您的位置:首页 > 其它

[1.5] RDD经典Action类算子实战解读

2016-10-09 18:57 155 查看

场景

常用action类算子的用法举例

分析

spark中常用的action类算子有:

saveAsTextFile、reduce、count、 collect、foreach 以及 take 等,这里以 saveAsTextFile、collect与foreach算子的用法为例加以详细说明。

saveAsTextFile:将rdd中的数据以文本形式保存下来,制定的文件目录不存在则创建,存在的话抛异常(文件名如:part-00000 、part-00001等)

collect: 在本地节点上遍历rdd中的元素(不建议这么做:

1、通过网络从远程主机上拉取数据到本地,性能不高 2、RDD中数据量大的话可能出现OOM)

foreach:在远程集群上遍历rdd中的元素

实验

package cool.pengych.bigdata.spark.core;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import java.util.Arrays;
import java.util.List;

/**
* Created by 彭宇成 on 2016/10/9.
* action 类算子实战解读:
* saveAsTextFile、reduce、count
* collect、foreach、take
*/
public class ActionOps
{
public static void main(String[] args)
{
SparkConf conf  = new SparkConf().setAppName("action ops");
JavaSparkContext jsc = new JavaSparkContext(conf);

List<Integer> nums = Arrays.asList(1,3,5,7,9);
JavaRDD<Integer> numsRDD = jsc.parallelize(nums);
JavaRDD<Integer> mapedRDD = numsRDD.map(new Function<Integer, Integer>()
{
@Override
public Integer call(Integer num)
{
return num * 2 ;
}
});
/*
* saveAsTextFile:将数据保存到hdfs上
*/
mapedRDD.saveAsTextFile("hdfs://mvxl2766:8020/user/hive/pengyc");
System.out.println("=== success to save data to hdfs  ===");

/*
* foreach : 在远程集群上遍历rdd中的元素
*/
System.out.println("=== foreach test  ===");
mapedRDD.foreach(new VoidFunction<Integer>()
{
@Override
public void call(Integer integer) throws Exception
{
System.out.println(integer);
}
});
/*
* collect : 在本地节点上遍历rdd中的元素(不建议这么做:1、通过网络从远程主机上拉取数据到本地,性能不高 2、RDD中数据量大的话可能出现OOM)
*/
System.out.println("=== collect test  ===");
List<Integer> integers = mapedRDD.collect();
for (Integer num:
integers)
{
System.out.println(num);
}

jsc.close();
}
}


执行脚本

/opt/cloudera/parcels/CDH/lib/spark/bin/spark-submit \
--class cool.pengych.bigdata.spark.core.ActionOps \
--master yarn \
--deploy-mode client \
/var/lib/hive/pengyc/jobs/action-ops.jar \


执行结果

