您的位置:首页 > Web前端 > BootStrap

Netty4启动ServerBootStrap源码分析

2017-07-23 12:41 1526 查看
首记:

来分析下Netty4中的核心NIO模型的启动过程, 如何 bind -> accept -> process -> …. 这些过程,在分析之前,先来熟悉下jdk中原生的NIO模型,

因为Netty中的NIO是基于此上面进行封装的。

一,java nio 模型

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

class NioServer implements Runnable {

private static final int DEFAULT_SEND_BUF_SIZE = 65535;
private static final int DEFAULT_RECV_BUF_SIZE = 65535;
private static final int DEFAULT_READ_TIMEOUT = 30000;
private static final int DEFAULT_BACKLOG = 1024;

private String host;
private int port;

private int sendBufSize;
private int recvBufSize;

//ms
private int readTimeout;

private ServerSocketChannel serverSocketChannel;

private Selector selector;

//server 正在启动
private AtomicBoolean isStartingUp = new AtomicBoolean(false);

//server 运行中
private volatile boolean isRunning = false;

//server 正在关闭
private AtomicBoolean isShutdownIng = new AtomicBoolean(false);

//server 关闭完成
private volatile boolean isShutdownComplete = false;

private InetSocketAddress serverLocalAddress;

public NioServer(int port) {
this(null, port);
}

public NioServer(String host, int port) {
this(host, port, DEFAULT_SEND_BUF_SIZE, DEFAULT_RECV_BUF_SIZE, DEFAULT_READ_TIMEOUT);
}

public NioServer(String host, int port, int sendBufSize, int recvBufSize, int readTimeout) {
this.host = host;
if (port < 0) {
throw new IllegalArgumentException("port must be positive");
}
this.port = port;
this.sendBufSize = sendBufSize > 0 ? sendBufSize : DEFAULT_SEND_BUF_SIZE;
this.recvBufSize = recvBufSize > 0 ? recvBufSize : DEFAULT_RECV_BUF_SIZE;
this.readTimeout = readTimeout > 0 ? readTimeout : DEFAULT_READ_TIMEOUT;

}

private void initServerSocket() throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.socket().setSoTimeout(this.readTimeout);
serverSocketChannel.socket().setReceiveBufferSize(this.recvBufSize);
serverSocketChannel.socket().setReuseAddress(true);

InetSocketAddress address;
if (host != null) {
address = new InetSocketAddress(host, port);
} else {
address = new InetSocketAddress(port);
}

serverSocketChannel.bind(address, DEFAULT_BACKLOG);
this.serverLocalAddress = (InetSocketAddress) serverSocketChannel.getLocalAddress();
System.out.printf("********************  Bind Local Address [%s:%d]  ************************\n",
this.serverLocalAddress.getHostName(), this.serverLocalAddress.getPort());

this.serverSocketChannel = serverSocketChannel;
}

private void initSelector() throws IOException {
this.selector = Selector.open();
}

private void init() throws IOException {
if (!isStartingUp.compareAndSet(false, true)) {
return;
}

if (isRunning) {
return;
}

initServerSocket();
initSelector();

this.serverSocketChannel.configureBlocking(false);
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT );

this.isRunning = true;
this.isStartingUp.compareAndSet(true, false);
}

