Netty学习6-ChanelHandler【2】调用过程源码分析
2016-12-24 16:13
651 查看
1 概述
Netty中的ChannelPipeline类似于servlet,chanelHandler类似于filter。这类拦截器就是职责链设计模式,主要是事件拦截和用户业务逻辑定制。演示代码采用的是netty
3.10.5版本。调试步骤和测试代码如下:
1 netty源代码下载完成后,导入为maven项目,命名为MyNettySource
2
测试代码在configure build path时不要直接导入netty3的jar包,而是直接导入project项目,选择MyNettySource项目
3 对关注的方法打断点
4 跟踪调用链
2 server启动
handler是通过pipeline的addLast方法添加的,那首先将断点定位在DefaultChannelPileline的addLast方法。
利用debug方式运行Server,注意查看调用链。从调用链中可看出入口是ServerBootStrap的bindAsync方法。
[1] 方法DefaultChannelPileline@addLast
bossPipeline.addLast("binder", binder)赋值了一个名为binder的处理器。继续跟进addLast方法。
[2] 方法DefaultChannelPileline@init
name2ctx是保存handlerContext的集合对象,此时是空的。断点走进init方法,逐行分析方法内的这几行代码。
行1 声明了DefaultChannelHandlerContext对象,传入name值是binder,handler是名为binder的处理器,这两个值初始化的地方就在上文看到的调用链的BootStrap的bind方法中。先看DefaultChannelHandlerContext该类使用到的属性。很明显这是链表的数据结构,prev指向上个对象,next指向下个对象。
行2 binder并不是LifeCycleAwareChannelHandler类型,直接return掉。
行2 将链表中的第1个head和最后一个tail都赋值给了第1行中声明的对象。
行4 清除Map对象中的值。
行5 将第1行声明的对象赋值给map集合。
行6 binder并不是LifeCycleAwareChannelHandler类型,直接return掉。第一个需要关注的断点到此结束。
3 client启动
启动client的方法由很多种,可以写Client的代码进行访问。还有一个简便的方法是利用telnet模拟客户端。
telnet 127.0.0.1 10101。设置断点在Server类中的Channels.pipeline()位置。
[1] 方法channels.pipeline()
新生成ChannelPipeline对象,从调用链可看出:
[1] boss线程池负责接收连接。
[2] 上游NioServerBoss@registerAcceptedChannel表明在建立连接时,该ChannelPipeline的所有handler将被设置完成。
[2] 方法pipeline.addLast(decoder)
添加StringDecoder这个处理器,流程和2.2中的所有步骤一致都走init方法,同样在callBeforeAdd和callAfterAdd中return了。唯一不同的是name叫做decoder。
[3] 方法pipeline.addLast(xyHelloHandler)
name2ctx不再empty,进入else分支。
checkDuplicateName检查是否有重名,有重名则直接抛出异常。
oldTail表示原来尾对象,新申明的xyHelloHandler将被作为新尾对象。再把原oldTail的next指向xyHelloHandler,再放入name2ctx集合。
[4] 方法AbstractNioSelector@select
继续跟着断点走,会走到AbstractNioSelector的select方法会阻塞,等待下一次事件。
4 client发送信息
在telnet窗口按下ctrl+] 会进入发送窗口。
现在关注以下2个方法的调用链,StringDecoder@decode、HelloHandler@messageReceived断点到这两个方法。
[1] 方法StringDecoder@decode
从NioWorker的read方法开始,执行到DefaultChannelPipeline的sendUpstream。标注559处。
进入该方法后会先调用StringDecoder的父类OnetoOneDecoder的handleUpStream(ctx,event)方法,标注559的下一行。
父类方法调用子类的decode方法,如此StringDecoder的decode方法就被调用到了。fireMessageReceived(ctx, decodedMessage, e.getRemoteAddress())方法会向下传递,进入该方法最终会执行到DefaultChannelPipeline的内部类DefaultChannelHandlerContext的sendUpstream方法
[2] 方法HelloHandler@messageReceived
next相当于获取了head的下一个handler对象即HelloHanlder。
从代码可看出handler往下传递对象的方法是sendUpstream(event)下方代码进行测试:
5 简要流程图
流程图地址 http://blog.csdn.net/zxhoo/article/details/17264263
感觉有帮助请您赏一杯茶钱,金额随意。您的鼓励是我写作的动力。
Netty中的ChannelPipeline类似于servlet,chanelHandler类似于filter。这类拦截器就是职责链设计模式,主要是事件拦截和用户业务逻辑定制。演示代码采用的是netty
3.10.5版本。调试步骤和测试代码如下:
1 netty源代码下载完成后,导入为maven项目,命名为MyNettySource
2
测试代码在configure build path时不要直接导入netty3的jar包,而是直接导入project项目,选择MyNettySource项目
3 对关注的方法打断点
4 跟踪调用链
import java.net.InetSocketAddress; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.jboss.netty.handler.codec.string.StringDecoder; /** * netty服务端入门 */ public class Server { public static void main(String[] args) { // 服务类 ServerBootstrap bootstrap = new ServerBootstrap(); // boss线程监听端口,worker线程负责数据读写 ExecutorService boss = Executors.newCachedThreadPool(); ExecutorService worker = Executors.newCachedThreadPool(); // 设置niosocket工厂 bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker)); // 设置管道的工厂 bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("xyHelloHandler", new HelloHandler()); // StringDecoder和HelloHandler都是Netty4中的inboundHandler return pipeline; } }); bootstrap.bind(new InetSocketAddress(10101)); System.out.println("server3 start!!!"); } } import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; /** * 消息接受处理类 */ public class HelloHandler extends SimpleChannelHandler { // 接收消息 @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { String messageReceived = (String) e.getMessage(); System.out.println(messageReceived); super.messageReceived(ctx, e); } // 捕获异常 @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { System.out.println("exceptionCaught"); super.exceptionCaught(ctx, e); } // 新连接 @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("channelConnected"); super.channelConnected(ctx, e); } // 必须是链接已经建立,关闭通道的时候才会触发 @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("channelDisconnected"); super.channelDisconnected(ctx, e); } // channel关闭的时候触发 @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("channelClosed"); super.channelClosed(ctx, e); } }
2 server启动
handler是通过pipeline的addLast方法添加的,那首先将断点定位在DefaultChannelPileline的addLast方法。
利用debug方式运行Server,注意查看调用链。从调用链中可看出入口是ServerBootStrap的bindAsync方法。
[1] 方法DefaultChannelPileline@addLast
public ChannelFuture bindAsync(final SocketAddress localAddress) { if (localAddress == null) { throw new NullPointerException("localAddress"); } Binder binder = new Binder(localAddress); ChannelHandler parentHandler = getParentHandler(); ChannelPipeline bossPipeline = pipeline(); bossPipeline.addLast("binder", binder); if (parentHandler != null) { bossPipeline.addLast("userHandler", parentHandler); } }
bossPipeline.addLast("binder", binder)赋值了一个名为binder的处理器。继续跟进addLast方法。
private final Map<String, DefaultChannelHandlerContext> name2ctx =new HashMap<String, DefaultChannelHandlerContext>(4); public synchronized void addLast(String name, ChannelHandler handler) { if (name2ctx.isEmpty()) { init(name, handler); } else { checkDuplicateName(name); DefaultChannelHandlerContext oldTail = tail; DefaultChannelHandlerContext newTail = new DefaultChannelHandlerContext(oldTail, null, name, handler); callBeforeAdd(newTail); oldTail.next = newTail; tail = newTail; name2ctx.put(name, newTail); callAfterAdd(newTail); } }
[2] 方法DefaultChannelPileline@init
name2ctx是保存handlerContext的集合对象,此时是空的。断点走进init方法,逐行分析方法内的这几行代码。
private void init(String name, ChannelHandler handler) { DefaultChannelHandlerContext ctx = new DefaultChannelHandlerContext(null, null, name, handler); callBeforeAdd(ctx); head = tail = ctx; name2ctx.clear(); name2ctx.put(name, ctx); callAfterAdd(ctx); }
行1 声明了DefaultChannelHandlerContext对象,传入name值是binder,handler是名为binder的处理器,这两个值初始化的地方就在上文看到的调用链的BootStrap的bind方法中。先看DefaultChannelHandlerContext该类使用到的属性。很明显这是链表的数据结构,prev指向上个对象,next指向下个对象。
private final class DefaultChannelHandlerContext implements ChannelHandlerContext { volatile DefaultChannelHandlerContext next; volatile DefaultChannelHandlerContext prev; private final String name; private final ChannelHandler handler; }
行2 binder并不是LifeCycleAwareChannelHandler类型,直接return掉。
private static void callBeforeAdd(ChannelHandlerContext ctx) { if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) { return; }
行2 将链表中的第1个head和最后一个tail都赋值给了第1行中声明的对象。
private volatile DefaultChannelHandlerContext head; private volatile DefaultChannelHandlerContext tail;
行4 清除Map对象中的值。
行5 将第1行声明的对象赋值给map集合。
行6 binder并不是LifeCycleAwareChannelHandler类型,直接return掉。第一个需要关注的断点到此结束。
private void callAfterAdd(ChannelHandlerContext ctx) { if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) { return; } }
3 client启动
启动client的方法由很多种,可以写Client的代码进行访问。还有一个简便的方法是利用telnet模拟客户端。
telnet 127.0.0.1 10101。设置断点在Server类中的Channels.pipeline()位置。
[1] 方法channels.pipeline()
新生成ChannelPipeline对象,从调用链可看出:
[1] boss线程池负责接收连接。
[2] 上游NioServerBoss@registerAcceptedChannel表明在建立连接时,该ChannelPipeline的所有handler将被设置完成。
private static void registerAcceptedChannel(NioServerSocketChannel parent, SocketChannel acceptedSocket,Thread currentThread) { try { ChannelSink sink = parent.getPipeline().getSink(); ChannelPipeline pipeline =parent.getConfig().getPipelineFactory().getPipeline(); } }
[2] 方法pipeline.addLast(decoder)
添加StringDecoder这个处理器,流程和2.2中的所有步骤一致都走init方法,同样在callBeforeAdd和callAfterAdd中return了。唯一不同的是name叫做decoder。
[3] 方法pipeline.addLast(xyHelloHandler)
name2ctx不再empty,进入else分支。
checkDuplicateName检查是否有重名,有重名则直接抛出异常。
oldTail表示原来尾对象,新申明的xyHelloHandler将被作为新尾对象。再把原oldTail的next指向xyHelloHandler,再放入name2ctx集合。
[4] 方法AbstractNioSelector@select
继续跟着断点走,会走到AbstractNioSelector的select方法会阻塞,等待下一次事件。
4 client发送信息
在telnet窗口按下ctrl+] 会进入发送窗口。
现在关注以下2个方法的调用链,StringDecoder@decode、HelloHandler@messageReceived断点到这两个方法。
[1] 方法StringDecoder@decode
从NioWorker的read方法开始,执行到DefaultChannelPipeline的sendUpstream。标注559处。
public void sendUpstream(ChannelEvent e) { DefaultChannelHandlerContext head = getActualUpstreamContext(this.head); // 获取该pipeline的HeadHandler就是StringDecoder if (head == null) { if (logger.isWarnEnabled()) { logger.warn( "The pipeline contains no upstream handlers; discarding: " + e); } return; } sendUpstream(head, e); // 进入该方法 }
进入该方法后会先调用StringDecoder的父类OnetoOneDecoder的handleUpStream(ctx,event)方法,标注559的下一行。
public void handleUpstream( ChannelHandlerContext ctx, ChannelEvent evt) throws Exception { if (!(evt instanceof MessageEvent)) { ctx.sendUpstream(evt); return; } MessageEvent e = (MessageEvent) evt; Object originalMessage = e.getMessage(); Object decodedMessage = decode(ctx, e.getChannel(), originalMessage); if (originalMessage == decodedMessage) { ctx.sendUpstream(evt); } else if (decodedMessage != null) { fireMessageReceived(ctx, decodedMessage, e.getRemoteAddress()); // 向下传递 } }
父类方法调用子类的decode方法,如此StringDecoder的decode方法就被调用到了。fireMessageReceived(ctx, decodedMessage, e.getRemoteAddress())方法会向下传递,进入该方法最终会执行到DefaultChannelPipeline的内部类DefaultChannelHandlerContext的sendUpstream方法
public void sendUpstream(ChannelEvent e) { DefaultChannelHandlerContext next = getActualUpstreamContext(this.next); // 获取next if (next != null) { DefaultChannelPipeline.this.sendUpstream(next, e); } }
[2] 方法HelloHandler@messageReceived
next相当于获取了head的下一个handler对象即HelloHanlder。
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { String messageReceived = (String) e.getMessage(); System.out.println(messageReceived); super.messageReceived(ctx, e); // helloHandler若希望继续向下传递,则继续调用链式方法 }
从代码可看出handler往下传递对象的方法是sendUpstream(event)下方代码进行测试:
public class Client { public static void main(String[] args) throws Exception { Socket socket = new Socket("127.0.0.1", 10101); socket.getOutputStream().write("hello".getBytes()); socket.close(); } } public class Server { public static void main(String[] args) { //服务类 ServerBootstrap bootstrap = new ServerBootstrap(); //boss线程监听端口,worker线程负责数据读写 ExecutorService boss = Executors.newCachedThreadPool(); ExecutorService worker = Executors.newCachedThreadPool(); //设置niosocket工厂 bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker)); //设置管道的工厂 bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("handler1", new MyHandler1()); pipeline.addLast("handler2", new MyHandler2()); return pipeline; } }); bootstrap.bind(new InetSocketAddress(10101)); System.out.println("start!!!"); } } import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; import org.jboss.netty.channel.UpstreamMessageEvent; public class MyHandler1 extends SimpleChannelHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { ChannelBuffer buffer = (ChannelBuffer)e.getMessage(); byte[] array = buffer.array(); String message = new String(array); System.out.println("handler1:" + message); // 传递给handler2。由于直接传递的是string那么handler2直接接收String即可 ctx.sendUpstream(new UpstreamMessageEvent(ctx.getChannel(), "abc", e.getRemoteAddress())); ctx.sendUpstream(new UpstreamMessageEvent(ctx.getChannel(), "efg", e.getRemoteAddress())); } } import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; public class MyHandler2 extends SimpleChannelHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { String message = (String)e.getMessage(); System.out.println("handler2:" + message); } }
5 简要流程图
流程图地址 http://blog.csdn.net/zxhoo/article/details/17264263
感觉有帮助请您赏一杯茶钱,金额随意。您的鼓励是我写作的动力。
相关文章推荐
- spring源码学习之路---深度分析IOC容器初始化过程(四)
- Netty源码学习——EventLoopGroup原理:NioEventLoopGroup分析
- 源码分析netty服务器创建过程vs java nio服务器创建
- netty 5 alph1源码分析(服务端创建过程)
- Openstack Nova 源码分析 — RPC 远程调用过程
- spring源码学习之路---深度分析IOC容器初始化过程(四)
- TQ2440 学习笔记—— 30、移植U-Boot【U-Boot 的启动过程第一阶段源码分析】
- 韩顺平 javascript教学视频_学习笔记9_js函数调用过程内存分析_js函数细节
- Netty 源码分析(三):服务器端的初始化和注册过程
- 从Spark-Shell到SparkContext的函数调用路径过程分析(源码)
- Netty源码学习-ServerBootstrap启动及事件处理过程
- Hama框架学习(一) 从源码角度分析job的提交和运行过程
- Android Framework学习——Launcher启动应用程序过程源码分析
- 汇编学习笔记:函数调用过程中的堆栈分析
- nginx 源码学习笔记(十九)—— nginx启动过程函数调用图
- Netty源码学习——ChannelPipeline模型分析
- shiro学习笔记——从源码角度分析shiro身份验证过程
- Netty 4.0源码分析1:服务端启动过程中的Channel与EventLoopGroup的注册
- Netty源码分析之服务端启动过程
- struts2请求过程源码分析(转载学习)