您的位置:首页 > 编程语言 > Java开发

基于Java+SparkStreaming整合kafka编程

2017-07-19 00:00 465 查看

一、下载依赖jar包

具体可以参考:SparkStreaming整合kafka编程

二、创建Java工程

太简单,略。

三、实际例子

spark的安装包里面有好多例子,具体路径:spark-2.1.1-bin-hadoop2.7\examples。

JavaDirectKafkaWordCount.java



packagecom.spark.test;

importjava.util.HashMap;

importjava.util.HashSet;

importjava.util.Arrays;

importjava.util.Iterator;

importjava.util.Map;

importjava.util.Set;

importjava.util.regex.Pattern;

importscala.Tuple2;

importkafka.serializer.StringDecoder;

importorg.apache.spark.SparkConf;

importorg.apache.spark.api.java.function.*;

importorg.apache.spark.streaming.api.java.*;

importorg.apache.spark.streaming.kafka.KafkaUtils;

importorg.apache.spark.streaming.Durations;

publicclassJavaDirectKafkaWordCount{

publicstaticvoidmain(String[]args)throwsException{

//Stringbrokers=args[0];

//Stringtopics=args[1];

//Createcontextwitha2secondsbatchinterval

/**

*setMaster("local[2]"),至少要指定两个线程,一条用于用于接收消息,一条线程用于处理消息

*/

SparkConfsparkConf=newSparkConf().setAppName("JavaDirectKafkaWordCount").setMaster("local[2]");

JavaStreamingContextjssc=newJavaStreamingContext(sparkConf,Durations.seconds(2));

Set<String>topicsSet=newHashSet<>(Arrays.asList("test"));

Map<String,String>kafkaParams=newHashMap<>();

kafkaParams.put("metadata.broker.list","192.168.168.200:9092");

//Createdirectkafkastreamwithbrokersandtopics

JavaPairInputDStream<String,String>messages=KafkaUtils.createDirectStream(

jssc,

String.class,

String.class,

StringDecoder.class,

StringDecoder.class,

kafkaParams,

topicsSet

);

//Getthelines,splitthemintowords,countthewordsandprint

JavaDStream<String>lines=messages.map(newFunction<Tuple2<String,String>,String>(){

@Override

publicStringcall(Tuple2<String,String>tuple2){

returntuple2._2();

}

});

JavaDStream<String>words=lines.flatMap(newFlatMapFunction<String,String>(){

@Override

publicIterator<String>call(Stringline){

returnArrays.asList(line.split("")).iterator();

}

});

JavaPairDStream<String,Integer>wordCounts=words.mapToPair(

newPairFunction<String,String,Integer>(){

@Override

publicTuple2<String,Integer>call(Strings){

returnnewTuple2<>(s,1);

}

}).reduceByKey(

newFunction2<Integer,Integer,Integer>(){

@Override

publicIntegercall(Integeri1,Integeri2){

returni1+i2;

}

});

wordCounts.print();

//Startthecomputation

jssc.start();

jssc.awaitTermination();

}

}

JavaKafkaWordCount.java



packagecom.spark.test;

importjava.util.Arrays;

importjava.util.Iterator;

importjava.util.Map;

importjava.util.HashMap;

importjava.util.regex.Pattern;

importscala.Tuple2;

importorg.apache.spark.SparkConf;

importorg.apache.spark.api.java.function.FlatMapFunction;

importorg.apache.spark.api.java.function.Function;

importorg.apache.spark.api.java.function.Function2;

importorg.apache.spark.api.java.function.PairFunction;

importorg.apache.spark.streaming.Duration;

importorg.apache.spark.streaming.api.java.JavaDStream;

importorg.apache.spark.streaming.api.java.JavaPairDStream;

importorg.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;

importorg.apache.spark.streaming.api.java.JavaStreamingContext;

importorg.apache.spark.streaming.kafka.KafkaUtils;

publicclassJavaKafkaWordCount{

publicstaticvoidmain(String[]args)throwsInterruptedException{

SparkConfsparkConf=newSparkConf().setAppName("JavaKafkaWordCount").setMaster("local[2]");

//Createthecontextwith2secondsbatchsize

JavaStreamingContextjssc=newJavaStreamingContext(sparkConf,newDuration(2000));

intnumThreads=Integer.parseInt("2");

Map<String,Integer>topicMap=newHashMap<>();

String[]topics="test".split(",");

for(Stringtopic:topics){

topicMap.put(topic,numThreads);

}

JavaPairReceiverInputDStream<String,String>messages=

KafkaUtils.createStream(jssc,"192.168.168.200:2181","test-group",topicMap);

JavaDStream<String>lines=messages.map(newFunction<Tuple2<String,String>,String>(){

@Override

publicStringcall(Tuple2<String,String>tuple2){

returntuple2._2();

}

});

JavaDStream<String>words=lines.flatMap(newFlatMapFunction<String,String>(){

@Override

publicIterator<String>call(Stringline){

returnArrays.asList(line.split("")).iterator();

}

});

JavaPairDStream<String,Integer>wordCounts=words.mapToPair(

newPairFunction<String,String,Integer>(){

@Override

publicTuple2<String,Integer>call(Strings){

returnnewTuple2<>(s,1);

}

}).reduceByKey(newFunction2<Integer,Integer,Integer>(){

@Override

publicIntegercall(Integeri1,Integeri2){

returni1+i2;

}

});

wordCounts.print();

jssc.start();

jssc.awaitTermination();

}

}

JavaLocalWordCount.java



packagecom.spark.test;

importjava.util.Arrays;

importjava.util.Iterator;

importorg.apache.spark.SparkConf;

importorg.apache.spark.api.java.JavaPairRDD;

importorg.apache.spark.api.java.JavaRDD;

importorg.apache.spark.api.java.JavaSparkContext;

importorg.apache.spark.api.java.function.FlatMapFunction;

importorg.apache.spark.api.java.function.Function2;

importorg.apache.spark.api.java.function.PairFunction;

importorg.apache.spark.api.java.function.VoidFunction;

importscala.Tuple2;

publicclassJavaLocalWordCount{

publicstaticvoidmain(String[]args){

/**

*第一步,创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,

*例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,

*如果设置为local,则代表Spark程序在本地运行,特别适合于机器配置较差的情况

*/

SparkConfsparkConf=newSparkConf().setAppName("LocalWordCountByJava").setMaster("local");

/**

*第二步,创建SparkContext对象

*SparkContext是Spark程序所有功能的唯一入口,无论是采用Scala,java,python,R等都

*必须有一个SparkContext(不同语言具体类名称不同,如果是Java的话,则为JavaSparkContext)

*同时还会负责Spark程序在Master注册程序等

*SparkContext是整个Spark应用程序至关重要的一个对象

*/

JavaSparkContextjsc=newJavaSparkContext(sparkConf);//其底层实际上是Scala的SparkContext

/**

*第三步,根据具体的数据来源(HDFS,HBase,Local,FS,DB,S3等),通过JavaSparkContext来创建JavaRDD

*JavaRDD的创建方式有三种:根据外部数据来源(例如HDFS),

*根据Scala集合,由其他的RDD操作数据会将RDD划分成一系列Partition,

*分配到每个Partition的数据属于一个Task处理范畴

*/

JavaRDD<String>lines=jsc.textFile("words.txt");

//如果是Scala,由于SAM转化,所以可以写成valwords=lines.flatMap{line=>line.split("")}

JavaRDD<String>words=lines.flatMap(newFlatMapFunction<String,String>(){

@Override

publicIterator<String>call(Stringline){

returnArrays.asList(line.split("")).iterator();

}

});

/**

*第4步:对初始的JavaRDD进行Transformation级别的处理,例如map,filter等高阶函数等的编程,来进行具体的数据计算

*第4.1步:在单词拆分的基础上对每个单词实例进行计数为1,也就是word=>(word,1)

*/

JavaPairRDD<String,Integer>pairs=words.mapToPair(newPairFunction<String,String,Integer>(){

publicTuple2<String,Integer>call(Stringword)throwsException{

returnnewTuple2<String,Integer>(word,1);

}

});

/**

*统计总次数

*/

JavaPairRDD<String,Integer>wordCount=pairs.reduceByKey(newFunction2<Integer,Integer,Integer>()

{

publicIntegercall(Integerv1,Integerv2)throwsException

{

returnv1+v2;

}

});

wordCount.foreach(newVoidFunction<Tuple2<String,Integer>>(){

publicvoidcall(Tuple2<String,Integer>pairs)throwsException{

System.out.println(pairs._1()+":"+pairs._2());

}

});

jsc.close();

}

}

JavaClusterWordCount.java



packagecom.spark.test;

importjava.util.Arrays;

importjava.util.Iterator;

importorg.apache.spark.SparkConf;

importorg.apache.spark.api.java.JavaPairRDD;

importorg.apache.spark.api.java.JavaRDD;

importorg.apache.spark.api.java.JavaSparkContext;

importorg.apache.spark.api.java.function.FlatMapFunction;

importorg.apache.spark.api.java.function.Function2;

importorg.apache.spark.api.java.function.PairFunction;

importorg.apache.spark.api.java.function.VoidFunction;

importscala.Tuple2;

publicclassJavaClusterWordCount{

publicstaticvoidmain(String[]args){

/**

*第一步,创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,

*例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,

*如果设置为local,则代表Spark程序在本地运行,特别适合于机器配置较差的情况

*/

SparkConfsparkConf=newSparkConf().setAppName("LocalWordCountByJava").setMaster("local");

/**

*第二步,创建SparkContext对象

*SparkContext是Spark程序所有功能的唯一入口,无论是采用Scala,java,python,R等都

*必须有一个SparkContext(不同语言具体类名称不同,如果是Java的话,则为JavaSparkContext)

*同时还会负责Spark程序在Master注册程序等

*SparkContext是整个Spark应用程序至关重要的一个对象

*/

JavaSparkContextjsc=newJavaSparkContext(sparkConf);//其底层实际上是Scala的SparkContext

/**

*第三步,根据具体的数据来源(HDFS,HBase,Local,FS,DB,S3等),通过JavaSparkContext来创建JavaRDD

*JavaRDD的创建方式有三种:根据外部数据来源(例如HDFS),

*根据Scala集合,由其他的RDD操作数据会将RDD划分成一系列Partition,

*分配到每个Partition的数据属于一个Task处理范畴

*/

JavaRDD<String>lines=jsc.textFile("hdfs://192.168.168.200:9000/input/words.txt");

//如果是Scala,由于SAM转化,所以可以写成valwords=lines.flatMap{line=>line.split("")}

JavaRDD<String>words=lines.flatMap(newFlatMapFunction<String,String>(){

@Override

publicIterator<String>call(Stringline){

returnArrays.asList(line.split("")).iterator();

}

});

/**

*第4步:对初始的JavaRDD进行Transformation级别的处理,例如map,filter等高阶函数等的编程,来进行具体的数据计算

*第4.1步:在单词拆分的基础上对每个单词实例进行计数为1,也就是word=>(word,1)

*/

JavaPairRDD<String,Integer>pairs=words.mapToPair(newPairFunction<String,String,Integer>(){

publicTuple2<String,Integer>call(Stringword)throwsException{

returnnewTuple2<String,Integer>(word,1);

}

});

/**

*统计总次数

*/

JavaPairRDD<String,Integer>wordCount=pairs.reduceByKey(newFunction2<Integer,Integer,Integer>()

{

publicIntegercall(Integerv1,Integerv2)throwsException

{

returnv1+v2;

}

});

wordCount.foreach(newVoidFunction<Tuple2<String,Integer>>(){

publicvoidcall(Tuple2<String,Integer>pairs)throwsException{

System.out.println(pairs._1()+":"+pairs._2());

}

});

jsc.close();

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Java SparkStreaming Kafka