我的集群:1个主节点,11个从节点,每个节点有6gb内存。

我的设置:

spark.executor.memory=4g, Dspark.akka.frameSize=512

问题是这样的:

首先,我从HDFS读取一些数据(2.19 GB)到RDD:

val imageBundleRDD = sc.newAPIHadoopFile(...)

其次,在这个RDD上做一些事情:

val res = imageBundleRDD.map(data => {
                               val desPoints = threeDReconstruction(data._2, bg)
                                 (data._1, desPoints)
                             })

最后,输出到HDFS:

res.saveAsNewAPIHadoopFile(...)

当我运行我的程序时,它显示:

.....
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:24 as TID 33 on executor 9: Salve7.Hadoop (NODE_LOCAL)
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:24 as 30618515 bytes in 210 ms
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:36 as TID 34 on executor 2: Salve11.Hadoop (NODE_LOCAL)
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:36 as 30618515 bytes in 449 ms
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Starting task 1.0:32 as TID 35 on executor 7: Salve4.Hadoop (NODE_LOCAL)
Uncaught error from thread [spark-akka.actor.default-dispatcher-3] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[spark]
java.lang.OutOfMemoryError: Java heap space

任务太多?

PS:当输入数据约为225 MB时,一切正常。

我该如何解决这个问题呢?


当前回答

为了添加一个通常不被讨论的用例,我将在本地模式下通过Spark -submit提交Spark应用程序时提出一个解决方案。

根据Jacek Laskowski的giitbook Mastering Apache Spark:

您可以在本地模式下运行Spark。在这种非分布式单JVM部署模式下,Spark在同一个JVM中生成所有执行组件——驱动程序、执行程序、后端和主机。这是驱动程序用于执行的唯一模式。

因此,如果您在堆中遇到OOM错误,调整驱动程序内存而不是执行程序内存就足够了。

这里有一个例子:

spark-1.6.1/bin/spark-submit
  --class "MyClass"
  --driver-memory 12g
  --master local[*] 
  target/scala-2.10/simple-project_2.10-1.0.jar 

其他回答

广义上讲,spark Executor JVM内存可以分为两部分。Spark内存和User内存。这是由spark.memory.fraction属性控制的——值在0到1之间。 在spark应用程序中处理图像或执行内存密集型处理时,请考虑降低spark.memory.fraction。这将为应用程序工作提供更多内存。Spark可能溢出,所以它仍然可以在较少的内存共享下工作。

The second part of the problem is division of work. If possible, partition your data into smaller chunks. Smaller data possibly needs less memory. But if that is not possible, you are sacrifice compute for memory. Typically a single executor will be running multiple cores. Total memory of executors must be enough to handle memory requirements of all concurrent tasks. If increasing executor memory is not a option, you can decrease the cores per executor so that each task gets more memory to work with. Test with 1 core executors which have largest possible memory you can give and then keep increasing cores until you find the best core count.

根据我对上面提供的代码的理解,它加载文件并进行映射操作并保存回来。没有需要shuffle的操作。此外,没有任何操作需要将数据传输到驱动程序,因此调优与shuffle或驱动程序相关的任何内容都不会产生影响。当任务太多时,驱动程序确实会有问题,但这只是在spark 2.0.2版本之前。可能会有两件事出错。

There are only one or a few executors. Increase the number of executors so that they can be allocated to different slaves. If you are using yarn need to change num-executors config or if you are using spark standalone then need to tune num cores per executor and spark max cores conf. In standalone num executors = max cores / cores per executor . The number of partitions are very few or maybe only one. So if this is low even if we have multi-cores,multi executors it will not be of much help as parallelization is dependent on the number of partitions. So increase the partitions by doing imageBundleRDD.repartition(11)

您应该增加驱动程序内存。在$SPARK_HOME/conf文件夹中,你应该找到spark-defaults.conf文件,编辑并设置spark.driver.memory 4000m,这取决于你主内存的大小。 这就是为我解决问题的方法,一切都很顺利

你应该配置offHeap内存设置如下所示:

val spark = SparkSession
     .builder()
     .master("local[*]")
     .config("spark.executor.memory", "70g")
     .config("spark.driver.memory", "50g")
     .config("spark.memory.offHeap.enabled",true)
     .config("spark.memory.offHeap.size","16g")   
     .appName("sampleCodeForReference")
     .getOrCreate()

根据您机器的RAM可用性提供驱动程序内存和执行程序内存。如果仍然面临OutofMemory问题,可以增加offHeap大小。

简单,如果你正在使用一个脚本或juyter笔记本,然后只设置配置路径,当你开始构建一个spark会话…

spark = SparkSession.builder.master('local[*]').config("spark.driver.memory", "15g").appName('testing').getOrCreate()