文章目录
- RDD 持久化
- 1、RDD Cache 缓存
- 2、持久化的作用
- 3、RDD ChechPoint 检查点
- 1) 说明
- 2) 代码示例
- 4、Cache & Persist & Checkpoint 区别
RDD 持久化
1、RDD Cache 缓存
RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 算子时,该RDD将会被缓存计算节点的内存中,并供后面重新用。
RDD不存储数,如果一个RDD要重复使用,那么需要从头再次执行来获取数据,RDD对象是可以重用的,但是数据无法重用。所以要是想重用那么可以使用,cache()
方法是缓存在内存中,persist()
方法可以选择是保存在文件磁盘或者内存中。
persist()方法的存储的级别:
级别 | 使用的空间 | CPU时间 | 是否在内存中 | 是否在磁盘上 | 备注 |
---|---|---|---|---|---|
MEMORY_ONLY | 高 | 低 | 是 | 否 | |
MEMORY_ONLY_SER | 低 | 高 | 是 | 否 | |
MEMORY_AND_DISK | 高 | 中等 | 部分 | 部分 | 如果数据在内存中放不下,则溢写到磁盘上 |
MEMORY_AND_DISK_SER | 低 | 高 | 部分 | 部分 | 如果数据在内存中放不下,则溢写到磁盘上。在内存中存放序列化后的数据 |
DISK_ONLY | 低 | 高 | 否 | 是 | |
缓存有可能丢失,或者存储于内存的数据由于内存不是而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个 Partition 是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部 Partition。 |
注意:持久化操作必须是在行动算子执行时完成的,因为只有行动算子在执行时,才会有数据,不然数据都没有,要这持久化干什么。
package com.atguigu.bigdata.spark.core.wc.Depimport org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}//这个是说的持久化,由于内容比较少,难得键一个新的包了
class Spark02_RDD_chijiu {}
object Spark02_RDD_chijiu{def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")val context = new SparkContext(conf)val rdd = context.makeRDD(List("hello word", "Hello word", "Hello scala"))val flatRDD = rdd.flatMap(_.split(" "))val mapRDD = flatRDD.map(num => {println("@@@@@@@@@@@@@@@@@@")(num, 1)})//持久化操作必须是在行动算子执行时完成的//mapRDD.cache() // cache()进行缓存,后面RDD执行就不会重头开始了mapRDD.persist(StorageLevel.DISK_ONLY) // cache() 是放在内存中,persist是放在文件中 这两个方法完全一样val result = mapRDD.reduceByKey(_ + _)result.collect().foreach(println)println("================")val result2 = mapRDD.groupByKey()result2.collect().foreach(println)context.stop()}
}
2、持久化的作用
RDD对象的持久化操作不一定是为了重用,在数据执行较长,或数据比较重要的场和也可以采用持久化操作。
3、RDD ChechPoint 检查点
所谓的检查点其实就是通过将RDD中间结果写入磁盘,由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。对RDD进行 chechpoint 操作并不会马上被执行,必须执行 Action 操作才能触发。
1) 说明
checkpoint 需要落盘,需要指定检查点保存路径,检查点路径保存的文件,当作业执行完毕后,不会被删除,一般保存路径都是在分布式存储系统:比如说hdfs。checkpoint 有个路径参数,需要有一个保存检查点的文件路径,SparkContext.setCheckpointDir()
使用这个方法来保存路径,里面的参数是路径。
它的使用方法和上面的那个持久化的位置是一样的,他们的功能也是比较像,都是将数据保存下来,然后要重新执行RDD,可以直接从这里开始。
2) 代码示例
package com.atguigu.bigdata.spark.core.wc.Depimport org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel// 持久化的检查点
class Spark02_RDD_chijiu2 {}
object Spark02_RDD_chijiu2{def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")val context = new SparkContext(conf)context.setCheckpointDir("cp") // setCheckpointDir()这个是下面的检查点的保存路径val rdd = context.makeRDD(List("hello word", "Hello word", "Hello scala"))val flatRDD = rdd.flatMap(_.split(" "))val mapRDD = flatRDD.map(num => {println("@@@@@@@@@@@@@@@@@@")(num, 1)})//checkpoint 需要落盘,需要指定检查点保存路径//检查点路径保存的文件,当作业执行完毕后,不会被删除//一般保存路径都是在分布式存储系统:比如说hdfsmapRDD.checkpoint() //检查点val result = mapRDD.reduceByKey(_ + _)result.collect().foreach(println)println("================")val result2 = mapRDD.groupByKey()result2.collect().foreach(println)context.stop()}
}
4、Cache & Persist & Checkpoint 区别
cache:将数据临时存储在内存中进行数据重用。会在血缘关系中添加新的依赖。一旦出现问题,可以从头读取数据。
persist:将数据临时存储在磁盘文件中,进行数据重用,涉及到磁盘IO,性能较低,但是数据安全,如果作业执行完毕,临时保存的数据文件就会丢失。
checkpoint:将数据长久的保存在磁盘文件中进行数据重用,涉及到磁盘IO,性能较低,但是数据安全,为了保证数据安全,所以一般情况下,会独立执行作业,为了能够提高效率,一般情况下,是需要和cache
联合使用,先cache 然后再行执行 checkpoint 操作。执行过程中,会切断血缘关系,重新建立新的血缘关系。
注意:checkpoint 等同于改变数据源,相当于检查点作为了新的数据源。所以直接切断了之前的血缘关系,重新开了一脉,自己这里开始。
package com.atguigu.bigdata.spark.core.wc.Depimport org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel// 持久化的检查点
class Spark02_RDD_chijiu2 {}
object Spark02_RDD_chijiu2{def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")val context = new SparkContext(conf)context.setCheckpointDir("cp") // setCheckpointDir()这个是下面的检查点的保存路径val rdd = context.makeRDD(List("hello word", "Hello word", "Hello scala"))val flatRDD = rdd.flatMap(_.split(" "))val mapRDD = flatRDD.map(num => {println("@@@@@@@@@@@@@@@@@@")(num, 1)})//checkpoint 需要落盘,需要指定检查点保存路径//检查点路径保存的文件,当作业执行完毕后,不会被删除//一般保存路径都是在分布式存储系统:比如说hdfsmapRDD.cache().checkpoint()//mapRDD.checkpoint() //检查点val result = mapRDD.reduceByKey(_ + _)result.collect().foreach(println)println("================")val result2 = mapRDD.groupByKey()result2.collect().foreach(println)context.stop()}
}
本文链接:https://www.ngui.cc/article/show-862280.html