基于Java+SparkStreaming整合kafka编程
2017-07-19 00:00
465 查看
一、下载依赖jar包
具体可以参考:二、创建Java工程
太简单,略。三、实际例子
spark的安装包里面有好多例子,具体路径:spark-2.1.1-bin-hadoop2.7\examples。JavaDirectKafkaWordCount.java
packagecom.spark.test;
importjava.util.HashMap;
importjava.util.HashSet;
importjava.util.Arrays;
importjava.util.Iterator;
importjava.util.Map;
importjava.util.Set;
importjava.util.regex.Pattern;
importscala.Tuple2;
importkafka.serializer.StringDecoder;
importorg.apache.spark.SparkConf;
importorg.apache.spark.api.java.function.*;
importorg.apache.spark.streaming.api.java.*;
importorg.apache.spark.streaming.kafka.KafkaUtils;
importorg.apache.spark.streaming.Durations;
publicclassJavaDirectKafkaWordCount{
publicstaticvoidmain(String[]args)throwsException{
//Stringbrokers=args[0];
//Stringtopics=args[1];
//Createcontextwitha2secondsbatchinterval
/**
*setMaster("local[2]"),至少要指定两个线程,一条用于用于接收消息,一条线程用于处理消息
*/
SparkConfsparkConf=newSparkConf().setAppName("JavaDirectKafkaWordCount").setMaster("local[2]");
JavaStreamingContextjssc=newJavaStreamingContext(sparkConf,Durations.seconds(2));
Set<String>topicsSet=newHashSet<>(Arrays.asList("test"));
Map<String,String>kafkaParams=newHashMap<>();
kafkaParams.put("metadata.broker.list","192.168.168.200:9092");
//Createdirectkafkastreamwithbrokersandtopics
JavaPairInputDStream<String,String>messages=KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
//Getthelines,splitthemintowords,countthewordsandprint
JavaDStream<String>lines=messages.map(newFunction<Tuple2<String,String>,String>(){
publicStringcall(Tuple2<String,String>tuple2){
returntuple2._2();
}
});
JavaDStream<String>words=lines.flatMap(newFlatMapFunction<String,String>(){
publicIterator<String>call(Stringline){
returnArrays.asList(line.split("")).iterator();
}
});
JavaPairDStream<String,Integer>wordCounts=words.mapToPair(
newPairFunction<String,String,Integer>(){
publicTuple2<String,Integer>call(Strings){
returnnewTuple2<>(s,1);
}
}).reduceByKey(
newFunction2<Integer,Integer,Integer>(){
publicIntegercall(Integeri1,Integeri2){
returni1+i2;
}
});
wordCounts.print();
//Startthecomputation
jssc.start();
jssc.awaitTermination();
}
}
JavaKafkaWordCount.java
packagecom.spark.test;
importjava.util.Arrays;
importjava.util.Iterator;
importjava.util.Map;
importjava.util.HashMap;
importjava.util.regex.Pattern;
importscala.Tuple2;
importorg.apache.spark.SparkConf;
importorg.apache.spark.api.java.function.FlatMapFunction;
importorg.apache.spark.api.java.function.Function;
importorg.apache.spark.api.java.function.Function2;
importorg.apache.spark.api.java.function.PairFunction;
importorg.apache.spark.streaming.Duration;
importorg.apache.spark.streaming.api.java.JavaDStream;
importorg.apache.spark.streaming.api.java.JavaPairDStream;
importorg.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
importorg.apache.spark.streaming.api.java.JavaStreamingContext;
importorg.apache.spark.streaming.kafka.KafkaUtils;
publicclassJavaKafkaWordCount{
publicstaticvoidmain(String[]args)throwsInterruptedException{
SparkConfsparkConf=newSparkConf().setAppName("JavaKafkaWordCount").setMaster("local[2]");
//Createthecontextwith2secondsbatchsize
JavaStreamingContextjssc=newJavaStreamingContext(sparkConf,newDuration(2000));
intnumThreads=Integer.parseInt("2");
Map<String,Integer>topicMap=newHashMap<>();
String[]topics="test".split(",");
for(Stringtopic:topics){
topicMap.put(topic,numThreads);
}
JavaPairReceiverInputDStream<String,String>messages=
KafkaUtils.createStream(jssc,"192.168.168.200:2181","test-group",topicMap);
JavaDStream<String>lines=messages.map(newFunction<Tuple2<String,String>,String>(){
publicStringcall(Tuple2<String,String>tuple2){
returntuple2._2();
}
});
JavaDStream<String>words=lines.flatMap(newFlatMapFunction<String,String>(){
@Override
publicIterator<String>call(Stringline){
returnArrays.asList(line.split("")).iterator();
}
});
JavaPairDStream<String,Integer>wordCounts=words.mapToPair(
newPairFunction<String,String,Integer>(){
@Override
publicTuple2<String,Integer>call(Strings){
returnnewTuple2<>(s,1);
}
}).reduceByKey(newFunction2<Integer,Integer,Integer>(){
@Override
publicIntegercall(Integeri1,Integeri2){
returni1+i2;
}
});
wordCounts.print();
jssc.start();
jssc.awaitTermination();
}
}
JavaLocalWordCount.java
packagecom.spark.test;
importjava.util.Arrays;
importjava.util.Iterator;
importorg.apache.spark.SparkConf;
importorg.apache.spark.api.java.JavaPairRDD;
importorg.apache.spark.api.java.JavaRDD;
importorg.apache.spark.api.java.JavaSparkContext;
importorg.apache.spark.api.java.function.FlatMapFunction;
importorg.apache.spark.api.java.function.Function2;
importorg.apache.spark.api.java.function.PairFunction;
importorg.apache.spark.api.java.function.VoidFunction;
importscala.Tuple2;
publicclassJavaLocalWordCount{
publicstaticvoidmain(String[]args){
/**
*第一步,创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
*例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,
*如果设置为local,则代表Spark程序在本地运行,特别适合于机器配置较差的情况
*/
SparkConfsparkConf=newSparkConf().setAppName("LocalWordCountByJava").setMaster("local");
/**
*第二步,创建SparkContext对象
*SparkContext是Spark程序所有功能的唯一入口,无论是采用Scala,java,python,R等都
*必须有一个SparkContext(不同语言具体类名称不同,如果是Java的话,则为JavaSparkContext)
*同时还会负责Spark程序在Master注册程序等
*SparkContext是整个Spark应用程序至关重要的一个对象
*/
JavaSparkContextjsc=newJavaSparkContext(sparkConf);//其底层实际上是Scala的SparkContext
/**
*第三步,根据具体的数据来源(HDFS,HBase,Local,FS,DB,S3等),通过JavaSparkContext来创建JavaRDD
*JavaRDD的创建方式有三种:根据外部数据来源(例如HDFS),
*根据Scala集合,由其他的RDD操作数据会将RDD划分成一系列Partition,
*分配到每个Partition的数据属于一个Task处理范畴
*/
JavaRDD<String>lines=jsc.textFile("words.txt");
//如果是Scala,由于SAM转化,所以可以写成valwords=lines.flatMap{line=>line.split("")}
JavaRDD<String>words=lines.flatMap(newFlatMapFunction<String,String>(){
@Override
publicIterator<String>call(Stringline){
returnArrays.asList(line.split("")).iterator();
}
});
/**
*第4步:对初始的JavaRDD进行Transformation级别的处理,例如map,filter等高阶函数等的编程,来进行具体的数据计算
*第4.1步:在单词拆分的基础上对每个单词实例进行计数为1,也就是word=>(word,1)
*/
JavaPairRDD<String,Integer>pairs=words.mapToPair(newPairFunction<String,String,Integer>(){
publicTuple2<String,Integer>call(Stringword)throwsException{
returnnewTuple2<String,Integer>(word,1);
}
});
/**
*统计总次数
*/
JavaPairRDD<String,Integer>wordCount=pairs.reduceByKey(newFunction2<Integer,Integer,Integer>()
{
publicIntegercall(Integerv1,Integerv2)throwsException
{
returnv1+v2;
}
});
wordCount.foreach(newVoidFunction<Tuple2<String,Integer>>(){
publicvoidcall(Tuple2<String,Integer>pairs)throwsException{
System.out.println(pairs._1()+":"+pairs._2());
}
});
jsc.close();
}
}
JavaClusterWordCount.java
packagecom.spark.test;
importjava.util.Arrays;
importjava.util.Iterator;
importorg.apache.spark.SparkConf;
importorg.apache.spark.api.java.JavaPairRDD;
importorg.apache.spark.api.java.JavaRDD;
importorg.apache.spark.api.java.JavaSparkContext;
importorg.apache.spark.api.java.function.FlatMapFunction;
importorg.apache.spark.api.java.function.Function2;
importorg.apache.spark.api.java.function.PairFunction;
importorg.apache.spark.api.java.function.VoidFunction;
importscala.Tuple2;
publicclassJavaClusterWordCount{
publicstaticvoidmain(String[]args){
/**
*第一步,创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
*例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,
*如果设置为local,则代表Spark程序在本地运行,特别适合于机器配置较差的情况
*/
SparkConfsparkConf=newSparkConf().setAppName("LocalWordCountByJava").setMaster("local");
/**
*第二步,创建SparkContext对象
*SparkContext是Spark程序所有功能的唯一入口,无论是采用Scala,java,python,R等都
*必须有一个SparkContext(不同语言具体类名称不同,如果是Java的话,则为JavaSparkContext)
*同时还会负责Spark程序在Master注册程序等
*SparkContext是整个Spark应用程序至关重要的一个对象
*/
JavaSparkContextjsc=newJavaSparkContext(sparkConf);//其底层实际上是Scala的SparkContext
/**
*第三步,根据具体的数据来源(HDFS,HBase,Local,FS,DB,S3等),通过JavaSparkContext来创建JavaRDD
*JavaRDD的创建方式有三种:根据外部数据来源(例如HDFS),
*根据Scala集合,由其他的RDD操作数据会将RDD划分成一系列Partition,
*分配到每个Partition的数据属于一个Task处理范畴
*/
JavaRDD<String>lines=jsc.textFile("hdfs://192.168.168.200:9000/input/words.txt");
//如果是Scala,由于SAM转化,所以可以写成valwords=lines.flatMap{line=>line.split("")}
JavaRDD<String>words=lines.flatMap(newFlatMapFunction<String,String>(){
@Override
publicIterator<String>call(Stringline){
returnArrays.asList(line.split("")).iterator();
}
});
/**
*第4步:对初始的JavaRDD进行Transformation级别的处理,例如map,filter等高阶函数等的编程,来进行具体的数据计算
*第4.1步:在单词拆分的基础上对每个单词实例进行计数为1,也就是word=>(word,1)
*/
JavaPairRDD<String,Integer>pairs=words.mapToPair(newPairFunction<String,String,Integer>(){
publicTuple2<String,Integer>call(Stringword)throwsException{
returnnewTuple2<String,Integer>(word,1);
}
});
/**
*统计总次数
*/
JavaPairRDD<String,Integer>wordCount=pairs.reduceByKey(newFunction2<Integer,Integer,Integer>()
{
publicIntegercall(Integerv1,Integerv2)throwsException
{
returnv1+v2;
}
});
wordCount.foreach(newVoidFunction<Tuple2<String,Integer>>(){
publicvoidcall(Tuple2<String,Integer>pairs)throwsException{
System.out.println(pairs._1()+":"+pairs._2());
}
});
jsc.close();
}
}
相关文章推荐
- 基于Java+SparkStreaming整合kafka编程
- 基于Python的Spark Streaming+Kafka编程实践及调优总结
- SparkStreaming整合kafka编程
- java实现kafka整合spark streaming完成wordCount,updateStateByKey完成实时状态更新
- spark----基于Python的Spark Streaming+Kafka编程实践
- 第91课:SparkStreaming基于Kafka Direct案例实战和内幕源码解密 java.lang.ClassNotFoundException 踩坑解决问题详细内幕版本
- sparkstreaming之基于flume+kafka+sparkstreaming整合
- 基于Python的Spark Streaming+Kafka编程实践
- #####好#########基于Python的Spark Streaming+Kafka编程实践
- flume kafka sparkstreaming整合后spark executor dead 及集群报错java.io.IOException: Connection reset by peer
- java8下spark-streaming结合kafka编程(spark 2.0 & kafka 0.10)
- 【总结】Spark Streaming和Kafka整合保证数据零丢失
- Kafka+Spark Streaming+Redis实时计算整合实践
- 整合Kafka到Spark Streaming——代码示例和挑战
- Spark Streaming createDirectStream保存kafka offset(JAVA实现)
- 整合Kafka到Spark Streaming——代码示例和挑战
- kafka生产者消费者API 与sparkStreaming 整合(scala版)
- Kafka+Spark Streaming+Redis实时计算整合实践
- 【问底】Michael G. Noll:整合Kafka到Spark Streaming——代码示例和挑战
- zookeeper+kafka安装以及kafka+spark streaming 的简单整合