@Override
public void run() {
try {
init();
} catch (IOException e) {
e.printStackTrace();
}

while (this.selector.isOpen()) {

int select = 0;
try {
select = this.selector.select(500);
} catch (IOException e) {
e.printStackTrace();
}

if (select > 0 && isRunning) {
Set<SelectionKey> selectionKeys = this.selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();

if (!key.isValid()) {
continue;
}

//accept
if (key.isAcceptable()) {
try {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();

socketChannel.socket().setSendBufferSize(this.sendBufSize);
socketChannel.socket().setSoTimeout(this.readTimeout);
socketChannel.socket().setReceiveBufferSize(this.recvBufSize);
socketChannel.socket().setKeepAlive(true);
socketChannel.socket().setReuseAddress(true);
socketChannel.socket().setTcpNoDelay(true);
socketChannel.configureBlocking(false);

InetSocketAddress local = (InetSocketAddress) socketChannel.getLocalAddress();
InetSocketAddress remote = (InetSocketAddress) socketChannel.getRemoteAddress();

System.out.printf("Accept Channel : [%s:%d] -> [%s:%d]\n",
remote.getHostName(), remote.getPort(),
local.getHostName(), local.getPort());

socketChannel.register(this.selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}

if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
if (socketChannel.isOpen()) {
try {
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len = socketChannel.read(buffer);
if (len > 0) {
System.out.println(new String(buffer.array(), 0, len));

String httpResponse = "HTTP/1.1 200 OK\r\n" +
"Content-Length: 38\r\n" +
"Content-Type: text/html\r\n" +
"\r\n" +
"<html><body>Hello World!</body></html>";
socketChannel.write(ByteBuffer.wrap(httpResponse.getBytes()));
} else {
key.interestOps(key.interestOps() &~ SelectionKey.OP_READ);
}
} catch (IOException e) {
e.printStackTrace();
try {
socketChannel.close();
key.cancel();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
}
}
}
}

public void shutdown() {
if (!this.isShutdownIng.compareAndSet(false, true)) {
return;
}
if (this.isShutdownComplete) {
return;
}

isRunning = false;

try {
this.serverSocketChannel.socket().close();
this.serverSocketChannel.close();

this.selector.wakeup();
this.selector.close();
} catch (IOException e) {
e.printStackTrace();
System.out.println("Fail to shutdown server ... ");
throw new RuntimeException(e);
}

this.isShutdownComplete = true;
this.isShutdownIng.compareAndSet(true, false);

System.out.printf("**************** shutdown server[%s:%d] successfully **************\n",
this.serverLocalAddress.getHostName(), this.serverLocalAddress.getPort());
}

}

public class NioDemo {

public static void main(String[] args) throws InterruptedException {

NioServer server = new NioServer(8888);
new Thread(server).start();

Thread.sleep(100000);
server.shutdown();
}
}


上面实现一个简单的Nio socket server, 用浏览器可访问
http://localhost:8888


二,Netty ServerBootstrap 源码分析

先用netty来简单实现上面例子的功能:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

import java.net.InetSocketAddress;

public class NettyServerDemo {

public static void main(String[] args) {

EventLoopGroup accept = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup(4);

ServerBootstrap b = new ServerBootstrap();

b.group(accept, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_RCVBUF, 65535)
.option(ChannelOption.SO_BACKLOG, 1024)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
int len = msg.readableBytes();
byte[] bytes = new byte[len];
msg.getBytes(0, bytes);
System.out.println(new String(bytes));
String httpResponse = "HTTP/1.1 200 OK\r\n" +
"Content-Length: 38\r\n" +
"Content-Type: text/html\r\n" +
"\r\n" +
"<html><body>Hello World!</body></html>";

ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(httpResponse.getBytes("UTF8"));
ctx.writeAndFlush(buffer);
}
});
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_RCVBUF, 65535)
.childOption(ChannelOption.SO_SNDBUF, 65535)
.localAddress(8888);

ChannelFuture channelFuture = b.bind().awaitUninterruptibly();

if (channelFuture.isSuccess()) {
Channel channel = channelFuture.channel();
InetSocketAddress address = (InetSocketAddress) channel.localAddress();
System.out.printf("********* BIND LOCAL ADDRESS [%s:%d] SUCCESSFULLY ********\n",
address.getHostName(), address.getPort());
}

}
}


以上面例子说起, 开始初始化了两个线程组 accept, worker

EventLoopGroup accept = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup(4);


用于 Nio server 的io事件处理线程, 顾名思义,accept 用于ServerSocketChannel accept io事件 ,worker 用于SocketChannel read,write读写io事件,其中线程组的每个线程都对应了唯一一个Selector,用于io事件,也就是线程(其实就是NioEventLoop 与 Selector 一一对应),等会儿可以简单的来看看,因为这不是这篇文章的重点,以后可以专门写篇文章来剖析一下Netty中有关于nio线程对象的封装。

现在来简单的看下
NioEventLoopGroup
的初始化过程, 不能详细展开 ,不然就写不完了。



public NioEventLoopGroup() {
this(0);
}

public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}

public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, SelectorProvider.provider());
}

public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}

public NioEventLoopGroup(
int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}

public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
RejectedExecutionHandlers.reject());
}

public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory,
final RejectedExecutionHandler rejectedExecutionHandler) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
}


这是所有的构造方法, 如果没有指定线程数, 会由MultithreadEventLoopGroup 默认线程数指定

DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));


还可以指定 Executor,SelectorProvider 等等, 不再赘述。

正式进入ServerBootstrap的分析, 继承于抽象父类AbstractBootstrap

b.group(accept, worker)


用于指定acceptor, worker io线程,

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}


调用父类的方法如下:

