我的集群:1个主节点,11个从节点,每个节点有6gb内存。

我的设置:

spark.executor.memory=4g, Dspark.akka.frameSize=512

问题是这样的:

首先,我从HDFS读取一些数据(2.19 GB)到RDD:

val imageBundleRDD = sc.newAPIHadoopFile(...)

其次,在这个RDD上做一些事情:

val res = imageBundleRDD.map(data => {
                               val desPoints = threeDReconstruction(data._2, bg)
                                 (data._1, desPoints)
                             })

最后,输出到HDFS:

res.saveAsNewAPIHadoopFile(...)

当我运行我的程序时,它显示:

.....
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:24 as TID 33 on executor 9: Salve7.Hadoop (NODE_LOCAL)
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:24 as 30618515 bytes in 210 ms
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:36 as TID 34 on executor 2: Salve11.Hadoop (NODE_LOCAL)
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:36 as 30618515 bytes in 449 ms
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Starting task 1.0:32 as TID 35 on executor 7: Salve4.Hadoop (NODE_LOCAL)
Uncaught error from thread [spark-akka.actor.default-dispatcher-3] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[spark]
java.lang.OutOfMemoryError: Java heap space

任务太多?

PS:当输入数据约为225 MB时,一切正常。

我该如何解决这个问题呢?


当前回答

看看启动脚本,Java堆大小设置在那里,看起来你在运行Spark worker之前没有设置这个。

# Set SPARK_MEM if it isn't already set since we also use it for this process
SPARK_MEM=${SPARK_MEM:-512m}
export SPARK_MEM

# Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="$OUR_JAVA_OPTS"
JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM"

您可以在这里找到部署脚本的文档。

其他回答

在使用动态资源分配时,我经常遇到这个问题。我原以为它会利用我的集群资源来最适合这个应用程序。

但事实上,动态资源分配并没有设置驱动程序内存,而是将其保持为默认值,即1G。

我通过将spark.driver.memory设置为适合我的驱动器内存的数字来解决这个问题(对于32GB ram,我将其设置为18G)。

可以使用spark submit命令进行设置,方法如下:

spark-submit --conf spark.driver.memory=18g

非常重要的一点是,如果你从代码中设置这个属性,将不会被考虑,根据Spark文档-动态加载Spark属性:

Spark properties mainly can be divided into two kinds: one is related to deploy, like “spark.driver.memory”, “spark.executor.instances”, this kind of properties may not be affected when setting programmatically through SparkConf in runtime, or the behavior is depending on which cluster manager and deploy mode you choose, so it would be suggested to set through configuration file or spark-submit command line options; another is mainly related to Spark runtime control, like “spark.task.maxFailures”, this kind of properties can be set in either way.

你应该配置offHeap内存设置如下所示:

val spark = SparkSession
     .builder()
     .master("local[*]")
     .config("spark.executor.memory", "70g")
     .config("spark.driver.memory", "50g")
     .config("spark.memory.offHeap.enabled",true)
     .config("spark.memory.offHeap.size","16g")   
     .appName("sampleCodeForReference")
     .getOrCreate()

根据您机器的RAM可用性提供驱动程序内存和执行程序内存。如果仍然面临OutofMemory问题,可以增加offHeap大小。

为了添加一个通常不被讨论的用例,我将在本地模式下通过Spark -submit提交Spark应用程序时提出一个解决方案。

根据Jacek Laskowski的giitbook Mastering Apache Spark:

您可以在本地模式下运行Spark。在这种非分布式单JVM部署模式下,Spark在同一个JVM中生成所有执行组件——驱动程序、执行程序、后端和主机。这是驱动程序用于执行的唯一模式。

因此,如果您在堆中遇到OOM错误,调整驱动程序内存而不是执行程序内存就足够了。

这里有一个例子:

spark-1.6.1/bin/spark-submit
  --class "MyClass"
  --driver-memory 12g
  --master local[*] 
  target/scala-2.10/simple-project_2.10-1.0.jar 

您应该增加驱动程序内存。在$SPARK_HOME/conf文件夹中,你应该找到spark-defaults.conf文件,编辑并设置spark.driver.memory 4000m,这取决于你主内存的大小。 这就是为我解决问题的方法,一切都很顺利

设置内存堆大小的位置(至少在spark-1.0.0中)在conf/spark-env中。 相关变量为SPARK_EXECUTOR_MEMORY和SPARK_DRIVER_MEMORY。 部署指南中有更多的文档

此外,不要忘记将配置文件复制到所有从节点。