SparkStreaming实现HDFS的wordCount(java版)
2017-06-05 15:36
579 查看
利用sparkstreaming实现hdfs文件系统中的某个目录下的wordcount
代码如下:
在运行程序前先启动hdfs,spark,这里不再赘述
test-data.txt的内容为:
hadoop hadoop
hadoop1 hadoop1 hadoop1
hadoop2 hadoop2 hadoop2 hadoop2
hadoop3
spark spark
spark
spark1
先运行程序,然后再将测试文件test-data.txt传到hdfs中的对应的sparkTest目录下,这是可以看到结果如下:
代码如下:
package sparkTestJava; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; public class HDFSWordCount { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("wordcount").setMaster("local[2]"); JavaStreamingContext jssc = new JavaStreamingContext(conf,Durations.seconds(5)); JavaDStream<String> lines = jssc.textFileStream("hdfs://hadoop:9000/sparkTest"); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>(){ private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>(){ private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); JavaPairDStream<String, Integer> wordcounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>(){ private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); wordcounts.print(); jssc.start(); jssc.awaitTermination(); jssc.stop(); jssc.close(); } }
在运行程序前先启动hdfs,spark,这里不再赘述
test-data.txt的内容为:
hadoop hadoop
hadoop1 hadoop1 hadoop1
hadoop2 hadoop2 hadoop2 hadoop2
hadoop3
spark spark
spark
spark1
先运行程序,然后再将测试文件test-data.txt传到hdfs中的对应的sparkTest目录下,这是可以看到结果如下:
相关文章推荐
- java实现kafka整合spark streaming完成wordCount,updateStateByKey完成实时状态更新
- java8实现spark streaming的wordcount
- spark读hdfs文件实现wordcount并将结果存回hdfs
- Spark Streaming实现实时WordCount,DStream的使用,updateStateByKey(func)实现累计计算单词出现频率
- Spark Streaming---Streaming Word Count(java)
- Spark:用Scala和Java实现WordCount
- Spark:用Scala和Java实现WordCount
- 在idea上用SparkStreaming实现从远程socket读取数据并完成Wordcount
- DStream操作实战:1.SparkStreaming接受socket数据,实现单词计数WordCount
- spark streaming 接收 kafka 数据java代码WordCount示例
- SparkStream例子HdfsWordCount--Streaming的Job是如何调度的
- Spark:用Scala和Java实现WordCount
- Spark组件之Spark Streaming学习4--HdfsWordCount 学习
- Spark读取HDFS文件,文件格式为GB2312,实现WordCount示例
- Spark:用Scala和Java实现WordCount
- python、scala、java分别实现在spark上实现WordCount
- Java实现Spark词配对Wordcount计数
- Spark:用Scala和Java实现WordCount
- Spark Streaming---HDFSwordcount
- Spark2.x学习笔记:17、Spark Streaming之HdfsWordCount 学习