rdd扩展了Serialisable接口,所以这不是导致任务失败的原因。现在这并不意味着你可以用Spark序列化一个RDD并且避免NotSerializableException
Spark是一个分布式计算引擎,它的主要抽象是一个弹性分布式数据集(RDD),它可以被视为一个分布式集合。基本上,RDD的元素是在集群的节点上进行分区的,但是Spark将其从用户那里抽象出来,让用户与RDD(集合)交互,就像它是一个本地的集合一样。
不涉及太多细节,但当你在RDD上运行不同的转换(map, flatMap, filter和其他)时,你的转换代码(闭包)是:
在驱动节点上序列化,
发送到集群中的适当节点,
反序列化,
最后在节点上执行
当然,您可以在本地运行(如您的示例),但所有这些阶段(除了通过网络传输)仍然会发生。[这可以让您在部署到生产环境之前捕捉任何错误]
在第二种情况下,您正在调用一个方法,该方法是在map函数内部的类测试中定义的。Spark看到了这一点,因为方法不能单独序列化,所以Spark尝试序列化整个测试类,这样代码在另一个JVM中执行时仍然可以工作。你有两种可能:
要么你让类测试可序列化,这样整个类都可以被Spark序列化:
import org.apache.spark.{SparkContext,SparkConf}
object Spark {
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}
object NOTworking extends App {
new Test().doIT
}
class Test extends java.io.Serializable {
val rddList = Spark.ctx.parallelize(List(1,2,3))
def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}
def someFunc(a: Int) = a + 1
}
或者你让someFunc函数而不是方法(函数在Scala中是对象),这样Spark就可以序列化它:
import org.apache.spark.{SparkContext,SparkConf}
object Spark {
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}
object NOTworking extends App {
new Test().doIT
}
class Test {
val rddList = Spark.ctx.parallelize(List(1,2,3))
def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}
val someFunc = (a: Int) => a + 1
}
类似的,但不相同的类序列化问题可能会引起您的兴趣,您可以在Spark Summit 2013的演讲中阅读它。
作为旁注,你可以将rddList.map(someFunc(_))重写为rddList.map(someFunc),它们是完全相同的。通常情况下,首选第二种,因为它不那么冗长,读起来更清楚。
编辑(2015-03-15):Spark -5307引入了SerializationDebugger, Spark 1.3.0是第一个使用它的版本。它将序列化路径添加到NotSerializableException。当遇到NotSerializableException时,调试器访问对象图以查找无法序列化的对象的路径,并构造信息以帮助用户找到该对象。
在OP的例子中,这是打印到stdout的内容:
Serialization stack:
- object not serializable (class: testing, value: testing@2dfe2f00)
- field (class: testing$$anonfun$1, name: $outer, type: class testing)
- object (class testing$$anonfun$1, <function1>)