我的集群:1个主节点,11个从节点,每个节点有6gb内存。

我的设置:

spark.executor.memory=4g, Dspark.akka.frameSize=512

问题是这样的:

首先,我从HDFS读取一些数据(2.19 GB)到RDD:

val imageBundleRDD = sc.newAPIHadoopFile(...)

其次,在这个RDD上做一些事情:

val res = imageBundleRDD.map(data => {
                               val desPoints = threeDReconstruction(data._2, bg)
                                 (data._1, desPoints)
                             })

最后,输出到HDFS:

res.saveAsNewAPIHadoopFile(...)

当我运行我的程序时,它显示:

.....
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:24 as TID 33 on executor 9: Salve7.Hadoop (NODE_LOCAL)
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:24 as 30618515 bytes in 210 ms
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:36 as TID 34 on executor 2: Salve11.Hadoop (NODE_LOCAL)
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:36 as 30618515 bytes in 449 ms
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Starting task 1.0:32 as TID 35 on executor 7: Salve4.Hadoop (NODE_LOCAL)
Uncaught error from thread [spark-akka.actor.default-dispatcher-3] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[spark]
java.lang.OutOfMemoryError: Java heap space

任务太多?

PS:当输入数据约为225 MB时,一切正常。

我该如何解决这个问题呢?


当前回答

设置内存堆大小的位置(至少在spark-1.0.0中)在conf/spark-env中。 相关变量为SPARK_EXECUTOR_MEMORY和SPARK_DRIVER_MEMORY。 部署指南中有更多的文档

此外,不要忘记将配置文件复制到所有从节点。

其他回答

为了添加一个通常不被讨论的用例,我将在本地模式下通过Spark -submit提交Spark应用程序时提出一个解决方案。

根据Jacek Laskowski的giitbook Mastering Apache Spark:

您可以在本地模式下运行Spark。在这种非分布式单JVM部署模式下,Spark在同一个JVM中生成所有执行组件——驱动程序、执行程序、后端和主机。这是驱动程序用于执行的唯一模式。

因此,如果您在堆中遇到OOM错误,调整驱动程序内存而不是执行程序内存就足够了。

这里有一个例子:

spark-1.6.1/bin/spark-submit
  --class "MyClass"
  --driver-memory 12g
  --master local[*] 
  target/scala-2.10/simple-project_2.10-1.0.jar 

对于上面提到的错误,我没有什么建议。

检查执行程序分配的内存可能必须处理需要比分配的内存更多的分区。

尝试查看是否有更多的shuffle是实时的,因为shuffle是昂贵的操作,因为它们涉及磁盘I/O、数据序列化和网络I/O

●使用广播连接

避免使用groupByKeys,尽量用ReduceByKey代替

●避免在任何发生洗牌的地方使用巨大的Java对象

堆空间错误通常是由于将太多数据带回驱动程序或执行程序而发生的。 在您的代码中,似乎没有将任何东西带回驱动程序,相反,您可能重载了使用threeDReconstruction()方法将一个输入记录/行映射到另一个输入记录/行的执行器。我不确定在方法定义中是什么,但这肯定会导致执行器的重载。 现在你有两个选择,

编辑你的代码,以更有效的方式进行三维重建。 不要编辑代码,但是给你的执行程序更多的内存,以及更多的内存开销。[spark.executor。内存或spark.driver.memoryOverhead]

我建议谨慎使用,只使用你需要的量。就内存需求而言,每个作业都是独一无二的,所以我建议根据经验尝试不同的值,每次增加2的幂(256M,512M,1G ..)等等)

您将得到一个可以工作的执行程序内存的值。尝试使用此值重新运行作业3或5次,然后再接受此配置。

广义上讲,spark Executor JVM内存可以分为两部分。Spark内存和User内存。这是由spark.memory.fraction属性控制的——值在0到1之间。 在spark应用程序中处理图像或执行内存密集型处理时,请考虑降低spark.memory.fraction。这将为应用程序工作提供更多内存。Spark可能溢出,所以它仍然可以在较少的内存共享下工作。

The second part of the problem is division of work. If possible, partition your data into smaller chunks. Smaller data possibly needs less memory. But if that is not possible, you are sacrifice compute for memory. Typically a single executor will be running multiple cores. Total memory of executors must be enough to handle memory requirements of all concurrent tasks. If increasing executor memory is not a option, you can decrease the cores per executor so that each task gets more memory to work with. Test with 1 core executors which have largest possible memory you can give and then keep increasing cores until you find the best core count.

看看启动脚本,Java堆大小设置在那里,看起来你在运行Spark worker之前没有设置这个。

# Set SPARK_MEM if it isn't already set since we also use it for this process
SPARK_MEM=${SPARK_MEM:-512m}
export SPARK_MEM

# Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="$OUR_JAVA_OPTS"
JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM"

您可以在这里找到部署脚本的文档。