在闭包外部调用函数时出现奇怪的行为:

当函数在一个对象中时,一切都在工作 当函数在类中,get:

任务不可序列化:java.io.NotSerializableException:测试

问题是我需要在类而不是对象中编写代码。知道为什么会这样吗?Scala对象是否序列化(默认?)?

这是一个工作代码示例:

object working extends App {
    val list = List(1,2,3)

    val rddList = Spark.ctx.parallelize(list)
    //calling function outside closure 
    val after = rddList.map(someFunc(_))

    def someFunc(a:Int)  = a+1

    after.collect().map(println(_))
}

这是一个无效的例子:

object NOTworking extends App {
  new testing().doIT
}

//adding extends Serializable wont help
class testing {  
  val list = List(1,2,3)  
  val rddList = Spark.ctx.parallelize(list)

  def doIT =  {
    //again calling the fucntion someFunc 
    val after = rddList.map(someFunc(_))
    //this will crash (spark lazy)
    after.collect().map(println(_))
  }

  def someFunc(a:Int) = a+1
}

当前回答

rdd扩展了Serialisable接口,所以这不是导致任务失败的原因。现在这并不意味着你可以用Spark序列化一个RDD并且避免NotSerializableException

Spark是一个分布式计算引擎,它的主要抽象是一个弹性分布式数据集(RDD),它可以被视为一个分布式集合。基本上,RDD的元素是在集群的节点上进行分区的,但是Spark将其从用户那里抽象出来,让用户与RDD(集合)交互,就像它是一个本地的集合一样。

不涉及太多细节,但当你在RDD上运行不同的转换(map, flatMap, filter和其他)时,你的转换代码(闭包)是:

在驱动节点上序列化, 发送到集群中的适当节点, 反序列化, 最后在节点上执行

当然,您可以在本地运行(如您的示例),但所有这些阶段(除了通过网络传输)仍然会发生。[这可以让您在部署到生产环境之前捕捉任何错误]

在第二种情况下,您正在调用一个方法,该方法是在map函数内部的类测试中定义的。Spark看到了这一点,因为方法不能单独序列化,所以Spark尝试序列化整个测试类,这样代码在另一个JVM中执行时仍然可以工作。你有两种可能:

要么你让类测试可序列化,这样整个类都可以被Spark序列化:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test extends java.io.Serializable {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  def someFunc(a: Int) = a + 1
}

或者你让someFunc函数而不是方法(函数在Scala中是对象),这样Spark就可以序列化它:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  val someFunc = (a: Int) => a + 1
}

类似的,但不相同的类序列化问题可能会引起您的兴趣,您可以在Spark Summit 2013的演讲中阅读它。

作为旁注,你可以将rddList.map(someFunc(_))重写为rddList.map(someFunc),它们是完全相同的。通常情况下,首选第二种,因为它不那么冗长,读起来更清楚。

编辑(2015-03-15):Spark -5307引入了SerializationDebugger, Spark 1.3.0是第一个使用它的版本。它将序列化路径添加到NotSerializableException。当遇到NotSerializableException时,调试器访问对象图以查找无法序列化的对象的路径,并构造信息以帮助用户找到该对象。

在OP的例子中,这是打印到stdout的内容:

Serialization stack:
    - object not serializable (class: testing, value: testing@2dfe2f00)
    - field (class: testing$$anonfun$1, name: $outer, type: class testing)
    - object (class testing$$anonfun$1, <function1>)

其他回答

我不完全确定这适用于Scala,但在Java中,我通过重构我的代码解决了NotSerializableException,这样闭包就不会访问不可序列化的final字段。

def upper(name: String) : String = { 
var uppper : String  =  name.toUpperCase()
uppper
}

val toUpperName = udf {(EmpName: String) => upper(EmpName)}
val emp_details = """[{"id": "1","name": "James Butt","country": "USA"},
{"id": "2", "name": "Josephine Darakjy","country": "USA"},
{"id": "3", "name": "Art Venere","country": "USA"},
{"id": "4", "name": "Lenna Paprocki","country": "USA"},
{"id": "5", "name": "Donette Foller","country": "USA"},
{"id": "6", "name": "Leota Dilliard","country": "USA"}]"""

val df_emp = spark.read.json(Seq(emp_details).toDS())
val df_name=df_emp.select($"id",$"name")
val df_upperName= df_name.withColumn("name",toUpperName($"name")).filter("id='5'")
display(df_upperName)

这会产生错误 sparkexception:任务不可序列化 org.apache.spark.util.ClosureCleaner .ensureSerializable美元(ClosureCleaner.scala: 304)

解决方案-

import java.io.Serializable;

object obj_upper extends Serializable { 
  def upper(name: String) : String = 
  {
    var uppper : String  =  name.toUpperCase()
    uppper
  }
val toUpperName = udf {(EmpName: String) => upper(EmpName)}
}

