实际问题
在多个task同时读取 object 单例的内部非线程安全的对象时会报错,示例代码如下:
package org.example
import org.apache.spark.{SparkConf, SparkContext}
import java.text.SimpleDateFormat
import org.apache.commons.lang3.time.FastDateFormat
object DateUtils {
// 这个对象是线程不安全的,但是在task里面的多个线程会共用这个对象所有可能会有问题
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
def parse(dateString: String) = {
sdf.parse(dateString)
}
}
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("yzyApp").setMaster("local[*]")
val sc = new SparkContext(conf)
// 文件/tmp/data.txt中,一行一个时间字符串,数据尽量多一些,比如 2023-08-08 17:14:02
val nums = sc.textFile("/tmp/data.txt")
nums.map(line => {
(line, DateUtils.parse(line))
}).foreach(println)
}
}
解决方法1
将线程不安全的换成线程安全的对象即可
把
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
换成
val sdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
解决方法2
使用mapPartitions,同一个partition使用同一个对象,不会出现争抢问题。
package org.example
import org.apache.spark.{SparkConf, SparkContext}
import java.text.SimpleDateFormat
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("yzyApp").setMaster("local[*]")
val sc = new SparkContext(conf)
// 文件/tmp/data.txt中,一行一个时间字符串,数据尽量多一些,比如 2023-08-08 17:14:02
val nums = sc.textFile("/tmp/data.txt")
nums.mapPartitions(it => {
// 这个对象是线程不安全的,但是在单个 partition 顺序执行的不会出现争抢问题
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
it.map(line => {
(line, sdf.parse(line))
})
}).foreach(println)
}
}