您的位置:首页 > 其它

updateStateByKey--word count

2015-09-04 00:00 183 查看
摘要: 使用updateStateByKey来保存上次计算的结果
http://blog.selfup.cn/619.html
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) {
StreamingExamples.setStreamingLogLevels();

JavaStreamingContext jssc = new JavaStreamingContext("local[2]",
"JavaNetworkWordCount", new Duration(10000));
jssc.checkpoint(".");//使用updateStateByKey()函数需要设置checkpoint
//打开本地的端口9999
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
//按行输入,以空格分隔
JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(SPACE.split(line)));
//每个单词形成pair,如(word,1)
JavaPairDStream<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));
//统计并更新每个单词的历史出现次数
JavaPairDStream<String, Integer> counts = pairs.updateStateByKey((values, state) -> {
Integer newSum = state.or(0);
for(Integer i :values) {
newSum += i;
}
return Optional.of(newSum);
});
counts.print();
jssc.start();
jssc.awaitTermination();
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: