您的位置:首页 > 大数据 > 人工智能

Spark算子[04]:map,flatMap,mapToPair,flatMapToPair

2017-12-07 17:19 495 查看
input:spark.txt

hadoop hive spark flume
hdfs spark zookeeper storm
flume hue flume hdfs
spark hive hdfs spark


map

scala版本:

scala> val lines = sc.textFile("/spark.txt")
scala> val words = lines.flatMap(line => line.split(" "))
scala> words.collect


结果:

Array[Array[String]] = Array(Array(hadoop, hive, spark, flume), Array(hdfs, spark, zookeeper, storm), Array(flume, hue, flume, hdfs), Array(spark, hive, hdfs, spark))

java版本:

JavaRDD<String> lines = sc.textFile("C:\\Users\\chenhaolin\\Desktop\\spark.txt");
JavaRDD<String[]> words = lines.map(new Function<String, String[]>() {
@Override
public String[] call(String v1) throws Exception {
return v1.split(" ");
}
});


flatMap

scala版本:

scala> val lines = sc.textFile("/spark.txt")
scala> val words = lines.flatMap(line => line.split(" "))
scala> words.collect


结果:

Array[String] = Array(hadoop, hive, spark, flume, hdfs, spark, zookeeper, storm, flume, hue, flume, hdfs, spark, hive, hdfs, spark)

java版本:

JavaRDD<String> lines = sc.textFile("C:\\Users\\chenhaolin\\Desktop\\spark.txt");
JavaRDD<String>  words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});


mapToPairs

scala版本

scala是没有mapToPair函数的,scala版本只需要map就可以了

scala> val lines = sc.textFile("/spark.txt")
scala> val pairs = lines.flatMap(line => line.split(" ")).map(word => (word,1))
scala> words.collect


结果:

Array[(String, Int)] = Array((hadoop,1), (hive,1), (spark,1), (flume,1), (hdfs,1), (spark,1), (zookeeper,1), (storm,1), (flume,1), (hue,1), (flume,1), (hdfs,1), (spark,1), (hive,1), (

hdfs,1), (spark,1))

java版本1

JavaRDD<String> lines = sc.textFile("C:\\Users\\chenhaolin\\Desktop\\spark.txt");

JavaRDD<String>  words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});

JavaPairRDD<String,Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s,1);
}
});


java版本2

JavaRDD<String> lines = sc.textFile("C:\\Users\\chenhaolin\\Desktop\\spark.txt");

JavaPairRDD<String,Integer> pair1 = lines
.flatMap(line ->Arrays.asList(line.split(" ")).iterator())
.mapToPair(x -> new Tuple2<String, Integer>(x, 1));


flatMapToPairs

scala版本

scala是没有flatMapToPairs函数的,scala版本只需要map就可以了,实现同上mapToPairs

java版本1

JavaPairRDD<String,Integer> pair = lines.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
@Override
public Iterator<Tuple2<String, Integer>> call(String s) throws Exception {
String[] strs = s.split("\\\t");
ArrayList<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
for (String str : strs) {
Tuple2<String, Integer> tuple2 = new Tuple2<String, Integer>(str, 1);
list.add(tuple2);
}
return list.iterator();
}
});


java版本2

JavaPairRDD<String,Integer> pair = lines.flatMapToPair(line ->{
String[] words = line.split(" ");
ArrayList<Tuple2<String,Integer>> list = new ArrayList<Tuple2<String, Integer>>();
for(String word : words){
list.add(new Tuple2<String,Integer>(word,1));
}
return list.iterator();
});
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: