已复制
全屏展示
复制代码

Spark task 序列化总结


· 2 min read

一. 概要说明

我们在 driver 中定义的数据,在 executor 中执行时是无法获取到 driver 中的数据的( 会报错Task not serializable),task 分为 ShuffleMapTask 和 ResultTask,这两种 task 都已经实现了序列化了,所以如果我读取在 driver 中定义的对象,还需要将数据序列化。

  • 其中一种方法是使用广播变量
  • 另一种方法是我们手动编写数据的序列化方法。

二. 序列化示例

  • 如下示例中,Person类的实例在传入到task中时无法序列化,会报如下错误
    diagnostics: User class threw exception: org.apache.spark.SparkException: Task not serializable
package com.yuchaoshui

import java.net.InetAddress
import org.apache.spark.{SparkConf, SparkContext}

class Person(name: String, age: Int)

object WordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("yzyApp")
    val sc = new SparkContext(conf)
    
    var peopleMap = Map(
      1 -> new Person("zhang", 20),
      2 -> new Person("zhao", 40),
      3 -> new Person("li", 10),
      4 -> new Person("song", 90),
      5 -> new Person("yu", 80),
      6 -> new Person("qin", 140)
    )
    
    val nums = sc.parallelize(List(1, 2, 3, 4, 5, 6), 6)
    val value = nums.map(x => (
      x, peopleMap(x), Thread.currentThread().getId, InetAddress.getLocalHost.getHostName)
                        )
    value.saveAsTextFile("/tmp/out1")
  }
}
  • 按照如下修改Person类的定义以后
class Person(name: String, age: Int) extends Serializable

// 结果:
// (1,com.yuchaoshui.Person@66102630,31,node3)
// (2,com.yuchaoshui.Person@4894fa21,31,node2)
// (3,com.yuchaoshui.Person@657f2a02,31,node2)
// (4,com.yuchaoshui.Person@2fbc16a,31,node3)
// (5,com.yuchaoshui.Person@64cb6ea5,31,node2)
// (6,com.yuchaoshui.Person@7f8ea18b,31,node3)

三. 序列化总结

如果在 Driver 初始化了一个 object 或者 new 了一个 class 实例,然后在函数中使用,必须实现序列化接口。

  • 在一个 executor 内部,多个 task 会共用同一个 object,因为 object 是一个单列对象,一个进程只能有一个。
  • 在一个 executor 内部,每个 task 会拥有自己的实例(new 的 class),一个task独享一个实例。
🔗

文章推荐