您的位置:首页 > 其它

Spark join和cogroup算子

2016-06-16 09:39 357 查看
转载请标明出处:小帆的帆的专栏

join

下面的代码包括RDD和DataFrame的join操作, 注释中有详细描述

Row(t._1, t._2)), schema1)

val schema2 = StructType(Array(StructField("id", DataTypes.IntegerType, nullable = true), StructField("age", DataTypes.IntegerType, nullable = true)))
val idAgeDF = sqlContext.createDataFrame(idAge.map(t => Row(t._1, t._2)), schema2)
println("*********************************DataFrame**********************************")

println("\n内关联(inner join)\n")
// 相当于调用, idNameDF.join(idAgeDF, Seq("id"), "inner").collect().foreach(println)
// 这里只是调用了封装的API
idNameDF.join(idAgeDF, "id").collect().foreach(println)

println("\n左外关联(left out join)\n")
idNameDF.join(idAgeDF, Seq("id"), "left_outer").collect().foreach(println)

println("\n右外关联(right outer join)\n")
idNameDF.join(idAgeDF, Seq("id"), "right_outer").collect().foreach(println)

println("\n全外关联(full outer join)\n")
idNameDF.join(idAgeDF, Seq("id"), "outer").collect().foreach(println)

println("\nleft semi join\n")
// left semi join
// 左边的id, 在右边有, 就保留左边的数据; 右边的数据不保留, 只有id的有意义的
/**
* [1,zhangsan]
* [2,lisi]
*/
idNameDF.join(idAgeDF, Seq("id"), "leftsemi").collect().foreach(println)
}

}" data-snippet-id="ext.2c586ec3b86bc54fbd9294e1bb8bce55" data-snippet-saved="false" data-codota-status="done">[code]import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object Run {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("test").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val sqlContext = new SQLContext(sc)

/**
* id      name
* 1       zhangsan
* 2       lisi
* 3       wangwu
*/
val idName = sc.parallelize(Array((1, "zhangsan"), (2, "lisi"), (3, "wangwu")))

/**
* id      age
* 1       30
* 2       29
* 4       21
*/
val idAge = sc.parallelize(Array((1, 30), (2, 29), (4, 21)))

/** *******************************RDD **********************************/

println("*********************************RDD**********************************")

println("\n内关联(inner join)\n")
// 内关联(inner join)
//  只保留两边id相等的部分
/**
* (1,(zhangsan,30))
* (2,(lisi,29))
*/
idName.join(idAge).collect().foreach(println)

println("\n左外关联(left out join)\n")
// 左外关联(left out join)
// 以左边的数据为标准, 左边的数据一律保留
// 右边分三情况:
//      一: 左边的id, 右边有, 则合并数据; (1,(zhangsan,Some(30)))
//      二: 左边的id, 右边没有, 则右边为空; (3,(wangwu,None))
//      三: 右边的id, 左边没有, 则不保留; 右边有id为4的行, 但结果中并未保留
/**
* (1,(zhangsan,Some(30)))
* (2,(lisi,Some(29)))
* (3,(wangwu,None))
*/
idName.leftOuterJoin(idAge).collect().foreach(println)

println("\n右外关联(right outer join)\n")
// 右外关联(right outer join)
// 以右边的数据为标准, 右边的数据一律保留
// 左边分三种情况:
//      一: 右边的id, 左边有, 则合并数据; (1,(Some(zhangsan),30))
//      二: 右边的id, 左边没有, 则左边为空; (4,(None,21))
//      三: 左边的id, 右边没有, 则不保留; 左边有id为3的行, 但结果中并为保留
/**
* (1,(Some(zhangsan),30))
* (2,(Some(lisi),29))
* (4,(None,21))
*/
idName.rightOuterJoin(idAge).collect().foreach(println)

println("\n全外关联(full outer join)\n")
// 全外关联(full outer join)
/**
*
* (1,(Some(zhangsan),Some(30)))
* (2,(Some(lisi),Some(29)))
* (3,(Some(wangwu),None))
* (4,(None,Some(21)))
*/
idName.fullOuterJoin(idAge).collect().foreach(println)

