storm代码练习-WordcCounttTopology
2018-03-22 14:14
344 查看
package WordCount; import java.util.HashMap; import java.util.Map; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.task.ShellBolt; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; /** * This topology demonstrates Storm's stream groupings and multilang capabilities. */ public class WordCountTopology { public static class SplitSentence extends ShellBolt implements IRichBolt { public SplitSentence() { super("python", "splitsentence.py"); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } } public static class WordCount extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); collector.emit(new Values(word, count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-co 4000 unt", conf, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } } }
相关文章推荐
- STORM_0006_第二个storm_topology:WordCountTopology的代码与运行
- storm代码练习-bolt
- Apache Storm技术实战之1 -- WordCountTopology
- 第一个storm代码-wordcount-本地模式
- storm代码练习-spout
- storm入门简介及WordCount代码解析(一)
- storm代码练习-split
- Storm WordCount Topology学习
- Storm 本地模式运行WordCountTopology
- Storm WordCount 代码
- storm自带例子详解 (一)——WordCountTopologyNode
- Apache Storm技术实战之1 -- WordCountTopology
- storm代码练习-mysplit
- storm分配topology关键代码
- Storm的wordcount代码编写与分析
- storm代码练习-transaction
- Storm 从入门到精通 第十五讲 Storm Word Count 示例代码
- Storm系列(四)Topology提交校验过程
- 小练习3与4,为代码2.21添加查询功能
- phpstorm+mamp pro用xdebug调试代码