在闭包外部调用函数时出现奇怪的行为:

当函数在一个对象中时,一切都在工作 当函数在类中,get:

任务不可序列化:java.io.NotSerializableException:测试

问题是我需要在类而不是对象中编写代码。知道为什么会这样吗?Scala对象是否序列化(默认?)?

这是一个工作代码示例:

object working extends App {
    val list = List(1,2,3)

    val rddList = Spark.ctx.parallelize(list)
    //calling function outside closure 
    val after = rddList.map(someFunc(_))

    def someFunc(a:Int)  = a+1

    after.collect().map(println(_))
}

这是一个无效的例子:

object NOTworking extends App {
  new testing().doIT
}

//adding extends Serializable wont help
class testing {  
  val list = List(1,2,3)  
  val rddList = Spark.ctx.parallelize(list)

  def doIT =  {
    //again calling the fucntion someFunc 
    val after = rddList.map(someFunc(_))
    //this will crash (spark lazy)
    after.collect().map(println(_))
  }

  def someFunc(a:Int) = a+1
}

当前回答

Scala类中定义的方法是不可序列化的,方法可以转换为函数来解决序列化问题。

方法的语法

def func_name (x String) : String = {
...
return x
}

函数的语法

val func_name = { (x String) => 
...
x
}

其他回答

def upper(name: String) : String = { 
var uppper : String  =  name.toUpperCase()
uppper
}

val toUpperName = udf {(EmpName: String) => upper(EmpName)}
val emp_details = """[{"id": "1","name": "James Butt","country": "USA"},
{"id": "2", "name": "Josephine Darakjy","country": "USA"},
{"id": "3", "name": "Art Venere","country": "USA"},
{"id": "4", "name": "Lenna Paprocki","country": "USA"},
{"id": "5", "name": "Donette Foller","country": "USA"},
{"id": "6", "name": "Leota Dilliard","country": "USA"}]"""

val df_emp = spark.read.json(Seq(emp_details).toDS())
val df_name=df_emp.select($"id",$"name")
val df_upperName= df_name.withColumn("name",toUpperName($"name")).filter("id='5'")
display(df_upperName)

这会产生错误 sparkexception:任务不可序列化 org.apache.spark.util.ClosureCleaner .ensureSerializable美元(ClosureCleaner.scala: 304)

解决方案-

import java.io.Serializable;

object obj_upper extends Serializable { 
  def upper(name: String) : String = 
  {
    var uppper : String  =  name.toUpperCase()
    uppper
  }
val toUpperName = udf {(EmpName: String) => upper(EmpName)}
}

val df_upperName= 
df_name.withColumn("name",obj_upper.toUpperName($"name")).filter("id='5'")
display(df_upperName)

Grega的回答很好地解释了为什么原始代码不能工作以及解决这个问题的两种方法。然而,这种解决方案不是很灵活;考虑这样一种情况,闭包包含一个对你无法控制的非serializable类的方法调用。您既不能向该类添加Serializable标记,也不能更改底层实现以将方法更改为函数。

Nilesh提出了一个很好的解决方案,但解决方案可以更简洁和通用:

def genMapper[A, B](f: A => B): A => B = {
  val locker = com.twitter.chill.MeatLocker(f)
  x => locker.get.apply(x)
}

这个函数序列化器可以用来自动封装闭包和方法调用:

rdd map genMapper(someFunc)

这种技术还有一个好处,就是不需要额外的Shark依赖来访问KryoSerializationWrapper,因为Twitter的Chill已经被核心Spark引入了

我用另一种方法解决了这个问题。您只需要在传递闭包之前序列化对象,然后反序列化。即使您的类不是Serializable,这种方法也很有效,因为它在幕后使用了Kryo。你只需要一些咖喱。;)

下面是我的一个例子:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
               (foo: Foo) : Bar = {
    kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()

object Blah(abc: ABC) extends (Foo => Bar) {
    def apply(foo: Foo) : Bar = { //This is the real function }
}

你可以随心所欲地让Blah变得复杂,比如类、伴生对象、嵌套类、对多个第三方库的引用。

KryoSerializationWrapper是指:https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

完整的演讲充分解释了这个问题,并提出了一个很好的范式转换方法来避免这些序列化问题:https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory-leaks-no-ws.md

投票最多的答案基本上是建议抛弃整个语言特性——不再使用方法,只使用函数。的确,在函数式编程中,类中的方法应该避免,但是将它们转换为函数并不能解决设计问题(参见上面的链接)。

在这种特殊的情况下,你可以使用@transient注释告诉它不要试图序列化有问题的值(这里是Spark。ctx是一个自定义类,而不是Spark的OP命名):

@transient
val rddList = Spark.ctx.parallelize(list)

您还可以重新构造代码,使rddList存在于其他地方,但这也很麻烦。

未来可能是孢子

在将来,Scala将包括这些被称为“孢子”的东西,这将允许我们细粒度地控制闭包到底会吸引什么,不吸引什么。此外,这应该将所有意外拉入不可序列化类型(或任何不需要的值)的错误转变为编译错误,而不是现在可怕的运行时异常/内存泄漏。

http://docs.scala-lang.org/sips/pending/spores.html

关于Kryo序列化的提示

当使用kyro,使注册是必要的,这将意味着你得到错误,而不是内存泄漏:

最后,我知道kryo有kryo. setregistrationoptional (true),但我很难弄清楚如何使用它。当打开这个选项时,如果我没有注册类,kryo似乎仍然会抛出异常。”

用kryo注册类的策略

当然,这只能提供类型级别的控制,而不是值级别的控制。

... 还会有更多的想法。

我也有过类似的经历。

当我在驱动程序(master)上初始化一个变量时触发了错误,但随后试图在其中一个工人上使用它。 当这种情况发生时,Spark Streaming将尝试序列化对象以将其发送给worker,如果对象不可序列化则失败。

我通过使变量为静态来解决这个错误。

以前的无效代码

  private final PhoneNumberUtil phoneUtil = PhoneNumberUtil.getInstance();

工作代码

  private static final PhoneNumberUtil phoneUtil = PhoneNumberUtil.getInstance();

学分:

https://learn.microsoft.com/en-us/answers/questions/35812/sparkexception-job-aborted-due-to-stage-failure-ta.html (pradeepcheekatla-msft的答案) https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html