您的位置:首页 > 编程语言 > Java开发

spring boot 集成spark-streaming-kafka

2018-02-24 14:42 846 查看
主要思路:SparkContext由spring容器管理,在spring容器启动完毕后,执行spark-streaming-kafka,获取数据并处理。
1.spring容器中初始化SparkContext,代码片段如下:
@Bean
@ConditionalOnMissingBean(SparkConf.class)
public SparkConf sparkConf() {
SparkConf conf = new SparkConf()
.setAppName(sparkAppName)
                .setMaster(sparkMasteer).set("spark.driver.memory",sparkDriverMemory)
                .set("spark.worker.memory",sparkWorkerMemory)//"26g".set("spark.shuffle.memoryFraction","0") //默认0.2
                .set("spark.executor.memory",sparkExecutorMemory)
                .set("spark.rpc.message.maxSize",sparkRpcMessageMaxSize);
// .setMaster("local[*]");//just use in test
return conf;
}
@Bean
@ConditionalOnMissingBean(JavaSparkContext.class) //默认: JVM 只允许存在一个sparkcontext
public JavaSparkContext javaSparkContext(@Autowired SparkConf sparkConf) {
return new JavaSparkContext(sparkConf);

}
2.spark-streaming-kafka 执行类:
@Component
public class SparkKafkaStreamExecutor implements Serializable,Runnable{
private static final long serialVersionUID = 1L;
private static final Logger log = LoggerFactory.getLogger(SparkKafkaStreamExecutor.class);

@Value("${spark.stream.kafka.durations}")
private String streamDurationTime;
@Value("${kafka.broker.list}")
private String metadatabrokerlist;
@Value("${spark.kafka.topics}")
private String topicsAll;
@Autowired

private transient Gson gson;

private transient JavaStreamingContext jsc;
@Autowired 
private transient JavaSparkContext javaSparkContext;

@Override
public void run() {
startStreamTask();
}

public void startStreamTask() {
// System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.5");
Set<String> topics = new HashSet<String>(Arrays.asList(topicsAll.split(",")));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", metadatabrokerlist);
jsc = new JavaStreamingContext(javaSparkContext,
Durations.seconds(Integer.valueOf(streamDurationTime)));
jsc.checkpoint("checkpoint"); //保证元数据恢复,就是Driver端挂了之后数据仍然可以恢复

// 得到数据流
final JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(jsc, String.class,
String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
System.out.println("stream started!");
stream.print();
stream.foreachRDD(v -> {
//针对单篇文章流式处理
List<String> topicDatas = v.values().collect();
for (String topicData : topicDatas) {
List<Map<String, Object>> list = gson
.fromJson(topicData, new TypeToken<List<Map<String, String>>>() {}.getType());
list.parallelStream().forEach(m->{
//do something
System.out.println(m);
});
}
log.info("一批次数据流处理完: {}",topicDatas);
});
jsc.start();
}

public void destoryStreamTask() {
if(jsc!=null) {
jsc.stop();
}
}
3.容器加载完成后容器监听执行类:
public class ApplicationStartup implements ApplicationListener<ContextRefreshedEvent> {

@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
ApplicationContext ac = event.getApplicationContext();
SparkKafkaStreamExecutor sparkKafkaStreamExecutor= ac.getBean(SparkKafkaStreamExecutor.class);
Thread thread = new Thread(sparkKafkaStreamExecutor);
thread.start();
}

}
4.项目启动类,注册监听类:
@SpringBootApplication
public class SampleApplication {

public static void main(String[] args) {
SpringApplication springApplication = new SpringApplication(SampleApplication.class);
springApplication.addListeners(new ApplicationStartup());
springApplication.run(args);
}
//将Gson划归为spring管理
@Bean
public Gson gson() {
return new Gson();
}

}
主要代码已贴上,全部代码已上传至github 和 码云
谢谢阅读。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息