您的位置:首页 > 其它

storm自带例子详解 (四)——ManualDRPC

2015-06-01 10:13 183 查看
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package storm.starter;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.drpc.DRPCSpout;
import backtype.storm.drpc.ReturnResults;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/*
** 手工(手动的)的分布式RPC
*/
public class ManualDRPC {
// 定义一个Bolt,继承自BaseBasicBolt
public static class ExclamationBolt extends BaseBasicBolt {

// 定义输出字段
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// 定义了两个输出字段,分别是:result、return-info
declarer.declare(new Fields("result", "return-info"));
}

// 执行函数
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
// 接收第一个字段的数据
String arg = tuple.getString(0);
// 接收第二个字段的数据
Object retInfo = tuple.getValue(1);
// 发射数据
collector.emit(new Values(arg + "!!!", retInfo));
}

}

public static void main(String[] args) {
// 拓扑构建器
TopologyBuilder builder = new TopologyBuilder();
// 本地分布式RPC
LocalDRPC drpc = new LocalDRPC();

// 定义一个分布式RPC的Spout,名叫exclamation
DRPCSpout spout = new DRPCSpout("exclamation", drpc);

// 设置拓扑的Spout,名为drpc
builder.setSpout("drpc", spout);
// 设置拓扑的Bolt,名为exclaim,接收drpc的数据,并行度是3
builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");
// 设置拓扑的Bolt,名为return,接收exclaim的数据,并行度是3
builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");

// 创建本地集群
LocalCluster cluster = new LocalCluster();
Config conf = new Config();
// 提交拓扑,该拓扑被命名为exclaim
cluster.submitTopology("exclaim", conf, builder.createTopology());

// 分布式RPC操作相应的数据,或者说,分布式RPC发送数据给Spout(exclamation)
System.out.println(drpc.execute("exclamation", "aaa"));
System.out.println(drpc.execute("exclamation", "bbb"));

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: