Storm Trident MapReduce
2018-03-04 13:48
211 查看
package com.mark.storm.mapreduce.v2; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.trident.TridentState; import org.apache.storm.trident.TridentTopology; import org.apache.storm.trident.operation.BaseFunction; import org.apache.storm.trident.operation.Consumer; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.operation.builtin.Count; import org.apache.storm.trident.testing.FixedBatchSpout; import org.apache.storm.trident.testing.MemoryMapState; import org.apache.storm.trident.tuple.TridentTuple; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; /** * Created by fellowlei on 2018/3/4 */ public class TridentMapReduce { public static void main(String[] args) { FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 1, new Values("spark hadoop"), new Values("hadoop hive")); spout.setCycle(true); TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(1) .each(new Fields("sentence"), new SplitFunction(), new Fields("word")) .groupBy(new Fields("word")). persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) .parallelismHint(1); // debug value stream wordCounts.newValuesStream().peek(new Consumer() { @Override public void accept(TridentTuple tridentTuple) { System.out.println("###" + tridentTuple); } }); Config config = new Config(); config.setDebug(false); config.setNumWorkers(1); LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mydemo",config,topology.build()); } // split function static class SplitFunction extends BaseFunction{ @Override public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) { String sentence = tridentTuple.getString(0); String[] words = sentence.split(" "); for(String word: words){ tridentCollector.emit(new Values(word)); } } } }
相关文章推荐
- Trident-storm
- MapReduce\Tez\Storm\Spark四个框架的异同
- Storm Trident DRPC C/S 集群模式
- Storm高级原语(四) — Trident API 综述
- Storm——3、 Storm Trident API 实践
- [置顶] STORM入门之(TridentAPI,Aggregation)
- [置顶] STORM入门之(TridentTopology集成Kafka)
- Storm高级原语(四) — Trident API 综述
- Storm Trident API 实践
- storm trident merger
- Storm之trident聚合操作介绍
- Storm高级原语(四)Trident API 综述
- 获取Storm集群上TridentWordCount计算结果的方法
- Storm入门(十四)Trident API Overview
- Apache Storm 官方文档 —— Trident 教程 原文链接 译者:魏勇 Trident 是 Storm 的一种高度抽象的实时计算模型,它可以将高吞吐量(每秒百万级)数据输入、有状
- Storm0.8.1的(mapreduce) Spout/Blot编程实例实例详解
- Storm的Trident特性
- Storm之trident聚合操作介绍
- Storm,Trident,Spark Streaming,Samza和Flink主流流处理框架比较
- Storm之trident序列化问题