spring boot 集成spark-streaming-kafka
2018-02-24 14:42
846 查看
主要思路:SparkContext由spring容器管理,在spring容器启动完毕后,执行spark-streaming-kafka,获取数据并处理。
1.spring容器中初始化SparkContext,代码片段如下:
@Bean
@ConditionalOnMissingBean(SparkConf.class)
public SparkConf sparkConf() {
SparkConf conf = new SparkConf()
.setAppName(sparkAppName)
.setMaster(sparkMasteer).set("spark.driver.memory",sparkDriverMemory)
.set("spark.worker.memory",sparkWorkerMemory)//"26g".set("spark.shuffle.memoryFraction","0") //默认0.2
.set("spark.executor.memory",sparkExecutorMemory)
.set("spark.rpc.message.maxSize",sparkRpcMessageMaxSize);
// .setMaster("local[*]");//just use in test
return conf;
}
@Bean
@ConditionalOnMissingBean(JavaSparkContext.class) //默认: JVM 只允许存在一个sparkcontext
public JavaSparkContext javaSparkContext(@Autowired SparkConf sparkConf) {
return new JavaSparkContext(sparkConf);
}
2.spark-streaming-kafka 执行类:
@Component
public class SparkKafkaStreamExecutor implements Serializable,Runnable{
private static final long serialVersionUID = 1L;
private static final Logger log = LoggerFactory.getLogger(SparkKafkaStreamExecutor.class);
@Value("${spark.stream.kafka.durations}")
private String streamDurationTime;
@Value("${kafka.broker.list}")
private String metadatabrokerlist;
@Value("${spark.kafka.topics}")
private String topicsAll;
@Autowired
private transient Gson gson;
private transient JavaStreamingContext jsc;
@Autowired
private transient JavaSparkContext javaSparkContext;
@Override
public void run() {
startStreamTask();
}
public void startStreamTask() {
// System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.5");
Set<String> topics = new HashSet<String>(Arrays.asList(topicsAll.split(",")));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", metadatabrokerlist);
jsc = new JavaStreamingContext(javaSparkContext,
Durations.seconds(Integer.valueOf(streamDurationTime)));
jsc.checkpoint("checkpoint"); //保证元数据恢复,就是Driver端挂了之后数据仍然可以恢复
// 得到数据流
final JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(jsc, String.class,
String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
System.out.println("stream started!");
stream.print();
stream.foreachRDD(v -> {
//针对单篇文章流式处理
List<String> topicDatas = v.values().collect();
for (String topicData : topicDatas) {
List<Map<String, Object>> list = gson
.fromJson(topicData, new TypeToken<List<Map<String, String>>>() {}.getType());
list.parallelStream().forEach(m->{
//do something
System.out.println(m);
});
}
log.info("一批次数据流处理完: {}",topicDatas);
});
jsc.start();
}
public void destoryStreamTask() {
if(jsc!=null) {
jsc.stop();
}
}
3.容器加载完成后容器监听执行类:
public class ApplicationStartup implements ApplicationListener<ContextRefreshedEvent> {
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
ApplicationContext ac = event.getApplicationContext();
SparkKafkaStreamExecutor sparkKafkaStreamExecutor= ac.getBean(SparkKafkaStreamExecutor.class);
Thread thread = new Thread(sparkKafkaStreamExecutor);
thread.start();
}
}
4.项目启动类,注册监听类:
@SpringBootApplication
public class SampleApplication {
public static void main(String[] args) {
SpringApplication springApplication = new SpringApplication(SampleApplication.class);
springApplication.addListeners(new ApplicationStartup());
springApplication.run(args);
}
//将Gson划归为spring管理
@Bean
public Gson gson() {
return new Gson();
}
}
主要代码已贴上,全部代码已上传至github 和 码云
谢谢阅读。
1.spring容器中初始化SparkContext,代码片段如下:
@Bean
@ConditionalOnMissingBean(SparkConf.class)
public SparkConf sparkConf() {
SparkConf conf = new SparkConf()
.setAppName(sparkAppName)
.setMaster(sparkMasteer).set("spark.driver.memory",sparkDriverMemory)
.set("spark.worker.memory",sparkWorkerMemory)//"26g".set("spark.shuffle.memoryFraction","0") //默认0.2
.set("spark.executor.memory",sparkExecutorMemory)
.set("spark.rpc.message.maxSize",sparkRpcMessageMaxSize);
// .setMaster("local[*]");//just use in test
return conf;
}
@Bean
@ConditionalOnMissingBean(JavaSparkContext.class) //默认: JVM 只允许存在一个sparkcontext
public JavaSparkContext javaSparkContext(@Autowired SparkConf sparkConf) {
return new JavaSparkContext(sparkConf);
}
2.spark-streaming-kafka 执行类:
@Component
public class SparkKafkaStreamExecutor implements Serializable,Runnable{
private static final long serialVersionUID = 1L;
private static final Logger log = LoggerFactory.getLogger(SparkKafkaStreamExecutor.class);
@Value("${spark.stream.kafka.durations}")
private String streamDurationTime;
@Value("${kafka.broker.list}")
private String metadatabrokerlist;
@Value("${spark.kafka.topics}")
private String topicsAll;
@Autowired
private transient Gson gson;
private transient JavaStreamingContext jsc;
@Autowired
private transient JavaSparkContext javaSparkContext;
@Override
public void run() {
startStreamTask();
}
public void startStreamTask() {
// System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.5");
Set<String> topics = new HashSet<String>(Arrays.asList(topicsAll.split(",")));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", metadatabrokerlist);
jsc = new JavaStreamingContext(javaSparkContext,
Durations.seconds(Integer.valueOf(streamDurationTime)));
jsc.checkpoint("checkpoint"); //保证元数据恢复,就是Driver端挂了之后数据仍然可以恢复
// 得到数据流
final JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(jsc, String.class,
String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
System.out.println("stream started!");
stream.print();
stream.foreachRDD(v -> {
//针对单篇文章流式处理
List<String> topicDatas = v.values().collect();
for (String topicData : topicDatas) {
List<Map<String, Object>> list = gson
.fromJson(topicData, new TypeToken<List<Map<String, String>>>() {}.getType());
list.parallelStream().forEach(m->{
//do something
System.out.println(m);
});
}
log.info("一批次数据流处理完: {}",topicDatas);
});
jsc.start();
}
public void destoryStreamTask() {
if(jsc!=null) {
jsc.stop();
}
}
3.容器加载完成后容器监听执行类:
public class ApplicationStartup implements ApplicationListener<ContextRefreshedEvent> {
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
ApplicationContext ac = event.getApplicationContext();
SparkKafkaStreamExecutor sparkKafkaStreamExecutor= ac.getBean(SparkKafkaStreamExecutor.class);
Thread thread = new Thread(sparkKafkaStreamExecutor);
thread.start();
}
}
4.项目启动类,注册监听类:
@SpringBootApplication
public class SampleApplication {
public static void main(String[] args) {
SpringApplication springApplication = new SpringApplication(SampleApplication.class);
springApplication.addListeners(new ApplicationStartup());
springApplication.run(args);
}
//将Gson划归为spring管理
@Bean
public Gson gson() {
return new Gson();
}
}
主要代码已贴上,全部代码已上传至github 和 码云
谢谢阅读。
相关文章推荐
- spring-sparkstreaming-kafka10集成实现和疑难杂症解决
- Kafka和Spark Streaming Java版本集成并将数据实时写入HBase及代码
- Spark Streaming + Kafka集成指南
- spring boot kafka集成
- 【Spark】SparkStreaming-Kafka-Redis-集成-基础参考资料
- sparkstreaming和kafka集成的两种方式(最全)
- Spark(1.2.0) Streaming 集成 Kafka 总结 [复制链接]
- sparkstreaming接受kafka数据实时存入hbse并集成rest服务
- java实现spark streaming与kafka集成进行流式计算
- 【Spark】SparkStreaming-Kafka-集成-终极参考资料
- Spring boot 集成 kafka及storm
- springboot kafka集成(实现producer和consumer)
- Kafka和Spark Streaming Java版本集成并将数据实时写入HBase
- spark-streaming集成Kafka处理实时数据
- 利用kafka与springboot高消费集成
- Spark Streaming 和kafka 集成指导(kafka 0.8.2.1 或以上版本)
- spring boot与kafka集成(spring boot 1.5.1版本)
- sparkStreaming集成Kafka
- 随着spring boot 1.5版本的发布,在spring项目中与kafka集成更为简便。
- spring boot 集成kafka (多线程,消费者使用kafka的原生api实现,因为@KakfkaListener修改groupId无效)