16/10/09 18:06:28 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/10/09 18:06:28 INFO scheduler.DAGScheduler: Job 0 finished: saveAsTextFile at ActionOps.java:38, took 8.077504 s
=== success to save data to hdfs  ===
=== foreach test  ===
16/10/09 18:06:28 INFO spark.SparkContext: Starting job: foreach at ActionOps.java:46
16/10/09 18:06:28 INFO scheduler.DAGScheduler: Got job 1 (foreach at ActionOps.java:46) with 2 output partitions
16/10/09 18:06:28 INFO scheduler.DAGScheduler: Final stage: ResultStage 1 (foreach at ActionOps.java:46)
16/10/09 18:06:28 INFO scheduler.DAGScheduler: Parents of final stage: List()
16/10/09 18:06:28 INFO scheduler.DAGScheduler: Missing parents: List()
16/10/09 18:06:28 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[1] at map at ActionOps.java:26), which has no missing parents
16/10/09 18:06:28 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.3 KB, free 99.4 KB)
16/10/09 18:06:28 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1430.0 B, free 100.8 KB)
16/10/09 18:06:28 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.16.74.36:43858 (size: 1430.0 B, free: 530.3 MB)
16/10/09 18:06:28 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
16/10/09 18:06:28 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD[1] at map at ActionOps.java:26)
16/10/09 18:06:28 INFO cluster.YarnScheduler: Adding task set 1.0 with 2 tasks
16/10/09 18:06:28 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, mvxl2769, partition 0,PROCESS_LOCAL, 2071 bytes)
16/10/09 18:06:28 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, mvxl2770, partition 1,PROCESS_LOCAL, 2073 bytes)
16/10/09 18:06:28 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on mvxl2769:36941 (size: 1430.0 B, free: 530.3 MB)
16/10/09 18:06:28 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on mvxl2770:51601 (size: 1430.0 B, free: 530.3 MB)
16/10/09 18:06:28 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 96 ms on mvxl2769 (1/2)
16/10/09 18:06:28 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 99 ms on mvxl2770 (2/2)
16/10/09 18:06:28 INFO scheduler.DAGScheduler: ResultStage 1 (foreach at ActionOps.java:46) finished in 0.107 s
16/10/09 18:06:28 INFO cluster.YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool
16/10/09 18:06:28 INFO scheduler.DAGScheduler: Job 1 finished: foreach at ActionOps.java:46, took 0.125269 s
=== collect test  ===
16/10/09 18:06:28 INFO spark.SparkContext: Starting job: collect at ActionOps.java:59
16/10/09 18:06:28 INFO scheduler.DAGScheduler: Got job 2 (collect at ActionOps.java:59) with 2 output partitions
16/10/09 18:06:28 INFO scheduler.DAGScheduler: Final stage: ResultStage 2 (collect at ActionOps.java:59)
16/10/09 18:06:28 INFO scheduler.DAGScheduler: Parents of final stage: List()
16/10/09 18:06:28 INFO scheduler.DAGScheduler: Missing parents: List()
16/10/09 18:06:28 INFO scheduler.DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[1] at map at ActionOps.java:26), which has no missing parents
16/10/09 18:06:28 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.2 KB, free 103.0 KB)
16/10/09 18:06:28 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1394.0 B, free 104.3 KB)
16/10/09 18:06:28 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.16.74.36:43858 (size: 1394.0 B, free: 530.3 MB)
16/10/09 18:06:28 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006
16/10/09 18:06:28 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 2 (MapPartitionsRDD[1] at map at ActionOps.java:26)
16/10/09 18:06:28 INFO cluster.YarnScheduler: Adding task set 2.0 with 2 tasks
16/10/09 18:06:28 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 4, mvxl2769, partition 0,PROCESS_LOCAL, 2071 bytes)
16/10/09 18:06:28 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 2.0 (TID 5, mvxl2770, partition 1,PROCESS_LOCAL, 2073 bytes)
16/10/09 18:06:28 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on mvxl2769:36941 (size: 1394.0 B, free: 530.3 MB)
16/10/09 18:06:28 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on mvxl2770:51601 (size: 1394.0 B, free: 530.3 MB)
16/10/09 18:06:28 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 2.0 (TID 5) in 67 ms on mvxl2770 (1/2)
16/10/09 18:06:29 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 2.0 (TID 4) in 78 ms on mvxl2769 (2/2)
16/10/09 18:06:29 INFO scheduler.DAGScheduler: ResultStage 2 (collect at ActionOps.java:59) finished in 0.080 s
16/10/09 18:06:29 INFO cluster.YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool
16/10/09 18:06:29 INFO scheduler.DAGScheduler: Job 2 finished: collect at ActionOps.java:59, took 0.095753 s
2
6
10
14
18
16/10/09 18:06:29 INFO ui.SparkUI: Stopped Spark web UI at http://10.16.74.36:4040 16/10/09 18:06:29 INFO cluster.YarnClientSchedulerBackend: Shutting down all executors
16/10/09 18:06:29 INFO cluster.YarnClientSchedulerBackend: Interrupting monitor thread
16/10/09 18:06:29 INFO cluster.YarnClientSchedulerBackend: Asking each executor to shut down
16/10/09 18:06:29 INFO cluster.YarnClientSchedulerBackend: Stopped
16/10/09 18:06:29 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/10/09 18:06:29 INFO storage.MemoryStore: MemoryStore cleared
16/10/09 18:06:29 INFO storage.BlockManager: BlockManager stopped
16/10/09 18:06:29 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
16/10/09 18:06:29 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/10/09 18:06:29 INFO spark.SparkContext: Successfully stopped SparkContext
16/10/09 18:06:29 INFO util.ShutdownHookManager: Shutdown hook called
16/10/09 18:06:29 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/10/09 18:06:29 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-dd3ae8e9-6b1a-4a61-a261-c594382e57eb

[hive@mvxl2766 sh]$ hdfs dfs -ls hdfs://mvxl2766:8020/user/hive/pengyc
Found 3 items
-rw-r--r--   3 hive hive          0 2016-10-09 18:06 hdfs://mvxl2766:8020/user/hive/pengyc/_SUCCESS
-rw-r--r--   3 hive hive          4 2016-10-09 18:06 hdfs://mvxl2766:8020/user/hive/pengyc/part-00000
-rw-r--r--   3 hive hive          9 2016-10-09 18:06 hdfs://mvxl2766:8020/user/hive/pengyc/part-00001
[hive@mvxl2766 sh]$ hdfs dfs -cat hdfs://mvxl2766:8020/user/hive/pengyc/_SUCCESS
[hive@mvxl2766 sh]$ hdfs dfs -cat hdfs://mvxl2766:8020/user/hive/pengyc/part-00000
2
6
[hive@mvxl2766 sh]$ hdfs dfs -cat hdfs://mvxl2766:8020/user/hive/pengyc/part-00001
10
14
18
[hive@mvxl2766 sh]$


总结

1、一个action算子,触发一个 job : job0、job1与job2

2、保存到hdfs上的文件份数与位置是如何决定的呢?有待后续进一步学习spark作业与资源调度相关算法。本例中将输出文件保存到了mvxl2769 与mvxl2770上。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: