您的位置:首页 > 其它

RocketMQ原理解析-Remoting1. 通信层实现

2014-09-09 10:49 861 查看
Rocketmq的通信层是基于通信框架netty 4.0.21.Final之上做了简单的协议封装,是强依赖。



一: NettyRemotingAbstract  Server与Client公用抽象类

ResponseFuture模式:

        invokeSyncImpl和invokeAsyncImpl都使用了

       请求方会new一个ResponseFuture对象缓存起来ConcurrentHashMap<Integer/* opaque */, ResponseFuture>,并且设置opaque 值

       Broker接收请求将opaque 直接把这个值设置回响应对象,客户端接收到这个响应,通过opaque从缓存查找对应的ResponseFuture对象

 

1. invokeSyncImpl
同步调用实现

       构建ResponseFuture,设置opaque值

        netty发送请送并且设置监听器回调响应

          发送成功设置ResponseFuture发送成功,退出监听器

         发送失败设置ResponseFuture发送失败,并且从缓存中移除ResponseFuture(没有响应过来,就用不到缓存中的ResponseFuturel)

        responseFuture.waitResponse(timeoutMillis)获取响应

 发送成功,没有响应对象说明超时



2. invokeAsyncImpl
异步调用实现

        异步一般链路耗时比较长, 为了防止本地缓存的netty请求过多, 使用信号量控制上限默认2048个

        获取是否可以处理请求

        构建一次释放对象

        构建responseFuture对象,设置opaque, callback, once,超时时间等值,并放入缓存集合

        通过netty发送请求,设置listener, 

                   发送成功responseFuture.setSendRequestOK(true);

                   发送失败responseFuture.setSendRequestOK(false), 信号量通过once释放, 删除缓存

         Netty接收server端响应,根据opaque从缓存获取responseFuture,调用回调方法即接口InvokeCallback实现



3. invokeOnewayImpl  单向请求

        标记onewayRpc

        用信号量控制并发的数 //这是我对在这里用新号量控制的理解

 

4 scanResponseTable  

由定时任务启动,定时查看超时的缓存请求,有callback的执行callback,让后从缓存中移除再释放请求

 

5 processRequestCommand
接收请求处理

        根据请求code查找对应的处理器线程池pair, 没有用默认的

        有处理器处理请求返回RemotingCommand对象的响应response

        若不是onewayRpc 给response设置opaque

                       标记响应类型

                       通过netty写入响应

 

6 processResponseCommand
接收响应处理

 当client向server发送请求的时候,server处理后向client反馈处理结果。

        根据RemotingCommand的opaque,从缓存中取出对应的ResponseFuture

        ResponseFuture设置响应对象RemotingCommand

        responseFuture释放信号量

        有callback的执行callback(通过线程池), 没有的putResponse(这个方法同步调用使用,来countDownLatch, 因为调用线程在等待呢)

 

 

 


二:NettyRemotingServer Remoting 服务端实现

broker启动初始化NettyRometingServer

向netty注册handler

 NettyEncoder协议编码器,将RemotingCommand转换为字节,给netty传输

 NettyDecoder协议解码器, 将netty接收的输入流,转换成RemotingCommand

NettyConnetManageHandler 处理register,unregiter, active, inactive, exception

NettyServerHandler  netty处理请求的业

 

向NettyRemotingServer注册业务处理器

  server接收client请求根据RequestCode选择具体的处理器RequestProcessor,就是利用RequestCode进行策略的选择

  server(broker,namesrv)在启动的时候会把RequestCode与对应的RequestProcessor和处理线程池注册到NettyRemotingServer中去,代码类似如下:

         remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, SendMessageProcessor, sendMessageExecutor)
 


  

三:NetttyRemotingClient

向netty注册handler

NettyEncoder协议编码器,将RemotingCommand转换为字节,给netty传输

         NettyDecoder协议解码器, 将netty接收的输入流,转换成RemotingCommand

NettyConnetManageHandler 处理register,unregiter, active, inactive, exception

NettyClientHandler   netty 处理请求的业

 

Client与通信层的交互封了MQClientAPIImpl统一处理,在MQClientAPIImpl构造的时候注册了ClientRemotingProcessor来处理server的请求
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息