版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/stark_summer/article/details/49304913
异常信息
这里关于调用外部的closure时出现了一些错误,当函数是一个对象时一切正常,当函数是一个类时则出现如下报错:
Task not serializable: java.io.NotSerializableException: testing
下面是能正常工作的代码示例:
object working extends App {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
//calling function outside closure
val after = rddList.map(someFunc(_))
def someFunc(a:Int) = a+1
after.collect().map(println(_))
}
不能正常工作的代码:
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
解决方案
Spark是一个分布式计算引擎,它抽象的提出一种弹性分布式数据集(RDD),RDD可以被看作是一个分布式集合。RDD的元素是在集群的节点分区,但Spark将用户抽象的运用到这里,让用户使用RDD(集合)事感到就是在本地进行交互。
第二个例子不能正常运行的原因是从map中定义一个类—testing,这样调用函数的方法是不对的,Spark检测到了这一点,一旦没有对自己进行序列化的方法,Spark会尝试序列化整个testing类。这样当在另一个JVM中执行代码仍然可以运行。在这里有两种可能性:
首先你可以做一个类的序列化测试,以确保整个类都可以被Spark进行序列化:
import org.apache.spark.SparkContext
object Spark {
val ctx = new SparkContext("local", "test")
}
object NOTworking extends App {
new Test().doIT
}
class Test extends java.io.Serializable {
val rddList = Spark.ctx.parallelize(List(1,2,3))
def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}
def someFunc(a: Int) = a + 1
}
或者用函数someFunc来替代的方法,来确保Spark能够进行序列化:
import org.apache.spark.SparkContext
object Spark {
val ctx = new SparkContext("local", "test")
}
object NOTworking extends App {
new Test().doIT
}
class Test {
val rddList = Spark.ctx.parallelize(List(1,2,3))
def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}
val someFunc = (a: Int) => a + 1
}