Spark的RDD操作之Join大全!
2016-11-18 08:42
225 查看
Spark的RDD操作之Join大全!
首先rdd1是一个行业基本RDD,包含ID和行业名称,rdd2是一个行业薪水RDD,包含ID和薪水。
DT大数据梦工厂王家林老师。
全部视频地址:http://www.tudou.com/home/_79823675/playlist
微博:http://weibo.com/ilovepains?is_all=1&retcode=6102
一、RDD的Join操作有哪些?
(一)Join:Join类似于SQL的inner join操作,返回结果是前面和后面集合中配对成功的,过滤掉关联不上的。源代码如下:
/** * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. */ def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope { this.cogroup(other, partitioner).flatMapValues( pair => for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w) ) }
(二)leftOuterJoin:leftOuterJoin类似于SQL中的左外关联left outer join,返回结果以前面的RDD为主,关联不上的记录为空。声明如下:
/** * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to * partition the output RDD. */ def leftOuterJoin[W]( other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = self.withScope { this.cogroup(other, partitioner).flatMapValues { pair => if (pair._2.isEmpty) { pair._1.iterator.map(v => (v, None)) } else { for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w)) } } }
(三)rightOuterJoin:rightOuterJoin类似于SQL中的有外关联right outer join,返回结果以参数也就是右边的RDD为主,关联不上的记录为空。声明如下:
/** * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to * partition the output RDD. */ def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Option[V], W))] = self.withScope { this.cogroup(other, partitioner).flatMapValues { pair => if (pair._1.isEmpty) { pair._2.iterator.map(w => (None, w)) } else { for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w) } } }
二、实战操作
下面我们用一个非常简单的栗子,来进行比较说明:首先rdd1是一个行业基本RDD,包含ID和行业名称,rdd2是一个行业薪水RDD,包含ID和薪水。
//设置运行环境 val conf = new SparkConf().setAppName("SparkRDDJoinOps").setMaster("local[4]") val sc = new SparkContext(conf) //建立一个基本的键值对RDD,包含ID和名称,其中ID为1、2、3、4 val rdd1 = sc.makeRDD(Array(("1","Spark"),("2","Hadoop"),("3","Scala"),("4","Java")),2) //建立一个行业薪水的键值对RDD,包含ID和薪水,其中ID为1、2、3、5 val rdd2 = sc.makeRDD(Array(("1","30K"),("2","15K"),("3","25K"),("5","10K")),2) println("//下面做Join操作,预期要得到(1,×)、(2,×)、(3,×)") val joinRDD=rdd1.join(rdd2).collect.foreach(println) println("//下面做leftOutJoin操作,预期要得到(1,×)、(2,×)、(3,×)、(4,×)") val leftJoinRDD=rdd1.leftOuterJoin(rdd2).collect.foreach(println) println("//下面做rightOutJoin操作,预期要得到(1,×)、(2,×)、(3,×)、(5,×)") val rightJoinRDD=rdd1.rightOuterJoin(rdd2).collect.foreach(println) sc.stop()
三、结果如下:
//下面做Join操作,预期要得到(1,×)、(2,×)、(3,×) (2,(Hadoop,15K)) (3,(Scala,25K)) (1,(Spark,30K)) //下面做leftOutJoin操作,预期要得到(1,×)、(2,×)、(3,×)、(4,×) (4,(Java,None)) (2,(Hadoop,Some(15K))) (3,(Scala,Some(25K))) (1,(Spark,Some(30K))) //下面做rightOutJoin操作,预期要得到(1,×)、(2,×)、(3,×)、(5,×) (2,(Some(Hadoop),15K)) (5,(None,10K)) (3,(Some(Scala),25K)) (1,(Some(Spark),30K))结果就证明了我们的预期。
DT大数据梦工厂王家林老师。
全部视频地址:http://www.tudou.com/home/_79823675/playlist
微博:http://weibo.com/ilovepains?is_all=1&retcode=6102
相关文章推荐
- Spark算子:RDD键值转换操作(4)–cogroup、join
- Spark 实践 - RDD 的 join操作之需要注意的事项 - RDD为空的join操作
- Spark算子:RDD键值转换操作(5)–leftOuterJoin、rightOuterJoin、subtractByKey
- Spark算子:RDD键值转换操作(4)–cogroup、join
- Spark RDD转换操作union、join、cogroup
- Spark算子:RDD键值转换操作(5)–leftOuterJoin、rightOuterJoin、subtractByKey
- Spark算子:RDD键值转换操作(5)–leftOuterJoin、rightOuterJoin、subtractByKey
- 3.3 Spark RDD键值转换操作5-leftOuterJoin、rightOuterJoin、subtractByKey
- Spark算子:RDD键值转换操作(4)–cogroup、join
- Spark算子:RDD键值转换操作(4)–cogroup/join
- Spark算子:RDD键值转换操作(5)–leftOuterJoin、rightOuterJoin、subtractByKey
- 离线轻量级大数据平台Spark之JavaRDD关联join操作
- 【Spark】RDD操作详解1——Transformation和Actions概况
- spark1.4加载mysql数据 创建Dataframe及join操作连接方法问题
- spark RDD transformation操作
- Spark RDD操作(1)
- [spark]Spark算子:RDD基本转换操作(5)–mapPartitions、mapPartitionsWithIndex
- 大数据之Spark探秘:二、RDD的Transformations操作
- Spark算子:RDD基本转换操作(5)–mapPartitions、
- 【Spark】RDD操作详解4——Action算子