val df_upperName= 
df_name.withColumn("name",obj_upper.toUpperName($"name")).filter("id='5'")
display(df_upperName)

完整的演讲充分解释了这个问题,并提出了一个很好的范式转换方法来避免这些序列化问题:https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory-leaks-no-ws.md

投票最多的答案基本上是建议抛弃整个语言特性——不再使用方法,只使用函数。的确,在函数式编程中,类中的方法应该避免,但是将它们转换为函数并不能解决设计问题(参见上面的链接)。

在这种特殊的情况下,你可以使用@transient注释告诉它不要试图序列化有问题的值(这里是Spark。ctx是一个自定义类,而不是Spark的OP命名):

@transient
val rddList = Spark.ctx.parallelize(list)

您还可以重新构造代码,使rddList存在于其他地方,但这也很麻烦。

未来可能是孢子

在将来,Scala将包括这些被称为“孢子”的东西,这将允许我们细粒度地控制闭包到底会吸引什么,不吸引什么。此外,这应该将所有意外拉入不可序列化类型(或任何不需要的值)的错误转变为编译错误,而不是现在可怕的运行时异常/内存泄漏。

http://docs.scala-lang.org/sips/pending/spores.html

关于Kryo序列化的提示

当使用kyro,使注册是必要的,这将意味着你得到错误,而不是内存泄漏:

最后,我知道kryo有kryo. setregistrationoptional (true),但我很难弄清楚如何使用它。当打开这个选项时,如果我没有注册类,kryo似乎仍然会抛出异常。”

用kryo注册类的策略

当然,这只能提供类型级别的控制,而不是值级别的控制。

... 还会有更多的想法。

rdd扩展了Serialisable接口,所以这不是导致任务失败的原因。现在这并不意味着你可以用Spark序列化一个RDD并且避免NotSerializableException

Spark是一个分布式计算引擎,它的主要抽象是一个弹性分布式数据集(RDD),它可以被视为一个分布式集合。基本上,RDD的元素是在集群的节点上进行分区的,但是Spark将其从用户那里抽象出来,让用户与RDD(集合)交互,就像它是一个本地的集合一样。

不涉及太多细节,但当你在RDD上运行不同的转换(map, flatMap, filter和其他)时,你的转换代码(闭包)是:

在驱动节点上序列化, 发送到集群中的适当节点, 反序列化, 最后在节点上执行

当然,您可以在本地运行(如您的示例),但所有这些阶段(除了通过网络传输)仍然会发生。[这可以让您在部署到生产环境之前捕捉任何错误]

在第二种情况下,您正在调用一个方法,该方法是在map函数内部的类测试中定义的。Spark看到了这一点,因为方法不能单独序列化,所以Spark尝试序列化整个测试类,这样代码在另一个JVM中执行时仍然可以工作。你有两种可能:

要么你让类测试可序列化,这样整个类都可以被Spark序列化:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test extends java.io.Serializable {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  def someFunc(a: Int) = a + 1
}

或者你让someFunc函数而不是方法(函数在Scala中是对象),这样Spark就可以序列化它:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  val someFunc = (a: Int) => a + 1
}

类似的,但不相同的类序列化问题可能会引起您的兴趣,您可以在Spark Summit 2013的演讲中阅读它。

作为旁注,你可以将rddList.map(someFunc(_))重写为rddList.map(someFunc),它们是完全相同的。通常情况下,首选第二种,因为它不那么冗长,读起来更清楚。

编辑(2015-03-15):Spark -5307引入了SerializationDebugger, Spark 1.3.0是第一个使用它的版本。它将序列化路径添加到NotSerializableException。当遇到NotSerializableException时,调试器访问对象图以查找无法序列化的对象的路径,并构造信息以帮助用户找到该对象。

在OP的例子中,这是打印到stdout的内容:

Serialization stack:
    - object not serializable (class: testing, value: testing@2dfe2f00)
    - field (class: testing$$anonfun$1, name: $outer, type: class testing)
    - object (class testing$$anonfun$1, <function1>)

我也遇到过类似的问题,从Grega的回答中我理解到

object NOTworking extends App {
 new testing().doIT
}
//adding extends Serializable wont help
class testing {

val list = List(1,2,3)

val rddList = Spark.ctx.parallelize(list)

def doIT =  {
  //again calling the fucntion someFunc 
  val after = rddList.map(someFunc(_))
  //this will crash (spark lazy)
  after.collect().map(println(_))
}

def someFunc(a:Int) = a+1

}

你的doIT方法试图序列化someFunc(_)方法,但由于方法是不可序列化的,它试图序列化类测试,这也是不可序列化的。

为了让你的代码工作,你应该在doIT方法中定义someFunc。例如:

def doIT =  {
 def someFunc(a:Int) = a+1
  //function definition
 }
 val after = rddList.map(someFunc(_))
 after.collect().map(println(_))
}

如果有多个函数进入图中,那么所有这些函数都应该对父上下文可用。