day17:RDD案例(join、cogroup、reduceByKey、groupByKey, join cogroup
2016-02-24 17:32
549 查看
本文内容来源于DT大数据梦工厂整理,
DT大数据梦工厂
新浪微博: http://weibo.com.ilovepains/ 微信公共号:DT_Spark
博客:http://bolg.sina.com.cn/ilovepains
手机:18610086859
qq:1740415547
邮箱:18610086859@vip.126.com
package cn.tan.bd.bdapp.bd.day17;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
/**
* @author 作者 E-mail:
* @version 创建时间:2016年2月28日 上午8:15:13
* 类说明
*/
public class Cogroups {
public static void main( String[] args ) {
SparkConf conf = new SparkConf();
conf.setAppName("cogroups").setMaster( "local" );
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<Integer, String>> nameList = Arrays.asList(
new Tuple2<Integer, String>(1, "java"),
new Tuple2<Integer, String>(2, "spark"),
new Tuple2<Integer, String>(3, "hadoop")
);
List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(
new Tuple2<Integer, Integer>(1, 100),
new Tuple2<Integer, Integer>(2, 90),
new Tuple2<Integer, Integer>(3, 70),
new Tuple2<Integer, Integer>(1, 60),
new Tuple2<Integer, Integer>(2, 80),
new Tuple2<Integer, Integer>(2, 50)
);
JavaPairRDD<Integer, String> name = sc.parallelizePairs(nameList);
JavaPairRDD<Integer, Integer> score = sc.parallelizePairs(scoreList);
JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> cogroup = name.cogroup(score);
cogroup.foreach( new VoidFunction<Tuple2<Integer,Tuple2<Iterable<String>,Iterable<Integer>>>>() {
/**
*
*/
private static final long serialVersionUID = 1L;
public void call( Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> datas )
throws Exception {
System.out.println("id:" + datas._1);
System.out.println("name:" +datas._2._1 );
System.out.println("score"+datas._2._2);
System.out.println(".................................");
}
} );
sc.close();
}
}
运行结果:
id:1
name:[java]
score[100, 60]
.................................
id:3
name:[hadoop]
score[70]
.................................
id:2
name:[spark]
score[90, 80, 50]
.................................
DT大数据梦工厂
新浪微博: http://weibo.com.ilovepains/ 微信公共号:DT_Spark
博客:http://bolg.sina.com.cn/ilovepains
手机:18610086859
qq:1740415547
邮箱:18610086859@vip.126.com
import org.apache.spark.{SparkConf, SparkContext} object Tranformations { def main(args: Array[String]) { val sc = sparkContext("Tranformation Operations") //创建SparkContext // mapTranformation(sc) //map案例 // filterTranformation(sc) //filter案例 // flatMapTranformation(sc) //flatMap案例 // groupByKeyTranformation(sc) //groupByKey案例 // groupByKeyTranformation(sc) //reduceByKey案例 // reduceByKeyTranformation(sc) joinTranformation(sc) //join案例 sc.stop() //停止SparkContext,销毁相关的Driver对象,释放资源 } def sparkContext( name: String) = { val conf = new SparkConf().setAppName("work......").setMaster("local") val sc = new SparkContext(conf) sc } def mapTranformation(sc: SparkContext){ val mapData = sc.parallelize( 1 to 10) val its = mapData.map(items => items * 2) its.collect.foreach(println) } def filterTranformation(sc :SparkContext){ val filters = sc.parallelize(1 to 20) val fd = filters.map(fs => fs % 2 == 0) fd.collect.foreach(println) } def flatMapTranformation(sc: SparkContext){ val flatMap = Array("scala spark", "hadoop scala", "Java tachyon", "Java hadoop") val ds = sc.parallelize(flatMap) val lds = ds.flatMap(line => line.split(" ")) lds.collect.foreach(println) } def groupByKeyTranformation(sc : SparkContext){ val data = Array(Tuple2(100, "java"), Tuple2(90, "scala"), Tuple2(80, "oracle"), Tuple2(90, "spark"), Tuple2(100, "oracle")) val ss = sc.parallelize(data) val rdds = ss.groupByKey() rdds.collect.foreach(println) } def reduceByKeyTranformation(sc: SparkContext){ val rdds = sc.textFile("D://googledown//datas.txt") val words = rdds.flatMap(lines => lines.split(" ")) val wd = words.map(word => (word, 1)) val wordOrder = wd.reduceByKey(_+_) wordOrder.collect.foreach(pairs => println((pairs._1 +"...."+ pairs._2))) } def joinTranformation(sc: SparkContext){ /* val studentName = Array{ Tuple3(1, "zhangsan"); Tuple3(2, "lisi", "sh"); Tuple3(3, "wangwu", "sz"); Tuple3(4, "wangba", "gz" ) } val studentScore = Array{ Tuple3(1, 90, "hb"); Tuple3(2, 80, "zj"); Tuple3(3, 70, "gd"); Tuple3(4, 90, "gd") }*/ val studentNames = Array( Tuple2(1,"Spark"), Tuple2(2,"Tachyon"), Tuple2(3,"Hadoop") ) val studentScores = Array( Tuple2(1,100), Tuple2(2,95), Tuple2(3,65) ) val names = sc.parallelize(studentNames) val score = sc.parallelize(studentScores) val joindatas = names.join(score) joindatas.collect.foreach(println) } }
package cn.tan.bd.bdapp.bd.day17;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
/**
* @author 作者 E-mail:
* @version 创建时间:2016年2月28日 上午8:15:13
* 类说明
*/
public class Cogroups {
public static void main( String[] args ) {
SparkConf conf = new SparkConf();
conf.setAppName("cogroups").setMaster( "local" );
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<Integer, String>> nameList = Arrays.asList(
new Tuple2<Integer, String>(1, "java"),
new Tuple2<Integer, String>(2, "spark"),
new Tuple2<Integer, String>(3, "hadoop")
);
List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(
new Tuple2<Integer, Integer>(1, 100),
new Tuple2<Integer, Integer>(2, 90),
new Tuple2<Integer, Integer>(3, 70),
new Tuple2<Integer, Integer>(1, 60),
new Tuple2<Integer, Integer>(2, 80),
new Tuple2<Integer, Integer>(2, 50)
);
JavaPairRDD<Integer, String> name = sc.parallelizePairs(nameList);
JavaPairRDD<Integer, Integer> score = sc.parallelizePairs(scoreList);
JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> cogroup = name.cogroup(score);
cogroup.foreach( new VoidFunction<Tuple2<Integer,Tuple2<Iterable<String>,Iterable<Integer>>>>() {
/**
*
*/
private static final long serialVersionUID = 1L;
public void call( Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> datas )
throws Exception {
System.out.println("id:" + datas._1);
System.out.println("name:" +datas._2._1 );
System.out.println("score"+datas._2._2);
System.out.println(".................................");
}
} );
sc.close();
}
}
运行结果:
id:1
name:[java]
score[100, 60]
.................................
id:3
name:[hadoop]
score[70]
.................................
id:2
name:[spark]
score[90, 80, 50]
.................................
相关文章推荐
- 连接oracle数据库异常System.Data.OracleClient 需要 Oracle 客户端软件 8.1.7 或更高版本
- cookie/session 转载文章
- htmlparser新建tag类(以iframe标签为例)
- 图片压缩处理方法
- 门户平台
- [LeetCode]Decode Ways
- JavaEE多语言功能实现
- linux定时任务crond那些事
- 查看oracle数据库全局数据库名和sid
- day5:Scala隐式转换和并发编程
- easyui学习
- iperf使用方法
- 效验手机号码正则表达式
- 隐藏APK在Launcher中的启动图标 android开发教程
- 企业门户平台解决方案
- 安装Android模拟器Genymotion【Android学习入门】
- lintcode-easy-Count and Say
- sublime 3103liense
- LCD时序中设计到的VSPW/VBPD/VFPD/HSPW/HBPD/HFPD总结
- JAVA正则表达式语法大全