您的位置:首页 > 其它

day17:RDD案例(join、cogroup、reduceByKey、groupByKey, join cogroup

2016-02-24 17:32 549 查看
本文内容来源于DT大数据梦工厂整理,

DT大数据梦工厂
新浪微博: http://weibo.com.ilovepains/ 微信公共号:DT_Spark
博客:http://bolg.sina.com.cn/ilovepains
手机:18610086859
qq:1740415547
邮箱:18610086859@vip.126.com

import org.apache.spark.{SparkConf, SparkContext}

object Tranformations {

def main(args: Array[String]) {
val sc = sparkContext("Tranformation Operations") //创建SparkContext
//  mapTranformation(sc)  //map案例
//    filterTranformation(sc) //filter案例
//   flatMapTranformation(sc) //flatMap案例

//    groupByKeyTranformation(sc) //groupByKey案例

//   groupByKeyTranformation(sc) //reduceByKey案例
// reduceByKeyTranformation(sc)
joinTranformation(sc) //join案例

sc.stop() //停止SparkContext,销毁相关的Driver对象,释放资源
}
def sparkContext( name: String) = {
val conf = new SparkConf().setAppName("work......").setMaster("local")
val sc = new SparkContext(conf)
sc
}

def mapTranformation(sc: SparkContext){
val mapData = sc.parallelize( 1 to 10)
val its = mapData.map(items => items * 2)
its.collect.foreach(println)
}

def filterTranformation(sc :SparkContext){
val filters = sc.parallelize(1 to 20)
val fd = filters.map(fs => fs % 2 == 0)
fd.collect.foreach(println)
}

def flatMapTranformation(sc: SparkContext){
val flatMap = Array("scala spark", "hadoop scala", "Java tachyon", "Java hadoop")
val ds = sc.parallelize(flatMap)
val lds = ds.flatMap(line => line.split(" "))
lds.collect.foreach(println)

}

def groupByKeyTranformation(sc : SparkContext){
val data = Array(Tuple2(100, "java"), Tuple2(90, "scala"), Tuple2(80, "oracle"), Tuple2(90, "spark"), Tuple2(100, "oracle"))
val ss = sc.parallelize(data)
val rdds = ss.groupByKey()
rdds.collect.foreach(println)
}
def reduceByKeyTranformation(sc: SparkContext){
val rdds = sc.textFile("D://googledown//datas.txt")
val words = rdds.flatMap(lines => lines.split(" "))
val wd = words.map(word => (word, 1))
val wordOrder = wd.reduceByKey(_+_)
wordOrder.collect.foreach(pairs => println((pairs._1 +"...."+ pairs._2)))
}

def joinTranformation(sc: SparkContext){
/* val  studentName = Array{
Tuple3(1, "zhangsan");
Tuple3(2, "lisi", "sh");
Tuple3(3, "wangwu", "sz");
Tuple3(4, "wangba", "gz" )
}

val studentScore = Array{
Tuple3(1, 90, "hb");
Tuple3(2, 80, "zj");
Tuple3(3, 70, "gd");
Tuple3(4, 90, "gd")
}*/
val studentNames = Array(
Tuple2(1,"Spark"),
Tuple2(2,"Tachyon"),
Tuple2(3,"Hadoop")
)

val studentScores = Array(
Tuple2(1,100),
Tuple2(2,95),
Tuple2(3,65)
)
val names = sc.parallelize(studentNames)
val score = sc.parallelize(studentScores)
val joindatas = names.join(score)
joindatas.collect.foreach(println)
}
}


package cn.tan.bd.bdapp.bd.day17;

import java.util.Arrays;

import java.util.List;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

/** 

 * @author 作者 E-mail: 

 * @version 创建时间:2016年2月28日 上午8:15:13 

 * 类说明 

 */

public class Cogroups {

    public static void main( String[] args ) {

        SparkConf conf = new SparkConf();

        conf.setAppName("cogroups").setMaster( "local" );

        JavaSparkContext sc = new JavaSparkContext(conf);

        List<Tuple2<Integer, String>> nameList = Arrays.asList( 

                       new Tuple2<Integer, String>(1, "java"),

                       new Tuple2<Integer, String>(2, "spark"),

                       new Tuple2<Integer, String>(3, "hadoop")

                       );

        List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(

                      new Tuple2<Integer, Integer>(1, 100),

                      new Tuple2<Integer, Integer>(2, 90),

                      new Tuple2<Integer, Integer>(3, 70),

                      new Tuple2<Integer, Integer>(1, 60),

                      new Tuple2<Integer, Integer>(2, 80),

                      new Tuple2<Integer, Integer>(2, 50)

                      );

        JavaPairRDD<Integer, String> name = sc.parallelizePairs(nameList);

        JavaPairRDD<Integer, Integer> score = sc.parallelizePairs(scoreList);

        JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> cogroup = name.cogroup(score);

        cogroup.foreach( new VoidFunction<Tuple2<Integer,Tuple2<Iterable<String>,Iterable<Integer>>>>() {

            

            /**

             * 

             */

            private static final long serialVersionUID = 1L;

            public void call( Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> datas )

                throws Exception {

                System.out.println("id:" + datas._1);

                System.out.println("name:" +datas._2._1 );

                System.out.println("score"+datas._2._2);

                System.out.println(".................................");

            }

        } );

        sc.close();

    }

}

运行结果:
id:1

name:[java]

score[100, 60]

.................................

id:3

name:[hadoop]

score[70]

.................................

id:2

name:[spark]

score[90, 80, 50]

.................................
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: