已复制
全屏展示
复制代码

Spark task多线程资源争抢问题

· 1 min read

实际问题

在多个task同时读取 object 单例的内部非线程安全的对象时会报错,示例代码如下:

package org.example

import org.apache.spark.{SparkConf, SparkContext}
import java.text.SimpleDateFormat
import org.apache.commons.lang3.time.FastDateFormat

object DateUtils {

  // 这个对象是线程不安全的,但是在task里面的多个线程会共用这个对象所有可能会有问题
  val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

  def parse(dateString: String) = {
    sdf.parse(dateString)
  }
}

object WordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("yzyApp").setMaster("local[*]")
    val sc = new SparkContext(conf)

    // 文件/tmp/data.txt中,一行一个时间字符串,数据尽量多一些,比如 2023-08-08 17:14:02
    val nums = sc.textFile("/tmp/data.txt")
    nums.map(line => {
      (line, DateUtils.parse(line))
    }).foreach(println)
  }
}

解决方法1

将线程不安全的换成线程安全的对象即可

val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

换成

val sdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")

解决方法2

使用mapPartitions,同一个partition使用同一个对象,不会出现争抢问题。

package org.example

import org.apache.spark.{SparkConf, SparkContext}
import java.text.SimpleDateFormat

object WordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("yzyApp").setMaster("local[*]")
    val sc = new SparkContext(conf)

    // 文件/tmp/data.txt中,一行一个时间字符串,数据尽量多一些,比如 2023-08-08 17:14:02
    val nums = sc.textFile("/tmp/data.txt")
    nums.mapPartitions(it => {
      
      // 这个对象是线程不安全的,但是在单个 partition 顺序执行的不会出现争抢问题
      val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

      it.map(line => {
        (line, sdf.parse(line))
      })
    }).foreach(println)

  }
}
🔗

文章推荐