Spark task 序列化总结
一. 概要说明
我们在 driver 中定义的数据,在 executor 中执行时是无法获取到 driver 中的数据的( 会报错Task not serializable
),task 分为 ShuffleMapTask 和 ResultTask,这两种 task 都已经实现了序列化了,所以如果我读取在 driver 中定义的对象,还需要将数据序列化。
- 其中一种方法是使用广播变量
- 另一种方法是我们手动编写数据的序列化方法。
二. 序列化示例
- 如下示例中,
Person
类的实例在传入到task中时无法序列化,会报如下错误diagnostics: User class threw exception: org.apache.spark.SparkException: Task not serializable
package com.yuchaoshui
import java.net.InetAddress
import org.apache.spark.{SparkConf, SparkContext}
class Person(name: String, age: Int)
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("yzyApp")
val sc = new SparkContext(conf)
var peopleMap = Map(
1 -> new Person("zhang", 20),
2 -> new Person("zhao", 40),
3 -> new Person("li", 10),
4 -> new Person("song", 90),
5 -> new Person("yu", 80),
6 -> new Person("qin", 140)
)
val nums = sc.parallelize(List(1, 2, 3, 4, 5, 6), 6)
val value = nums.map(x => (
x, peopleMap(x), Thread.currentThread().getId, InetAddress.getLocalHost.getHostName)
)
value.saveAsTextFile("/tmp/out1")
}
}
- 按照如下修改
Person
类的定义以后
class Person(name: String, age: Int) extends Serializable
// 结果:
// (1,com.yuchaoshui.Person@66102630,31,node3)
// (2,com.yuchaoshui.Person@4894fa21,31,node2)
// (3,com.yuchaoshui.Person@657f2a02,31,node2)
// (4,com.yuchaoshui.Person@2fbc16a,31,node3)
// (5,com.yuchaoshui.Person@64cb6ea5,31,node2)
// (6,com.yuchaoshui.Person@7f8ea18b,31,node3)
三. 序列化总结
如果在 Driver 初始化了一个 object 或者 new 了一个 class 实例,然后在函数中使用,必须实现序列化接口。
- 在一个 executor 内部,多个 task 会共用同一个 object,因为 object 是一个单列对象,一个进程只能有一个。
- 在一个 executor 内部,每个 task 会拥有自己的实例(new 的 class),一个task独享一个实例。