您的位置:首页 > 其它

spark 的transformations之bykey的区别

2016-09-23 15:53 309 查看
        spark中提供的bykey相关的transforms有以下几个

         


    这几个操作基本都需要是原始输入是(K,V).

     这些都会引发shuffle操作

     groupByKey的输入是(K,V),输出是(K,Iterable<V>)。

     reduceByKey的输入是(K,V),输出也是(K,V)。对于相同的key的值,会执行func进行聚合。

     sortByKey     会根据key进行排序。

     arrgegateByKey (zeroValue)(seqOp,combOp)     (K,V) =>     对于每一个分区,执行sqlOp函数(zeroValue,V).      combOp(K,V)会根据K值对V进行聚合

     

package com.le.spark.scala.examples.Transformations

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions

object bykey {
//    transform:
//    map(func) 	Return a new distributed dataset formed by passing each element of the source through a function func.
//    filter(func) 	Return a new dataset formed by selecting those elements of the source on which func returns true.

def main(args:Array[String]){

val sparkconf = new SparkConf().setAppName("test1").setMaster("local")
val sc = new SparkContext(sparkconf)

val list = List((1,1),(1,2),(1,3),(1,4),(2,6))

val rddmap = sc.parallelize(list, 4);
//      val rddmap = rdd.map (x=>(x,x))

//       val list = List((0,1),(8,9),(2,5),(4,3),(6,7),(1,2),(1,4))
//
//      val rddmap = sc.parallelize(list, 1);
//groupbykey: When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
val rddgroupbykey=rddmap.groupByKey()
rddgroupbykey.foreach(x=>println(x._1+" "+x._2))
rddgroupbykey.foreach(x=>{val it=x._2.iterator; var c=0 ;  while(it.hasNext) c=c+it.next() ; println(x._1+"  "+c) })

val sortbykey= rddmap.sortByKey(true)
sortbykey.foreach(x=>println("sort "+x._1+" :"+x._2))

val reducebykey =  rddmap.reduceByKey((v1,v2)=>v1+v2);
reducebykey.foreach(x=>println("reducebykey "+x._1+" :"+x._2))
//aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
//seqOp : 在每一个分区执行
//combOp: 在不同的分区之间执行
val aggregatebykey = rddmap.aggregateByKey("100")(((u,v)=>{println("u:"+u+" v:"+v);  u+v}),((v1,v2)=>{println("v1:"+v1+" v2:"+v2);    v1+v2}))
aggregatebykey.foreach((x)=>println("aggregatebykey:"+x))

sc.stop()
}
}


    

        

 

     

    

      

     
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: