您的位置:首页 > 运维架构

storm自带例子详解 (三)——ExclamationTopology

2015-06-01 09:56 399 查看
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package storm.starter;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

import java.util.Map;

/**
* This is a basic example of a Storm topology.
*/

/*
** 一个基础的拓扑
*/
public class ExclamationTopology {

// 定义一个Bolt,继承自BaseRichBolt
public static class ExclamationBolt extends BaseRichBolt {
// 输出收集器
OutputCollector _collector;

// 准备函数
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}

// 执行函数
@Override
public void execute(Tuple tuple) {
// 先接收一个单词,然后在他后面加上!!!,然后发送出去
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
// 确认发送,保证了准确性
_collector.ack(tuple);
}

// 输出字段定义函数
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// 定义一个字段word
declarer.declare(new Fields("word"));
}

}

public static void main(String[] args) throws Exception {
// 创建一个拓扑构建器
TopologyBuilder builder = new TopologyBuilder();

// 设置Spout,该Spout的名字是word,并行度是10
builder.setSpout("word", new TestWordSpout(), 10);
// 设置Bolt,该Bolt的名字是exclaim1,并行度是3,它的上游是Spout(word)(即,接收名为word的Bolt的数据)
builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
// 设置Bolt,该Bolt的名字是exclaim2,并行度是2,它的上游是Bolt(exclaim1)(即,接收名为exclaim2的Bolt的数据)
builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");

Config conf = new Config();
// 设置为debug模式
conf.setDebug(true);

if (args != null && args.length > 0) {
// 集群模式

// 设置工作进程的数量为3
conf.setNumWorkers(3);
// 提交拓扑
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
else {

// 本地模式

// 构建一个本地集群
LocalCluster cluster = new LocalCluster();
// 提交一个名为test的拓扑
cluster.submitTopology("test", conf, builder.createTopology());
// 睡眠一段时间
Utils.sleep(10000);
// 杀死拓扑
cluster.killTopology("test");
cluster.shutdown();
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: