Storm Trident DRPC C/S 集群模式
2017-10-09 19:59
127 查看
Storm Trident DRPC C/S 集群模式
简介
基于 Storm Trident 框架,一个简单的 DRPC 服务端与客户端实现。具体可以参考 storm-starter, 基本概念:Storm 是一个分布式实时处理框架;
DRPC 是为了实现并发的 RPC 架构而实现的,其中D就是 Distributed,利用 Storm 分布式、并发的能力,实现 RPC 的高性能;
Trident 是Storm 0.8.0版本引入的新特性,为基于 Storm 元语进行实时计算的用户提供了一类更高级的抽象元语,能够同时满足高吞吐量(每秒百万级的消息)与低处理延时。
实现
服务端
由于 LinearDRPCTopologyBuilder 已弃用,官方建议用 TridentTopology 生成 DRPCSteam 流。public class DrpcTopology { public static class Split extends BaseFunction { @Override public void execute(TridentTuple tuple, TridentCollector collector) { String sentence = tuple.getString(0); for (String word : sentence.split(" ")) { collector.emit(new Values(word)); } } } public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException { Config conf = new Config(); conf.setMaxSpoutPending(20); if (args.length == 0) { LocalCluster cluster = new LocalCluster(); LocalDRPC drpc = new LocalDRPC(); cluster.submitTopology("wordCounter", conf, buildTopology(drpc)); System.out.println("DRPC RESULT: " + drpc.execute("data", "cat the dog jumped")); drpc.shutdown(); cluster.shutdown(); } else { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, buildTopology(null)); } } public static StormTopology buildTopology(LocalDRPC drpc) { TridentTopology topology = new TridentTopology(); topology.newDRPCStream("data", drpc) .each(new Fields("args"), new Split(), new Fields("result")) .project(new Fields("result")); return topology.build(); } }
上面代码,使用 TridentTopology 生成 DRPCStream 流,并以
data命名 drpc 函数。用 Split 按空格符分割字符串,并下发多个单词字段,返回给客户端。
客户端
public class DrpcClient { public static void main(String[] args) throws TException, DRPCExecutionException { DRPCClient client = new DRPCClient(args[0], args[1], null); System.out.println(client.execute("data","cat the dog jumped")); } }
args[0], args[1] 分别为服务端地址及端口,其值为 storm.yaml 中的设置值。 execute 方法的第一个参数为服务端DRPCSteam 流定义的函数名,这里为
data。提交服务器拓扑后(提交过程与普通拓扑一样),执行客户端,可看到返回数据如下
[["cat"],["the"],["dog"],["jumped"]]
集群配置
更新每个节点的配置文件 storm.yaml 增加参数drpc.servers: - "10.0.3.31"
drpc.port: 3772
启动 DRPC 服务命令
storm drpc
相关文章推荐
- Storm集群的DRPC模式
- Storm Trident+DRPC实例
- 大数据_Storm_Storm的集群模式与本地模式 (基于Storm 1.0.1)
- storm-0.8.2集群模式安装部署
- Storm集群扩容——从单机模式拓展到集群模式,以此类推
- 007-storm开发计数程序集群模式运行
- 获取Storm集群上TridentWordCount计算结果的方法
- Storm集群模式下cleanup解决方法
- hadoop2.x分布式集群安装配置 ~hadoop3种模式的介绍
- Zookeeper实战之嵌入式运行Zookeeper集群模式
- LVS集群之NAT模式实例(3)
- 第127讲:Hadoop集群管理之安全模式解析及动手实战学习笔记
- Storm集群安装实践
- windows下storm本地模式java开发实例
- spark学习笔记:spark独立集群模式配置及FIFO调度
- 负载均衡集群介绍、LVS介绍、LVS调度算法、LVS NAT模式搭建
- Storm集群的搭建
- 从Storm学习集群管理
- windows2003集群中的“群集操作模式”的单播和多播(转)
- linux lvs集群nat模式(比上一篇的lvs nat实用)