您的位置:首页 > 理论基础 > 计算机网络

java游戏服务器之网络层Netty 之EventLoop

2017-04-27 00:17 441 查看
java游戏服务器网络层越来越流行netty,毕竟版本更新快,支持新的特性,更多的功能支持。相比下来mina慢了好多。现在就开始分析下对应的代码吧。

估计这段时间要花的时间有点长.我看的最新版本是4.1.11

游戏服务器架构设计中经常面临多线程设计的问题。因为单线程处理逻辑对于cpu的要求较高而且对于程序的要求理解也要高。如果做好底层耗时监控貌似是可以的,但是经常面临业务逻辑中需要网络请求IO(数据库操作IO,访问些http请求)

都需要通过其他线程来完成,代码逻辑很难直观,不好维护。所以技术上提高之后还是要回到设计多线程架构上来的。那么Netty是如何处理这种问题的呢?

那么就要看下 EventLoop线程模型:每一个新建立的tcp连接,都会注册到到EventLoop中。说到EventLoop那么就要从初始化创建的时候开始。在

ServerBootstrap构造方法中public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) 传递两个EventLoopGroup参数,parentGroup主要用于服务自身的ServerSocketChannel 读取到新的客户端连接SocketChannel,然后通过

assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);

boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}

allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}

int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();

if (exception != null) {
closed = closeOnReadError(exception);

pipeline.fireExceptionCaught(exception);
}


把SocketChannel传递给childGroup分配处理,如下面的代码

@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}

final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}

ChannelPipeline p = channel.pipeline();

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}

p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}


所以childGroup是处理socketChannel的消息读取。因为ServerSocketChannel通常只有一个,所以parentGroup值我觉得是可以设置为1的,因为默认是

DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));


貌似这样的初始化线程数经常代码在变化,之前看到的代码是处理器核数量乘以2再加1(没有去看之前很早版本的代码求证)。哈哈

代码一直再变,看来还是要经常再看看。对于childGroup来说,因为socketChannel的数量可能在几千或者几万量级所以是有必要设置多个线程的。初始化EventLoopGroup就是初始化一个线程数组。每一个

EventLoop和一个线程绑定。那么socketChannel是如何选择被绑定到哪一个EventLoop中去的呢?

public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}

private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}

private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;

PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}

@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}

private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;

GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}

@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}


在这段代码中可以看到针对传进来的线程数做了判断处理,如果是2的次方的线程数, 那么就是PowerOfTwoEventExecutorChooser对象否则就是GenericEventExecutorChooser对象, 对于线程的获取的获取没看出来有多大区别,我觉得应该是节省点cpu耗时吧。

然后业务逻辑处理是在EventLoop处理的,因为一个EventLoop是和一个线程绑定的,所以可以看下SingleThreadEventExecutor这个类。

takeTask方法,代码如下

protected Runnable takeTask() {
assert inEventLoop();
if (!(taskQueue instanceof BlockingQueue)) {
throw new UnsupportedOperationException();
}

BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
for (;;) {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
Runnable task = null;
try {
task = taskQueue.take();
if (task == WAKEUP_TASK) {
task = null;
}
} catch (InterruptedException e) {
// Ignore
}
return task;
} else {
long delayNanos = scheduledTask.delayNanos();
Runnable task = null;
if (delayNanos > 0) {
try {
task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
// Waken up.
return null;
}
}
if (task == null) {
// We need to fetch the scheduled tasks now as otherwise there may be a chance that
// scheduled tasks are never executed if there is always one task in the taskQueue.
// This is for example true for the read task of OIO Transport
// See https://github.com/netty/netty/issues/1614 fetchFromScheduledTaskQueue();
task = taskQueue.poll();
}

if (task != null) {
return task;
}
}
}
}


在这个方法中可以看到peekScheduledTask()方法先被调用,里面是一个PriorityQueue

ScheduledFutureTask优先级队列,存放着定时任务。优先处理定时任务,如果有定时任务就先返回定时任务,否则就处理BlockingQueue队列里的任务。

runAllTasks方法是把定时任务添加到BlockingQueue队列中。 runAllTasks(long timeoutNanos)则是在这个时间段内处理任务,防止一直在处理任务的话,就没办法执行定时任务逻辑了。

然后再看看SingleThreadEventLoop这个类

继承的是SingleThreadEventExecutor这个类,也就是上面分析到的那个类。有好多个子类实现。主要看下NioEventLoop

和 EpollEventLoop这两个类。

NioEventLoop循环调用呢Selector中的注册的SelectionKey获取每个连接的数据。然后把获取到的字节数据传送给ChannelPipeline处理。

EpollEventLoop通过epollWait来过去数据事件,循环EpollEventArray的事件处理每个连接的数据。也是传给ChannelPipeline处理。

然后再回到之前的那个问题,netty是如何处理多线程的问题的?

每一个新建立的socket连接 通过eventLoopGroup取余分配到对应的eventLoop,一个eventLoop循环多个连接获取消息。这样保证每个连接的消息是单线程的。这次主要是分析eventLoopGroup,下一篇要分析下ChannelPipeline。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java 服务器 游戏 网络