您的位置:首页 > 其它

9.2 服务端接收请求消息并发送响应消息源码

2017-11-12 16:58 363 查看
一 总体流程图

服务端接收请求消息
NettyHandler.messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-->MultiMessageHandler.received(Channel channel, Object message)
-->HeartbeatHandler.received(Channel channel, Object message)
-->AllChannelHandler.received(Channel channel, Object message)
-->ExecutorService cexecutor = getExecutorService()
-->cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message))
-->ChannelEventRunnable.run()
-->DecodeHandler.received(Channel channel, Object message)
-->decode(Object message)
-->HeaderExchangeHandler.received(Channel channel, Object message)
-->Response response = handleRequest(exchangeChannel, request)
-->DubboProtocol.requestHandler.reply(ExchangeChannel channel, Object message)//这里的message就是上边的RpcInvocation
//首先获取exporter,之后再获取invoker
-->getInvoker(Channel channel, Invocation inv)//组装serviceKey=com.alibaba.dubbo.demo.DemoService:20880
-->(DubboExporter<?>) exporterMap.get(serviceKey)//从Map<String, Exporter<?>> exporterMap中根据serviceKey获取DubboExport实例,
-->exporter.getInvoker()//获取RegistryProtocol$InvokerDelegete实例
//执行filter链
-->EchoFilter.invoke(Invoker<?> invoker, Invocation inv)
-->ClassLoaderFilter.nvoke(Invoker<?> invoker, Invocation invocation)
-->GenericFilter.invoke(Invoker<?> invoker, Invocation inv)
-->ContextFilter.invoke(Invoker<?> invoker, Invocation invocation)
-->TraceFilter.invoke(Invoker<?> invoker, Invocation invocation)
-->TimeoutFilter.invoke(Invoker<?> invoker, Invocation invocation)
-->MonitorFilter.invoke(Invoker<?> invoker, Invocation invocation)
-->ExceptionFilter.invoke(Invoker<?> invoker, Invocation invocation)
//执行真正的invoker调用
-->AbstractProxyInvoker.invoke(Invocation invocation)
-->JavassistProxyFactory$AbstractProxyInvoker.doInvoke
-->Wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments)
-->DemoServiceImpl.sayHello(String name)
-->new RpcResult(Object result)//将返回值result包装成RpcResult(最后该参数会被包装为Response)
服务端发送响应消息
-->channel.send(response)//NettyChannel
-->NioAcceptedSocketChannel.write(Object message)//已经是netty的东西了,这里的message=Response实例:最重要的是RpcResult [result=Hello world, response form provider: 10.211.55.2:20880, exception=null]


二 源码解析

netty通信是在netty的handler中进行消息的接收处理和发送。来看一下NettyServer的handler。

1     protected void doOpen() throws Throwable {
2         ...
3         final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
4         ...
5         bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
6             public ChannelPipeline getPipeline() {
7                 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
8                 ChannelPipeline pipeline = Channels.pipeline();
9                 pipeline.addLast("decoder", adapter.getDecoder());
10                 pipeline.addLast("encoder", adapter.getEncoder());
11                 pipeline.addLast("handler", nettyHandler);
12                 return pipeline;
13             }
14         });
15         ...
16     }


NettyHandler.messageReceived

1     private final ChannelHandler handler;//NettyServer
2
3     @Override
4     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
5         NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
6         try {
7             handler.received(channel, e.getMessage());
8         } finally {
9             NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
10         }
11     }


首先会执行NettyServer父类AbstractPeer的received方法,其调用MultiMessageHandler.received:

1     protected ChannelHandler handler;//HeartbeatHandler
2     public void received(Channel channel, Object message) throws RemotingException {
3         if (message instanceof MultiMessage) {
4             MultiMessage list = (MultiMessage) message;
5             for (Object obj : list) {
6                 handler.received(channel, obj);
7             }
8         } else {
9             handler.received(channel, message);
10         }
11     }


HeartbeatHandler.received(Channel channel, Object message)

1     protected ChannelHandler handler;//AllChannelHandler
2     public void received(Channel channel, Object message) throws RemotingException {
3         setReadTimestamp(channel);
4         if (isHeartbeatRequest(message)) {
5             ...
6             return;
7         }
8         if (isHeartbeatResponse(message)) {
9            ...
10             return;
11         }
12         handler.received(channel, message);
13     }


AllChannelHandler.received(Channel channel, Object message)

