Spark算子详解之reduceByKey_sample_take_takeSample_distinct_sortByKey_saveAsTextFile_intersection
2018-01-04 20:40
561 查看
XML Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 | package com.lyzx.day16 import org.apache.spark.{SparkContext, SparkConf} class T2 { /** * reduceByKey = groupByKey + reduce *从功能上讲相当于先做GroupByKey然后再做reduce操作但是比groupByKey+reduce操作效率要高 * 即对于键值对的RDD来说,应用于所有的key相同的值做操作 * * 由于reduceByKey是先做groupByKey后做reduce * 所以它包含shuffle操作(groupByKey包含shuffle操作) *前面说过 * 功能上:reduceByKey = groupByKey + reduce * 效率上:reduceByKey > groupByKey + reduce * 原因是reduceByKey在map端自带Combiner * 例如wordCount例子中map端 * [("java",(1,1,1,1)),("c",(1,1,1,1))] * 如果在map做Combiner就像[("java",(4)),("c",(4))] * 在reduce端fatch时效率会高 */ def f1(sc:SparkContext): Unit ={ val arr = List(1,2,3,4,5,5,4,3,2,1) //rdd中的数据如下[(1,2,3,4,5,5,4,3,2,1)] val rdd = sc.parallelize(arr) //mapRdd中的数据如下[((1,1),(2,2),(3,3),...(5,5),....,(1,1))] val mapRdd = rdd.map(item=>(item,item*10)) val reduceRdd = mapRdd.reduceByKey(_+_) reduceRdd.foreach(println) } /** * sample方法用于随机的采样即随机的从rdd中取出一定个数的数量 * 参数1 Boolean 取样的方式 是不是取完后放回去再取 * 参数2 Double 取多少 * 参数3 Double 种子 */ def f2(sc:SparkContext): Unit ={ val arr = List(1,2,3,4,5,6,7,8,9,10) val rdd = sc.parallelize(arr) val sampleRdd = rdd.sample(false,0.5) sampleRdd.foreach(println) } /** * take 取出前N个元素 */ def f3(sc:SparkContext): Unit ={ val arr = (1 to 10) val rdd = sc.parallelize(arr) val list = rdd.take(4) list.foreach(println) } /** * takeSample 随机的取N个元素 * 参数1 随机取元素时要不要放回去再取 * 参数2 随机取几个 */ def f4(sc:SparkContext): Unit ={ val arr = (1 to 10) val rdd = sc.parallelize(arr) val list = rdd.takeSample(true,2) list.foreach(println) } /** *distinct是去重功能 * 有shuffle操作 * 其内部实现是通过reduceByKey实现 */ def f5(sc:SparkContext): Unit ={ val arr1 = List(1 to 10) val arr2 = List(1 to 10) val rdd = sc.parallelize(arr1.++(arr2)) rdd.foreach(println) println("=================================") rdd.distinct().foreach(println) } /** sortByKey *通过对键排序 */ def f6(sc:SparkContext): Unit ={ val rdd = sc.parallelize((1 to 10)) val mapRdd = rdd.map(item=>(item,item)) val sortRdd = mapRdd.sortByKey(false) sortRdd.foreach(println) } /** * 把计算结果保存为一个文本文件 * 传入的路径必须是不存在否则会抛异常,试想一下如果计算了很久的结果被覆盖这会有多酸爽啊~~~ * @param sc */ def f7(sc:SparkContext): Unit ={ val rdd = sc.parallelize((1 to 10)) val mapRdd = rdd.map(item=>(item,item)) mapRdd.saveAsTextFile("./k") } /** * intersection 交点 * 求出两个RDD的交集 * @param sc */ def f8(sc:SparkContext): Unit ={ val rdd1 = sc.parallelize((1 to 10)) val rdd2 = sc.parallelize((5 to 15)) rdd1.intersection(rdd2).foreach(println) } } object T2{ def main(args: Array[String]) { val conf = new SparkConf().setAppName("myTest").setMaster("local") val sc = new SparkContext(conf) val t = new T2() // t.f1(sc) // t.f2(sc) // t.f3(sc) // t.f4(sc) // t.f5(sc) // t.f6(sc) // t.f7(sc) t.f8(sc) sc.stop() } } |
相关文章推荐
- Spark代码3之Action:reduce,reduceByKey,sorted,lookup,take,saveAsTextFile
- spark RDD算子(六)之键值对聚合操作reduceByKey,foldByKey,排序操作sortByKey
- Spark算子[13]:sortByKey、sortBy、二次排序 源码实例详解
- Spark编程之基本的RDD算子之glom,substract,substractByKey,intersection,distinct,union
- Spark算子[18]:saveAsTextFile、saveAsObjectFile 源码实例详解
- Spark groupByKey,reduceByKey,sortByKey算子的区别
- Spark算子[12]:groupByKey、cogroup、join、lookup 源码实例详解
- Spark算子--reduceByKey
- Spark: sortBy和sortByKey函数详解
- 大数据:Spark 算子(一)排序算子sortByKey来看大数据平台下如何做排序
- Spark groupByKey、sortByKey、reduceByKey Java实现
- spark中算子详解:combineByKey
- Spark算子:RDD行动Action操作(5)–saveAsTextFile、saveAsSequenceFile、saveAsObjectFile
- 大数据Spark “蘑菇云”行动第40课:Spark编程实战之aggregateByKey、reduceByKey、groupByKey、sortByKey深度解密
- Spark RDD中Transformation的combineByKey、reduceByKey,join详解
- Spark算子:RDD行动Action操作(5)–saveAsTextFile、saveAsSequenceFile、saveAsObjectFile
- Spark算子[10]:foldByKey、fold 源码实例详解
- 深入理解Spark算子之 reduceByKey
- Spark API 详解/大白话解释 之 reduce、reduceByKey
- spark中算子详解:aggregateByKey