您的位置:首页 > 其它

netty源码分析之ChannelPipeline

2013-10-11 18:19 971 查看
看了ChannelHandler我们就来看ChannelPipeline,这个类实现了责任链模式,我们就直接来看这个类的实现吧,看完后我们再看看javadoc的,这个写的很详细。

Java代码


static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
static final ChannelSink discardingSink = new DiscardingChannelSink();

private volatile Channel channel;
private volatile ChannelSink sink;
private volatile DefaultChannelHandlerContext head;
private volatile DefaultChannelHandlerContext tail;
private final Map<String, DefaultChannelHandlerContext> name2ctx =
new HashMap<String, DefaultChannelHandlerContext>(4);

这个里面的很好理解,channel就是和这个pipeline相关联的,sink也是和这个chanel相关联的对象,在pipeline里面会调度channelhandler的运行,在运行之后交给sink去处理传输等比较底层的操作。head和tail和name2ctx是这个pipe里面的一个ChanelHandlerContext的一个链式结构。

Java代码


public ChannelSink getSink() {
ChannelSink sink = this.sink;
if (sink == null) {
return discardingSink;
}
return sink;
}

在这个里面可以看到单不传递sink的时候,默认使用discardingSink。

Java代码


public void attach(Channel channel, ChannelSink sink) {
if (channel == null) {
throw new NullPointerException("channel");
}
if (sink == null) {
throw new NullPointerException("sink");
}
if (this.channel != null || this.sink != null) {
throw new IllegalStateException("attached already");
}
this.channel = channel;
this.sink = sink;
}

这个里面实现channel和sink与这个pipeline的绑定,可以看出来,一个pipe会和一个channel和sink相绑定。

下来来看一下链表channelhandler的操作:

Java代码


public synchronized void addFirst(String name, ChannelHandler handler) {
if (name2ctx.isEmpty()) {
init(name, handler);
} else {
checkDuplicateName(name);
DefaultChannelHandlerContext oldHead = head;
DefaultChannelHandlerContext newHead = new DefaultChannelHandlerContext(null, oldHead, name, handler);

callBeforeAdd(newHead);

oldHead.prev = newHead;
head = newHead;
name2ctx.put(name, newHead);

callAfterAdd(newHead);
}
}

如果链表是空的时候实现初始化的一些操作,初始化head、tail变量,然后如果这个handler是实现LifeCycleChannelHandler的,就触发相应的动作。如果不是初始化的时候,就修改一下head指针,这个是比较好理解的。

同理addLast要修改相应的tail指针,addBefore修改prev,addAfter修改next指针,这些都比较好理解,就不多解释了。需要注意的是这些方法都是一些同步方法,都加了synchronized关键字,不知道这个地方是否可以使用ConcurrentHashMap。

中间的一些链表操作我们就不多讲了,但是写的感觉比较好,都比较精彩,作者的功底很高,我们接下来看一下调度:

Java代码


public void sendUpstream(ChannelEvent e) {
DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
if (head == null) {
logger.warn(
"The pipeline contains no upstream handlers; discarding: " + e);
return;
}

sendUpstream(head, e);
}

void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
try {
((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
} catch (Throwable t) {
notifyHandlerException(e, t);
}
}

sendUpstream可以看到是从head开始走,然后找到下一个可以处理upstream的handler进行处理,然后让HandlerContext找到当前的下一个到下一个,慢慢的处理upstream事件,最后让交给ChannelSink组件。

Java代码


public void sendDownstream(ChannelEvent e) {
DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
if (tail == null) {
try {
getSink().eventSunk(this, e);
return;
} catch (Throwable t) {
notifyHandlerException(e, t);
return;
}
}

sendDownstream(tail, e);
}

void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
if (e instanceof UpstreamMessageEvent) {
throw new IllegalArgumentException("cannot send an upstream event to downstream");
}

try {
((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
} catch (Throwable t) {
// Unlike an upstream event, a downstream event usually has an
// incomplete future which is supposed to be updated by ChannelSink.
// However, if an exception is raised before the event reaches at
// ChannelSink, the future is not going to be updated, so we update
// here.
e.getFuture().setFailure(t);
notifyHandlerException(e, t);
}
}

sendDownstream我们可以看到是找到最后一个Handler,然后找到前一个可以处理downstream事件的handler进行处理。

最后我们来看一下javadoc里面给的例子程序:

Java代码


ChannelPipeline p = Channels.pipeline();
p.addLast("1", new UpstreamHandlerA());
p.addLast("2", new UpstreamHandlerB());
p.addLast("3", new DownstreamHandlerA());
p.addLast("4", new DownstreamHandlerB());
p.addLast("5", new UpstreamHandlerX());

我们可以看到,upstream事件的执行顺序是从头部找到第一个可以处理upstream事件的handler开始执行,所以执行顺序应该是1 --> 2 --> 5.

downstream事件的执行顺序是从尾部找到第一个可以处理downstream事件的handler开始执行,所以执行顺序是 4 --> 3.

在javadoc里面给我们指出了handler很灵活可以在运行的过程中自动的增加和删除,但是同时指出了下面的实现是错误的:

Java代码


public class FirstHandler extends SimpleChannelUpstreamHandler {

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
// Remove this handler from the pipeline,
ctx.getPipeline().remove(this);
// And let SecondHandler handle the current event.
ctx.getPipeline().addLast("2nd", new SecondHandler());
ctx.sendUpstream(e);
}
}

我们看出这个是错误的,remove方法会破坏这个链,所有应该在remove之前add,这个相信大家都不会犯这样的错误的。

分享到:


内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: