spark 的transformations之bykey的区别
2016-09-23 15:53
309 查看
spark中提供的bykey相关的transforms有以下几个
这几个操作基本都需要是原始输入是(K,V).
这些都会引发shuffle操作
groupByKey的输入是(K,V),输出是(K,Iterable<V>)。
reduceByKey的输入是(K,V),输出也是(K,V)。对于相同的key的值,会执行func进行聚合。
sortByKey 会根据key进行排序。
arrgegateByKey (zeroValue)(seqOp,combOp) (K,V) => 对于每一个分区,执行sqlOp函数(zeroValue,V). combOp(K,V)会根据K值对V进行聚合
这几个操作基本都需要是原始输入是(K,V).
这些都会引发shuffle操作
groupByKey的输入是(K,V),输出是(K,Iterable<V>)。
reduceByKey的输入是(K,V),输出也是(K,V)。对于相同的key的值,会执行func进行聚合。
sortByKey 会根据key进行排序。
arrgegateByKey (zeroValue)(seqOp,combOp) (K,V) => 对于每一个分区,执行sqlOp函数(zeroValue,V). combOp(K,V)会根据K值对V进行聚合
package com.le.spark.scala.examples.Transformations import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions import org.apache.spark.rdd.RDD.rddToPairRDDFunctions object bykey { // transform: // map(func) Return a new distributed dataset formed by passing each element of the source through a function func. // filter(func) Return a new dataset formed by selecting those elements of the source on which func returns true. def main(args:Array[String]){ val sparkconf = new SparkConf().setAppName("test1").setMaster("local") val sc = new SparkContext(sparkconf) val list = List((1,1),(1,2),(1,3),(1,4),(2,6)) val rddmap = sc.parallelize(list, 4); // val rddmap = rdd.map (x=>(x,x)) // val list = List((0,1),(8,9),(2,5),(4,3),(6,7),(1,2),(1,4)) // // val rddmap = sc.parallelize(list, 1); //groupbykey: When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. val rddgroupbykey=rddmap.groupByKey() rddgroupbykey.foreach(x=>println(x._1+" "+x._2)) rddgroupbykey.foreach(x=>{val it=x._2.iterator; var c=0 ; while(it.hasNext) c=c+it.next() ; println(x._1+" "+c) }) val sortbykey= rddmap.sortByKey(true) sortbykey.foreach(x=>println("sort "+x._1+" :"+x._2)) val reducebykey = rddmap.reduceByKey((v1,v2)=>v1+v2); reducebykey.foreach(x=>println("reducebykey "+x._1+" :"+x._2)) //aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) //seqOp : 在每一个分区执行 //combOp: 在不同的分区之间执行 val aggregatebykey = rddmap.aggregateByKey("100")(((u,v)=>{println("u:"+u+" v:"+v); u+v}),((v1,v2)=>{println("v1:"+v1+" v2:"+v2); v1+v2})) aggregatebykey.foreach((x)=>println("aggregatebykey:"+x)) sc.stop() } }
相关文章推荐
- Spark中的aggregate和aggregateByKey的区别及疑惑
- 【Spark系列2】reduceByKey和groupByKey区别与用法
- 请教Spark 中 combinebyKey 和 reduceByKey的传入函数参数的区别?
- spark中reduce和reduceByKey的区别
- Spark中groupBy groupByKey reduceByKey的区别
- Spark中的aggregate和aggregateByKey的区别及疑惑
- Spark中groupByKey与reduceByKey算子之间的区别
- Spark groupByKey,reduceByKey,sortByKey算子的区别
- 【Spark系列2】reduceByKey和groupByKey区别与用法
- Spark中 groupBy() 与groupByKey()的区别
- 【转载】Spark中:reduceByKey和groupByKey区别与用法
- 在Spark中关于groupByKey与reduceByKey的区别
- Controlling the number of Partitions in Spark for shuffle transformations (Ex. reduceByKey)
- spark中groupByKey与reducByKey的区别
- Spark: sortBy和sortByKey函数详解
- [spark]groupbykey reducebykey
- Spark RDD中Transformation的combineByKey、reduceByKey,join详解
- spark reduceByKey()和 reduceByKey(,para)的时间差
- Spark中的combineByKey
- spark streaming - kafka updateStateByKey 统计用户消费金额