storm自带例子详解 (三)——ExclamationTopology
2015-06-01 09:56
399 查看
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package storm.starter; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.testing.TestWordSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import java.util.Map; /** * This is a basic example of a Storm topology. */ /* ** 一个基础的拓扑 */ public class ExclamationTopology { // 定义一个Bolt,继承自BaseRichBolt public static class ExclamationBolt extends BaseRichBolt { // 输出收集器 OutputCollector _collector; // 准备函数 @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } // 执行函数 @Override public void execute(Tuple tuple) { // 先接收一个单词,然后在他后面加上!!!,然后发送出去 _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); // 确认发送,保证了准确性 _collector.ack(tuple); } // 输出字段定义函数 @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // 定义一个字段word declarer.declare(new Fields("word")); } } public static void main(String[] args) throws Exception { // 创建一个拓扑构建器 TopologyBuilder builder = new TopologyBuilder(); // 设置Spout,该Spout的名字是word,并行度是10 builder.setSpout("word", new TestWordSpout(), 10); // 设置Bolt,该Bolt的名字是exclaim1,并行度是3,它的上游是Spout(word)(即,接收名为word的Bolt的数据) builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word"); // 设置Bolt,该Bolt的名字是exclaim2,并行度是2,它的上游是Bolt(exclaim1)(即,接收名为exclaim2的Bolt的数据) builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1"); Config conf = new Config(); // 设置为debug模式 conf.setDebug(true); if (args != null && args.length > 0) { // 集群模式 // 设置工作进程的数量为3 conf.setNumWorkers(3); // 提交拓扑 StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } else { // 本地模式 // 构建一个本地集群 LocalCluster cluster = new LocalCluster(); // 提交一个名为test的拓扑 cluster.submitTopology("test", conf, builder.createTopology()); // 睡眠一段时间 Utils.sleep(10000); // 杀死拓扑 cluster.killTopology("test"); cluster.shutdown(); } } }
相关文章推荐
- openwrt下安装和配置ser2net
- linux查看java jdk安装路径和设置环境变量
- storm自带例子详解 (二)——BasicDRPCTopology
- 什么事JPA,JPA和Hibernate、TopLink等ORM框架的关系
- linux top命令详解
- [转载] 运维平台规划体系全介绍
- Windows下apache ant安装、环境变量配置教程
- Linux下的Bind服务
- Linux运维 第三阶段 (四) SAMBA
- 鸟哥的linux私房菜学习笔记 ---第6章-1
- Linux 的多线程编程的高效开发经验
- 20个Linux防火墙应用技巧
- 如何在Fedora或CentOS上使用Samba共享文件夹
- 查看当前系统shell
- 查看当前系统shell 分类: Ubuntu学习笔记 2015-06-01 08:34 28人阅读 评论(0) 收藏
- Linux学习笔记(一)
- OpenGL之坐标转换(好文-清晰版)
- 5 个基于Linux命令行的文件下载和网站浏览工具
- Tomcat简介
- 在Mac下配置php开发环境:Apache+php+MySql