/** *******************************DataFrame **********************************/
val schema1 = StructType(Array(StructField("id", DataTypes.IntegerType, nullable = true), StructField("name", DataTypes.StringType, nullable = true)))
val idNameDF = sqlContext.createDataFrame(idName.map(t => Row(t._1, t._2)), schema1)

val schema2 = StructType(Array(StructField("id", DataTypes.IntegerType, nullable = true), StructField("age", DataTypes.IntegerType, nullable = true)))
val idAgeDF = sqlContext.createDataFrame(idAge.map(t => Row(t._1, t._2)), schema2)
println("*********************************DataFrame**********************************")

println("\n内关联(inner join)\n")
// 相当于调用, idNameDF.join(idAgeDF, Seq("id"), "inner").collect().foreach(println)
// 这里只是调用了封装的API
idNameDF.join(idAgeDF, "id").collect().foreach(println)

println("\n左外关联(left out join)\n")
idNameDF.join(idAgeDF, Seq("id"), "left_outer").collect().foreach(println)

println("\n右外关联(right outer join)\n")
idNameDF.join(idAgeDF, Seq("id"), "right_outer").collect().foreach(println)

println("\n全外关联(full outer join)\n")
idNameDF.join(idAgeDF, Seq("id"), "outer").collect().foreach(println)

println("\nleft semi join\n")
// left semi join
// 左边的id, 在右边有, 就保留左边的数据; 右边的数据不保留, 只有id的有意义的
/**
* [1,zhangsan]
* [2,lisi]
*/
idNameDF.join(idAgeDF, Seq("id"), "leftsemi").collect().foreach(println)
}

}


cogroup与join的笛卡尔积

当出现相同Key时, join会出现笛卡尔积, 而cogroup的处理方式不同

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object Run {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("test").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val sqlContext = new SQLContext(sc)

/**
* id      name
* 1       zhangsan
* 2       lisi
* 3       wangwu
*/
val idName = sc.parallelize(Array((1, "zhangsan"), (2, "lisi"), (3, "wangwu")))

/**
* id      age
* 1       30
* 2       29
* 4       21
*/
val idAge = sc.parallelize(Array((1, 30), (2, 29), (4, 21)))

println("\ncogroup\n")

/**
* (1,(CompactBuffer(zhangsan),CompactBuffer(30)))
* (2,(CompactBuffer(lisi),CompactBuffer(29)))
* (3,(CompactBuffer(wangwu),CompactBuffer()))
* (4,(CompactBuffer(),CompactBuffer(21)))
*/
idName.cogroup(idAge).collect().foreach(println)

println("\njoin\n")
// fullOuterJoin于cogroup的结果类似, 只是数据结构不一样
/**
* (1,(Some(zhangsan),Some(30)))
* (2,(Some(lisi),Some(29)))
* (3,(Some(wangwu),None))
* (4,(None,Some(21)))
*/
idName.fullOuterJoin(idAge).collect().foreach(println)

/**
* id      score
* 1       100
* 2       90
* 2       95
*/
val idScore = sc.parallelize(Array((1, 100), (2, 90), (2, 95)))

println("\ncogroup, 出现相同id时\n")

/**
* (1,(CompactBuffer(zhangsan),CompactBuffer(100)))
* (2,(CompactBuffer(lisi),CompactBuffer(90, 95)))
* (3,(CompactBuffer(wangwu),CompactBuffer()))
*/
idName.cogroup(idScore).collect().foreach(println)

println("\njoin, 出现相同id时\n")

/**
* (1,(Some(zhangsan),Some(100)))
* (2,(Some(lisi),Some(90)))
* (2,(Some(lisi),Some(95)))
* (3,(Some(wangwu),None))
*/
idName.fullOuterJoin(idScore).collect().foreach(println)
}

}


参考链接

Hive中Join的类型和用法
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark