您的位置:首页 > 其它

Netty 快速入门系列 - Chapter 7 数据包协议【第十九讲】解决方案-粘包分包原理

2018-03-26 11:33 579 查看


1. 消息如何在管道中流转




当前的一个handler如何往下面的一个handler传递一个对象
一个管道中会有多个handler,其中handler往下传递对象的方法是sendUpstream(event)


源码讲解:

NioWorker read 方法用于 读取Nio byte数据

  final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
  buffer.setBytes(0, bb);
  buffer.writerIndex(readBytes);

  // Fire the event. 第一个upstream 介绍到数据是 [b]ChannelBuffer 
  Channels.fireMessageReceived(channel, buffer);
[/b]public class NioWorker extends AbstractNioWorker

protected boolean read(SelectionKey k) {
final SocketChannel ch = (SocketChannel) k.channel();
final NioSocketChannel channel = (NioSocketChannel) k.attachment();

final ReceiveBufferSizePredictor predictor =
channel.getConfig().getReceiveBufferSizePredictor();
final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();

int ret = 0;
int readBytes = 0;
boolean failure = true;

ByteBuffer bb = recvBufferPool.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());
try {
while ((ret = ch.read(bb)) > 0) {
readBytes += ret;
if (!bb.hasRemaining()) {
break;
}
}
failure = false;
} catch (ClosedChannelException e) {
// Can happen, and does not need a user attention.
} catch (Throwable t) {
fireExceptionCaught(channel, t);
}

if (readBytes > 0) {
bb.flip();

final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
buffer.setBytes(0, bb);
buffer.writerIndex(readBytes);

// Update the predictor.
predictor.previousReceiveBufferSize(readBytes);

// Fire the event.
Channels.fireMessageReceived(channel, buffer);
}

if (ret < 0 || failure) {
k.cancel(); // Some JDK implementations run into an infinite loop without this.
close(channel, succeededFuture(channel));
return false;
}

return true;
}

Channels:Channel 是 Netty封装的Channel对象, 包含 nio channe 、sink 和  [b]pipeline 等[/b]
[b]Pipeline 是 DefaultChannelPipeline , UpstreamMessageEvent 是 MessageEvent
[/b]public static void fireMessageReceived(Channel channel, Object message, SocketAddress remoteAddress) {
channel.getPipeline().sendUpstream(new UpstreamMessageEvent(channel, message, remoteAddress));

}
DefaultChannelPipeline: Upstream 从 head 开始 到 tailer 

getActualUpstreamContext(this.head); 获取Pipeline中第一个upstream handler



public void sendUpstream(ChannelEvent e) {
DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
if (head == null) {
if (logger.isWarnEnabled()) {
logger.warn(
"The pipeline contains no upstream handlers; discarding: " + e);
}

return;
}

sendUpstream(head, e);
}

void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
try {
((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
} catch (Throwable t) {
notifyHandlerException(e, t);
}
}


ChannelHandlerContext:DefaultChannelHandlerContext  保存next 和 prev 处理节点,是一个双向链表

ChannelHandler--》ChannelUpstreamHandler
((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e); 
[b]--handleUpstream 继续查找下一个netxt 处理upstream ,主要 handleUpstream 传入当前Context,可以find next handler[/b]private final class DefaultChannelHandlerContext implements ChannelHandlerContext {

volatile DefaultChannelHandlerContext next;
volatile DefaultChannelHandlerContext prev;
private final String name;
private final ChannelHandler handler;
private final boolean canHandleUpstream;
private final boolean canHandleDownstream;
private volatile Object attachment;

DefaultChannelHandlerContext(
DefaultChannelHandlerContext prev, DefaultChannelHandlerContext next,
String name, ChannelHandler handler) {

if (name == null) {
throw new NullPointerException("name");
}
if (handler == null) {
throw new NullPointerException("handler");
}
canHandleUpstream = handler instanceof ChannelUpstreamHandler;
canHandleDownstream = handler instanceof ChannelDownstreamHandler;

if (!canHandleUpstream && !canHandleDownstream) {
throw new IllegalArgumentException(
"handler must be either " +
ChannelUpstreamHandler.class.getName() + " or " +
ChannelDownstreamHandler.class.getName() + '.');
}

this.prev = prev;
this.next = next;
this.name = name;
this.handler = handler;
}

public void sendDownstream(ChannelEvent e) {
DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
if (prev == null) {
try {
getSink().eventSunk(DefaultChannelPipeline.this, e);
} catch (Throwable t) {
notifyHandlerException(e, t);
}
} else {
DefaultChannelPipeline.this.sendDownstream(prev, e);
}
}

public void sendUpstream(ChannelEvent e) {
DefaultChannelHandlerContext next = getActualUpstreamContext(this.next);
if (next != null) {
DefaultChannelPipeline.this.sendUpstream(next, e);
}
}


ChannelUpstreamHandler 中 SimpleChannelHandler 通过 ctx.sendUpstream(e);  find next 节点处理handlerpublic class SimpleChannelHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler {

public void handleUpstream(
ChannelHandlerContext ctx, ChannelEvent e) throws Exception {

if (e instanceof MessageEvent) {
messageReceived(ctx, (MessageEvent) e);
..........
.........
}

public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) throws Exception {
ctx.sendUpstream(e);
}
所有源码下载 :https://download.csdn.net/download/netcobol/10308871
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Netty
相关文章推荐