【Spark篇】---Spark中控制算子
2018-02-09 16:49
113 查看
一、前述
Spark中控制算子也是懒执行的,需要Action算子触发才能执行,主要是为了对数据进行缓存。
控制算子有三种,cache,persist,checkpoint,以上算子都可以将RDD持久化,持久化的单位是partition。cache和persist都是懒执行的。必须有一个action类算子触发执行。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。
二、具体算子
1、 cache 默认将RDD的数据持久化到内存中。cache是懒执行。chche () = persist()=persist(StorageLevel.Memory_Only) SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("CacheTest");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> lines = jsc.textFile("./NASA_access_log_Aug95");
lines = lines.cache();
long startTime = System.currentTimeMillis();
long count = lines.count();//count是action算子,到这里才能触发cache执行,所以这一次coun加载是从磁盘读数据,然后拉回到drive端。
long endTime = System.currentTimeMillis();
System.out.println("共"+count+ "条数据,"+"初始化时间+cache时间+计算时间="+
(endTime-startTime));
long countStartTime = System.currentTimeMillis();
long countrResult = lines.count();//这一次是从内存种中读数据
long countEndTime = System.currentTimeMillis();
System.out.println("共"+countrResult+ "条数据,"+"计算时间="+ (countEndTime-
countStartTime));
jsc.stop();
2、persist([b]可以指定持久化的级别)[/b]
解释:
1、MEMORY_AND_DISK 意思是先往内存中放数据,内存不够再放磁盘
2、最常用的是MEMORY_ONLY和MEMORY_AND_DISK。”_2”表示有副本数。
3、选择的原则是:首先考虑内存,然后考虑序列化之后再放入内存,最后考虑内存加磁盘。
4、尽量避免使用“_2”和DISK_ONLY级别。
5、deserialized是不序列化的意思。
注意事项:
1、cache和persist都是懒执行,必须有一个action类算子触发执行。
2、cache和persist算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了。持久化的单位是partition。
3、cache和persist算子后不能立即紧跟action算子。
错误:
rdd.cache().count() 返回的不是持久化的RDD,而是一个数值了
3、 Checkpoint(对Lineage非常长时使用)
1、概念和特征:
不仅可以将数据持久化到磁盘,还可以切断RDD之间的依赖关系,checkpoint也是懒执行。 Checkpoin不仅存储结果,还会存储逻辑,还可以存储元数据。 Persisit切断不了RDD的依赖关系。 2、checkpoint 的执行原理:
2.1.Spark job执行完之后,spark会从finalRDD从后往前回溯。
2.2.当回溯到对某个RDD进行了checkpoint,会对这个RDD标记。
2.3.回溯完成之后,Spark会重新计算标记RDD的结果,然后将结果保存到Checkpint目录中。
3、优化checekpoint
因为最后是要触发当前application的action算子,所以在触发之前加一层cache操作,一样会往前执行cache操作,实现对数据的cache ,所以考虑将cache优化到checkpoin的优化流程里。
对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job(回溯完成之后重新开的job)只需要将内存中的数据(cache缓存好的checkpoint那个点的数据)拷贝到HDFS上就可以。
省去了重新计算这一步,不需要重头开始来走到checkpoint这个点了。
总结:
持久化的最小单位是partition!!!
Spark中控制算子也是懒执行的,需要Action算子触发才能执行,主要是为了对数据进行缓存。
控制算子有三种,cache,persist,checkpoint,以上算子都可以将RDD持久化,持久化的单位是partition。cache和persist都是懒执行的。必须有一个action类算子触发执行。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。
二、具体算子
1、 cache 默认将RDD的数据持久化到内存中。cache是懒执行。chche () = persist()=persist(StorageLevel.Memory_Only) SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("CacheTest");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> lines = jsc.textFile("./NASA_access_log_Aug95");
lines = lines.cache();
long startTime = System.currentTimeMillis();
long count = lines.count();//count是action算子,到这里才能触发cache执行,所以这一次coun加载是从磁盘读数据,然后拉回到drive端。
long endTime = System.currentTimeMillis();
System.out.println("共"+count+ "条数据,"+"初始化时间+cache时间+计算时间="+
(endTime-startTime));
long countStartTime = System.currentTimeMillis();
long countrResult = lines.count();//这一次是从内存种中读数据
long countEndTime = System.currentTimeMillis();
System.out.println("共"+countrResult+ "条数据,"+"计算时间="+ (countEndTime-
countStartTime));
jsc.stop();
2、persist([b]可以指定持久化的级别)[/b]
解释:
1、MEMORY_AND_DISK 意思是先往内存中放数据,内存不够再放磁盘
2、最常用的是MEMORY_ONLY和MEMORY_AND_DISK。”_2”表示有副本数。
3、选择的原则是:首先考虑内存,然后考虑序列化之后再放入内存,最后考虑内存加磁盘。
4、尽量避免使用“_2”和DISK_ONLY级别。
5、deserialized是不序列化的意思。
注意事项:
1、cache和persist都是懒执行,必须有一个action类算子触发执行。
2、cache和persist算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了。持久化的单位是partition。
3、cache和persist算子后不能立即紧跟action算子。
错误:
rdd.cache().count() 返回的不是持久化的RDD,而是一个数值了
3、 Checkpoint(对Lineage非常长时使用)
1、概念和特征:
不仅可以将数据持久化到磁盘,还可以切断RDD之间的依赖关系,checkpoint也是懒执行。 Checkpoin不仅存储结果,还会存储逻辑,还可以存储元数据。 Persisit切断不了RDD的依赖关系。 2、checkpoint 的执行原理:
2.1.Spark job执行完之后,spark会从finalRDD从后往前回溯。
2.2.当回溯到对某个RDD进行了checkpoint,会对这个RDD标记。
2.3.回溯完成之后,Spark会重新计算标记RDD的结果,然后将结果保存到Checkpint目录中。
3、优化checekpoint
因为最后是要触发当前application的action算子,所以在触发之前加一层cache操作,一样会往前执行cache操作,实现对数据的cache ,所以考虑将cache优化到checkpoin的优化流程里。
对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job(回溯完成之后重新开的job)只需要将内存中的数据(cache缓存好的checkpoint那个点的数据)拷贝到HDFS上就可以。
省去了重新计算这一步,不需要重头开始来走到checkpoint这个点了。
总结:
持久化的最小单位是partition!!!
相关文章推荐
- Spark独到见解--3控制算子
- 【Spark篇】---SparkStreaming算子操作transform和updateStateByKey
- 【Spark篇】---Spark中Action算子
- Spark 常用算子
- Spark算子[09]:aggregateByKey、aggregate详解
- SparkRDDAPI常用算子说明
- Spark:控制日志输出级别
- Hive压缩和spark算子,RDD总结
- 【SPARK】基础知识(RDD两大类算子常见函数)
- Spark算子--RDD的基本转换
- java 写的spark 控制hbase
- spark 算子
- Spark算子使用示例
- 3.1 Spark-RDD算子分类
- spark RDD算子学习(基本命令)
- Spark算子:RDD基本转换操作(5)–mapPartitions/mapPartitionsWithIndex
- Spark算子篇 --Spark算子之combineByKey详解
- spark 算子之RDD
- Spark算子:RDD基本转换操作(2)–coalesce、repartition
- Spark算子--coalesce和repartition