在闭包外部调用函数时出现奇怪的行为:
当函数在一个对象中时,一切都在工作
当函数在类中,get:
任务不可序列化:java.io.NotSerializableException:测试
问题是我需要在类而不是对象中编写代码。知道为什么会这样吗?Scala对象是否序列化(默认?)?
这是一个工作代码示例:
object working extends App {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
//calling function outside closure
val after = rddList.map(someFunc(_))
def someFunc(a:Int) = a+1
after.collect().map(println(_))
}
这是一个无效的例子:
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
我也有过类似的经历。
当我在驱动程序(master)上初始化一个变量时触发了错误,但随后试图在其中一个工人上使用它。
当这种情况发生时,Spark Streaming将尝试序列化对象以将其发送给worker,如果对象不可序列化则失败。
我通过使变量为静态来解决这个错误。
以前的无效代码
private final PhoneNumberUtil phoneUtil = PhoneNumberUtil.getInstance();
工作代码
private static final PhoneNumberUtil phoneUtil = PhoneNumberUtil.getInstance();
学分:
https://learn.microsoft.com/en-us/answers/questions/35812/sparkexception-job-aborted-due-to-stage-failure-ta.html (pradeepcheekatla-msft的答案)
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html
def upper(name: String) : String = {
var uppper : String = name.toUpperCase()
uppper
}
val toUpperName = udf {(EmpName: String) => upper(EmpName)}
val emp_details = """[{"id": "1","name": "James Butt","country": "USA"},
{"id": "2", "name": "Josephine Darakjy","country": "USA"},
{"id": "3", "name": "Art Venere","country": "USA"},
{"id": "4", "name": "Lenna Paprocki","country": "USA"},
{"id": "5", "name": "Donette Foller","country": "USA"},
{"id": "6", "name": "Leota Dilliard","country": "USA"}]"""
val df_emp = spark.read.json(Seq(emp_details).toDS())
val df_name=df_emp.select($"id",$"name")
val df_upperName= df_name.withColumn("name",toUpperName($"name")).filter("id='5'")
display(df_upperName)
这会产生错误
sparkexception:任务不可序列化
org.apache.spark.util.ClosureCleaner .ensureSerializable美元(ClosureCleaner.scala: 304)
解决方案-
import java.io.Serializable;
object obj_upper extends Serializable {
def upper(name: String) : String =
{
var uppper : String = name.toUpperCase()
uppper
}
val toUpperName = udf {(EmpName: String) => upper(EmpName)}
}
val df_upperName=
df_name.withColumn("name",obj_upper.toUpperName($"name")).filter("id='5'")
display(df_upperName)
我也有过类似的经历。
当我在驱动程序(master)上初始化一个变量时触发了错误,但随后试图在其中一个工人上使用它。
当这种情况发生时,Spark Streaming将尝试序列化对象以将其发送给worker,如果对象不可序列化则失败。
我通过使变量为静态来解决这个错误。
以前的无效代码
private final PhoneNumberUtil phoneUtil = PhoneNumberUtil.getInstance();
工作代码
private static final PhoneNumberUtil phoneUtil = PhoneNumberUtil.getInstance();
学分:
https://learn.microsoft.com/en-us/answers/questions/35812/sparkexception-job-aborted-due-to-stage-failure-ta.html (pradeepcheekatla-msft的答案)
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html
Grega的回答很好地解释了为什么原始代码不能工作以及解决这个问题的两种方法。然而,这种解决方案不是很灵活;考虑这样一种情况,闭包包含一个对你无法控制的非serializable类的方法调用。您既不能向该类添加Serializable标记,也不能更改底层实现以将方法更改为函数。
Nilesh提出了一个很好的解决方案,但解决方案可以更简洁和通用:
def genMapper[A, B](f: A => B): A => B = {
val locker = com.twitter.chill.MeatLocker(f)
x => locker.get.apply(x)
}
这个函数序列化器可以用来自动封装闭包和方法调用:
rdd map genMapper(someFunc)
这种技术还有一个好处,就是不需要额外的Shark依赖来访问KryoSerializationWrapper,因为Twitter的Chill已经被核心Spark引入了