我的集群:1个主节点,11个从节点,每个节点有6gb内存。

我的设置:

spark.executor.memory=4g, Dspark.akka.frameSize=512

问题是这样的:

首先,我从HDFS读取一些数据(2.19 GB)到RDD:

val imageBundleRDD = sc.newAPIHadoopFile(...)

其次,在这个RDD上做一些事情:

val res = imageBundleRDD.map(data => {
                               val desPoints = threeDReconstruction(data._2, bg)
                                 (data._1, desPoints)
                             })

最后,输出到HDFS:

res.saveAsNewAPIHadoopFile(...)

当我运行我的程序时,它显示:

.....
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:24 as TID 33 on executor 9: Salve7.Hadoop (NODE_LOCAL)
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:24 as 30618515 bytes in 210 ms
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:36 as TID 34 on executor 2: Salve11.Hadoop (NODE_LOCAL)
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:36 as 30618515 bytes in 449 ms
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Starting task 1.0:32 as TID 35 on executor 7: Salve4.Hadoop (NODE_LOCAL)
Uncaught error from thread [spark-akka.actor.default-dispatcher-3] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[spark]
java.lang.OutOfMemoryError: Java heap space

任务太多?

PS:当输入数据约为225 MB时,一切正常。

我该如何解决这个问题呢?


当前回答

我有一些建议:

If your nodes are configured to have 6g maximum for Spark (and are leaving a little for other processes), then use 6g rather than 4g, spark.executor.memory=6g. Make sure you're using as much memory as possible by checking the UI (it will say how much mem you're using) Try using more partitions, you should have 2 - 4 per CPU. IME increasing the number of partitions is often the easiest way to make a program more stable (and often faster). For huge amounts of data you may need way more than 4 per CPU, I've had to use 8000 partitions in some cases! Decrease the fraction of memory reserved for caching, using spark.storage.memoryFraction. If you don't use cache() or persist in your code, this might as well be 0. It's default is 0.6, which means you only get 0.4 * 4g memory for your heap. IME reducing the mem frac often makes OOMs go away. UPDATE: From spark 1.6 apparently we will no longer need to play with these values, spark will determine them automatically. Similar to above but shuffle memory fraction. If your job doesn't need much shuffle memory then set it to a lower value (this might cause your shuffles to spill to disk which can have catastrophic impact on speed). Sometimes when it's a shuffle operation that's OOMing you need to do the opposite i.e. set it to something large, like 0.8, or make sure you allow your shuffles to spill to disk (it's the default since 1.0.0). Watch out for memory leaks, these are often caused by accidentally closing over objects you don't need in your lambdas. The way to diagnose is to look out for the "task serialized as XXX bytes" in the logs, if XXX is larger than a few k or more than an MB, you may have a memory leak. See https://stackoverflow.com/a/25270600/1586965 Related to above; use broadcast variables if you really do need large objects. If you are caching large RDDs and can sacrifice some access time consider serialising the RDD http://spark.apache.org/docs/latest/tuning.html#serialized-rdd-storage. Or even caching them on disk (which sometimes isn't that bad if using SSDs). (Advanced) Related to above, avoid String and heavily nested structures (like Map and nested case classes). If possible try to only use primitive types and index all non-primitives especially if you expect a lot of duplicates. Choose WrappedArray over nested structures whenever possible. Or even roll out your own serialisation - YOU will have the most information regarding how to efficiently back your data into bytes, USE IT! (bit hacky) Again when caching, consider using a Dataset to cache your structure as it will use more efficient serialisation. This should be regarded as a hack when compared to the previous bullet point. Building your domain knowledge into your algo/serialisation can minimise memory/cache-space by 100x or 1000x, whereas all a Dataset will likely give is 2x - 5x in memory and 10x compressed (parquet) on disk.

http://spark.apache.org/docs/1.2.1/configuration.html

编辑:(所以我可以谷歌自己更容易)下面也表明了这个问题:

java.lang.OutOfMemoryError : GC overhead limit exceeded

其他回答

你应该配置offHeap内存设置如下所示:

val spark = SparkSession
     .builder()
     .master("local[*]")
     .config("spark.executor.memory", "70g")
     .config("spark.driver.memory", "50g")
     .config("spark.memory.offHeap.enabled",true)
     .config("spark.memory.offHeap.size","16g")   
     .appName("sampleCodeForReference")
     .getOrCreate()

根据您机器的RAM可用性提供驱动程序内存和执行程序内存。如果仍然面临OutofMemory问题,可以增加offHeap大小。

对于上面提到的错误,我没有什么建议。

检查执行程序分配的内存可能必须处理需要比分配的内存更多的分区。

尝试查看是否有更多的shuffle是实时的,因为shuffle是昂贵的操作,因为它们涉及磁盘I/O、数据序列化和网络I/O

●使用广播连接

避免使用groupByKeys,尽量用ReduceByKey代替

●避免在任何发生洗牌的地方使用巨大的Java对象

您应该增加驱动程序内存。在$SPARK_HOME/conf文件夹中,你应该找到spark-defaults.conf文件,编辑并设置spark.driver.memory 4000m,这取决于你主内存的大小。 这就是为我解决问题的方法,一切都很顺利

设置内存堆大小的位置(至少在spark-1.0.0中)在conf/spark-env中。 相关变量为SPARK_EXECUTOR_MEMORY和SPARK_DRIVER_MEMORY。 部署指南中有更多的文档

此外,不要忘记将配置文件复制到所有从节点。

看看启动脚本,Java堆大小设置在那里,看起来你在运行Spark worker之前没有设置这个。

# Set SPARK_MEM if it isn't already set since we also use it for this process
SPARK_MEM=${SPARK_MEM:-512m}
export SPARK_MEM

# Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="$OUR_JAVA_OPTS"
JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM"

您可以在这里找到部署脚本的文档。