您的位置:首页 > Web前端 > BootStrap

Netty源码学习-ServerBootstrap启动及事件处理过程

2014-05-07 00:27 871 查看
Netty是采用了Reactor模式的多线程版本,建议先看下面这篇文章了解一下Reactor模式:

/article/3737996.html

Netty的启动及事件处理的流程,基本上是按照上面这篇文章来走的

文章里面提到的操作,每一步都能在Netty里面找到对应的代码

其中Reactor里面的Acceptor就对应Netty的ServerBootstrap.boss;

而Reactor里面的Handler就对应Netty里面各ChannelHandler(在worker里面跑)

由于流程涉及到比较多的类和方法,我提取一下Netty的骨架:

Java代码


ServerBootstrap.bind(localAddress)

|-->newServerSocketChannel & fireChannelOpen (得到ServerSocketChannel[server])

-->

Binder.channelOpen

|-->Channels.bind(that is : sendDownstream of ChannelState.BOUND)

-->

In DefaultChannelPipeline, No downstreamHandler, jump to

NioServerSocketPipelineSink.bind (关键)

|-->1.do the REAL java.net.ServerSocket.bind (server绑定端口)

2.start bossThread in bossExecutor

3.do "accept & dispatch" in a endless loop of bossThread(得到SocketChannel[client])

|--> registerAcceptedChannel, start worker in workerPool

|-->worker.run

|-->processSelectedKeys(selector.selectedKeys())

|--> read & fireMessageReceived (开始调用各Handler)

下面就对照上面的“骨架”,把关键的代码拿出来读一下

其中关键的步骤,我用“===[关键步骤]===”的形式标记出来了

Netty的Server端是从ServerBootstrap.bind方法开始的:

Java代码


public class ServerBootstrap extends Bootstrap {

public Channel bind(final SocketAddress localAddress) {

final BlockingQueue<ChannelFuture> futureQueue =

new LinkedBlockingQueue<ChannelFuture>();

ChannelHandler binder = new Binder(localAddress, futureQueue);

ChannelPipeline bossPipeline = Channels.pipeline();

bossPipeline.addLast("binder", binder);

/*===OPEN===

NioServerSocketChannelFactory.newChannel返回一个NioServerSocketChannel

在NioServerSocketChannel的构造函数里,调用ServerSocketChannel.open()并触发channelOpen事件

这个事件由上面的“binder”来处理并返回Future(非阻塞),详见Binder

最后将Future放入futureQueue,以便在接下来的while循环里面取

*/

Channel channel = getFactory().newChannel(bossPipeline);

// Wait until the future is available.

ChannelFuture future = null;

boolean interrupted = false;

do {

try {

future = futureQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS);

} catch (InterruptedException e) {

interrupted = true;

}

} while (future == null);

//处理中断的一种方式,详见《Java并发编程实践》

if (interrupted) {

Thread.currentThread().interrupt();

}

// Wait for the future.

future.awaitUninterruptibly();

return channel;

}

//主要是处理channelOpen事件

private final class Binder extends SimpleChannelUpstreamHandler {

private final SocketAddress localAddress;

private final BlockingQueue<ChannelFuture> futureQueue;

private final Map<String, Object> childOptions =

new HashMap<String, Object>();

Binder(SocketAddress localAddress, BlockingQueue<ChannelFuture> futureQueue) {

this.localAddress = localAddress;

this.futureQueue = futureQueue;

}

public void channelOpen(

ChannelHandlerContext ctx,

ChannelStateEvent evt) {

try {

//处理各种option,例如keep alive,nodelay等等,省略代码

} finally {

ctx.sendUpstream(evt);

}

/*

重点在这里

这里bind方法只是触发sendDownstream(ChannelState.BOUND)

而此时pipeline里面还没有ChannelDownstreamHandler(只有一个handler:“binder”):

public void sendDownstream(ChannelEvent e) {

DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);

if (tail == null) {

try {

getSink().eventSunk(this, e);

return;

}

}

sendDownstream(tail, e);

}

因此ChannelState.BOUND会去到pipeline里面的sink,在sink里面执行最终的java.net.ServerSocket.bind操作

详见NioServerSocketPipelineSink.bind

*/

boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress));

assert finished;

}

}

}

NioServerSocketPipelineSink:

Java代码


class NioServerSocketPipelineSink extends AbstractNioChannelSink {

private void bind(

NioServerSocketChannel channel, ChannelFuture future,

SocketAddress localAddress) {

try {

//在这里执行真正的===BIND===

channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());

bound = true;

Executor bossExecutor =

((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;

//java.net.ServerSocket.bind完成,接下来可以accept了,详见Boss类的run方法

//===BOSS start===,放入线程池里跑(bossExecutor)

DeadLockProofWorker.start(bossExecutor,

new ThreadRenamingRunnable(new Boss(channel),

"New I/O server boss #" + id + " (" + channel + ')'));

bossStarted = true;

}

}

private final class Boss implements Runnable {

private final Selector selector;

private final NioServerSocketChannel channel;

/*

===REGISTER[server]===

注意到每新建一个Boss,就会新建一个selector

*/

Boss(NioServerSocketChannel channel) throws IOException {

this.channel = channel;

selector = Selector.open();

channel.socket.register(selector, SelectionKey.OP_ACCEPT);

registered = true;

channel.selector = selector;

}

/*

===ACCEPT&DISPATCH===

boss不断地接受Client的连接并将连接成功的SocketChannel交由worker处理

*/

public void run() {

for (;;) {

SocketChannel acceptedSocket = channel.socket.accept();

if (acceptedSocket == null) {

break;

}

//把acceptedSocket交由worker处理

registerAcceptedChannel(acceptedSocket, currentThread);

}

}

/*

这里面的worker(implements Runnable)就相当于“Reactor Pattern”里面“Handler”

handler需要两方面信息:selector和acceptedSocket,其中后者已经传递过来了,而selector则

在worker.register里新创建一个

*/

private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {

ChannelPipeline pipeline = channel.getConfig().getPipelineFactory().getPipeline();

//从WorkerPool里面取:workers[Math.abs(workerIndex.getAndIncrement() % workers.length)]

//可见worker是re-used的

NioWorker worker = nextWorker();

/*

值得注意的是new NioAcceptedSocketChannel(...)包含了一个关键操作:

将pipeline与channel关联起来,一对一;见AbstractChannel类:

protected AbstractChannel(

Channel parent, ChannelFactory factory,

ChannelPipeline pipeline, ChannelSink sink) {

this.parent = parent;

this.factory = factory;

this.pipeline = pipeline;

id = allocateId(this);

pipeline.attach(this, sink);

}

*/

worker.register(new NioAcceptedSocketChannel(

channel.getFactory(), pipeline, channel,

NioServerSocketPipelineSink.this, acceptedSocket,

worker, currentThread), null);

}

}

}

worker.register,主要工作是创建registerTask(implements Runnable)并放入registerTaskQueue

对应的类是NioWorker 和AbstractNioWorker:

Java代码


void register(AbstractNioChannel<?> channel, ChannelFuture future) {

//只是创建Runnable,未启动。在worker的run方法中,processRegisterTaskQueue时候才执行

Runnable registerTask = createRegisterTask(channel, future);

//在start()里面启动worker线程

Selector selector = start();

boolean offered = registerTaskQueue.offer(registerTask);

assert offered;

if (wakenUp.compareAndSet(false, true)) {

selector.wakeup();

}

}

private Selector start() {

synchronized (startStopLock) {

if (!started) {

selector = Selector.open();

//===WORKER start===

DeadLockProofWorker.start(executor, new ThreadRenamingRunnable(this, "New I/O worker #" + id));

}

}

return selector;

}

private final class RegisterTask implements Runnable {

private final NioSocketChannel channel;

private final ChannelFuture future;

private final boolean server;

public void run() {

try {

synchronized (channel.interestOpsLock) {

//===REGISTER[client]=== 初始的state(getRawInterestOps)是OP_READ

channel.channel.register(selector, channel.getRawInterestOps(), channel);

}

fireChannelConnected(channel, remoteAddress);

}

}

}

worker线程的run操作:

Java代码


public void run() {

for (;;) {

//===SELECT===

SelectorUtil.select(selector);

processRegisterTaskQueue();

processEventQueue();

processWriteTaskQueue();

//在这里面,就会遍历selectedKey并调用相关联的handler

processSelectedKeys(selector.selectedKeys());

}

}

private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException {

for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {

SelectionKey k = i.next();

i.remove();

int readyOps = k.readyOps();

//下面的“与”操作等价于k.isReadable

if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {

//执行读操作

if (!read(k)) {

continue;

}

}

//执行写操作

if ((readyOps & SelectionKey.OP_WRITE) != 0) {

writeFromSelectorLoop(k);

}

}

}

/*

主要是两个操作:

1.从channel里面读取数据

2.读取完成后,fireMessageReceived,从channel(k.attachment)

可以得到与它关联的pipeline,从而触发pipeline里面的handler

*/

protected boolean read(SelectionKey k) {

/*

变量“ch”的类型是java.nio.channels.SocketChannel,是“the channel for which this key was created”

变量“channel”的类型是org.jboss.netty.channel.socket.nio.NioSocketChannel,是“the attachment for which this key was created”

因此,“ch”的作用就是读数据,而“channel”的作用则是取得pipeline后开始处理数据

但,“channel”似乎是“包含”了“ch”?

我们知道,org.jboss.netty.channel.socket.nio.NioSocketChannel是对java.nio.channels.SocketChannel的封装,

正如org.jboss.netty.channel.socket.nio.NioServerSocketChannel是对java.nio.channels.ServerSocketChannel的封装

而查看RegisterTask的run方法,register并返回SelectionKey:

channel.channel.register(

selector, channel.getRawInterestOps(), channel);

变量的命名让人糊涂,翻译一下:

acceptedNioSocketChannel.channel.register(selector, ops, acceptedNioSocketChannel)

注意到acceptedNioSocketChannel.channel的真实类型,其实就是java.nio.channels.SocketChannel,

它就是下面代码里的“acceptedSocket”:

worker.register(new NioAcceptedSocketChannel(

channel.getFactory(), pipeline, channel,

NioServerSocketPipelineSink.this, acceptedSocket,

worker, currentThread), null);

因此,是不是可以认为“channel”与“ch”的关系是:

ch = channel.channel

*/

final SocketChannel ch = (SocketChannel) k.channel();

final NioSocketChannel channel = (NioSocketChannel) k.attachment();

//会根据这一次接收到的数据的大小,来预测下一次接收数据的大小

//并以此为依据来决定ByteBuffer的大小

final ReceiveBufferSizePredictor predictor =

channel.getConfig().getReceiveBufferSizePredictor();

final int predictedRecvBufSize = predictor.nextReceiveBufferSize();

int ret = 0;

int readBytes = 0;

boolean failure = true;

ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize);

try {

while ((ret = ch.read(bb)) > 0) {

readBytes += ret;

if (!bb.hasRemaining()) {

break;

}

}

failure = false;

}

if (readBytes > 0) {

bb.flip();

final ChannelBufferFactory bufferFactory =

channel.getConfig().getBufferFactory();

final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);

buffer.setBytes(0, bb);

buffer.writerIndex(readBytes);

recvBufferPool.release(bb);

// Update the predictor.

predictor.previousReceiveBufferSize(readBytes);

// Fire the event.

fireMessageReceived(channel, buffer);

}

return true;

}
http://bylijinnan.iteye.com/blog/1992345
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: