Spark-MapReduce编程-自连接(Scala)
2014-11-25 18:47
127 查看
关于SQL和Hadoop的实现参考这里 MapReduce编程-自连接
这里用相同的原理,使用spark实现。本人也是刚学Scala,可能写的不好,还请指正。
object SelfUion {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("SelfUnion")
val sc = new SparkContext(conf)
val cpFile = sc.textFile("cp.txt")
//val strs = Array[String]("a", "b")
// 1) 生成两张表
val res = cpFile.flatMap(line => {
val strs: Array[String] = line.split(" ");
if (strs.length == 2) Array(strs(0) + " parent_" + strs(1), strs(1) + " child_" + strs(0)) else Array[String]()
})
// 2) 转化为key-value形式
.map(line => {
val kv = line.split(" ")
(kv(0), kv(1))
})
// 3) 列的相等匹配
.groupByKey()
// 4) 解析value,得到结果
.flatMapValues(values => {
val childList = new ArrayBuffer[String]();
val parentList = new ArrayBuffer[String]();
values.foreach(
name => {
if (name.startsWith("child_")) childList += name
else if (name.startsWith("parent_")) parentList += name;
})
for (c <- childList; p <- parentList) yield
c.substring(6) + " " + p.substring(7)
}).values
.saveAsTextFile("selfunion_out")
}
}
cp.txt的内容为:
Tom Lucy
Tom Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Ben
Jack Alice
Jack Jesse
Terry Tom
输出为:
Terry Lucy
Terry Jack
Tom Mary
Tom Ben
Jone Mary
Jone Ben
Tom Alice
Tom Jesse
Jone Alice
Jone Jesse
纯Scala实现
另外,附带上使用Scala的函数式编程实现。注意,下面是用的Scala原有的map,flatMap() 等方法,而不是RDD。
使用一个ArrayBuffer[String]带入输入文件,每一行为一个item.
def main(args: Array[String]) {
val list = new ArrayBuffer[String]();
list+="Tom Lucy"
list += "Tom Jack"
list += "Jone Lucy"
list += "Jone Jack"
list += "Lucy Mary"
list += "Lucy Ben"
list += "Jack Alice"
list += "Jack Jesse"
list += "Terry Tom"
val list2 = list.flatMap(line => {
val strs: Array[String] = line.split(" ");
if (strs.length == 2) Array(strs(0) + " parent_" + strs(1), strs(1) + " child_" + strs(0)) else Array[String]()
} )
//println("list2 : " + list2)
val mapout = list2.map(line => line.split(" "))
//println("mapout : " + mapout)
//for(item <- mapout) print(item(0) + " " + item(1) + " ; " )
// println()
val groupbyout = mapout.groupBy(_(0)) //same as groupByKey, The first element of the Array
val res = groupbyout.values.flatMap(values => {
val childList = new ArrayBuffer[String]();
val parentList = new ArrayBuffer[String]();
values.foreach(
names => {
if (names(1).startsWith("child_")) childList += names(1)
else if (names(1).startsWith("parent_")) parentList += names(1);
})
for (c <- childList; p <- parentList) yield
Array[String](c,p)
})
for(arr <- res){
println(arr(0).substring(6) + " " + arr(1).substring(7))
}
}
可以看出,MR和函数式编程有很多相似之处
这里用相同的原理,使用spark实现。本人也是刚学Scala,可能写的不好,还请指正。
object SelfUion {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("SelfUnion")
val sc = new SparkContext(conf)
val cpFile = sc.textFile("cp.txt")
//val strs = Array[String]("a", "b")
// 1) 生成两张表
val res = cpFile.flatMap(line => {
val strs: Array[String] = line.split(" ");
if (strs.length == 2) Array(strs(0) + " parent_" + strs(1), strs(1) + " child_" + strs(0)) else Array[String]()
})
// 2) 转化为key-value形式
.map(line => {
val kv = line.split(" ")
(kv(0), kv(1))
})
// 3) 列的相等匹配
.groupByKey()
// 4) 解析value,得到结果
.flatMapValues(values => {
val childList = new ArrayBuffer[String]();
val parentList = new ArrayBuffer[String]();
values.foreach(
name => {
if (name.startsWith("child_")) childList += name
else if (name.startsWith("parent_")) parentList += name;
})
for (c <- childList; p <- parentList) yield
c.substring(6) + " " + p.substring(7)
}).values
.saveAsTextFile("selfunion_out")
}
}
cp.txt的内容为:
Tom Lucy
Tom Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Ben
Jack Alice
Jack Jesse
Terry Tom
输出为:
Terry Lucy
Terry Jack
Tom Mary
Tom Ben
Jone Mary
Jone Ben
Tom Alice
Tom Jesse
Jone Alice
Jone Jesse
纯Scala实现
另外,附带上使用Scala的函数式编程实现。注意,下面是用的Scala原有的map,flatMap() 等方法,而不是RDD。
使用一个ArrayBuffer[String]带入输入文件,每一行为一个item.
def main(args: Array[String]) {
val list = new ArrayBuffer[String]();
list+="Tom Lucy"
list += "Tom Jack"
list += "Jone Lucy"
list += "Jone Jack"
list += "Lucy Mary"
list += "Lucy Ben"
list += "Jack Alice"
list += "Jack Jesse"
list += "Terry Tom"
val list2 = list.flatMap(line => {
val strs: Array[String] = line.split(" ");
if (strs.length == 2) Array(strs(0) + " parent_" + strs(1), strs(1) + " child_" + strs(0)) else Array[String]()
} )
//println("list2 : " + list2)
val mapout = list2.map(line => line.split(" "))
//println("mapout : " + mapout)
//for(item <- mapout) print(item(0) + " " + item(1) + " ; " )
// println()
val groupbyout = mapout.groupBy(_(0)) //same as groupByKey, The first element of the Array
val res = groupbyout.values.flatMap(values => {
val childList = new ArrayBuffer[String]();
val parentList = new ArrayBuffer[String]();
values.foreach(
names => {
if (names(1).startsWith("child_")) childList += names(1)
else if (names(1).startsWith("parent_")) parentList += names(1);
})
for (c <- childList; p <- parentList) yield
Array[String](c,p)
})
for(arr <- res){
println(arr(0).substring(6) + " " + arr(1).substring(7))
}
}
可以看出,MR和函数式编程有很多相似之处
相关文章推荐
- 学习spark系列---scala 编程基础
- 一步步学spark之一scala函数编程中序列,可变列表与不可变列表3.2
- 用户行为日志的统计,Java mapreduce与Scala spark的代码存档...
- 大数据Spark “蘑菇云”行动前传第14课Scala集合上的函数式编程实战及Spark源码鉴赏
- 3000门徒内部训练绝密视频(泄密版)第3课:Scala中函数式编程彻底精通及Spark源码阅读
- 用Spark实现K-means(scala:面向函数式编程风格)
- 初解,Scala语言中基于Actor的并发编程的机制,并展示了在Spark中基于Scala语言的Actor而产生的消息驱动框架Akka的使用,
- (升级版)Spark从入门到精通(Scala编程、案例实战、高级特性、Spark内核源码剖析、Hadoop高端)
- Spark1.0.0 多语言编程之Scala实现
- (升级版)Spark从入门到精通(Scala编程、案例实战、高级特性、Spark内核源码剖析、Hadoop高端)
- 第51讲:Scala中链式调用风格的实现代码实战及其在Spark编程中的广泛运用学习笔记
- (升级版)Spark从入门到精通(Scala编程、案例实战、高级特性、Spark内核源码剖析、Hadoop高端)
- scala用actor并发编程写一个单机版的WorldCount(类似Hadoop的MapReduce思想)
- Spark基础-Scala类型参数编程
- spark入门知识讲解和基础数据操作编程(统一用scala编程实例)
- 第51讲:Scala中链式调用风格的实现代码实战及其在Spark编程中的广泛运用
- Spark1.0.0 多语言编程之Scala实现
- Spark 2.0从入门到精通:Scala编程、大数据开发、上百个实战案例、内核源码深度剖析(278讲全)
- 3000门徒内部训练绝密视频(泄密版)第5课:彻底精通Scala隐式转换和并发编程及Spark源码阅读
- Spark源码中的Scala模式匹配编程和Scala模式匹配编程操作实战