public B group(EventLoopGroup group) {
if (group == null) {
throw new NullPointerException("group");
}
if (this.group != null) {
throw new IllegalStateException("group set already");
}
this.group = group;
return (B) this;
}


指定Channel类型,
.channel(NioServerSocketChannel.class)


AbstractBootstrap.java

public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}


这个ReflectiveChannelFactory会在后面的初始化Channel的时候用到.

.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_RCVBUF, 65535)
.option(ChannelOption.SO_BACKLOG, 1024)

用于设置ServerSocketChannel属性


.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_RCVBUF, 65535)
.childOption(ChannelOption.SO_SNDBUF, 65535)

用于设置accept之后建立的SocketChannel属性


.handler(new LoggingHandler(LogLevel.INFO))

在accept过程链路中加上的 日志handler 这里涉及到ChannelPipeline,ChannelHandler,ChannelHandlerContext这三者的关系,不是这篇的核心重点, 以后可以专门写篇文章详细剖析一下。


.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
int len = msg.readableBytes();
byte[] bytes = new byte[len];
msg.getBytes(0, bytes);
System.out.println(new String(bytes));
String httpResponse = "HTTP/1.1 200 OK\r\n" +
"Content-Length: 38\r\n" +
"Content-Type: text/html\r\n" +
"\r\n" +
"<html><body>Hello World!</body></html>";

ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(httpResponse.getBytes("UTF8"));
ctx.writeAndFlush(buffer);
}
});
}
})

在SocketChannel对应的Pipeline中添加业务handler,即对于请求的处理以及相应。


以上的都不是重点, 核心的部分来了,主要是解决 Netty 如何处理accept 以及之后的SocketChannel的初始化和读写过程。

ChannelFuture channelFuture = b.bind().awaitUninterruptibly();

bind方法就是 整个Netty启动的入口,来看看它经过了那些步骤:

AbstractBootstrap.java

public ChannelFuture bind() {
validate();
SocketAddress localAddress = this.localAddress;
if (localAddress == null) {
throw new IllegalStateException("localAddress not set");
}
return doBind(localAddress);
}

private ChannelFuture doBind(final SocketAddress localAddress) {
//这行代码就是如何初始化ServerSocketChannel 以及将其注册到accept 线程组 某个线程的Selector上的
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}

if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586 promise.registered();

doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}


1. Netty对于java中ServerSocketChannel的初始化,以及注册到seletor上的过程剖析

现在来重点分析下
final ChannelFuture regFuture = initAndRegister();
这个方法

final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//这行代码就是初始化ServerSocketChannle, 还记得上面channel和channelFactory的设置么? 就是用在这个地方,可以稍微看看
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {

channel.unsafe().closeForcibly();
}

return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}

//注册channel 到accept线程组某个线程对应的selector上
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}

ReflectiveChannelFactory.java 默认就是用的这个

public T newChannel() {
try {
//直接通过无参构造方法反射获得类实例 (NioServerSocketChannel.class)
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}

NioServerSocketChannel.java 的初始化

public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
//获取java中的ServerSocketChannel
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}

/**
* Create a new instance using the given {@link SelectorProvider}.
*/
public NioServerSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}

/**
* Create a new instance using the given {@link ServerSocketChannel}.
*/
public NioServerSocketChannel(ServerSocketChannel channel) {
//调用父类的构造方法 设置interest ops SelectionKey.OP_ACCEPT
super(null, channel, SelectionKey.OP_ACCEPT);
//ServerScoketChannel的配置 等会儿会将上面的option的参数 设置进入javaChannel().socket()(也就是上面newSocket获得的)
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

AbstractNioChannel.java  设置interest ops SelectionKey.OP_ACCEPT 并且设置ch.configureBlocking(false);
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}

throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}

server register to selector 过程
SingleThreadEventLoop.java
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
//unsafe在channel初始化也就是new的时候就会构造一个对应的,代码在其父类AbstractChannel中 可以稍微看下
promise.channel().unsafe().register(this, promise);
return promise;
}

unsafe 和pipeline 初始化,有兴趣的可以自己看下
protected AbstractChannel(Channel parent, ChannelId id) {
this.parent = parent;
this.id = id;
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}

register 注册过程最终是在unsafe中完成的  看看
@Override
protected AbstractNioUnsafe newUnsafe() {
//NioServerSocketChannel 的unsafe是NioMessageUnsafe
return new NioMessageUnsafe();
}

第一步 AbstractUnsafe.java
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}

//将accept中某个eventLoop绑定到该NioServerSocketChannel上,意味着 一个channel 只能对应一个线程, 而一个线程可以绑定多个channel, 这也就是一个pipeline中的handler只能由某一个线程来处理,也就是不存在多线程竞争的情况,Netty特性之一,体现在这。
AbstractChannel.this.eventLoop = eventLoop;

//这里判断就是为了应对上面的特性而设的 只能由channel绑定的线程来处理
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}

private void register0(ChannelPromise promise) {
try {

if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
//register
doRegister();
neverRegistered = false;
registered = true;

pipeline.invokeHandlerAddedIfNeeded();

safeSetSuccess(promise);
pipeline.fireChannelRegistered();

if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {

beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}

AbstractNioChannel.java
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
//终于看到了java中熟悉的代码了 至此 server register accept to selector 过程全部结束 对与accept后 的SocketChannel的注册读写事件过程也是大同小异
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {

eventLoop().selectNow();
selected = true;
} else {

throw e;
}
}
}
}


重新回到initAndRegister()这个过程 ,完了之后,会调用抽象init()方法,有两种实现 ServerBootstrap(服务端) 和Bootstrap (客户端),这里主要是看服务端的过程:

void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
//这里的set option 就是上面说的对于ServerSocketChannel属性的设置,有兴趣的可以自己看,比较简单,就不展开了
setChannelOptions(channel, options, logger);
}

final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}

ChannelPipeline p = channel.pipeline();

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}

//这里很重要 会处理accept 进来的SocketChannel 通过 ServerBootstrapAcceptor这个hanadler
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

先来看看 server处理SocketChannel的过程 ,等会儿再来看是如何进行accept过程并把获得的SocketChannel发送到该ServerBootstrapAcceptor  handler中进行处理的

public void channelRead(ChannelHandlerContext ctx, Object msg) {
//接受到的Channel其实是NioSocketChannel 里面封装有java nio中的SocketChannel accept之后获得的
final Channel child = (Channel) msg;

//为NioSocketChannel设置netty启动时设置的childHanlder
child.pipeline().addLast(childHandler);

//为SocketChannel设置child option 属性
setChannelOptions(child, childOptions, logger);

for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}

try {
//向worker 线程组的某个线程的selector register注册io事件, 和上面注册accept io事件大同小异, 不再赘述
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}

接下来看看accept这个过程,因为上面ServerSocketChannel已经注册了accept事件,一旦有客户端accept就绪,就会触发此事件, 可以简单的来看看NioEventLoop中对于selector io 事件的处理过程, 不详细了

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
//这里就是上面的NioServerSocketChannel 的NioMessageUnsafe
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}

unsafe.close(unsafe.voidPromise());
return;
}

try {
int readyOps = k.readyOps();

if ((readyOps & SelectionKey.OP_CONNECT) != 0) {

int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);

unsafe.finishConnect();
}

if ((readyOps & SelectionKey.OP_WRITE) != 0) {

ch.unsafe().forceFlush();
}

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//当有客户端accept事件就绪的时候,
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}

//unsafe.read(); accept处理过程
private final class NioMessageUnsafe extends AbstractNioUnsafe {

private final List<Object> readBuf = new ArrayList<Object>();

@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
//RecvByteBufAllocator 读取输入数据存入的ByteBuf分配器
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);

boolean closed = false;
Throwable exception = null;
try {
try {
do {
//accept 入口 看doReadMessages NioServerSocketChannel的实现
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}

allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}

int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();

if (exception != null) {
closed = closeOnReadError(exception);

pipeline.fireExceptionCaught(exception);
}

if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {

if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}

protected int doReadMessages(List<Object> buf) throws Exception {
//accept 在此 这里很简单 方法里面主逻辑就是serverSocketChannel.accept()
SocketChannel ch = SocketUtils.accept(javaChannel());

try {
if (ch != null) {
//把SocketChannel 封装进入NioSocketChannel 在通过上面的read方法 int size = readBuf.size();
//for (int i = 0; i < size; i ++) {
//readPending = false;
//调用下一个inbound的handler,直到到达ServerBootstrapAcceptor这个handler 中 调用channelRead方法 于是就完成了整个accept过程,还是有点小复杂的啊
//pipeline.fireChannelRead(readBuf.get(i));
//}
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);

try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}

return 0;
}


到此就差不多结束,简单的过了一下,篇幅不够 ,要想详细了解其过程,还得看源码。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  netty nio java 源码
相关文章推荐