首页 > 编程学习 > RDD 持久化

文章目录

  • RDD 持久化
    • 1、RDD Cache 缓存
    • 2、持久化的作用
    • 3、RDD ChechPoint 检查点
      • 1) 说明
      • 2) 代码示例
    • 4、Cache & Persist & Checkpoint 区别

RDD 持久化

1、RDD Cache 缓存

RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 算子时,该RDD将会被缓存计算节点的内存中,并供后面重新用。
RDD不存储数,如果一个RDD要重复使用,那么需要从头再次执行来获取数据,RDD对象是可以重用的,但是数据无法重用。所以要是想重用那么可以使用,cache() 方法是缓存在内存中,persist()方法可以选择是保存在文件磁盘或者内存中。
persist()方法的存储的级别

级别使用的空间CPU时间是否在内存中是否在磁盘上备注
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK中等部分部分如果数据在内存中放不下,则溢写到磁盘上
MEMORY_AND_DISK_SER部分部分如果数据在内存中放不下,则溢写到磁盘上。在内存中存放序列化后的数据
DISK_ONLY
缓存有可能丢失,或者存储于内存的数据由于内存不是而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个 Partition 是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部 Partition。

注意:持久化操作必须是在行动算子执行时完成的,因为只有行动算子在执行时,才会有数据,不然数据都没有,要这持久化干什么。

package com.atguigu.bigdata.spark.core.wc.Depimport org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}//这个是说的持久化,由于内容比较少,难得键一个新的包了
class Spark02_RDD_chijiu {}
object Spark02_RDD_chijiu{def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")val context = new SparkContext(conf)val rdd = context.makeRDD(List("hello word", "Hello word", "Hello scala"))val flatRDD = rdd.flatMap(_.split(" "))val mapRDD = flatRDD.map(num => {println("@@@@@@@@@@@@@@@@@@")(num, 1)})//持久化操作必须是在行动算子执行时完成的//mapRDD.cache() // cache()进行缓存,后面RDD执行就不会重头开始了mapRDD.persist(StorageLevel.DISK_ONLY) // cache() 是放在内存中,persist是放在文件中 这两个方法完全一样val result = mapRDD.reduceByKey(_ + _)result.collect().foreach(println)println("================")val result2 = mapRDD.groupByKey()result2.collect().foreach(println)context.stop()}
}

2、持久化的作用

RDD对象的持久化操作不一定是为了重用,在数据执行较长,或数据比较重要的场和也可以采用持久化操作。

3、RDD ChechPoint 检查点

所谓的检查点其实就是通过将RDD中间结果写入磁盘,由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。对RDD进行 chechpoint 操作并不会马上被执行,必须执行 Action 操作才能触发。

1) 说明

checkpoint 需要落盘,需要指定检查点保存路径,检查点路径保存的文件,当作业执行完毕后,不会被删除,一般保存路径都是在分布式存储系统:比如说hdfs。checkpoint 有个路径参数,需要有一个保存检查点的文件路径,SparkContext.setCheckpointDir()使用这个方法来保存路径,里面的参数是路径。
它的使用方法和上面的那个持久化的位置是一样的,他们的功能也是比较像,都是将数据保存下来,然后要重新执行RDD,可以直接从这里开始。

2) 代码示例

package com.atguigu.bigdata.spark.core.wc.Depimport org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel// 持久化的检查点
class Spark02_RDD_chijiu2 {}
object Spark02_RDD_chijiu2{def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")val context = new SparkContext(conf)context.setCheckpointDir("cp") // setCheckpointDir()这个是下面的检查点的保存路径val rdd = context.makeRDD(List("hello word", "Hello word", "Hello scala"))val flatRDD = rdd.flatMap(_.split(" "))val mapRDD = flatRDD.map(num => {println("@@@@@@@@@@@@@@@@@@")(num, 1)})//checkpoint 需要落盘,需要指定检查点保存路径//检查点路径保存的文件,当作业执行完毕后,不会被删除//一般保存路径都是在分布式存储系统:比如说hdfsmapRDD.checkpoint() //检查点val result = mapRDD.reduceByKey(_ + _)result.collect().foreach(println)println("================")val result2 = mapRDD.groupByKey()result2.collect().foreach(println)context.stop()}
}

4、Cache & Persist & Checkpoint 区别

cache:将数据临时存储在内存中进行数据重用。会在血缘关系中添加新的依赖。一旦出现问题,可以从头读取数据。
persist:将数据临时存储在磁盘文件中,进行数据重用,涉及到磁盘IO,性能较低,但是数据安全,如果作业执行完毕,临时保存的数据文件就会丢失。
checkpoint:将数据长久的保存在磁盘文件中进行数据重用,涉及到磁盘IO,性能较低,但是数据安全,为了保证数据安全,所以一般情况下,会独立执行作业,为了能够提高效率,一般情况下,是需要和cache联合使用,先cache 然后再行执行 checkpoint 操作。执行过程中,会切断血缘关系,重新建立新的血缘关系。
注意:checkpoint 等同于改变数据源,相当于检查点作为了新的数据源。所以直接切断了之前的血缘关系,重新开了一脉,自己这里开始。

package com.atguigu.bigdata.spark.core.wc.Depimport org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel// 持久化的检查点
class Spark02_RDD_chijiu2 {}
object Spark02_RDD_chijiu2{def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")val context = new SparkContext(conf)context.setCheckpointDir("cp") // setCheckpointDir()这个是下面的检查点的保存路径val rdd = context.makeRDD(List("hello word", "Hello word", "Hello scala"))val flatRDD = rdd.flatMap(_.split(" "))val mapRDD = flatRDD.map(num => {println("@@@@@@@@@@@@@@@@@@")(num, 1)})//checkpoint 需要落盘,需要指定检查点保存路径//检查点路径保存的文件,当作业执行完毕后,不会被删除//一般保存路径都是在分布式存储系统:比如说hdfsmapRDD.cache().checkpoint()//mapRDD.checkpoint() //检查点val result = mapRDD.reduceByKey(_ + _)result.collect().foreach(println)println("================")val result2 = mapRDD.groupByKey()result2.collect().foreach(println)context.stop()}
}

本文链接:https://www.ngui.cc/article/show-862280.html
Copyright © 2010-2022 ngui.cc 版权所有 |关于我们| 联系方式| 豫B2-20100000