1     protected final ExecutorService executor;//ThreadPoolExecutor
2     protected final ChannelHandler handler;//DecodeHandler
3
4     public void received(Channel channel, Object message) throws RemotingException {
5         ExecutorService cexecutor = getExecutorService();
6         try {
7             cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
8         } catch (Throwable t) {
9             ...
10             throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
11         }
12     }
13
14     private ExecutorService getExecutorService() {
15         ExecutorService cexecutor = executor;
16         if (cexecutor == null || cexecutor.isShutdown()) {
17             cexecutor = SHARED_EXECUTOR;
18         }
19         return cexecutor;
20     }


这里首先创建了一个线程任务ChannelEventRunnable,之后丢入线程池进行执行。

ChannelEventRunnable.run()

1     private final ChannelHandler handler;//DecodeHandler
2     public void run() {
3         switch (state) {
4             case CONNECTED:
5                 ...
6                 break;
7             case DISCONNECTED:
8                 ...
9                 break;
10             case SENT:
11                 ...
12                 break;
13             case RECEIVED:
14                 try {
15                     handler.received(channel, message);
16                 } catch (Exception e) {
17                     logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
18                             + ", message is " + message, e);
19                 }
20                 break;
21             case CAUGHT:
22                 ...
23                 break;
24             default:
25                 logger.warn("unknown state: " + state + ", message is " + message);
26         }
27     }


DecodeHandler.received(Channel channel, Object message)

1     protected ChannelHandler handler;//HeaderExchangeHandler
2     public void received(Channel channel, Object message) throws RemotingException {
3         if (message instanceof Decodeable) {
4             decode(message);
5         }
6
7         if (message instanceof Request) {
8             decode(((Request) message).getData());//解码
9         }
10
11         if (message instanceof Response) {
12             decode(((Response) message).getResult());
13         }
14
15         handler.received(channel, message);
16     }


HeaderExchangeHandler.received(Channel channel, Object message)

1     private final ExchangeHandler handler;//DubboProtocol$ExchangeHandler
2
3     public void received(Channel channel, Object message) throws RemotingException {
4         channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
5         ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
6         try {
7             if (message instanceof Request) {
8                 // handle request.
9                 Request request = (Request) message;
10                 if (request.isEvent()) {
11                     handlerEvent(channel, request);
12                 } else {
13                     if (request.isTwoWay()) {
14                         Response response = handleRequest(exchangeChannel, request);
15                         channel.send(response);
16                     } else {
17                         handler.received(exchangeChannel, request.getData());
18                     }
19                 }
20             } else if (message instanceof Response) {
21                 handleResponse(channel, (Response) message);
22             } else if (message instanceof String) {
23                 if (isClientSide(channel)) {
24                     Exception e = new Exception(...);
25                 } else {
26                     String echo = handler.telnet(channel, (String) message);
27                     if (echo != null && echo.length() > 0) {
28                         channel.send(echo);
29                     }
30                 }
31             } else {
32                 handler.received(exchangeChannel, message);
33             }
34         } finally {
35             HeaderExchangeChannel.removeChannelIfDisconnected(channel);
36         }
37     }
38
39     Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
40         Response res = new Response(req.getId(), req.getVersion());
41         if (req.isBroken()) {
42             Object data = req.getData();
43
44             String msg;
45             if (data == null) msg = null;
46             else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
47             else msg = data.toString();
48             res.setErrorMessage("Fail to decode request due to: " + msg);
49             res.setStatus(Response.BAD_REQUEST);
50
51             return res;
52         }
53         // find handler by message class.
54         Object msg = req.getData();
55         try {
56             // handle data.
57             Object result = handler.reply(channel, msg);
58             res.setStatus(Response.OK);
59             res.setResult(result);
60         } catch (Throwable e) {
61             res.setStatus(Response.SERVICE_ERROR);
62             res.setErrorMessage(StringUtils.toString(e));
63         }
64         return res;
65     }


DubboProtocol$ExchangeHandler.reply(ExchangeChannel channel, Object message)

1         public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
2             if (message instanceof Invocation) {
3                 Invocation inv = (Invocation) message;
4                 Invoker<?> invoker = getInvoker(channel, inv);
5                 ...
6                 return invoker.invoke(inv);
7             }
8             throw new RemotingException(...);
9         }


首先是获取Invoker,之后使用该invoker执行真正调用。

1     protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();
2
3     Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
4         ...
5         int port = channel.getLocalAddress().getPort();//20880
6         String path = inv.getAttachments().get(Constants.PATH_KEY);
7         ...
8         String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
9
10         DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
11
12         if (exporter == null)
13             throw new RemotingException(...);
14
15         return exporter.getInvoker();
16     }


这里serviceKey是:com.alibaba.dubbo.demo.DemoService:20880。实际上是group/serviceName:serviceVersion:port。

1     public static String serviceKey(int port, String serviceName, String serviceVersion, String serviceGroup) {
2         StringBuilder buf = new StringBuilder();
3         if (serviceGroup != null && serviceGroup.length() > 0) {
4             buf.append(serviceGroup);
5             buf.append("/");
6         }
7         buf.append(serviceName);
8         if (serviceVersion != null && serviceVersion.length() > 0 && !"0.0.0".equals(serviceVersion)) {
9             buf.append(":");
10             buf.append(serviceVersion);
11         }
12         buf.append(":");
13         buf.append(port);
14         return buf.toString();
15     }


Map<String, Exporter<?>> exporterMap在服务暴露时就已经初始化好了。"com.alibaba.dubbo.demo.DemoService:20880"->DubboExporter实例。该实例包含一个呗filter链包裹的Invoker实例:RegistryProtocol$InvokerDelegete实例。

之后开始执行filter链了,直到最后执行到RegistryProtocol$InvokerDelegete.invoke,该方法实际上是在RegistryProtocol$InvokerDelegete的父类InvokerWrapper执行,InvokerWrapper调用AbstractProxyInvoker.invoke(Invocation invocation)。

1     private final T proxy;//DemoServiceImpl实例
2
3     public Result invoke(Invocation invocation) throws RpcException {
4         try {
5             return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
6         } catch (InvocationTargetException e) {
7             return new RpcResult(e.getTargetException());
8         } catch (Throwable e) {
9             throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
10         }
11     }


这里先调用子类JavassistProxyFactory$AbstractProxyInvoker.doInvoke,之后将返回结果封装为RpcResult返回。

1 protected Object doInvoke(T proxy, String methodName,
2                                       Class<?>[] parameterTypes,
3                                       Object[] arguments) throws Throwable {
4                 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
5             }


这里调用了Wrapper类的invokeMethod方法,Wrapper是一个动态生成的类,笔者给出:

1 import com.alibaba.dubbo.common.bytecode.Wrapper;
2 import java.util.HashMap;
3
4 public class Wrapper1 extends Wrapper {
5
6     public static String[] pns;//property name array
7     public static java.util.Map pts = new HashMap();//<property key, property value>
8     public static String[] mns;//method names
9     public static String[] dmns;//
10     public static Class[] mts0;
11     /**
12      * @param o  实现类
13      * @param n  方法名称
14      * @param p  参数类型
15      * @param v  参数名称
16      * @return
17      * @throws java.lang.reflect.InvocationTargetException
18      */
19     public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
20         com.alibaba.dubbo.demo.provider.DemoServiceImpl w;
21         try {
22             w = ((com.alibaba.dubbo.demo.provider.DemoServiceImpl) o);
23         } catch (Throwable e) {
24             throw new IllegalArgumentException(e);
25         }
26         try {
27             if ("sayHello".equals(n) && p.length == 1) {
28                 return ($w) w.sayHello((java.lang.String) v[0]);
29             }
30         } catch (Throwable e) {
31             throw new java.lang.reflect.InvocationTargetException(e);
32         }
33         throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + n + "\" in class com.alibaba.dubbo.demo.provider.DemoServiceImpl.");
34     }
35 }


这里距执行到了DemoServiceImpl的sayHello(String name)方法。之后将返回结果封装为RpcResult并返回,一直返回到HeaderExchangeHandler的received(Channel channel, Object message)

1     public void received(Channel channel, Object message) throws RemotingException {
2         channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
3         ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
4         try {
5             if (message instanceof Request) {
6                 // handle request.
7                 Request request = (Request) message;
8                 if (request.isEvent()) {
9                     handlerEvent(channel, request);
10                 } else {
11                     if (request.isTwoWay()) {
12                         Response response = handleRequest(exchangeChannel, request);
13                         channel.send(response);
14                     } else {
15                         handler.received(exchangeChannel, request.getData());
16                     }
17                 }
18             } else if (message instanceof Response) {
19                 handleResponse(channel, (Response) message);
20             } else if (message instanceof String) {
21                 if (isClientSide(channel)) {
22                     Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
23                     logger.error(e.getMessage(), e);
24                 } else {
25                     String echo = handler.telnet(channel, (String) message);
26                     if (echo != null && echo.length() > 0) {
27                         channel.send(echo);
28                     }
29                 }
30             } else {
31                 handler.received(exchangeChannel, message);
32             }
33         } finally {
34             HeaderExchangeChannel.removeChannelIfDisconnected(channel);
35         }
36     }


之后将响应结果返回给客户端,这里的channel是NettyChannel,执行NettyChannel的send方法,其调用NioAcceptedSocketChannel.write(Object message)将消息写会给客户端,结束!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: