在闭包外部调用函数时出现奇怪的行为:

当函数在一个对象中时,一切都在工作 当函数在类中,get:

任务不可序列化:java.io.NotSerializableException:测试

问题是我需要在类而不是对象中编写代码。知道为什么会这样吗?Scala对象是否序列化(默认?)?

这是一个工作代码示例:

object working extends App {
    val list = List(1,2,3)

    val rddList = Spark.ctx.parallelize(list)
    //calling function outside closure 
    val after = rddList.map(someFunc(_))

    def someFunc(a:Int)  = a+1

    after.collect().map(println(_))
}

这是一个无效的例子:

object NOTworking extends App {
  new testing().doIT
}

//adding extends Serializable wont help
class testing {  
  val list = List(1,2,3)  
  val rddList = Spark.ctx.parallelize(list)

  def doIT =  {
    //again calling the fucntion someFunc 
    val after = rddList.map(someFunc(_))
    //this will crash (spark lazy)
    after.collect().map(println(_))
  }

  def someFunc(a:Int) = a+1
}

当前回答

我用另一种方法解决了这个问题。您只需要在传递闭包之前序列化对象,然后反序列化。即使您的类不是Serializable,这种方法也很有效,因为它在幕后使用了Kryo。你只需要一些咖喱。;)

下面是我的一个例子:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
               (foo: Foo) : Bar = {
    kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()

object Blah(abc: ABC) extends (Foo => Bar) {
    def apply(foo: Foo) : Bar = { //This is the real function }
}

你可以随心所欲地让Blah变得复杂,比如类、伴生对象、嵌套类、对多个第三方库的引用。

KryoSerializationWrapper是指:https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

其他回答

我用另一种方法解决了这个问题。您只需要在传递闭包之前序列化对象,然后反序列化。即使您的类不是Serializable,这种方法也很有效,因为它在幕后使用了Kryo。你只需要一些咖喱。;)

下面是我的一个例子:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
               (foo: Foo) : Bar = {
    kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()

object Blah(abc: ABC) extends (Foo => Bar) {
    def apply(foo: Foo) : Bar = { //This is the real function }
}

你可以随心所欲地让Blah变得复杂,比如类、伴生对象、嵌套类、对多个第三方库的引用。

KryoSerializationWrapper是指:https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

我的解决方案是添加一个同伴类来处理类中不可序列化的所有方法。

Scala类中定义的方法是不可序列化的,方法可以转换为函数来解决序列化问题。

方法的语法

def func_name (x String) : String = {
...
return x
}

函数的语法

val func_name = { (x String) => 
...
x
}

在Spark 2.4中,很多人可能会遇到这个问题。Kryo序列化已经变得更好,但在许多情况下,您不能使用spark.kryo.unsafe=true或幼稚的Kryo序列化器。

为了快速修复,请尝试在Spark配置中更改以下内容

spark.kryo.unsafe="false"

OR

spark.serializer="org.apache.spark.serializer.JavaSerializer"

我通过使用显式广播变量和新的内置twitter-chill api来修改我遇到或亲自编写的自定义RDD转换,将它们从RDD转换过来。将row =>映射到rdd。mapPartitions(partition =>{函数。

例子

老方法(不太好)

val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val outputRDD = rdd.map(row => {
    val value = sampleMap.get(row._1)
    value
})

替代(更好的)方式

import com.twitter.chill.MeatLocker
val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val brdSerSampleMap = spark.sparkContext.broadcast(MeatLocker(sampleMap))

rdd.mapPartitions(partition => {
    val deSerSampleMap = brdSerSampleMap.value.get
    partition.map(row => {
        val value = sampleMap.get(row._1)
        value
    }).toIterator
})

这种新方法将每个分区只调用广播变量一次,这更好。如果不注册类,仍然需要使用Java Serialization。

我也有过类似的经历。

当我在驱动程序(master)上初始化一个变量时触发了错误,但随后试图在其中一个工人上使用它。 当这种情况发生时,Spark Streaming将尝试序列化对象以将其发送给worker,如果对象不可序列化则失败。

我通过使变量为静态来解决这个错误。

以前的无效代码

  private final PhoneNumberUtil phoneUtil = PhoneNumberUtil.getInstance();

工作代码

  private static final PhoneNumberUtil phoneUtil = PhoneNumberUtil.getInstance();

学分:

https://learn.microsoft.com/en-us/answers/questions/35812/sparkexception-job-aborted-due-to-stage-failure-ta.html (pradeepcheekatla-msft的答案) https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html