您的位置:首页 > 编程语言

Spark-MapReduce编程-自连接(Scala)

2014-11-25 18:47 127 查看
关于SQL和Hadoop的实现参考这里 MapReduce编程-自连接

这里用相同的原理,使用spark实现。本人也是刚学Scala,可能写的不好,还请指正。

object SelfUion {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("SelfUnion")
val sc = new SparkContext(conf)
val cpFile = sc.textFile("cp.txt")
//val strs = Array[String]("a", "b")

// 1) 生成两张表
val res = cpFile.flatMap(line => {
val strs: Array[String] = line.split(" ");
if (strs.length == 2) Array(strs(0) + " parent_" + strs(1), strs(1) + " child_" + strs(0)) else Array[String]()
})
// 2) 转化为key-value形式
.map(line => {
val kv = line.split(" ")
(kv(0), kv(1))
})
// 3) 列的相等匹配
.groupByKey()
// 4) 解析value,得到结果
.flatMapValues(values => {
val childList = new ArrayBuffer[String]();
val parentList = new ArrayBuffer[String]();
values.foreach(
name => {
if (name.startsWith("child_")) childList += name
else if (name.startsWith("parent_")) parentList += name;
})
for (c <- childList; p <- parentList) yield
c.substring(6) + " " + p.substring(7)
}).values
.saveAsTextFile("selfunion_out")

}
}

cp.txt的内容为:
Tom Lucy
Tom Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Ben
Jack Alice
Jack Jesse
Terry Tom

输出为:

Terry Lucy
Terry Jack
Tom Mary
Tom Ben
Jone Mary
Jone Ben
Tom Alice
Tom Jesse
Jone Alice
Jone Jesse

纯Scala实现
另外,附带上使用Scala的函数式编程实现。注意,下面是用的Scala原有的map,flatMap() 等方法,而不是RDD。

使用一个ArrayBuffer[String]带入输入文件,每一行为一个item.

def main(args: Array[String]) {
val list = new ArrayBuffer[String]();
list+="Tom Lucy"
list += "Tom Jack"
list += "Jone Lucy"
list += "Jone Jack"
list += "Lucy Mary"
list += "Lucy Ben"
list += "Jack Alice"
list += "Jack Jesse"
list += "Terry Tom"

val list2 = list.flatMap(line => {
val strs: Array[String] = line.split(" ");
if (strs.length == 2) Array(strs(0) + " parent_" + strs(1), strs(1) + " child_" + strs(0)) else Array[String]()
} )
//println("list2 : " + list2)
val mapout = list2.map(line => line.split(" "))
//println("mapout : " + mapout)
//for(item <- mapout) print(item(0) + " " + item(1) + " ; " )
// println()
val groupbyout = mapout.groupBy(_(0)) //same as groupByKey, The first element of the Array
val res = groupbyout.values.flatMap(values => {
val childList = new ArrayBuffer[String]();
val parentList = new ArrayBuffer[String]();
values.foreach(
names => {
if (names(1).startsWith("child_")) childList += names(1)
else if (names(1).startsWith("parent_")) parentList += names(1);
})
for (c <- childList; p <- parentList) yield
Array[String](c,p)
})

for(arr <- res){
println(arr(0).substring(6) + " " + arr(1).substring(7))
}

}

可以看出,MR和函数式编程有很多相似之处
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  scala spark mapreduce
相关文章推荐