[1.5] RDD经典Action类算子实战解读
2016-10-09 18:57
155 查看
场景
常用action类算子的用法举例分析
spark中常用的action类算子有:saveAsTextFile、reduce、count、 collect、foreach 以及 take 等,这里以 saveAsTextFile、collect与foreach算子的用法为例加以详细说明。
saveAsTextFile:将rdd中的数据以文本形式保存下来,制定的文件目录不存在则创建,存在的话抛异常(文件名如:part-00000 、part-00001等)
collect: 在本地节点上遍历rdd中的元素(不建议这么做:
1、通过网络从远程主机上拉取数据到本地,性能不高 2、RDD中数据量大的话可能出现OOM)
foreach:在远程集群上遍历rdd中的元素
实验
package cool.pengych.bigdata.spark.core; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; import java.util.Arrays; import java.util.List; /** * Created by 彭宇成 on 2016/10/9. * action 类算子实战解读: * saveAsTextFile、reduce、count * collect、foreach、take */ public class ActionOps { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("action ops"); JavaSparkContext jsc = new JavaSparkContext(conf); List<Integer> nums = Arrays.asList(1,3,5,7,9); JavaRDD<Integer> numsRDD = jsc.parallelize(nums); JavaRDD<Integer> mapedRDD = numsRDD.map(new Function<Integer, Integer>() { @Override public Integer call(Integer num) { return num * 2 ; } }); /* * saveAsTextFile:将数据保存到hdfs上 */ mapedRDD.saveAsTextFile("hdfs://mvxl2766:8020/user/hive/pengyc"); System.out.println("=== success to save data to hdfs ==="); /* * foreach : 在远程集群上遍历rdd中的元素 */ System.out.println("=== foreach test ==="); mapedRDD.foreach(new VoidFunction<Integer>() { @Override public void call(Integer integer) throws Exception { System.out.println(integer); } }); /* * collect : 在本地节点上遍历rdd中的元素(不建议这么做:1、通过网络从远程主机上拉取数据到本地,性能不高 2、RDD中数据量大的话可能出现OOM) */ System.out.println("=== collect test ==="); List<Integer> integers = mapedRDD.collect(); for (Integer num: integers) { System.out.println(num); } jsc.close(); } }
执行脚本
/opt/cloudera/parcels/CDH/lib/spark/bin/spark-submit \ --class cool.pengych.bigdata.spark.core.ActionOps \ --master yarn \ --deploy-mode client \ /var/lib/hive/pengyc/jobs/action-ops.jar \
执行结果
16/10/09 18:06:28 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 16/10/09 18:06:28 INFO scheduler.DAGScheduler: Job 0 finished: saveAsTextFile at ActionOps.java:38, took 8.077504 s === success to save data to hdfs === === foreach test === 16/10/09 18:06:28 INFO spark.SparkContext: Starting job: foreach at ActionOps.java:46 16/10/09 18:06:28 INFO scheduler.DAGScheduler: Got job 1 (foreach at ActionOps.java:46) with 2 output partitions 16/10/09 18:06:28 INFO scheduler.DAGScheduler: Final stage: ResultStage 1 (foreach at ActionOps.java:46) 16/10/09 18:06:28 INFO scheduler.DAGScheduler: Parents of final stage: List() 16/10/09 18:06:28 INFO scheduler.DAGScheduler: Missing parents: List() 16/10/09 18:06:28 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[1] at map at ActionOps.java:26), which has no missing parents 16/10/09 18:06:28 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.3 KB, free 99.4 KB) 16/10/09 18:06:28 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1430.0 B, free 100.8 KB) 16/10/09 18:06:28 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.16.74.36:43858 (size: 1430.0 B, free: 530.3 MB) 16/10/09 18:06:28 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006 16/10/09 18:06:28 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD[1] at map at ActionOps.java:26) 16/10/09 18:06:28 INFO cluster.YarnScheduler: Adding task set 1.0 with 2 tasks 16/10/09 18:06:28 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, mvxl2769, partition 0,PROCESS_LOCAL, 2071 bytes) 16/10/09 18:06:28 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, mvxl2770, partition 1,PROCESS_LOCAL, 2073 bytes) 16/10/09 18:06:28 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on mvxl2769:36941 (size: 1430.0 B, free: 530.3 MB) 16/10/09 18:06:28 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on mvxl2770:51601 (size: 1430.0 B, free: 530.3 MB) 16/10/09 18:06:28 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 96 ms on mvxl2769 (1/2) 16/10/09 18:06:28 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 99 ms on mvxl2770 (2/2) 16/10/09 18:06:28 INFO scheduler.DAGScheduler: ResultStage 1 (foreach at ActionOps.java:46) finished in 0.107 s 16/10/09 18:06:28 INFO cluster.YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool 16/10/09 18:06:28 INFO scheduler.DAGScheduler: Job 1 finished: foreach at ActionOps.java:46, took 0.125269 s === collect test === 16/10/09 18:06:28 INFO spark.SparkContext: Starting job: collect at ActionOps.java:59 16/10/09 18:06:28 INFO scheduler.DAGScheduler: Got job 2 (collect at ActionOps.java:59) with 2 output partitions 16/10/09 18:06:28 INFO scheduler.DAGScheduler: Final stage: ResultStage 2 (collect at ActionOps.java:59) 16/10/09 18:06:28 INFO scheduler.DAGScheduler: Parents of final stage: List() 16/10/09 18:06:28 INFO scheduler.DAGScheduler: Missing parents: List() 16/10/09 18:06:28 INFO scheduler.DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[1] at map at ActionOps.java:26), which has no missing parents 16/10/09 18:06:28 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.2 KB, free 103.0 KB) 16/10/09 18:06:28 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1394.0 B, free 104.3 KB) 16/10/09 18:06:28 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.16.74.36:43858 (size: 1394.0 B, free: 530.3 MB) 16/10/09 18:06:28 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006 16/10/09 18:06:28 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 2 (MapPartitionsRDD[1] at map at ActionOps.java:26) 16/10/09 18:06:28 INFO cluster.YarnScheduler: Adding task set 2.0 with 2 tasks 16/10/09 18:06:28 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 4, mvxl2769, partition 0,PROCESS_LOCAL, 2071 bytes) 16/10/09 18:06:28 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 2.0 (TID 5, mvxl2770, partition 1,PROCESS_LOCAL, 2073 bytes) 16/10/09 18:06:28 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on mvxl2769:36941 (size: 1394.0 B, free: 530.3 MB) 16/10/09 18:06:28 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on mvxl2770:51601 (size: 1394.0 B, free: 530.3 MB) 16/10/09 18:06:28 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 2.0 (TID 5) in 67 ms on mvxl2770 (1/2) 16/10/09 18:06:29 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 2.0 (TID 4) in 78 ms on mvxl2769 (2/2) 16/10/09 18:06:29 INFO scheduler.DAGScheduler: ResultStage 2 (collect at ActionOps.java:59) finished in 0.080 s 16/10/09 18:06:29 INFO cluster.YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool 16/10/09 18:06:29 INFO scheduler.DAGScheduler: Job 2 finished: collect at ActionOps.java:59, took 0.095753 s 2 6 10 14 18 16/10/09 18:06:29 INFO ui.SparkUI: Stopped Spark web UI at http://10.16.74.36:4040 16/10/09 18:06:29 INFO cluster.YarnClientSchedulerBackend: Shutting down all executors 16/10/09 18:06:29 INFO cluster.YarnClientSchedulerBackend: Interrupting monitor thread 16/10/09 18:06:29 INFO cluster.YarnClientSchedulerBackend: Asking each executor to shut down 16/10/09 18:06:29 INFO cluster.YarnClientSchedulerBackend: Stopped 16/10/09 18:06:29 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 16/10/09 18:06:29 INFO storage.MemoryStore: MemoryStore cleared 16/10/09 18:06:29 INFO storage.BlockManager: BlockManager stopped 16/10/09 18:06:29 INFO storage.BlockManagerMaster: BlockManagerMaster stopped 16/10/09 18:06:29 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 16/10/09 18:06:29 INFO spark.SparkContext: Successfully stopped SparkContext 16/10/09 18:06:29 INFO util.ShutdownHookManager: Shutdown hook called 16/10/09 18:06:29 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 16/10/09 18:06:29 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-dd3ae8e9-6b1a-4a61-a261-c594382e57eb [hive@mvxl2766 sh]$ hdfs dfs -ls hdfs://mvxl2766:8020/user/hive/pengyc Found 3 items -rw-r--r-- 3 hive hive 0 2016-10-09 18:06 hdfs://mvxl2766:8020/user/hive/pengyc/_SUCCESS -rw-r--r-- 3 hive hive 4 2016-10-09 18:06 hdfs://mvxl2766:8020/user/hive/pengyc/part-00000 -rw-r--r-- 3 hive hive 9 2016-10-09 18:06 hdfs://mvxl2766:8020/user/hive/pengyc/part-00001 [hive@mvxl2766 sh]$ hdfs dfs -cat hdfs://mvxl2766:8020/user/hive/pengyc/_SUCCESS [hive@mvxl2766 sh]$ hdfs dfs -cat hdfs://mvxl2766:8020/user/hive/pengyc/part-00000 2 6 [hive@mvxl2766 sh]$ hdfs dfs -cat hdfs://mvxl2766:8020/user/hive/pengyc/part-00001 10 14 18 [hive@mvxl2766 sh]$
总结
1、一个action算子,触发一个 job : job0、job1与job22、保存到hdfs上的文件份数与位置是如何决定的呢?有待后续进一步学习spark作业与资源调度相关算法。本例中将输出文件保存到了mvxl2769 与mvxl2770上。
相关文章推荐
- 第127课: Spark Streaming源码经典解读系列之二:Spark Streaming生成RDD
- Spark RDD算子源码解读
- Spark笔记整理(四):Spark RDD算子实战
- 第8课:Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考
- Hadoop 实战经典视频教程
- 王家林亲授《DT大数据梦工厂》大数据实战视频“Scala深入浅出实战经典
- (版本定制)第8课:Spark Streaming源码解读之RDD生成生命周期彻底研究和思考
- 第8课:Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考
- Spark算子---实战应用
- Struts模块化编程经典实战教程(二)
- spark RDD算子(十)之PairRDD的Action操作countByKey, collectAsMap
- 【Java笔记】Java开发实战经典 - 第一章
- 第8课:Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考
- hadoop基础----hadoop实战(三)-----hadoop运行MapReduce---对单词进行统计--经典的自带例子wordcount
- Scala 深入浅出实战经典 第48讲:Scala类型约束代码实战及其在Spark中的应用源码解析
- Struts模块化编程经典实战教程(一)
- Spark编程之基本的RDD算子之fold,foldByKey,treeAggregate, treeReduce
- Samba经典案例完全解读(2009年3月最新)
- Scala 深入浅出实战经典 第3讲:Tuple、Array、Map与文件操作入门实战
- rsync经典实战