在闭包外部调用函数时出现奇怪的行为:
当函数在一个对象中时,一切都在工作
当函数在类中,get:
任务不可序列化:java.io.NotSerializableException:测试
问题是我需要在类而不是对象中编写代码。知道为什么会这样吗?Scala对象是否序列化(默认?)?
这是一个工作代码示例:
object working extends App {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
//calling function outside closure
val after = rddList.map(someFunc(_))
def someFunc(a:Int) = a+1
after.collect().map(println(_))
}
这是一个无效的例子:
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
我也有过类似的经历。
当我在驱动程序(master)上初始化一个变量时触发了错误,但随后试图在其中一个工人上使用它。
当这种情况发生时,Spark Streaming将尝试序列化对象以将其发送给worker,如果对象不可序列化则失败。
我通过使变量为静态来解决这个错误。
以前的无效代码
private final PhoneNumberUtil phoneUtil = PhoneNumberUtil.getInstance();
工作代码
private static final PhoneNumberUtil phoneUtil = PhoneNumberUtil.getInstance();
学分:
https://learn.microsoft.com/en-us/answers/questions/35812/sparkexception-job-aborted-due-to-stage-failure-ta.html (pradeepcheekatla-msft的答案)
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html
完整的演讲充分解释了这个问题,并提出了一个很好的范式转换方法来避免这些序列化问题:https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory-leaks-no-ws.md
投票最多的答案基本上是建议抛弃整个语言特性——不再使用方法,只使用函数。的确,在函数式编程中,类中的方法应该避免,但是将它们转换为函数并不能解决设计问题(参见上面的链接)。
在这种特殊的情况下,你可以使用@transient注释告诉它不要试图序列化有问题的值(这里是Spark。ctx是一个自定义类,而不是Spark的OP命名):
@transient
val rddList = Spark.ctx.parallelize(list)
您还可以重新构造代码,使rddList存在于其他地方,但这也很麻烦。
未来可能是孢子
在将来,Scala将包括这些被称为“孢子”的东西,这将允许我们细粒度地控制闭包到底会吸引什么,不吸引什么。此外,这应该将所有意外拉入不可序列化类型(或任何不需要的值)的错误转变为编译错误,而不是现在可怕的运行时异常/内存泄漏。
http://docs.scala-lang.org/sips/pending/spores.html
关于Kryo序列化的提示
当使用kyro,使注册是必要的,这将意味着你得到错误,而不是内存泄漏:
最后,我知道kryo有kryo. setregistrationoptional (true),但我很难弄清楚如何使用它。当打开这个选项时,如果我没有注册类,kryo似乎仍然会抛出异常。”
用kryo注册类的策略
当然,这只能提供类型级别的控制,而不是值级别的控制。
... 还会有更多的想法。
在Spark 2.4中,很多人可能会遇到这个问题。Kryo序列化已经变得更好,但在许多情况下,您不能使用spark.kryo.unsafe=true或幼稚的Kryo序列化器。
为了快速修复,请尝试在Spark配置中更改以下内容
spark.kryo.unsafe="false"
OR
spark.serializer="org.apache.spark.serializer.JavaSerializer"
我通过使用显式广播变量和新的内置twitter-chill api来修改我遇到或亲自编写的自定义RDD转换,将它们从RDD转换过来。将row =>映射到rdd。mapPartitions(partition =>{函数。
例子
老方法(不太好)
val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val outputRDD = rdd.map(row => {
val value = sampleMap.get(row._1)
value
})
替代(更好的)方式
import com.twitter.chill.MeatLocker
val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val brdSerSampleMap = spark.sparkContext.broadcast(MeatLocker(sampleMap))
rdd.mapPartitions(partition => {
val deSerSampleMap = brdSerSampleMap.value.get
partition.map(row => {
val value = sampleMap.get(row._1)
value
}).toIterator
})
这种新方法将每个分区只调用广播变量一次,这更好。如果不注册类,仍然需要使用Java Serialization。
Grega的回答很好地解释了为什么原始代码不能工作以及解决这个问题的两种方法。然而,这种解决方案不是很灵活;考虑这样一种情况,闭包包含一个对你无法控制的非serializable类的方法调用。您既不能向该类添加Serializable标记,也不能更改底层实现以将方法更改为函数。
Nilesh提出了一个很好的解决方案,但解决方案可以更简洁和通用:
def genMapper[A, B](f: A => B): A => B = {
val locker = com.twitter.chill.MeatLocker(f)
x => locker.get.apply(x)
}
这个函数序列化器可以用来自动封装闭包和方法调用:
rdd map genMapper(someFunc)
这种技术还有一个好处,就是不需要额外的Shark依赖来访问KryoSerializationWrapper,因为Twitter的Chill已经被核心Spark引入了
我也遇到过类似的问题,从Grega的回答中我理解到
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
你的doIT方法试图序列化someFunc(_)方法,但由于方法是不可序列化的,它试图序列化类测试,这也是不可序列化的。
为了让你的代码工作,你应该在doIT方法中定义someFunc。例如:
def doIT = {
def someFunc(a:Int) = a+1
//function definition
}
val after = rddList.map(someFunc(_))
after.collect().map(println(_))
}
如果有多个函数进入图中,那么所有这些函数都应该对父上下文可用。