您的位置:首页 > 其它

Storm Trident DRPC C/S 集群模式

2017-10-09 19:59 127 查看

Storm Trident DRPC C/S 集群模式

简介

  基于 Storm Trident 框架,一个简单的 DRPC 服务端与客户端实现。具体可以参考 storm-starter, 基本概念:

  Storm 是一个分布式实时处理框架;

  DRPC 是为了实现并发的 RPC 架构而实现的,其中D就是 Distributed,利用 Storm 分布式、并发的能力,实现 RPC 的高性能;

  Trident 是Storm 0.8.0版本引入的新特性,为基于 Storm 元语进行实时计算的用户提供了一类更高级的抽象元语,能够同时满足高吞吐量(每秒百万级的消息)与低处理延时。

实现

服务端

  由于 LinearDRPCTopologyBuilder 已弃用,官方建议用 TridentTopology 生成 DRPCSteam 流。

public class DrpcTopology {
public static class Split extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String sentence = tuple.getString(0);
for (String word : sentence.split(" ")) {
collector.emit(new Values(word));
}
}
}

public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
Config conf = new Config();
conf.setMaxSpoutPending(20);
if (args.length == 0) {
LocalCluster cluster = new LocalCluster();
LocalDRPC drpc = new LocalDRPC();
cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
System.out.println("DRPC RESULT: " + drpc.execute("data", "cat the dog jumped"));
drpc.shutdown();
cluster.shutdown();
} else {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, buildTopology(null));
}
}
public static StormTopology buildTopology(LocalDRPC drpc) {
TridentTopology topology = new TridentTopology();
topology.newDRPCStream("data", drpc)
.each(new Fields("args"), new Split(), new Fields("result"))
.project(new Fields("result"));
return topology.build();
}
}


  上面代码,使用 TridentTopology 生成 DRPCStream 流,并以
data
命名 drpc 函数。用 Split 按空格符分割字符串,并下发多个单词字段,返回给客户端。

客户端

public class DrpcClient {
public static void main(String[] args) throws TException, DRPCExecutionException {
DRPCClient client = new DRPCClient(args[0], args[1], null);
System.out.println(client.execute("data","cat the dog jumped"));
}
}


  args[0], args[1] 分别为服务端地址及端口,其值为 storm.yaml 中的设置值。 execute 方法的第一个参数为服务端DRPCSteam 流定义的函数名,这里为
data
。提交服务器拓扑后(提交过程与普通拓扑一样),执行客户端,可看到返回数据如下

[["cat"],["the"],["dog"],["jumped"]]


集群配置

  更新每个节点的配置文件 storm.yaml 增加参数

drpc.servers:    - "10.0.3.31"


drpc.port: 3772


  启动 DRPC 服务命令

storm drpc
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: