您的位置:首页 > 编程语言 > Java开发

spark 2.2.0 各个计算因子的使用方法 java版

2018-01-12 17:36 459 查看
map:

private static void map(){
SparkConf conf =new SparkConf().setAppName("map").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Integer> numbers = Arrays.asList(1,2,3,4,5);
JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer,Integer>(){

/**
*
*/
private static final long serialVersionUID = 1L;

@Override
public Integer call(Integer v1) throws Exception {
// TODO Auto-generated method stub
return v1*2;
}
});
multipleNumberRDD.foreach(new VoidFunction<Integer>() {

/**
*
*/
private static final long serialVersionUID = 1L;

@Override
public void call(Integer t) throws Exception {
System.out.println(t);

}
});
sc.close();
}
filterprivate static void filter(){
SparkConf conf =new SparkConf().setAppName("map").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Integer> numbers =Arrays.asList(1,2,3,4,5,6,7,8,9,10);
JavaRDD<Integer> numberRDD =sc.parallelize(numbers);
JavaRDD<Integer> evennumberRDD= numberRDD.filter(new Function<Integer, Boolean>() {

/**
*
*/
private static final long serialVersionUID = 1L;

@Override
public Boolean call(Integer v1) throws Exception {
// TODO Auto-generated method stub
return v1%2==0;
}
});
evennumberRDD.foreach(new VoidFunction<Integer>() {

/**
*
*/
private static final long serialVersionUID = 1L;

@Override
public void call(Integer t) throws Exception {
System.out.println(t);

}

});
sc.close();
}
flatmap:

private static void flatMap(){
SparkConf conf =new SparkConf().setAppName("flatMap").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<String> linelist = Arrays.asList("hello you","hello me","hello world");
JavaRDD<String> lines = sc.parallelize(linelist);
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String,String>() {

@Override
public Iterator<String> call(String t) throws Exception {

return Arrays.asList(t.split(" ")).iterator();
}
});
words.foreach(new VoidFunction<String>() {

/**
*
*/
private static final long serialVersionUID = 1L;

@Override
public void call(String t) throws Exception {
System.out.println(t);

}

});
sc.close();
}groupByKey:
private static void groupByKey(){
SparkConf conf =new SparkConf().setAppName("groupByKey").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Tuple2<String, Integer>> scoreList = Arrays.asList(
new Tuple2<String, Integer>("class1",80),
new Tuple2<String, Integer>("class2",75),
new Tuple2<String, Integer>("class1",90),
new Tuple2<String, Integer>("class2",65));
JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);
JavaPairRDD<String, Iterable<Integer>> groupScorese=scores.groupByKey();
groupScorese.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {

/**
*
*/
private static final long serialVersionUID = 1L;

@Override
public void call(Tuple2<String, Iterable<Integer>> t)
throws Exception {
System.out.println("class: " + t._1);
Iterator<Integer> ite =t._2.iterator();
while(ite.hasNext()){
System.out.println(ite.next());
}
System.out.println("=====================================");

}

});

sc.close();
}
reduceByKey:
private static void reduceByKey(){
SparkConf conf =new SparkConf().setAppName("reduceByKey").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Tuple2<String, Integer>> scoreList = Arrays.asList(
new Tuple2<String, Integer>("class1",80),
new Tuple2<String, Integer>("class2",75),
new Tuple2<String, Integer>("class1",90),
new Tuple2<String, Integer>("class2",65));
JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);
JavaPairRDD<String, Integer> reduceScorese=scores.reduceByKey(new Function2<Integer, Integer, Integer>(){

@Override
public Integer call(Integer arg0, Integer arg1) throws Exception {
// TODO Auto-generated method stub
return arg0+arg1;
}});
reduceScorese.foreach(new VoidFunction<Tuple2<String,Integer>>() {

@Override
public void call(Tuple2<String, Integer> t) throws Exception {
System.out.println(t._1+":"+t._2);

}
});

sc.close();
}sortByKey:
private static void sortByKey(){
SparkConf conf =new SparkConf().setAppName("reduceByKey").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Tuple2<Integer, String>> scoreList = Arrays.asList(
new Tuple2<Integer, String>(65,"ieo"),
new Tuple2<Integer, String>(50,"tom"),
new Tuple2<Integer, String>(100,"marry"),
new Tuple2<Integer, String>(80,"jack"));

JavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList);
JavaPairRDD<Integer,String> sortScores =scores.sortByKey(false);
sortScores.foreach(new VoidFunction<Tuple2<Integer,String>>(){

@Override
public void call(Tuple2<Integer, String> t) throws Exception {
System.out.println(t._1+ ": "+t._2);

}

});
sc.close();
}join:
private static void join(){
SparkConf conf =new SparkConf().setAppName("reduceByKey").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Tuple2<Integer,String>> studentList =Arrays.asList(
new Tuple2<Integer,String>(1, "leo"),
new Tuple2<Integer,String>(2, "jack"),
new Tuple2<Integer,String>(3, "tom"));
List<Tuple2<Integer,Integer>> scoreList =Arrays.asList(
new Tuple2<Integer,Integer>(1, 100),
new Tuple2<Integer,Integer>(2, 90),
new Tuple2<Integer,Integer>(3, 60));
JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);
JavaPairRDD<Integer, Tuple2<String, Integer>> studentScores = students.join(scores);
studentScores.foreach(new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() {

@Override
public void call(Tuple2<Integer, Tuple2<String, Integer>> t)
throws Exception {
System.out.println(t._1+":"+t._2._1+":"+t._2._2);

}
});

sc.close();
}cogroup:
private static void cogroup(){
SparkConf conf =new SparkConf().setAppName("reduceByKey").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Tuple2<Integer,String>> studentList =Arrays.asList(
new Tuple2<Integer,String>(1, "leo"),
new Tuple2<Integer,String>(2, "jack"),
new Tuple2<Integer,String>(3, "tom"));
List<Tuple2<Integer,Integer>> scoreList =Arrays.asList(
new Tuple2<Integer,Integer>(1, 100),
new Tuple2<Integer,Integer>(2, 90),
new Tuple2<Integer,Integer>(3, 60),
new Tuple2<Integer,Integer>(1, 70),
new Tuple2<Integer,Integer>(2, 80),
new Tuple2<Integer,Integer>(3, 50));
JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
JavaPairRDD<Integer, Integer> scores = sc.parallelizePai
4000
rs(scoreList);
JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> studentScores = students.cogroup(scores);
studentScores.foreach(new VoidFunction<Tuple2<Integer,Tuple2<Iterable<String>,Iterable<Integer>>>>() {

@Override
public void call(
Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t)
throws Exception {
System.out.println(t._1+":"+t._2._1+":"+t._2._2);

}
}

);

sc.close();
}
全文件:

package cn.spark.study.core;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

public class TransformationOperation {
public static void main(String[] args) {
// map();
// filter();
// flatMap();
// groupByKey();
// reduceByKey();
// sortByKey();
// join();
cogroup();
}

private static void map(){
SparkConf conf =new SparkConf().setAppName("map").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Integer> numbers = Arrays.asList(1,2,3,4,5);
JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer,Integer>(){

/**
*
*/
private static final long serialVersionUID = 1L;

@Override
public Integer call(Integer v1) throws Exception {
// TODO Auto-generated method stub
return v1*2;
}
});
multipleNumberRDD.foreach(new VoidFunction<Integer>() {

/**
*
*/
private static final long serialVersionUID = 1L;

@Override
public void call(Integer t) throws Exception {
System.out.println(t);

}
});
sc.close();
}
private static void filter(){
SparkConf conf =new SparkConf().setAppName("map").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Integer> numbers =Arrays.asList(1,2,3,4,5,6,7,8,9,10);
JavaRDD<Integer> numberRDD =sc.parallelize(numbers);
JavaRDD<Integer> evennumberRDD= numberRDD.filter(new Function<Integer, Boolean>() {

/**
*
*/
private static final long serialVersionUID = 1L;

@Override
public Boolean call(Integer v1) throws Exception {
// TODO Auto-generated method stub
return v1%2==0;
}
});
evennumberRDD.foreach(new VoidFunction<Integer>() {

/**
*
*/
private static final long serialVersionUID = 1L;

@Override
public void call(Integer t) throws Exception {
System.out.println(t);

}

});
sc.close();
}
private static void flatMap(){
SparkConf conf =new SparkConf().setAppName("flatMap").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<String> linelist = Arrays.asList("hello you","hello me","hello world");
JavaRDD<String> lines = sc.parallelize(linelist);
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String,String>() {

@Override
public Iterator<String> call(String t) throws Exception {

return Arrays.asList(t.split(" ")).iterator();
}
});
words.foreach(new VoidFunction<String>() {

/**
*
*/
private static final long serialVersionUID = 1L;

@Override
public void call(String t) throws Exception {
System.out.println(t);

}

});
sc.close();
}
private static void groupByKey(){ SparkConf conf =new SparkConf().setAppName("groupByKey").setMaster("local"); JavaSparkContext sc =new JavaSparkContext(conf); List<Tuple2<String, Integer>> scoreList = Arrays.asList( new Tuple2<String, Integer>("class1",80), new Tuple2<String, Integer>("class2",75), new Tuple2<String, Integer>("class1",90), new Tuple2<String, Integer>("class2",65)); JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList); JavaPairRDD<String, Iterable<Integer>> groupScorese=scores.groupByKey(); groupScorese.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() { /** * */ private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Iterable<Integer>> t) throws Exception { System.out.println("class: " + t._1); Iterator<Integer> ite =t._2.iterator(); while(ite.hasNext()){ System.out.println(ite.next()); } System.out.println("====================================="); } }); sc.close(); }
private static void reduceByKey(){
SparkConf conf =new SparkConf().setAppName("reduceByKey").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Tuple2<String, Integer>> scoreList = Arrays.asList(
new Tuple2<String, Integer>("class1",80),
new Tuple2<String, Integer>("class2",75),
new Tuple2<String, Integer>("class1",90),
new Tuple2<String, Integer>("class2",65));
JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);
JavaPairRDD<String, Integer> reduceScorese=scores.reduceByKey(new Function2<Integer, Integer, Integer>(){

@Override
public Integer call(Integer arg0, Integer arg1) throws Exception {
// TODO Auto-generated method stub
return arg0+arg1;
}});
reduceScorese.foreach(new VoidFunction<Tuple2<String,Integer>>() {

@Override
public void call(Tuple2<String, Integer> t) throws Exception {
System.out.println(t._1+":"+t._2);

}
});

sc.close();
}
private static void sortByKey(){
SparkConf conf =new SparkConf().setAppName("reduceByKey").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Tuple2<Integer, String>> scoreList = Arrays.asList(
new Tuple2<Integer, String>(65,"ieo"),
new Tuple2<Integer, String>(50,"tom"),
new Tuple2<Integer, String>(100,"marry"),
new Tuple2<Integer, String>(80,"jack"));

JavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList);
JavaPairRDD<Integer,String> sortScores =scores.sortByKey(false);
sortScores.foreach(new VoidFunction<Tuple2<Integer,String>>(){

@Override
public void call(Tuple2<Integer, String> t) throws Exception {
System.out.println(t._1+ ": "+t._2);

}

});
sc.close();
}
private static void join(){
SparkConf conf =new SparkConf().setAppName("reduceByKey").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Tuple2<Integer,String>> studentList =Arrays.asList(
new Tuple2<Integer,String>(1, "leo"),
new Tuple2<Integer,String>(2, "jack"),
new Tuple2<Integer,String>(3, "tom"));
List<Tuple2<Integer,Integer>> scoreList =Arrays.asList(
new Tuple2<Integer,Integer>(1, 100),
new Tuple2<Integer,Integer>(2, 90),
new Tuple2<Integer,Integer>(3, 60));
JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);
JavaPairRDD<Integer, Tuple2<String, Integer>> studentScores = students.join(scores);
studentScores.foreach(new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() {

@Override
public void call(Tuple2<Integer, Tuple2<String, Integer>> t)
throws Exception {
System.out.println(t._1+":"+t._2._1+":"+t._2._2);

}
});

sc.close();
}
private static void cogroup(){
SparkConf conf =new SparkConf().setAppName("reduceByKey").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Tuple2<Integer,String>> studentList =Arrays.asList(
new Tuple2<Integer,String>(1, "leo"),
new Tuple2<Integer,String>(2, "jack"),
new Tuple2<Integer,String>(3, "tom"));
List<Tuple2<Integer,Integer>> scoreList =Arrays.asList(
new Tuple2<Integer,Integer>(1, 100),
new Tuple2<Integer,Integer>(2, 90),
new Tuple2<Integer,Integer>(3, 60),
new Tuple2<Integer,Integer>(1, 70),
new Tuple2<Integer,Integer>(2, 80),
new Tuple2<Integer,Integer>(3, 50));
JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);
JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> studentScores = students.cogroup(scores);
studentScores.foreach(new VoidFunction<Tuple2<Integer,Tuple2<Iterable<String>,Iterable<Integer>>>>() {

@Override
public void call(
Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t)
throws Exception {
System.out.println(t._1+":"+t._2._1+":"+t._2._2);

}
}

);

sc.close();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息