您的位置:首页 > 其它

Storm DRPC

2016-09-09 13:29 246 查看
1.DRPC的作用是什么?
2.DRPC工作流是怎样的?
3.DRPC分为几部分?
4.服务端有几部分组成?

1. DRPC介绍
1.Storm是一个分布式实时处理框架,它支持以DRPC方式调用.可以理解为Storm是一个集群,DRPC提供了集群中处理功能的访问接口.

其实即使不通过DRPC,而是通过在Topoloye中的spout中建立一个TCP/HTTP监听来接收数据,在最后一个Bolt中将数据发送到指定位置也是可以的。这是后话,后面再进行介绍。而DPRC则是Storm提供的一套开发组建,使用DRPC可以极大的简化这一过程。

Storm里面引入DRPC主要是利用storm的实时计算能力来并行化CPU intensive的计算。DRPC的storm topology以函数的参数流作为输入,而把这些函数调用的返回值作为topology的输出流。

DRPC其实不能算是storm本身的一个特性, 它是通过组合storm的原语spout,bolt, topology而成的一种模式(pattern)。本来应该把DRPC单独打成一个包的, 但是DRPC实在是太有用了,所以我们我们把它和storm捆绑在一起。

2.DRPC工作流介绍
Distributed RPC是由一个”DPRC Server”协调的(storm自带了一个实现)。DRPC服务器协调
1) 接收一个RPC请求。
2) 发送请求到storm topology 
3) 从storm topology接收结果。
4) 把结果发回给等待的客户端。
从客户端的角度来看一个DRPC调用跟一个普通的RPC调用没有任何区别。比如下面是客户端如何调用RPC: reach方法的,方法的参数是: http://twitter.com

DRPCClient client = new DRPCClient("drpc-host", 3772);

String result = client.execute("reach","http://twitter.com");

DRPC的工作流大致是这样的:



客户端给DRPC服务器发送要执行的方法的名字,以及这个方法的参数。实现了这个函数的topology使用DRPCSpout从DRPC服务器接收函数调用流。每个函数调用被DRPC服务器标记了一个唯一的id。 
这个topology然后计算结果,在topology的最后一个叫做ReturnResults的bolt会连接到DRPC服务器,并且把这个调用的结果发送给DRPC服务器(通过那个唯一的id标识)。DRPC服务器用那个唯一id来跟等待的客户端匹配上,唤醒这个客户端并且把结果发送给它。

DRPC包括服务端和客户端两部分

1)服务端
服务端由四部分组成:包括一个DRPC Server, 一个 DPRC Spout,一个Topology和一个ReturnResult。

在实际使用中,主要有三个步骤:

a.启动Storm中的DRPC Server;

   首先,修改Storm/conf/storm.yaml中的drpc server地址;需要注意的是:必须修改所有Nimbus和supervisor上的配置文件,设置drpc server地址。否则在运行过程中可能无法返回结果。

  然后,通过 storm drpc命令启动drpc server。

b.创建一个DRPC 的Topology,提交到storm中运行。

  该Toplogy和普通的Topology稍有不同,可以通过两种方式创建:

  创建方法一:直接使用 Storm 提供的LinearDRPCTopologyBuilder。 (不过该方法在0.82版本中显示为已过期,不建议使用)

                          LinearDRPCTopologyBuilder 可以很方便的创建一个DRPC 的Topology,

package storm.starter;

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.LocalDRPC;

import backtype.storm.StormSubmitter;

import backtype.storm.drpc.LinearDRPCTopologyBuilder;

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

/**

 * This topology is a basic example of doing distributed RPC on top of Storm. It implements a function that appends a

 * "!" to any string you send the DRPC function.

 * <p/>

 * See https://github.com/nathanmarz/storm/wiki/Distributed-RPC for more information on doing distributed RPC on top of

 * Storm.

 */

public class BasicDRPCTopology {

  public static class ExclaimBolt extends BaseBasicBolt {

    @Override

    public void execute(Tuple tuple, BasicOutputCollector collector) {

      String input = tuple.getString(1);

      collector.emit(new Values(tuple.getValue(0), input + "!"));

    }

    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

      declarer.declare(new Fields("id", "result"));

    }

  }

  public static void main(String[] args) throws Exception {

    LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");

    builder.addBolt(new ExclaimBolt(), 3);

    Config conf = new Config();

    if (args == null || args.length == 0) {

      LocalDRPC drpc = new LocalDRPC();

      LocalCluster cluster = new LocalCluster();

      cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));

      for (String word : new String[]{ "hello", "goodbye" }) {

        System.out.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word));

      }

      cluster.shutdown();

      drpc.shutdown();

    }

    else {

      conf.setNumWorkers(3);

      StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());

    }

  }

}

创建方法二:

直接使用 Storm 提供的通用TopologyBuilder。 不过需要自己手动加上开始的DRPCSpout和结束的ReturnResults。
其实Storm 提供的LinearDRPCTopologyBuilder也是通过这种封装而来的。

package storm.starter;

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.LocalDRPC;

import backtype.storm.drpc.DRPCSpout;

import backtype.storm.drpc.ReturnResults;

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

public class ManualDRPC {

  public static class ExclamationBolt extends BaseBasicBolt {

    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

      declarer.declare(new Fields("result", "return-info"));
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
      String arg = tuple.getString(0);
      Object retInfo = tuple.getValue(1);
      collector.emit(new Values(arg + "!!!", retInfo));
    }
  }

  public static void main(String[] args) {
    TopologyBuilder builder = new TopologyBuilder();
    LocalDRPC drpc = new LocalDRPC();
    DRPCSpout spout = new DRPCSpout("exclamation", drpc);
    builder.setSpout("drpc", spout);
    builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");
    builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");
    LocalCluster cluster = new LocalCluster();
    Config conf = new Config();
    cluster.submitTopology("exclaim", conf, builder.createTopology());
    System.out.println(drpc.execute("exclamation", "aaa"));
    System.out.println(drpc.execute("exclamation", "bbb"));
  }
}

c.通过DRPCClient对Cluster进行访问
需要修改客户端配置文件 ~/.storm/storm.yaml,配置drpc server的地址。修改方法可storm服务端一样。

package
com.bbaiggey.storm_09_drpc;

import org.apache.thrift7.TException;

import backtype.storm.generated.DRPCExecutionException;

import backtype.storm.utils.DRPCClient;

public class MyDRPCclient {

/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub

DRPCClient client = new DRPCClient("192.168.15.135", 3772);
try {
//方法名  方法参数
String result = client.execute("exclamation", "hello ");

System.out.println(result);
} catch (TException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (DRPCExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();


}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: