spark 2.2.0 各个计算因子的使用方法 java版
2018-01-12 17:36
459 查看
map:
private static void map(){
SparkConf conf =new SparkConf().setAppName("map").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Integer> numbers = Arrays.asList(1,2,3,4,5);
JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer,Integer>(){
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1) throws Exception {
// TODO Auto-generated method stub
return v1*2;
}
});
multipleNumberRDD.foreach(new VoidFunction<Integer>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void call(Integer t) throws Exception {
System.out.println(t);
}
});
sc.close();
}
filterprivate static void filter(){
SparkConf conf =new SparkConf().setAppName("map").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Integer> numbers =Arrays.asList(1,2,3,4,5,6,7,8,9,10);
JavaRDD<Integer> numberRDD =sc.parallelize(numbers);
JavaRDD<Integer> evennumberRDD= numberRDD.filter(new Function<Integer, Boolean>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Integer v1) throws Exception {
// TODO Auto-generated method stub
return v1%2==0;
}
});
evennumberRDD.foreach(new VoidFunction<Integer>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void call(Integer t) throws Exception {
System.out.println(t);
}
});
sc.close();
}
flatmap:
private static void flatMap(){
SparkConf conf =new SparkConf().setAppName("flatMap").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<String> linelist = Arrays.asList("hello you","hello me","hello world");
JavaRDD<String> lines = sc.parallelize(linelist);
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String,String>() {
@Override
public Iterator<String> call(String t) throws Exception {
return Arrays.asList(t.split(" ")).iterator();
}
});
words.foreach(new VoidFunction<String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void call(String t) throws Exception {
System.out.println(t);
}
});
sc.close();
}groupByKey:
private static void reduceByKey(){
SparkConf conf =new SparkConf().setAppName("reduceByKey").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Tuple2<String, Integer>> scoreList = Arrays.asList(
new Tuple2<String, Integer>("class1",80),
new Tuple2<String, Integer>("class2",75),
new Tuple2<String, Integer>("class1",90),
new Tuple2<String, Integer>("class2",65));
JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);
JavaPairRDD<String, Integer> reduceScorese=scores.reduceByKey(new Function2<Integer, Integer, Integer>(){
@Override
public Integer call(Integer arg0, Integer arg1) throws Exception {
// TODO Auto-generated method stub
return arg0+arg1;
}});
reduceScorese.foreach(new VoidFunction<Tuple2<String,Integer>>() {
@Override
public void call(Tuple2<String, Integer> t) throws Exception {
System.out.println(t._1+":"+t._2);
}
});
sc.close();
}sortByKey:
private static void sortByKey(){
SparkConf conf =new SparkConf().setAppName("reduceByKey").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Tuple2<Integer, String>> scoreList = Arrays.asList(
new Tuple2<Integer, String>(65,"ieo"),
new Tuple2<Integer, String>(50,"tom"),
new Tuple2<Integer, String>(100,"marry"),
new Tuple2<Integer, String>(80,"jack"));
JavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList);
JavaPairRDD<Integer,String> sortScores =scores.sortByKey(false);
sortScores.foreach(new VoidFunction<Tuple2<Integer,String>>(){
@Override
public void call(Tuple2<Integer, String> t) throws Exception {
System.out.println(t._1+ ": "+t._2);
}
});
sc.close();
}join:
private static void join(){
SparkConf conf =new SparkConf().setAppName("reduceByKey").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Tuple2<Integer,String>> studentList =Arrays.asList(
new Tuple2<Integer,String>(1, "leo"),
new Tuple2<Integer,String>(2, "jack"),
new Tuple2<Integer,String>(3, "tom"));
List<Tuple2<Integer,Integer>> scoreList =Arrays.asList(
new Tuple2<Integer,Integer>(1, 100),
new Tuple2<Integer,Integer>(2, 90),
new Tuple2<Integer,Integer>(3, 60));
JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);
JavaPairRDD<Integer, Tuple2<String, Integer>> studentScores = students.join(scores);
studentScores.foreach(new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() {
@Override
public void call(Tuple2<Integer, Tuple2<String, Integer>> t)
throws Exception {
System.out.println(t._1+":"+t._2._1+":"+t._2._2);
}
});
sc.close();
}cogroup:
package cn.spark.study.core;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
public class TransformationOperation {
public static void main(String[] args) {
// map();
// filter();
// flatMap();
// groupByKey();
// reduceByKey();
// sortByKey();
// join();
cogroup();
}
private static void map(){
SparkConf conf =new SparkConf().setAppName("map").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Integer> numbers = Arrays.asList(1,2,3,4,5);
JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer,Integer>(){
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1) throws Exception {
// TODO Auto-generated method stub
return v1*2;
}
});
multipleNumberRDD.foreach(new VoidFunction<Integer>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void call(Integer t) throws Exception {
System.out.println(t);
}
});
sc.close();
}
private static void filter(){
SparkConf conf =new SparkConf().setAppName("map").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Integer> numbers =Arrays.asList(1,2,3,4,5,6,7,8,9,10);
JavaRDD<Integer> numberRDD =sc.parallelize(numbers);
JavaRDD<Integer> evennumberRDD= numberRDD.filter(new Function<Integer, Boolean>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Integer v1) throws Exception {
// TODO Auto-generated method stub
return v1%2==0;
}
});
evennumberRDD.foreach(new VoidFunction<Integer>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void call(Integer t) throws Exception {
System.out.println(t);
}
});
sc.close();
}
private static void flatMap(){
SparkConf conf =new SparkConf().setAppName("flatMap").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<String> linelist = Arrays.asList("hello you","hello me","hello world");
JavaRDD<String> lines = sc.parallelize(linelist);
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String,String>() {
@Override
public Iterator<String> call(String t) throws Exception {
return Arrays.asList(t.split(" ")).iterator();
}
});
words.foreach(new VoidFunction<String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void call(String t) throws Exception {
System.out.println(t);
}
});
sc.close();
}
private static void groupByKey(){ SparkConf conf =new SparkConf().setAppName("groupByKey").setMaster("local"); JavaSparkContext sc =new JavaSparkContext(conf); List<Tuple2<String, Integer>> scoreList = Arrays.asList( new Tuple2<String, Integer>("class1",80), new Tuple2<String, Integer>("class2",75), new Tuple2<String, Integer>("class1",90), new Tuple2<String, Integer>("class2",65)); JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList); JavaPairRDD<String, Iterable<Integer>> groupScorese=scores.groupByKey(); groupScorese.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() { /** * */ private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Iterable<Integer>> t) throws Exception { System.out.println("class: " + t._1); Iterator<Integer> ite =t._2.iterator(); while(ite.hasNext()){ System.out.println(ite.next()); } System.out.println("====================================="); } }); sc.close(); }
private static void reduceByKey(){
SparkConf conf =new SparkConf().setAppName("reduceByKey").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Tuple2<String, Integer>> scoreList = Arrays.asList(
new Tuple2<String, Integer>("class1",80),
new Tuple2<String, Integer>("class2",75),
new Tuple2<String, Integer>("class1",90),
new Tuple2<String, Integer>("class2",65));
JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);
JavaPairRDD<String, Integer> reduceScorese=scores.reduceByKey(new Function2<Integer, Integer, Integer>(){
@Override
public Integer call(Integer arg0, Integer arg1) throws Exception {
// TODO Auto-generated method stub
return arg0+arg1;
}});
reduceScorese.foreach(new VoidFunction<Tuple2<String,Integer>>() {
@Override
public void call(Tuple2<String, Integer> t) throws Exception {
System.out.println(t._1+":"+t._2);
}
});
sc.close();
}
private static void sortByKey(){
SparkConf conf =new SparkConf().setAppName("reduceByKey").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Tuple2<Integer, String>> scoreList = Arrays.asList(
new Tuple2<Integer, String>(65,"ieo"),
new Tuple2<Integer, String>(50,"tom"),
new Tuple2<Integer, String>(100,"marry"),
new Tuple2<Integer, String>(80,"jack"));
JavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList);
JavaPairRDD<Integer,String> sortScores =scores.sortByKey(false);
sortScores.foreach(new VoidFunction<Tuple2<Integer,String>>(){
@Override
public void call(Tuple2<Integer, String> t) throws Exception {
System.out.println(t._1+ ": "+t._2);
}
});
sc.close();
}
private static void join(){
SparkConf conf =new SparkConf().setAppName("reduceByKey").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Tuple2<Integer,String>> studentList =Arrays.asList(
new Tuple2<Integer,String>(1, "leo"),
new Tuple2<Integer,String>(2, "jack"),
new Tuple2<Integer,String>(3, "tom"));
List<Tuple2<Integer,Integer>> scoreList =Arrays.asList(
new Tuple2<Integer,Integer>(1, 100),
new Tuple2<Integer,Integer>(2, 90),
new Tuple2<Integer,Integer>(3, 60));
JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);
JavaPairRDD<Integer, Tuple2<String, Integer>> studentScores = students.join(scores);
studentScores.foreach(new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() {
@Override
public void call(Tuple2<Integer, Tuple2<String, Integer>> t)
throws Exception {
System.out.println(t._1+":"+t._2._1+":"+t._2._2);
}
});
sc.close();
}
private static void cogroup(){
SparkConf conf =new SparkConf().setAppName("reduceByKey").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Tuple2<Integer,String>> studentList =Arrays.asList(
new Tuple2<Integer,String>(1, "leo"),
new Tuple2<Integer,String>(2, "jack"),
new Tuple2<Integer,String>(3, "tom"));
List<Tuple2<Integer,Integer>> scoreList =Arrays.asList(
new Tuple2<Integer,Integer>(1, 100),
new Tuple2<Integer,Integer>(2, 90),
new Tuple2<Integer,Integer>(3, 60),
new Tuple2<Integer,Integer>(1, 70),
new Tuple2<Integer,Integer>(2, 80),
new Tuple2<Integer,Integer>(3, 50));
JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);
JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> studentScores = students.cogroup(scores);
studentScores.foreach(new VoidFunction<Tuple2<Integer,Tuple2<Iterable<String>,Iterable<Integer>>>>() {
@Override
public void call(
Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t)
throws Exception {
System.out.println(t._1+":"+t._2._1+":"+t._2._2);
}
}
);
sc.close();
}
}
private static void map(){
SparkConf conf =new SparkConf().setAppName("map").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Integer> numbers = Arrays.asList(1,2,3,4,5);
JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer,Integer>(){
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1) throws Exception {
// TODO Auto-generated method stub
return v1*2;
}
});
multipleNumberRDD.foreach(new VoidFunction<Integer>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void call(Integer t) throws Exception {
System.out.println(t);
}
});
sc.close();
}
filterprivate static void filter(){
SparkConf conf =new SparkConf().setAppName("map").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Integer> numbers =Arrays.asList(1,2,3,4,5,6,7,8,9,10);
JavaRDD<Integer> numberRDD =sc.parallelize(numbers);
JavaRDD<Integer> evennumberRDD= numberRDD.filter(new Function<Integer, Boolean>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Integer v1) throws Exception {
// TODO Auto-generated method stub
return v1%2==0;
}
});
evennumberRDD.foreach(new VoidFunction<Integer>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void call(Integer t) throws Exception {
System.out.println(t);
}
});
sc.close();
}
flatmap:
private static void flatMap(){
SparkConf conf =new SparkConf().setAppName("flatMap").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<String> linelist = Arrays.asList("hello you","hello me","hello world");
JavaRDD<String> lines = sc.parallelize(linelist);
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String,String>() {
@Override
public Iterator<String> call(String t) throws Exception {
return Arrays.asList(t.split(" ")).iterator();
}
});
words.foreach(new VoidFunction<String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void call(String t) throws Exception {
System.out.println(t);
}
});
sc.close();
}groupByKey:
private static void groupByKey(){ SparkConf conf =new SparkConf().setAppName("groupByKey").setMaster("local"); JavaSparkContext sc =new JavaSparkContext(conf); List<Tuple2<String, Integer>> scoreList = Arrays.asList( new Tuple2<String, Integer>("class1",80), new Tuple2<String, Integer>("class2",75), new Tuple2<String, Integer>("class1",90), new Tuple2<String, Integer>("class2",65)); JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList); JavaPairRDD<String, Iterable<Integer>> groupScorese=scores.groupByKey(); groupScorese.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() { /** * */ private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Iterable<Integer>> t) throws Exception { System.out.println("class: " + t._1); Iterator<Integer> ite =t._2.iterator(); while(ite.hasNext()){ System.out.println(ite.next()); } System.out.println("====================================="); } }); sc.close(); }reduceByKey:
private static void reduceByKey(){
SparkConf conf =new SparkConf().setAppName("reduceByKey").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Tuple2<String, Integer>> scoreList = Arrays.asList(
new Tuple2<String, Integer>("class1",80),
new Tuple2<String, Integer>("class2",75),
new Tuple2<String, Integer>("class1",90),
new Tuple2<String, Integer>("class2",65));
JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);
JavaPairRDD<String, Integer> reduceScorese=scores.reduceByKey(new Function2<Integer, Integer, Integer>(){
@Override
public Integer call(Integer arg0, Integer arg1) throws Exception {
// TODO Auto-generated method stub
return arg0+arg1;
}});
reduceScorese.foreach(new VoidFunction<Tuple2<String,Integer>>() {
@Override
public void call(Tuple2<String, Integer> t) throws Exception {
System.out.println(t._1+":"+t._2);
}
});
sc.close();
}sortByKey:
private static void sortByKey(){
SparkConf conf =new SparkConf().setAppName("reduceByKey").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Tuple2<Integer, String>> scoreList = Arrays.asList(
new Tuple2<Integer, String>(65,"ieo"),
new Tuple2<Integer, String>(50,"tom"),
new Tuple2<Integer, String>(100,"marry"),
new Tuple2<Integer, String>(80,"jack"));
JavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList);
JavaPairRDD<Integer,String> sortScores =scores.sortByKey(false);
sortScores.foreach(new VoidFunction<Tuple2<Integer,String>>(){
@Override
public void call(Tuple2<Integer, String> t) throws Exception {
System.out.println(t._1+ ": "+t._2);
}
});
sc.close();
}join:
private static void join(){
SparkConf conf =new SparkConf().setAppName("reduceByKey").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Tuple2<Integer,String>> studentList =Arrays.asList(
new Tuple2<Integer,String>(1, "leo"),
new Tuple2<Integer,String>(2, "jack"),
new Tuple2<Integer,String>(3, "tom"));
List<Tuple2<Integer,Integer>> scoreList =Arrays.asList(
new Tuple2<Integer,Integer>(1, 100),
new Tuple2<Integer,Integer>(2, 90),
new Tuple2<Integer,Integer>(3, 60));
JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);
JavaPairRDD<Integer, Tuple2<String, Integer>> studentScores = students.join(scores);
studentScores.foreach(new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() {
@Override
public void call(Tuple2<Integer, Tuple2<String, Integer>> t)
throws Exception {
System.out.println(t._1+":"+t._2._1+":"+t._2._2);
}
});
sc.close();
}cogroup:
private static void cogroup(){ SparkConf conf =new SparkConf().setAppName("reduceByKey").setMaster("local"); JavaSparkContext sc =new JavaSparkContext(conf); List<Tuple2<Integer,String>> studentList =Arrays.asList( new Tuple2<Integer,String>(1, "leo"), new Tuple2<Integer,String>(2, "jack"), new Tuple2<Integer,String>(3, "tom")); List<Tuple2<Integer,Integer>> scoreList =Arrays.asList( new Tuple2<Integer,Integer>(1, 100), new Tuple2<Integer,Integer>(2, 90), new Tuple2<Integer,Integer>(3, 60), new Tuple2<Integer,Integer>(1, 70), new Tuple2<Integer,Integer>(2, 80), new Tuple2<Integer,Integer>(3, 50)); JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList); JavaPairRDD<Integer, Integer> scores = sc.parallelizePai 4000 rs(scoreList); JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> studentScores = students.cogroup(scores); studentScores.foreach(new VoidFunction<Tuple2<Integer,Tuple2<Iterable<String>,Iterable<Integer>>>>() { @Override public void call( Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t) throws Exception { System.out.println(t._1+":"+t._2._1+":"+t._2._2); } } ); sc.close(); }全文件:
package cn.spark.study.core;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
public class TransformationOperation {
public static void main(String[] args) {
// map();
// filter();
// flatMap();
// groupByKey();
// reduceByKey();
// sortByKey();
// join();
cogroup();
}
private static void map(){
SparkConf conf =new SparkConf().setAppName("map").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Integer> numbers = Arrays.asList(1,2,3,4,5);
JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer,Integer>(){
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1) throws Exception {
// TODO Auto-generated method stub
return v1*2;
}
});
multipleNumberRDD.foreach(new VoidFunction<Integer>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void call(Integer t) throws Exception {
System.out.println(t);
}
});
sc.close();
}
private static void filter(){
SparkConf conf =new SparkConf().setAppName("map").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Integer> numbers =Arrays.asList(1,2,3,4,5,6,7,8,9,10);
JavaRDD<Integer> numberRDD =sc.parallelize(numbers);
JavaRDD<Integer> evennumberRDD= numberRDD.filter(new Function<Integer, Boolean>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Integer v1) throws Exception {
// TODO Auto-generated method stub
return v1%2==0;
}
});
evennumberRDD.foreach(new VoidFunction<Integer>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void call(Integer t) throws Exception {
System.out.println(t);
}
});
sc.close();
}
private static void flatMap(){
SparkConf conf =new SparkConf().setAppName("flatMap").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<String> linelist = Arrays.asList("hello you","hello me","hello world");
JavaRDD<String> lines = sc.parallelize(linelist);
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String,String>() {
@Override
public Iterator<String> call(String t) throws Exception {
return Arrays.asList(t.split(" ")).iterator();
}
});
words.foreach(new VoidFunction<String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void call(String t) throws Exception {
System.out.println(t);
}
});
sc.close();
}
private static void groupByKey(){ SparkConf conf =new SparkConf().setAppName("groupByKey").setMaster("local"); JavaSparkContext sc =new JavaSparkContext(conf); List<Tuple2<String, Integer>> scoreList = Arrays.asList( new Tuple2<String, Integer>("class1",80), new Tuple2<String, Integer>("class2",75), new Tuple2<String, Integer>("class1",90), new Tuple2<String, Integer>("class2",65)); JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList); JavaPairRDD<String, Iterable<Integer>> groupScorese=scores.groupByKey(); groupScorese.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() { /** * */ private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Iterable<Integer>> t) throws Exception { System.out.println("class: " + t._1); Iterator<Integer> ite =t._2.iterator(); while(ite.hasNext()){ System.out.println(ite.next()); } System.out.println("====================================="); } }); sc.close(); }
private static void reduceByKey(){
SparkConf conf =new SparkConf().setAppName("reduceByKey").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Tuple2<String, Integer>> scoreList = Arrays.asList(
new Tuple2<String, Integer>("class1",80),
new Tuple2<String, Integer>("class2",75),
new Tuple2<String, Integer>("class1",90),
new Tuple2<String, Integer>("class2",65));
JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);
JavaPairRDD<String, Integer> reduceScorese=scores.reduceByKey(new Function2<Integer, Integer, Integer>(){
@Override
public Integer call(Integer arg0, Integer arg1) throws Exception {
// TODO Auto-generated method stub
return arg0+arg1;
}});
reduceScorese.foreach(new VoidFunction<Tuple2<String,Integer>>() {
@Override
public void call(Tuple2<String, Integer> t) throws Exception {
System.out.println(t._1+":"+t._2);
}
});
sc.close();
}
private static void sortByKey(){
SparkConf conf =new SparkConf().setAppName("reduceByKey").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Tuple2<Integer, String>> scoreList = Arrays.asList(
new Tuple2<Integer, String>(65,"ieo"),
new Tuple2<Integer, String>(50,"tom"),
new Tuple2<Integer, String>(100,"marry"),
new Tuple2<Integer, String>(80,"jack"));
JavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList);
JavaPairRDD<Integer,String> sortScores =scores.sortByKey(false);
sortScores.foreach(new VoidFunction<Tuple2<Integer,String>>(){
@Override
public void call(Tuple2<Integer, String> t) throws Exception {
System.out.println(t._1+ ": "+t._2);
}
});
sc.close();
}
private static void join(){
SparkConf conf =new SparkConf().setAppName("reduceByKey").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Tuple2<Integer,String>> studentList =Arrays.asList(
new Tuple2<Integer,String>(1, "leo"),
new Tuple2<Integer,String>(2, "jack"),
new Tuple2<Integer,String>(3, "tom"));
List<Tuple2<Integer,Integer>> scoreList =Arrays.asList(
new Tuple2<Integer,Integer>(1, 100),
new Tuple2<Integer,Integer>(2, 90),
new Tuple2<Integer,Integer>(3, 60));
JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);
JavaPairRDD<Integer, Tuple2<String, Integer>> studentScores = students.join(scores);
studentScores.foreach(new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() {
@Override
public void call(Tuple2<Integer, Tuple2<String, Integer>> t)
throws Exception {
System.out.println(t._1+":"+t._2._1+":"+t._2._2);
}
});
sc.close();
}
private static void cogroup(){
SparkConf conf =new SparkConf().setAppName("reduceByKey").setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
List<Tuple2<Integer,String>> studentList =Arrays.asList(
new Tuple2<Integer,String>(1, "leo"),
new Tuple2<Integer,String>(2, "jack"),
new Tuple2<Integer,String>(3, "tom"));
List<Tuple2<Integer,Integer>> scoreList =Arrays.asList(
new Tuple2<Integer,Integer>(1, 100),
new Tuple2<Integer,Integer>(2, 90),
new Tuple2<Integer,Integer>(3, 60),
new Tuple2<Integer,Integer>(1, 70),
new Tuple2<Integer,Integer>(2, 80),
new Tuple2<Integer,Integer>(3, 50));
JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);
JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> studentScores = students.cogroup(scores);
studentScores.foreach(new VoidFunction<Tuple2<Integer,Tuple2<Iterable<String>,Iterable<Integer>>>>() {
@Override
public void call(
Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t)
throws Exception {
System.out.println(t._1+":"+t._2._1+":"+t._2._2);
}
}
);
sc.close();
}
}
相关文章推荐
- spark 2.2.0 各个计算因子的使用方法 python版
- spark 2.2.0 accumulator使用方法 java版 python版
- spark 2.2.0 共享变量使用方法 java版
- Java使用Math.random()结合蒙特卡洛方法计算pi值示例
- 标准计算答案(java)-使用方法
- javacpp-opencv图像处理3:使用opencv原生方法遍历摄像头设备及调用(增加实时帧率计算方法)
- Java中使用LocalDate根据日期来计算年龄的实现方法
- BigDecimal 使用方法详解(Java高精度计算)
- spark 2.2.0 共享变量使用方法 python版
- JAVA 正则表达式:使用group方法计算匹配到的字符串个数
- 使用Java Math.random()利用蒙特卡洛方法计算pi值
- Java 基于Spring、MyBatis使用HashMap嵌套列表统计不同国家、指定类型船舶指定时间段在某区域进出量计算方法
- Java入门学习-学会使用日期函数和日期的计算方法,理解什么是时间戳
- javacpp-opencv图像处理3:使用opencv原生方法遍历摄像头设备及调用(增加实时帧率计算方法)
- Java使用Oracle遇到的最大游标超出问题及其解决方法
- 使用handleEvent()方法,action()方法和mouseDown()方法处理事件的JAVA Application程序.
- 计算Java日期--学习怎样创建和使用日期
- Java Excel API 使用方法
- java 反编译工具jad使用方法
- C# 中使用 MD5 算法计算 hash (哈希)值的四种方法