已复制
全屏展示
复制代码

spark rdd 持久化数据


· 2 min read

一. 简要概述

我们在操作 rdd 的过程中,避免多次计算同一个 rdd,我们可以使用spark提供的持久化功能,一个rdd若果已经持久化过了,就不需要再次计算了,直接从内存的持久化中获取结果就行,如果持久化的数据过多,内存放不下,spark会使用LRU策略把最老的缓存清除掉。

二. rdd.cache

  • cache 可以将 rdd 数据缓存到内存
  • 它是一个惰性求值,当执行 cache ,被标记该 rdd 要缓存,后续有一次 action 时才会缓存
  • 缓存是以分区为单位的,所以当内存不足时,有些分区没有缓存
  • 底层调用 persist

三. rdd.persist

  • presist 默认使用内存
  • 如有必要,可以通过在存储级别的末尾加上 _2 来把持久化数据存为两份
import org.apache.spark.storage.StorageLevel

val result = sc.parallelize(List(1,2,3,4)).map(x => x * x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
  • 存储级别列表
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

四. rdd.unpersist

  • RDD 还有一个方法叫作 unpersist(),调用该方法可以手动把持久化的RDD从缓存中移除
  • 可以传入bool参数,true表示同步,false表示异步。

五. rdd.checkPoint

  • 作用:避免数据丢失,可以将数据写入hdfs上,以便后续继续使用。
  • checkpoint也是惰性的,在有一次action以后才会checkpoint,将数据写入hdfs上
  • checkpoint以后得到ReliableCheckPointRDDData,它是一个rdd的引用
  • 如果hdfs上的checkpoint丢失,会直接报错(因为spark认为写入hdfs的数据是不会丢失的)。
  • 注意:如果需要很高的可靠性和速度,可以先调用rdd.cache,然后调用rdd.checkpoint,这样数据会在内存一份,hdfs一份,使用时会优先使用内存。
  • 它会额外触发一个 job,这个job的目的就是保存结果到hdfs。
// 必须提前设置目录
sc.setCheckpointDir("hdfs://xxxx:9000/checkpoint") 
val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9))
x.checkpoint()
x.count

文章推荐