java游戏服务器之网络层Netty 之EventLoop
2017-04-27 00:17
441 查看
java游戏服务器网络层越来越流行netty,毕竟版本更新快,支持新的特性,更多的功能支持。相比下来mina慢了好多。现在就开始分析下对应的代码吧。
估计这段时间要花的时间有点长.我看的最新版本是4.1.11
游戏服务器架构设计中经常面临多线程设计的问题。因为单线程处理逻辑对于cpu的要求较高而且对于程序的要求理解也要高。如果做好底层耗时监控貌似是可以的,但是经常面临业务逻辑中需要网络请求IO(数据库操作IO,访问些http请求)
都需要通过其他线程来完成,代码逻辑很难直观,不好维护。所以技术上提高之后还是要回到设计多线程架构上来的。那么Netty是如何处理这种问题的呢?
那么就要看下 EventLoop线程模型:每一个新建立的tcp连接,都会注册到到EventLoop中。说到EventLoop那么就要从初始化创建的时候开始。在
ServerBootstrap构造方法中public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) 传递两个EventLoopGroup参数,parentGroup主要用于服务自身的ServerSocketChannel 读取到新的客户端连接SocketChannel,然后通过
把SocketChannel传递给childGroup分配处理,如下面的代码
所以childGroup是处理socketChannel的消息读取。因为ServerSocketChannel通常只有一个,所以parentGroup值我觉得是可以设置为1的,因为默认是
貌似这样的初始化线程数经常代码在变化,之前看到的代码是处理器核数量乘以2再加1(没有去看之前很早版本的代码求证)。哈哈
代码一直再变,看来还是要经常再看看。对于childGroup来说,因为socketChannel的数量可能在几千或者几万量级所以是有必要设置多个线程的。初始化EventLoopGroup就是初始化一个线程数组。每一个
EventLoop和一个线程绑定。那么socketChannel是如何选择被绑定到哪一个EventLoop中去的呢?
在这段代码中可以看到针对传进来的线程数做了判断处理,如果是2的次方的线程数, 那么就是PowerOfTwoEventExecutorChooser对象否则就是GenericEventExecutorChooser对象, 对于线程的获取的获取没看出来有多大区别,我觉得应该是节省点cpu耗时吧。
然后业务逻辑处理是在EventLoop处理的,因为一个EventLoop是和一个线程绑定的,所以可以看下SingleThreadEventExecutor这个类。
takeTask方法,代码如下
在这个方法中可以看到peekScheduledTask()方法先被调用,里面是一个PriorityQueue
ScheduledFutureTask优先级队列,存放着定时任务。优先处理定时任务,如果有定时任务就先返回定时任务,否则就处理BlockingQueue队列里的任务。
runAllTasks方法是把定时任务添加到BlockingQueue队列中。 runAllTasks(long timeoutNanos)则是在这个时间段内处理任务,防止一直在处理任务的话,就没办法执行定时任务逻辑了。
然后再看看SingleThreadEventLoop这个类
继承的是SingleThreadEventExecutor这个类,也就是上面分析到的那个类。有好多个子类实现。主要看下NioEventLoop
和 EpollEventLoop这两个类。
NioEventLoop循环调用呢Selector中的注册的SelectionKey获取每个连接的数据。然后把获取到的字节数据传送给ChannelPipeline处理。
EpollEventLoop通过epollWait来过去数据事件,循环EpollEventArray的事件处理每个连接的数据。也是传给ChannelPipeline处理。
然后再回到之前的那个问题,netty是如何处理多线程的问题的?
每一个新建立的socket连接 通过eventLoopGroup取余分配到对应的eventLoop,一个eventLoop循环多个连接获取消息。这样保证每个连接的消息是单线程的。这次主要是分析eventLoopGroup,下一篇要分析下ChannelPipeline。
估计这段时间要花的时间有点长.我看的最新版本是4.1.11
游戏服务器架构设计中经常面临多线程设计的问题。因为单线程处理逻辑对于cpu的要求较高而且对于程序的要求理解也要高。如果做好底层耗时监控貌似是可以的,但是经常面临业务逻辑中需要网络请求IO(数据库操作IO,访问些http请求)
都需要通过其他线程来完成,代码逻辑很难直观,不好维护。所以技术上提高之后还是要回到设计多线程架构上来的。那么Netty是如何处理这种问题的呢?
那么就要看下 EventLoop线程模型:每一个新建立的tcp连接,都会注册到到EventLoop中。说到EventLoop那么就要从初始化创建的时候开始。在
ServerBootstrap构造方法中public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) 传递两个EventLoopGroup参数,parentGroup主要用于服务自身的ServerSocketChannel 读取到新的客户端连接SocketChannel,然后通过
assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception != null) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); }
把SocketChannel传递给childGroup分配处理,如下面的代码
@Override void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
所以childGroup是处理socketChannel的消息读取。因为ServerSocketChannel通常只有一个,所以parentGroup值我觉得是可以设置为1的,因为默认是
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
貌似这样的初始化线程数经常代码在变化,之前看到的代码是处理器核数量乘以2再加1(没有去看之前很早版本的代码求证)。哈哈
代码一直再变,看来还是要经常再看看。对于childGroup来说,因为socketChannel的数量可能在几千或者几万量级所以是有必要设置多个线程的。初始化EventLoopGroup就是初始化一个线程数组。每一个
EventLoop和一个线程绑定。那么socketChannel是如何选择被绑定到哪一个EventLoop中去的呢?
public EventExecutorChooser newChooser(EventExecutor[] executors) { if (isPowerOfTwo(executors.length)) { return new PowerOfTwoEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } } private static boolean isPowerOfTwo(int val) { return (val & -val) == val; } private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; PowerOfTwoEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { return executors[idx.getAndIncrement() & executors.length - 1]; } } private static final class GenericEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; GenericEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { return executors[Math.abs(idx.getAndIncrement() % executors.length)]; } }
在这段代码中可以看到针对传进来的线程数做了判断处理,如果是2的次方的线程数, 那么就是PowerOfTwoEventExecutorChooser对象否则就是GenericEventExecutorChooser对象, 对于线程的获取的获取没看出来有多大区别,我觉得应该是节省点cpu耗时吧。
然后业务逻辑处理是在EventLoop处理的,因为一个EventLoop是和一个线程绑定的,所以可以看下SingleThreadEventExecutor这个类。
takeTask方法,代码如下
protected Runnable takeTask() { assert inEventLoop(); if (!(taskQueue instanceof BlockingQueue)) { throw new UnsupportedOperationException(); } BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue; for (;;) { ScheduledFutureTask<?> scheduledTask = peekScheduledTask(); if (scheduledTask == null) { Runnable task = null; try { task = taskQueue.take(); if (task == WAKEUP_TASK) { task = null; } } catch (InterruptedException e) { // Ignore } return task; } else { long delayNanos = scheduledTask.delayNanos(); Runnable task = null; if (delayNanos > 0) { try { task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { // Waken up. return null; } } if (task == null) { // We need to fetch the scheduled tasks now as otherwise there may be a chance that // scheduled tasks are never executed if there is always one task in the taskQueue. // This is for example true for the read task of OIO Transport // See https://github.com/netty/netty/issues/1614 fetchFromScheduledTaskQueue(); task = taskQueue.poll(); } if (task != null) { return task; } } } }
在这个方法中可以看到peekScheduledTask()方法先被调用,里面是一个PriorityQueue
ScheduledFutureTask优先级队列,存放着定时任务。优先处理定时任务,如果有定时任务就先返回定时任务,否则就处理BlockingQueue队列里的任务。
runAllTasks方法是把定时任务添加到BlockingQueue队列中。 runAllTasks(long timeoutNanos)则是在这个时间段内处理任务,防止一直在处理任务的话,就没办法执行定时任务逻辑了。
然后再看看SingleThreadEventLoop这个类
继承的是SingleThreadEventExecutor这个类,也就是上面分析到的那个类。有好多个子类实现。主要看下NioEventLoop
和 EpollEventLoop这两个类。
NioEventLoop循环调用呢Selector中的注册的SelectionKey获取每个连接的数据。然后把获取到的字节数据传送给ChannelPipeline处理。
EpollEventLoop通过epollWait来过去数据事件,循环EpollEventArray的事件处理每个连接的数据。也是传给ChannelPipeline处理。
然后再回到之前的那个问题,netty是如何处理多线程的问题的?
每一个新建立的socket连接 通过eventLoopGroup取余分配到对应的eventLoop,一个eventLoop循环多个连接获取消息。这样保证每个连接的消息是单线程的。这次主要是分析eventLoopGroup,下一篇要分析下ChannelPipeline。
相关文章推荐
- java游戏服务器之网络层Netty 之ChannelPipeline
- android netty5.0 编译时 java.lang.NoClassDefFoundError: io.netty.channel.nio.NioEventLoopGroup
- Java Netty游戏架构-服务器命令模型实践
- Java游戏服务器-Netty自动重连与会话管理
- eventloop & actor模式 & Java线程模型演进 & Netty线程模型 总结
- Java基础知识强化之网络编程笔记14:TCP之多个客户端上传到一个服务器的思考(多线程改进)
- 探讨游戏服务器压力的三座大山——数据库、网络以及系统资源(3)
- android studio 启动时候报:java.lang.IllegalStateException: failed to create a child event loop 问题解决
- 一种经典的网络游戏服务器架构
- JHServer网络游戏服务器开发系统说明书
- 【Java.NIO】从Jetty、Tomcat和Mina中提炼NIO构架网络服务器的经典模式(一)
- netty EventLoop write() 源码分析(二)
- 转:一种经典的网络游戏服务器架构
- 网络游戏服务器构架设计(一):前言
- 网络游戏服务器构架设计(一):前言
- 在线的棋牌类网络游戏java服务端实现
- Cordova Android 插件开发(网络端(服务器)调用Android插件(java))
- java网络编程(6)——实现一个服务器把小写转大写
- Netty 权威指南笔记(八):EventLoopGroup 和线程模型
- JAVA游戏服务器关闭操作的监听