您的位置:首页 > 运维架构

EventLoop与EventLoopGroup

2017-05-22 19:57 155 查看
转载自:http://www.cnblogs.com/wade-luffy/p/6226026.html

              http://blog.csdn.net/bdmh/article/details/49945765
              https://segmentfault.com/a/1190000007403873

线程模型

       在正入主题之前,先说下Netty的线程模型。

       说到线程模型,一般首先会想到的是经典的Reactor线程模型,尽管不同的NIO框架对于Reactor模式的实现存在差异,但本质上还是遵循了Reactor的基础线程模型。

Reactor单线程模型

       Reactor单线程模型,是指所有的I/O操作都在同一个NIO线程上面完成。NIO线程的职责如下:

       1. 作为NIO服务端,接收客户端的TCP连接;

       2. 作为NIO客户端,向服务端发起TCP连接;

       3. 读取通信对端的请求或者应答消息;

       4. 向通信对端发送消息请求或者应答消息。

     


       由于Reactor模式使用的是异步非阻塞I/O,所有的I/O操作都不会导致阻塞,理论上一个线程可以独立处理所有I/O相关的操作。从架构层面看,一个NIO线程确实可以完成其承担的职责。例如,通过Acceptor类接收客户端的TCP连接请求消息,当链路建立成功之后,通过Dispatch将对应的ByteBuffer派发到指定的Handler上,进行消息解码。用户线程消息编码后通过NIO线程将消息发送给客户端。

       在一些小容量应用场景下,可以使用单线程模型。但是这对于高负载、大并发的应用场景却不合适,主要原因如下:

       1. 一个NIO线程同时处理成百上千的链路,性能上无法支撑,即便NIO线程的CPU负荷达到100%,也无法满足海量消息的编码、解码、读取和发送。

       2. 当NIO线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重了NIO线程的负载,最终会导致大量消息积压和处理超时,成为系统的性能瓶颈。

       3. 可靠性问题:一旦NIO线程意外跑飞,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障。

Reactor多线程模型

       Rector多线程模型与单线程模型最大的区别就是有一组NIO线程来处理I/O操作。

       Reactor多线程模型的特点如下:

       1. 有专门一个NIO线程——Acceptor线程用于监听服务端,接收客户端的TCP连接请求。

       2. 网络I/O操作——读、写等由一个NIO线程池负责,线程池可以采用标准的JDK线程池实现,它包含一个任务队列和N个可用的线程,由这些NIO线程负责消息的读取、解码、编码和发送。

       3. 一个NIO线程可以同时处理N条链路,但是一个链路只对应一个NIO线程,防止发生并发操作问题。

     


       在绝大多数场景下,Reactor多线程模型可以满足性能需求。但是,在个别特殊场景中,一个NIO线程负责监听和处理所有的客户端连接可能会存在性能问题。例如并发百万客户端连接,或者服务端需要对客户端握手进行安全认证,但是认证本身非常损耗性能。在这类场景下,单独一个Acceptor线程可能会存在性能不足的问题,为了解决性能问题,产生了第三种Reactor线程模型——主从Reactor多线程模型。

主从Reactor多线程模型

       主从Reactor线程模型的特点是:服务端用于接收客户端连接的不再是一个单独的NIO线程,而是一个独立的NIO线程池。

       Acceptor接收到客户端TCP连接请求并处理完成后(可能包含接入认证等),将新创建的SocketChannel注册到I/O线程池(sub reactor线程池)的某个I/O线程上,由它负责SocketChannel的读写和编解码工作。Acceptor线程池仅仅用于客户端的登录、握手和安全认证,一旦链路建立成功,就将链路注册到后端subReactor线程池的I/O线程上,由I/O线程负责后续的I/O操作。

       利用主从NIO线程模型,可以解决一个服务端监听线程无法有效处理所有客户端连接的性能不足问题。因此,在Netty的官方demo中,推荐使用该线程模型。

       


Netty的线程模型

       Netty的线程模型并不是一成不变的,它实际取决于用户的启动参数配置。通过设置不同的启动参数,Netty可以同时支持Reactor单线程模型、多线程模型和主从Reactor多线层模型。

       下面是Netty服务端启动代码:
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer() {
@Override
public void initChannel(Channel ch)throws IOException{
ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4));
ch.pipeline().addLast(new NettyMessageEncoder());
ch.pipeline().addLast("readTimeoutHandler",new ReadTimeoutHandler(50));
ch.pipeline().addLast(new LoginAuthRespHandler());
ch.pipeline().addLast("HeartBeatHandler",new HeartBeatRespHandler());
}
});

// 绑定端口,同步等待成功
b.bind(NettyConstant.REMOTEIP, NettyConstant.PORT).sync();
       服务端启动的时候,创建了两个NioEventLoopGroup,它们实际是两个独立的Reactor线程池。一个用于接收客户端的TCP连接,另一个用于处理I/O相关的读写操作,或者执行系统Task、定时任务Task等。

       Netty用于接收客户端请求的线程池职责如下:

          1)接收客户端TCP连接,初始化Channel参数;

          2)将链路状态变更事件通知给ChannelPipeline。

       Netty处理I/O操作的Reactor线程池职责如下。

          1)异步读取通信对端的数据报,发送读事件到ChannelPipeline;

          2)异步发送消息到通信对端,调用ChannelPipeline的消息发送接口;

          3)执行系统调用Task;

          4)执行定时任务Task,例如链路空闲状态监测定时任务。

       通过调整线程池的线程个数、是否共享线程池等方式,Netty的Reactor线程模型可以在单线程、多线程和主从多线程间切换,这种灵活的配置方式可以最大程度地满足不同用户的个性化定制。

Netty的多线程编程最佳实践如下:

       1)创建两个NioEventLoopGroup,用于逻辑隔离NIO Acceptor和NIO I/O线程。

       2)尽量不要在ChannelHandler中启动用户线程(解码后用于将POJO消息派发到后端业务线程的除外)。

       3)解码要放在NIO线程调用的解码Handler中进行,不要切换到用户线程中完成消息的解码。

       4)如果业务逻辑操作非常简单,没有复杂的业务逻辑计算,没有可能会导致线程被阻塞的磁盘操作、数据库操作、网路操作等,可以直接在NIO线程上完成业务逻辑编排,不需要切换到用户线程。

       5)如果业务逻辑处理复杂,不要在NIO线程上完成,建议将解码后的POJO消息封装成Task,派发到业务线程池中由业务线程执行,以保证NIO线程尽快被释放,处理其他的I/O操作。

NioEventLoopGroup

       Netty内部都是通过线程在处理各种数据,EventLoopGroup就是用来管理调度他们的,注册Channel,管理他们的生命周期,下面就来看看EventLoopGroup是怎样工作的(基于4.1.11.Final源码)。
public class NioEventLoopGroup extends MultithreadEventLoopGroup {

/**
* Create a new instance using the default number of threads, the default {@link ThreadFactory} and
* the {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
*/
public NioEventLoopGroup() {
this(0);
}

/**
* Create a new instance using the specified number of threads, {@link ThreadFactory} and the
* {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
*/
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}

/**
* Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the
* {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
*/
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, SelectorProvider.provider());
}

public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}

……

public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

……
       NioEventLoopGroup会调用其父类MultithreadEventLoopGroup的构造函数:
/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
       这里,内部线程数DEFAULT_EVENT_LOOP_THREADS的大小是:
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
       MultithreadEventLoopGroup的父类是MultithreadEventExecutorGroup:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}

if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}

children = new EventExecutor[nThreads];

for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}

for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}

chooser = chooserFactory.newChooser(children);

final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};

for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}

Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
       children 是EventExecutor数组对象,其大小是 nThreads, 这样就构成了一个线程池,里面存放的是通过NioEventLoopGroup的newChild方法生成的NioEventLoop对象:
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
       至此,Group和内部的Loop对象以及Executor就创建完毕。

NioEventLoop

       NioEventLoop继承于SingleThreadEventLoop, 而SingleThreadEventLoop又继承于SingleThreadEventExecutor,SingleThreadEventExecutor继承于AbstractScheduledEventExecutor。SingleThreadEventExecutor是Netty中对本地线程的抽象, 它内部有一个Thread thread属性, 存储了一个本地Java线程。因此可以认为,
一个NioEventLoop其实和一个特定的线程绑定, 并且在其生命周期内, 绑定的线程都不会再改变。

       在AbstractScheduledEventExecutor中,Netty 实现了NioEventLoop的 schedule功能, 即可以通过调用一个NioEventLoop 实例的schedule方法来运行一些定时任务. 而在SingleThreadEventLoop中, 又实现了任务队列的功能, 通过它, 可以调用一个NioEventLoop实例的execute方法来向任务队列中添加一个 task,并由 NioEventLoop 进行调度执行。

       通常来说, NioEventLoop 肩负着两种任务, 第一个是作为IO线程, 执行与 Channel 相关的IO操作, 包括调用 select 等待就绪的IO事件、读写数据与数据的处理等; 而第二个任务是作为任务队列, 执行taskQueue中的任务, 例如用户调用eventLoop.schedule提交的定时任务也是这个线程执行的。

       下面是它的具体的继承关系图:



       它实现了EventLoop接口、EventExecutorGroup接口和ScheduledExecutorService接口,正是因为这种设计,导致NioEventLoop和其父类功能实现非常复杂。

       作为NIO框架的Reactor线程,NioEventLoop需要处理网络I/O读写事件,因此它必须聚合一个Selector对象。在NioEventLoop构造时将创建并打开一个新的Selector。Netty对Selector的selectedKeys进行了优化,用户可以通过” io.netty.noKeySetOptimization”开关决定是否启用该优化项。默认不打开selectedKeys优化功能。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: