您的位置:首页 > 其它

Netty学习6-ChanelHandler【2】调用过程源码分析

2016-12-24 16:13 651 查看
1 概述

Netty中的ChannelPipeline类似于servlet,chanelHandler类似于filter。这类拦截器就是职责链设计模式,主要是事件拦截和用户业务逻辑定制。演示代码采用的是netty
3.10.5版本。调试步骤和测试代码如下:

1 netty源代码下载完成后,导入为maven项目,命名为MyNettySource

2
测试代码在configure build path时不要直接导入netty3的jar包,而是直接导入project项目,选择MyNettySource项目

3 对关注的方法打断点

4 跟踪调用链

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;

/**
* netty服务端入门
*/
public class Server {

public static void main(String[] args) {

// 服务类
ServerBootstrap bootstrap = new ServerBootstrap();
// boss线程监听端口,worker线程负责数据读写
ExecutorService boss = Executors.newCachedThreadPool();
ExecutorService worker = Executors.newCachedThreadPool();
// 设置niosocket工厂
bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));
// 设置管道的工厂
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("xyHelloHandler", new HelloHandler()); // StringDecoder和HelloHandler都是Netty4中的inboundHandler
return pipeline;
}
});
bootstrap.bind(new InetSocketAddress(10101));
System.out.println("server3 start!!!");
}
}

import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;

/**
* 消息接受处理类
*/
public class HelloHandler extends SimpleChannelHandler {

// 接收消息
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
String messageReceived = (String) e.getMessage();
System.out.println(messageReceived);
super.messageReceived(ctx, e);
}

// 捕获异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
System.out.println("exceptionCaught");
super.exceptionCaught(ctx, e);
}

// 新连接
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
System.out.println("channelConnected");
super.channelConnected(ctx, e);
}

// 必须是链接已经建立,关闭通道的时候才会触发
@Override
public void channelDisconnected(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
System.out.println("channelDisconnected");
super.channelDisconnected(ctx, e);
}

// channel关闭的时候触发
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
System.out.println("channelClosed");
super.channelClosed(ctx, e);
}
}


2 server启动

handler是通过pipeline的addLast方法添加的,那首先将断点定位在DefaultChannelPileline的addLast方法。

利用debug方式运行Server,注意查看调用链。从调用链中可看出入口是ServerBootStrap的bindAsync方法。

[1] 方法DefaultChannelPileline@addLast



public ChannelFuture bindAsync(final SocketAddress localAddress) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
Binder binder = new Binder(localAddress);
ChannelHandler parentHandler = getParentHandler();

ChannelPipeline bossPipeline = pipeline();
bossPipeline.addLast("binder", binder);

if (parentHandler != null) {
bossPipeline.addLast("userHandler", parentHandler);
}
}

bossPipeline.addLast("binder", binder)赋值了一个名为binder的处理器。继续跟进addLast方法。

private final Map<String, DefaultChannelHandlerContext> name2ctx =new HashMap<String, DefaultChannelHandlerContext>(4);
public synchronized void addLast(String name, ChannelHandler handler) {
if (name2ctx.isEmpty()) {
init(name, handler);
} else {
checkDuplicateName(name);
DefaultChannelHandlerContext oldTail = tail;
DefaultChannelHandlerContext newTail = new DefaultChannelHandlerContext(oldTail, null, name, handler);
callBeforeAdd(newTail);
oldTail.next = newTail;
tail = newTail;
name2ctx.put(name, newTail);
callAfterAdd(newTail);
}
}


[2] 方法DefaultChannelPileline@init

name2ctx是保存handlerContext的集合对象,此时是空的。断点走进init方法,逐行分析方法内的这几行代码。



private void init(String name, ChannelHandler handler) {
DefaultChannelHandlerContext ctx = new DefaultChannelHandlerContext(null, null, name, handler);
callBeforeAdd(ctx);
head = tail = ctx;
name2ctx.clear();
name2ctx.put(name, ctx);
callAfterAdd(ctx);
}

行1 声明了DefaultChannelHandlerContext对象,传入name值是binder,handler是名为binder的处理器,这两个值初始化的地方就在上文看到的调用链的BootStrap的bind方法中。先看DefaultChannelHandlerContext该类使用到的属性。很明显这是链表的数据结构,prev指向上个对象,next指向下个对象。

private final class DefaultChannelHandlerContext implements ChannelHandlerContext {
volatile DefaultChannelHandlerContext next;
volatile DefaultChannelHandlerContext prev;
private final String name;
private final ChannelHandler handler;
}

行2 binder并不是LifeCycleAwareChannelHandler类型,直接return掉。

private static void callBeforeAdd(ChannelHandlerContext ctx) {
if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
return;
}

行2 将链表中的第1个head和最后一个tail都赋值给了第1行中声明的对象。

private volatile DefaultChannelHandlerContext head;
private volatile DefaultChannelHandlerContext tail;

行4 清除Map对象中的值。

行5 将第1行声明的对象赋值给map集合。

行6 binder并不是LifeCycleAwareChannelHandler类型,直接return掉。第一个需要关注的断点到此结束。

private void callAfterAdd(ChannelHandlerContext ctx) {
if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
return;
}
}


3 client启动

启动client的方法由很多种,可以写Client的代码进行访问。还有一个简便的方法是利用telnet模拟客户端。

telnet 127.0.0.1 10101。设置断点在Server类中的Channels.pipeline()位置。

[1] 方法channels.pipeline()



新生成ChannelPipeline对象,从调用链可看出:

[1] boss线程池负责接收连接。

[2] 上游NioServerBoss@registerAcceptedChannel表明在建立连接时,该ChannelPipeline的所有handler将被设置完成。

private static void registerAcceptedChannel(NioServerSocketChannel parent, SocketChannel acceptedSocket,Thread currentThread) {
try {
ChannelSink sink = parent.getPipeline().getSink();
ChannelPipeline pipeline =parent.getConfig().getPipelineFactory().getPipeline();
}
}



[2] 方法pipeline.addLast(decoder)

添加StringDecoder这个处理器,流程和2.2中的所有步骤一致都走init方法,同样在callBeforeAdd和callAfterAdd中return了。唯一不同的是name叫做decoder。

[3] 方法pipeline.addLast(xyHelloHandler)

name2ctx不再empty,进入else分支。



checkDuplicateName检查是否有重名,有重名则直接抛出异常。

oldTail表示原来尾对象,新申明的xyHelloHandler将被作为新尾对象。再把原oldTail的next指向xyHelloHandler,再放入name2ctx集合。

[4] 方法AbstractNioSelector@select

继续跟着断点走,会走到AbstractNioSelector的select方法会阻塞,等待下一次事件。

4 client发送信息

在telnet窗口按下ctrl+] 会进入发送窗口。

现在关注以下2个方法的调用链,StringDecoder@decode、HelloHandler@messageReceived断点到这两个方法。

[1] 方法StringDecoder@decode



从NioWorker的read方法开始,执行到DefaultChannelPipeline的sendUpstream。标注559处。

public void sendUpstream(ChannelEvent e) {
DefaultChannelHandlerContext head = getActualUpstreamContext(this.head); // 获取该pipeline的HeadHandler就是StringDecoder
if (head == null) {
if (logger.isWarnEnabled()) {
logger.warn(
"The pipeline contains no upstream handlers; discarding: " + e);
}
return;
}
sendUpstream(head, e); // 进入该方法
}

进入该方法后会先调用StringDecoder的父类OnetoOneDecoder的handleUpStream(ctx,event)方法,标注559的下一行。

public void handleUpstream(
ChannelHandlerContext ctx, ChannelEvent evt) throws Exception {
if (!(evt instanceof MessageEvent)) {
ctx.sendUpstream(evt);
return;
}
MessageEvent e = (MessageEvent) evt;
Object originalMessage = e.getMessage();
Object decodedMessage = decode(ctx, e.getChannel(), originalMessage);
if (originalMessage == decodedMessage) {
ctx.sendUpstream(evt);
} else if (decodedMessage != null) {
fireMessageReceived(ctx, decodedMessage, e.getRemoteAddress()); // 向下传递
}
}

父类方法调用子类的decode方法,如此StringDecoder的decode方法就被调用到了。fireMessageReceived(ctx, decodedMessage, e.getRemoteAddress())方法会向下传递,进入该方法最终会执行到DefaultChannelPipeline的内部类DefaultChannelHandlerContext的sendUpstream方法

public void sendUpstream(ChannelEvent e) {
DefaultChannelHandlerContext next = getActualUpstreamContext(this.next); // 获取next
if (next != null) {
DefaultChannelPipeline.this.sendUpstream(next, e);
}
}


[2] 方法HelloHandler@messageReceived

next相当于获取了head的下一个handler对象即HelloHanlder。

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
String messageReceived = (String) e.getMessage();
System.out.println(messageReceived);
super.messageReceived(ctx, e); // helloHandler若希望继续向下传递,则继续调用链式方法
}

从代码可看出handler往下传递对象的方法是sendUpstream(event)下方代码进行测试:

public class Client {
public static void main(String[] args) throws Exception {
Socket socket = new Socket("127.0.0.1", 10101);
socket.getOutputStream().write("hello".getBytes());
socket.close();
}
}

public class Server {
public static void main(String[] args) {
//服务类
ServerBootstrap bootstrap = new ServerBootstrap();
//boss线程监听端口,worker线程负责数据读写
ExecutorService boss = Executors.newCachedThreadPool();
ExecutorService worker = Executors.newCachedThreadPool();
//设置niosocket工厂
bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));
//设置管道的工厂
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("handler1", new MyHandler1());
pipeline.addLast("handler2", new MyHandler2());
return pipeline;
}
});
bootstrap.bind(new InetSocketAddress(10101));
System.out.println("start!!!");
}
}

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.UpstreamMessageEvent;
public class MyHandler1 extends SimpleChannelHandler {

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
ChannelBuffer buffer = (ChannelBuffer)e.getMessage();
byte[] array = buffer.array();
String message = new String(array);
System.out.println("handler1:" + message);

// 传递给handler2。由于直接传递的是string那么handler2直接接收String即可
ctx.sendUpstream(new UpstreamMessageEvent(ctx.getChannel(), "abc", e.getRemoteAddress()));
ctx.sendUpstream(new UpstreamMessageEvent(ctx.getChannel(), "efg", e.getRemoteAddress()));
}
}

import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
public class MyHandler2 extends SimpleChannelHandler {

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
String message = (String)e.getMessage();
System.out.println("handler2:" + message);
}
}


5 简要流程图



流程图地址 http://blog.csdn.net/zxhoo/article/details/17264263
感觉有帮助请您赏一杯茶钱,金额随意。您的鼓励是我写作的动力。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: