您的位置:首页 > 理论基础 > 计算机网络

java游戏服务器之网络层Netty 之ChannelPipeline

2017-05-26 00:37 651 查看
ChannelPipeline具体的功能在代码里有具体的详细说明

* <h3>Creation of a pipeline</h3>
*
* Each channel has its own pipeline and it is created automatically when a new channel is created.
*
* <h3>How an event flows in a pipeline</h3>
*
* The following diagram describes how I/O events are processed by {@link ChannelHandler}s in a {@link ChannelPipeline}
* typically. An I/O event is handled by either a {@link ChannelInboundHandler} or a {@link ChannelOutboundHandler}
* and be forwarded to its closest handler by calling the event propagation methods defined in
* {@link ChannelHandlerContext}, such as {@link ChannelHandlerContext#fireChannelRead(Object)} and
* {@link ChannelHandlerContext#write(Object)}.
*
* <pre>
*                                                 I/O Request
*                                            via {@link Channel} or
*                                        {@link ChannelHandlerContext}
*                                                      |
*  +---------------------------------------------------+---------------+
*  |                           ChannelPipeline         |               |
*  |                                                  \|/              |
*  |    +---------------------+            +-----------+----------+    |
*  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
*  |    +----------+----------+            +-----------+----------+    |
*  |              /|\                                  |               |
*  |               |                                  \|/              |
*  |    +----------+----------+            +-----------+----------+    |
*  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
*  |    +----------+----------+            +-----------+----------+    |
*  |              /|\                                  .               |
*  |               .                                   .               |
*  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
*  |        [ method call]                       [method call]         |
*  |               .                                   .               |
*  |               .                                  \|/              |
*  |    +----------+----------+            +-----------+----------+    |
*  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
*  |    +----------+----------+            +-----------+----------+    |
*  |              /|\                                  |               |
*  |               |                                  \|/              |
*  |    +----------+----------+            +-----------+----------+    |
*  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
*  |    +----------+----------+            +-----------+----------+    |
*  |              /|\                                  |               |
*  +---------------+-----------------------------------+---------------+
*                  |                                  \|/
*  +---------------+-----------------------------------+---------------+
*  |               |                                   |               |
*  |       [ Socket.read() ]                    [ Socket.write() ]     |
*  |                                                                   |
*  |  Netty Internal I/O Threads (Transport Implementation)            |
*  +-------------------------------------------------------------------+
* </pre>
* An inbound event is handled by the inbound handlers in the bottom-up direction as shown on the left side of the
* diagram.  An inbound handler usually handles the inbound data generated by the I/O thread on the bottom of the
* diagram.  The inbound data is often read from a remote peer via the actual input operation such as
* {@link SocketChannel#read(ByteBuffer)}.  If an inbound event goes beyond the top inbound handler, it is discarded
* silently, or logged if it needs your attention.
* <p>
* An outbound event is handled by the outbound handler in the top-down direction as shown on the right side of the
* diagram.  An outbound handler usually generates or transforms the outbound traffic such as write requests.
* If an outbound event goes beyond the bottom outbound handler, it is handled by an I/O thread associated with the
* {@link Channel}. The I/O thread often performs the actual output operation such as
* {@link SocketChannel#write(ByteBuffer)}.
* <p>
* For example, let us assume that we created the following pipeline:
* <pre>
* {@link ChannelPipeline} p = ...;
* p.addLast("1", new InboundHandlerA());
* p.addLast("2", new InboundHandlerB());
* p.addLast("3", new OutboundHandlerA());
* p.addLast("4", new OutboundHandlerB());
* p.addLast("5", new InboundOutboundHandlerX());
* </pre>
* In the example above, the class whose name starts with {@code Inbound} means it is an inbound handler.
* The class whose name starts with {@code Outbound} means it is a outbound handler.
* <p>
* In the given example configuration, the handler evaluation order is 1, 2, 3, 4, 5 when an event goes inbound.
* When an event goes outbound, the order is 5, 4, 3, 2, 1.  On top of this principle, {@link ChannelPipeline} skips
* the evaluation of certain handlers to shorten the stack depth:
* <ul>
* <li>3 and 4 don't implement {@link ChannelInboundHandler}, and therefore the actual evaluation order of an inbound
*     event will be: 1, 2, and 5.</li>
* <li>1 and 2 don't implement {@link ChannelOutboundHandler}, and therefore the actual evaluation order of a
*     outbound event will be: 5, 4, and 3.</li>
* <li>If 5 implements both {@link ChannelInboundHandler} and {@link ChannelOutboundHandler}, the evaluation order of
*     an inbound and a outbound event could be 125 and 543 respectively.</li>
* </ul>
*


就上面的例子说下吧。

p.addLast("1", new InboundHandlerA());
1. p.addLast("2", new InboundHandlerB());
2. p.addLast("3", new OutboundHandlerA());
3. p.addLast("4", new OutboundHandlerB());
4. p.addLast("5", new InboundOutboundHandlerX());


加到ChannelPipeline里 那么链表的结构就是

1->2->3->4->5
5->4->3->2->1


这里添加了5个handler 1 2 5都实现了ChannelInboundHandler接口,所以数据在网络层进入业务逻辑层的时候,是从链表的头开始进入,一直走到链表尾,每次查找实现ChannelInboundHandler的handler,数据从逻辑层写入到底层发送的过程 就是查找实现ChannelOutboundHandler的handler.所以经过的顺序是5 4 3

从上面到例子知道了具体的作用了就可以看下netty里具体的用法了。先从创建pipeline开始吧。

protected AbstractChannel(Channel parent, ChannelId id) {
this.parent = parent;
this.id = id;
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}


在构造方法里创建了DefaultChannelPipeline对象

protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise =  new VoidChannelPromise(channel, true);

tail = new TailContext(this);
head = new HeadContext(this);

head.next = tail;
tail.prev = head;
}


在创建自己的同时创建了handlercontext头和尾,这里就是保存handler引用的地方。一般都是调用pipeline的addLast方法,

@Override
public final ChannelPipeline addLast(String name, ChannelHandler handler) {
return addLast(null, name, handler);
}

@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);

newCtx = newContext(group, filterName(name, handler), handler);

addLast0(newCtx);

// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}

EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}


可以看到这里传进去的EventExecutorGroup值为null ,每新加一个handler,就创建一个DefaultChannelHandlerContext,把链表的引用修改下

private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}


剩余的逻辑是针对DefaultChannelHandlerContext添加完成希望能调用下handlerAdded方法通知handler,context添加完成。

4. 其他的方法基本可以忽略 ,现在就从fireChannelRead查看数据如何流动的。在AbstractNioByteChannel类read() 方法里 调用pipeline.fireChannelRead(byteBuf);这个方法

@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}


这里将头节点传入到了AbstractChannelHandlerContext里的invokeChannelRead。

private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}


这段代码里

private boolean invokeHandler() {
// Store in local variable to reduce volatile reads.
int handlerState = this.handlerState;
return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
}


这个方法为啥能这么肯定当前是ChannelInboundHandler类型呢?这个是个疑问。如果判断走不通 就会走下面一个方法fireChannelRegistered ,里面就会搜索是inbound的handler 就这样一直调用handler

5. 看完代码基本知道了pipeline的作用和原理 ,但是还是有些疑问需要探索才知道。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息