SparkStreaming updateStateByKey 基本操作
2016-05-27 16:17
435 查看
1.代码
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("UpdateStateByKeyDemo");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
//报错解决办法做checkpoint,开启checkpoint机制,把checkpoint中的数据放在这里设置的目录中,
//生产环境下一般放在HDFS中
jsc.checkpoint("/checkPoint");
JavaReceiverInputDStream lines =
jsc.socketTextStream("master", 9999);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
//如果是Scala,由于SAM转换,所以可以写成val words = lines.flatMap { line => line.split(" ")}
@Override
public Iterable<String> call(String
line) throws Exception {
return Arrays.asList(line.split(" "));
}
});
JavaPairDStream<String, Integer> pairs =
words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String
word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
//关键方法updateStateByKey,其中有关于状态的方法实现。
//注意此处的optionnals一定要导入com.google.common.base.Optional;
/*在这里是通过updateStateByKey来以Batch Interval为单位来对历史状态进行更新,
* 这是功能上的一个非常大的改进,否则的话需要完成同样的目的,就可能需要把数据保存在Redis、
* Tagyon或者HDFS或者HBase或者数据库中来不断的完成同样一个key的State更新,如果你对性能有极为苛刻的要求,
* 且数据量特别大的话,可以考虑把数据放在分布式的Redis或者Tachyon内存文件系统中;
* 当然从Spark1.6.x开始可以尝试使用mapWithState,Spark2.X后mapWithState应该非常稳定了。*/
JavaPairDStream<String, Integer> wordsCount =
pairs.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
//对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)
@Override
public Optional<Integer> call(List<Integer>
values, Optional<Integer>
state) throws Exception {
Integer updatedValue = 0 ;
if(state.isPresent()){
updatedValue =
state.get();
}
for(Integer
value: values){
updatedValue +=
value;
}
return Optional.of(updatedValue);
}
});
wordsCount.print();
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("UpdateStateByKeyDemo");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
//报错解决办法做checkpoint,开启checkpoint机制,把checkpoint中的数据放在这里设置的目录中,
//生产环境下一般放在HDFS中
jsc.checkpoint("/checkPoint");
JavaReceiverInputDStream lines =
jsc.socketTextStream("master", 9999);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
//如果是Scala,由于SAM转换,所以可以写成val words = lines.flatMap { line => line.split(" ")}
@Override
public Iterable<String> call(String
line) throws Exception {
return Arrays.asList(line.split(" "));
}
});
JavaPairDStream<String, Integer> pairs =
words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String
word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
//关键方法updateStateByKey,其中有关于状态的方法实现。
//注意此处的optionnals一定要导入com.google.common.base.Optional;
/*在这里是通过updateStateByKey来以Batch Interval为单位来对历史状态进行更新,
* 这是功能上的一个非常大的改进,否则的话需要完成同样的目的,就可能需要把数据保存在Redis、
* Tagyon或者HDFS或者HBase或者数据库中来不断的完成同样一个key的State更新,如果你对性能有极为苛刻的要求,
* 且数据量特别大的话,可以考虑把数据放在分布式的Redis或者Tachyon内存文件系统中;
* 当然从Spark1.6.x开始可以尝试使用mapWithState,Spark2.X后mapWithState应该非常稳定了。*/
JavaPairDStream<String, Integer> wordsCount =
pairs.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
//对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)
@Override
public Optional<Integer> call(List<Integer>
values, Optional<Integer>
state) throws Exception {
Integer updatedValue = 0 ;
if(state.isPresent()){
updatedValue =
state.get();
}
for(Integer
value: values){
updatedValue +=
value;
}
return Optional.of(updatedValue);
}
});
wordsCount.print();
相关文章推荐
- Java集合类详解
- 二叉树(二叉链表实现)JAVA代码
- vim简介及配置
- uses crt;
- PHP explode 按照固定字符截字符串成数组
- EBS查看报表或者账单的职责
- SparkStreaming中Tanformations和状态管理
- 魔兽世界任务制作教学,已经本人自己服务器中测试
- EM(期望最大化)聚类详细推导
- CocoPods的安装步骤
- 实践2.3 程序破解
- Excel 文件读取
- Winform窗体事件发生顺序
- Java 比较两个字符串的大小
- SparkStreaming基于Kafka Direct
- 可变参数
- mongodb查看当前操作db.currentOp()
- linux中常用的一些命令
- C# SMTP 邮件发送之QQ邮箱篇
- static 全局变量 和 普通全局变量区别