Spark join和cogroup算子
2016-06-16 09:39
357 查看
转载请标明出处:小帆的帆的专栏
join
下面的代码包括RDD和DataFrame的join操作, 注释中有详细描述Row(t._1, t._2)), schema1) val schema2 = StructType(Array(StructField("id", DataTypes.IntegerType, nullable = true), StructField("age", DataTypes.IntegerType, nullable = true))) val idAgeDF = sqlContext.createDataFrame(idAge.map(t => Row(t._1, t._2)), schema2) println("*********************************DataFrame**********************************") println("\n内关联(inner join)\n") // 相当于调用, idNameDF.join(idAgeDF, Seq("id"), "inner").collect().foreach(println) // 这里只是调用了封装的API idNameDF.join(idAgeDF, "id").collect().foreach(println) println("\n左外关联(left out join)\n") idNameDF.join(idAgeDF, Seq("id"), "left_outer").collect().foreach(println) println("\n右外关联(right outer join)\n") idNameDF.join(idAgeDF, Seq("id"), "right_outer").collect().foreach(println) println("\n全外关联(full outer join)\n") idNameDF.join(idAgeDF, Seq("id"), "outer").collect().foreach(println) println("\nleft semi join\n") // left semi join // 左边的id, 在右边有, 就保留左边的数据; 右边的数据不保留, 只有id的有意义的 /** * [1,zhangsan] * [2,lisi] */ idNameDF.join(idAgeDF, Seq("id"), "leftsemi").collect().foreach(println) } }" data-snippet-id="ext.2c586ec3b86bc54fbd9294e1bb8bce55" data-snippet-saved="false" data-codota-status="done">[code]import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.{SparkConf, SparkContext} object Run { def main(args: Array[String]) { val conf = new SparkConf().setAppName("test").setMaster("local") val sc = new SparkContext(conf) sc.setLogLevel("ERROR") val sqlContext = new SQLContext(sc) /** * id name * 1 zhangsan * 2 lisi * 3 wangwu */ val idName = sc.parallelize(Array((1, "zhangsan"), (2, "lisi"), (3, "wangwu"))) /** * id age * 1 30 * 2 29 * 4 21 */ val idAge = sc.parallelize(Array((1, 30), (2, 29), (4, 21))) /** *******************************RDD **********************************/ println("*********************************RDD**********************************") println("\n内关联(inner join)\n") // 内关联(inner join) // 只保留两边id相等的部分 /** * (1,(zhangsan,30)) * (2,(lisi,29)) */ idName.join(idAge).collect().foreach(println) println("\n左外关联(left out join)\n") // 左外关联(left out join) // 以左边的数据为标准, 左边的数据一律保留 // 右边分三情况: // 一: 左边的id, 右边有, 则合并数据; (1,(zhangsan,Some(30))) // 二: 左边的id, 右边没有, 则右边为空; (3,(wangwu,None)) // 三: 右边的id, 左边没有, 则不保留; 右边有id为4的行, 但结果中并未保留 /** * (1,(zhangsan,Some(30))) * (2,(lisi,Some(29))) * (3,(wangwu,None)) */ idName.leftOuterJoin(idAge).collect().foreach(println) println("\n右外关联(right outer join)\n") // 右外关联(right outer join) // 以右边的数据为标准, 右边的数据一律保留 // 左边分三种情况: // 一: 右边的id, 左边有, 则合并数据; (1,(Some(zhangsan),30)) // 二: 右边的id, 左边没有, 则左边为空; (4,(None,21)) // 三: 左边的id, 右边没有, 则不保留; 左边有id为3的行, 但结果中并为保留 /** * (1,(Some(zhangsan),30)) * (2,(Some(lisi),29)) * (4,(None,21)) */ idName.rightOuterJoin(idAge).collect().foreach(println) println("\n全外关联(full outer join)\n") // 全外关联(full outer join) /** * * (1,(Some(zhangsan),Some(30))) * (2,(Some(lisi),Some(29))) * (3,(Some(wangwu),None)) * (4,(None,Some(21))) */ idName.fullOuterJoin(idAge).collect().foreach(println) /** *******************************DataFrame **********************************/ val schema1 = StructType(Array(StructField("id", DataTypes.IntegerType, nullable = true), StructField("name", DataTypes.StringType, nullable = true))) val idNameDF = sqlContext.createDataFrame(idName.map(t => Row(t._1, t._2)), schema1) val schema2 = StructType(Array(StructField("id", DataTypes.IntegerType, nullable = true), StructField("age", DataTypes.IntegerType, nullable = true))) val idAgeDF = sqlContext.createDataFrame(idAge.map(t => Row(t._1, t._2)), schema2) println("*********************************DataFrame**********************************") println("\n内关联(inner join)\n") // 相当于调用, idNameDF.join(idAgeDF, Seq("id"), "inner").collect().foreach(println) // 这里只是调用了封装的API idNameDF.join(idAgeDF, "id").collect().foreach(println) println("\n左外关联(left out join)\n") idNameDF.join(idAgeDF, Seq("id"), "left_outer").collect().foreach(println) println("\n右外关联(right outer join)\n") idNameDF.join(idAgeDF, Seq("id"), "right_outer").collect().foreach(println) println("\n全外关联(full outer join)\n") idNameDF.join(idAgeDF, Seq("id"), "outer").collect().foreach(println) println("\nleft semi join\n") // left semi join // 左边的id, 在右边有, 就保留左边的数据; 右边的数据不保留, 只有id的有意义的 /** * [1,zhangsan] * [2,lisi] */ idNameDF.join(idAgeDF, Seq("id"), "leftsemi").collect().foreach(println) } }
cogroup与join的笛卡尔积
当出现相同Key时, join会出现笛卡尔积, 而cogroup的处理方式不同import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} object Run { def main(args: Array[String]) { val conf = new SparkConf().setAppName("test").setMaster("local") val sc = new SparkContext(conf) sc.setLogLevel("ERROR") val sqlContext = new SQLContext(sc) /** * id name * 1 zhangsan * 2 lisi * 3 wangwu */ val idName = sc.parallelize(Array((1, "zhangsan"), (2, "lisi"), (3, "wangwu"))) /** * id age * 1 30 * 2 29 * 4 21 */ val idAge = sc.parallelize(Array((1, 30), (2, 29), (4, 21))) println("\ncogroup\n") /** * (1,(CompactBuffer(zhangsan),CompactBuffer(30))) * (2,(CompactBuffer(lisi),CompactBuffer(29))) * (3,(CompactBuffer(wangwu),CompactBuffer())) * (4,(CompactBuffer(),CompactBuffer(21))) */ idName.cogroup(idAge).collect().foreach(println) println("\njoin\n") // fullOuterJoin于cogroup的结果类似, 只是数据结构不一样 /** * (1,(Some(zhangsan),Some(30))) * (2,(Some(lisi),Some(29))) * (3,(Some(wangwu),None)) * (4,(None,Some(21))) */ idName.fullOuterJoin(idAge).collect().foreach(println) /** * id score * 1 100 * 2 90 * 2 95 */ val idScore = sc.parallelize(Array((1, 100), (2, 90), (2, 95))) println("\ncogroup, 出现相同id时\n") /** * (1,(CompactBuffer(zhangsan),CompactBuffer(100))) * (2,(CompactBuffer(lisi),CompactBuffer(90, 95))) * (3,(CompactBuffer(wangwu),CompactBuffer())) */ idName.cogroup(idScore).collect().foreach(println) println("\njoin, 出现相同id时\n") /** * (1,(Some(zhangsan),Some(100))) * (2,(Some(lisi),Some(90))) * (2,(Some(lisi),Some(95))) * (3,(Some(wangwu),None)) */ idName.fullOuterJoin(idScore).collect().foreach(println) } }
参考链接
Hive中Join的类型和用法相关文章推荐
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Spark随谈——开发指南(译)
- Spark,一种快速数据分析替代方案
- eclipse 开发 spark Streaming wordCount
- Understanding Spark Caching
- ClassNotFoundException:scala.PreDef$
- Windows 下Spark 快速搭建Spark源码阅读环境
- Spark中将对象序列化存储到hdfs
- 使用java代码提交Spark的hive sql任务,run as java application
- Spark机器学习(一) -- Machine Learning Library (MLlib)
- Spark机器学习(二) 局部向量 Local-- Data Types - MLlib
- Spark机器学习(三) Labeled point-- Data Types
- Spark初探
- Spark Streaming初探
- Spark本地开发环境搭建
- 搭建hadoop/spark集群环境
- Spark HA部署方案
- Spark HA原理架构图
- spark内存概述