您的位置:首页 > 运维架构

Netty 2 线程模型和EventLoop源码

2018-01-01 19:07 267 查看
Netty 2 线程模型和EventLoop源码

Reactor线程模型
Reactor是一种经典的线程模型,Reactor线程模型分为单线程模型、多线程模型以及主从多线程模型。下面分别分析一下各个Reactor线程模型的优缺点。



Reactor单线程模型仅使用一个线程来处理所有的事情,包括客户端的连接和到服务器的连接,以及所有连接产生的读写事件,这种线程模型需要使用异步非阻塞I/O,使得每一个操作都不会发生阻塞,Handler为具体的处理事件的处理器,而Acceptor为连接的接收者,作为服务端接收来自客户端的链接请求。这样的线程模型理论上可以仅仅使用一个线程就完成所有的事件处理,显得线程的利用率非常高,而且因为只有一个线程在工作,所有不会产生在多线程环境下会发生的各种多线程之间的并发问题,架构简单明了,线程模型的简单性决定了线程管理工作的简单性。



线程模型下,接收链接和处理请求作为两部分分离了,而Acceptor使用单独的线程来接收请求,做好准备后就交给事件处理的handler来处理,而handler使用了一个线程池来实现,这个线程池可以使用Executor框架实现的线程池来实现,所以,一个连接会交给一个handler线程来复杂其上面的所有事件,需要注意,一个连接只会由一个线程来处理,而多个连接可能会由一个handler线程来处理,关键在于一个连接上的所有事件都只会由一个线程来处理,这样的好处就是消除了不必要的并发同步的麻烦。Reactor多线程模型似乎已经可以很好的工作在我们的项目中了,但是还有一个问题没有解决,那就是,多线程模型下任然只有一个线程来处理客户端的连接请求,那如果这个线程挂了,那整个系统任然会变为不可用,而且,因为仅仅由一个线程来负责客户端的连接请求,如果连接之后要做一些验证之类复杂耗时操作再提交给handler线程来处理的话,就会出现性能问题。



主从Reactor线程模型的特点是:服务端用于接收客户端连接的不再是个1个单独的NIO线程,而是一个独立的NIO线程池。Acceptor接收到客户端TCP连接请求处理完成后(可能包含接入认证等),将新创建的SocketChannel注册到IO线程池(sub reactor线程池)的某个IO线程上,由它负责SocketChannel的读写和编解码工作。Acceptor线程池仅仅只用于客户端的登陆、握手和安全认证,一旦链路建立成功,就将链路注册到后端subReactor线程池的IO线程上,由IO线程负责后续的IO操作。

上述引在网络资源

Netty服务端搭建与介绍

public void start(int port) throws Exception {
     EventLoopGroup bossGroup = new NioEventLoopGroup(1);
     EventLoopGroup workerGroup = new NioEventLoopGroup(5);
     try {
           ServerBootstrap b = new ServerBootstrap();
           b.group(bossGroup, workerGroup)
           .channel(NioServerSocketChannel.class)//
(3)
           .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {                       
                        ch.pipeline().addLast(new HttpResponseEncoder());//server端发送的是httpResponse,要使用HttpResponseEncoder进行编码
                        ch.pipeline().addLast(new HttpRequestDecoder());// server端接收到的是httpRequest,要使用HttpRequestDecoder进行解码
                        ch.pipeline().addLast(new HttpObjectAggregator(nettyLength));
                        ch.pipeline().addLast(new HttpServerInboundHandler());
                    }
                }).option(ChannelOption.SO_BACKLOG, 128) // (5)
        .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

           ChannelFuture f = b.bind(port).sync();
           f.channel().closeFuture().sync();
     } finally {
           workerGroup.shutdownGracefully();
           bossGroup.shutdownGracefully();
     }
}

上述中主要的的关于线程模型的对象。EventLoopGroup,EventLoop,ServerBootstrap,Channel。采用了服务端监听线程和IO线程分离,类似于Reactor的多线程模型。

在创建服务端实例的过程中,需要实例化两个EventLoopGroup。
bossGroup线程组其实就是Acceptor线程组,负责监听客户端TCP连接,一般只需要将bossGroup的线程数设置为1。获取客户端连接后,会创建一个Channel,然后交给workerGroup中的一个EventLoop来处理这个Channel的所有读写操作或者其他事件。
workerGroup线程组则是真正服务IO读写操作的线程组,会为每个新创建的Channel分配一个EventLoop,在Channel的整个生命周期中,所有的操作都由EventLoop对应的Thread去完成。

EventLoop接口
通过运行任务的方式来处理连接生命周期内发生的事件,是任何网络通信框架的基本功能。与之相应的编程上的构造成为--事件循环。Netty使用EventLoop来匹配。

EventLoop的构建是基于两个基础API:网络编程和并发。通过JDK 中的java.util.concurrent 包提供线程执行器(提供Thread、taskQueue等)。通过io.netty.channel中的类来实现与Channel的事件进行交互。

一个EventLoop对应一个线程,这个线程可以是监听线程、也可以是工作线程。因为EventLoop继承了concurrent包中的ScheduleExecutorService,所以EventLoop将被分配到一个Thread----永远不会改变的Thread。同时任务(Runnable、Callable)可以直接提交给EventLoop的实现,以立即执行或者调度执行。
一般会创建多个EventLoop实例用于优化资源使用,并且单个EventLoop可能会被指派用于多个Channel。
因为EventLoop会被指派给多个Channel,所有请确保不要加入长时间运行的请求,因为它会阻塞其他需要在同一线程的其他任务。这边指的任务可以理解是对应Channel的事件。

EventLoop的线程管理



EventLoop将负责处理一个Channel的整个生命周期内的所有事件。(IO读写,Channel业务逻辑等)
1.出现一个需要在EventLoop执行的Channel Task
2.在把任务传递给execute方法之后,执行检查确定当前可调用的线程是否是分配给EventLoop的那个线程。(EventLoop并不是工作线程本身,只是给EventLoop分配了一个Thread。当前可用的线程并不一定是Channel 对应的EventLoop中分配的线程,所有需要对线程进行检查。)
3.如果检查是相同的线程,那在EventLoop中可以直接执行任务。
4.如果检查不是EventLoop的线程,就需要将任务放进队列中,以便EventLoop下次处理他的时间时执行。

再次重申:永远不要讲长时间运行的任务放到执行队列中,它将会导致其他需要在同一线程运行的任务被阻塞。如果必须进行阻塞调用或者执行长时间运行任务,建议使用一个专门的EventExecutor。

EventLoopGroup和线程分配

用于执行Channel任务的EventLoop被EventLoopGroup管理,根据不同的传输情况,EventLoop的创建方式和分配方式也也不同。
EventLoopGroup workerGroup = new NioEventLoopGroup(5);
上述代码中的就是创建了一个NioEventLoopGroup,分配了五个EventLoop,每个EventLoop都有一个Thread支撑。在创建EventLoopGroup的时候就直接分配了EventLoop,这样可以确保在需要的时候时可用的。
注意:这时候虽然创建了EventLoop,但是Thread还没有分配的,只有在第一次将Channel注册到EventLoop中,并分配线程。例如,bossGroup中的EventLoop的线程是在ChannelFuture f = b.bind(port).sync();时候注册,然后创建监听线程。但是workGroup的EventLoop因为还没有连接建立,仍然没有分配线程。需要有请求的连接的时候,ServerBootstrap就会创建Channel,然后注册到EventLoop,这时才会分配对应的线程。当然在EventLoop中存在一个state,用于记录thread是否已经start,只有为ST_NOT_START的时候才会去创建线程。创建完以后就不会再次创建。





以下是异步传输中的EventLoop的分配方式。

异步传输中可以使用少量的线程(EventLoop相关的线程),为大量的Channel进行服务。这样的线程模型,EventLoop会被多个Channel所共享,这样可以实现通过尽可能少的Thread来支撑大量的Channel,极大的优化了线程资源,不需要一个Channel分配一个线程。
EventLoopGroup负责为每个新创建的Channel分配一个EventLoop,Channel创建的过程由ServerChannel完成。Channel的分配方式是循环的方式,依次分配给所有的EventLoop,确保Channel均衡的分布。
一个EventLoop可能会被分配多个Channel
一个Channel已经分配给一个EventLoop,这个Channel整个生命中都使用这个EventLoop(关联的Thread)。这样可以避免线程安全和同步问题
EventLoop的分配对ThreadLocal使用的影响。

阻塞传输和异步传输的设计是不一样的。每个Channel都会分配给一个EventLoop。这样的线程模型也能够确保一个Channel在他的生命周期内的所有事件都被一个Thread处理。



接下来会是一些EventLoopGroup、EventLoop源码,主要是EventLoop、EventLoop分配Channel中的细节。比较枯燥无趣,不想看的可以直接跳过,影响不大。

EventLoop创建流程
new NioEventLoopGroup();  or new NioEventLoopGroup(int nThread);

创建NioEventLoopGroup
在创建NioEventLoopGroup的过程中,可以指定EventLoop的个数。默认构造函数的值是0。不过会在后续的构造过程中去判断,如果nThread为0,则指定为可用的CPU内核数 × 2。

在初始化过程中,会加入Executor。SelectorProvider、SelectStrategyFactory、RejectedExecutionHandler 参数,放到args中。后续会通过args[0]、args[1]、args[2]的方式调用。
executor--------io.netty.util.concurrent.ThreadPerTaskExecutor
SelectorProvider----SelectorProvider.provider()------sun.nio.ch.WindowsSelectorProvider
SelectStrategyFactory ----DefaultSelectStrategyFactory.INSTANCE----io.netty.channel.DefaultSelectStrategyFactory
RejectedExecutionHandler-----RejectedExecutionHandlers.reject()---io.netty.util.concurrent.RejectedExecutionHandlers
1.创建了EventLoop需要的Executor实例。
2.在父类MultithreadEventLoopGroup中定义一个 EventExecutor数组(EventExecutor[]  children )。用于保存EventLoop。在初始的话过程中会调用newChild(Executor,Object...)方法,去创建EventLoop。 

NioEventLoopGroup的父类
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
     super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS
: nThreads, executor, args);

}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }
        if (executor == null) {
           executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
       children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
               children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
            } finally {
            }
        }
        chooser = chooserFactory.newChooser(children);
        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };
        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }
        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

创建NioEventLoop

在EventLoop的初始化过程中,
1.将Executor进行注册到对应的字段上,this.executor
2.创建EventLoop对应的任务队列,用于存放Channel事件的任务

private final Queue<Runnable> tailTasks;   SingleThreadEventLoop
private final Queue<Runnable> taskQueue;   SingleThreadEventExecutor
3.将Selector相关的实例注册到对应的字段上。SelectorProvider、Selector、SelectStrategy
在openSelector方法中,通过SelectorProvider(由EventLoopGroup创建是传入)提供一个Selector。SelectorProvider会根据操作系统返回不同的Selector。在window平台下的Selector是sun.nio.ch.WindowsSelectorImpl。会提供一个maybeSelectorImplClass = sun.nio.ch.SelectorImpl对创建Selector进行验证,是否会抛出异常。可以想到WindowsSelectorImpl就是SelectorImpl的子类。最后完成selector的创建工作。

注意:这时候还没有对EventLoop分配对应的Thread,分配线程在EventLoop第一次执行Channel任务的时候 调用startThread()方法中进行。后续会讲到。

通过上述在EventLoop中注入的一些字段executor、taskqueu、Selector等,可以帮助EventLoop完成Thread管理、Runnable任务调度,与Channel、ServerChannel的交互。这也是最开始所说的,EventLoop的协同设计的一部分,基于了两个基本API:并发和网络编程。

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
        SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
     super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
     if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
     }
     if (strategy == null) {
        throw new NullPointerException("selectStrategy");
     }
     provider = selectorProvider;
     final SelectorTuple selectorTuple = openSelector();
     selector = selectorTuple.selector;
     unwrappedSelector = selectorTuple.unwrappedSelector;
     selectStrategy = strategy;
}
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,boolean
addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
       tailTasks = newTaskQueue(maxPendingTasks);
}
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,boolean
addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
       this.maxPendingTasks = Math.max(16, maxPendingTasks);
       this.executor = ObjectUtil.checkNotNull(executor,
"executor");

        taskQueue = newTaskQueue(this.maxPendingTasks);
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }



EventLoop线程初始化,执行Channel任务

因为我们创建的EventLoopGroup有bossGroup和workGroup。所以在创建Channel、执行Channel任务的时候会存在部分差异,这部分差异主要有两点:第一是谁创建的Channel,创建的什么类型的Channel。对于EventLoop的线程初始化时没有影响的。
在已有一个Channel的情况下(可能是ServerChannel,也可能是Channel)。ServerBootStrap会根据这个Channel注册到对应的EventLoopGroup中,并注册到一个EventLoop中,通过EventLoop去执行。
bossGroup的EventLoop是在ServerBootStrap绑定端口(调用bind方法)的时候注册Channel,并在这个时候去创建线程,bind时期创建的Channel就是.channel()方法设置的NioServerSocketChannel。
workGroup是在bossGroup监听到TCP连接后,ServerChannel创建出Channel后,再去创建线程。这时候的Channel就是NioSocketChannel。

下图是bind方法时,注册ServerChannel的断点流程图。

NioEventLoop.register

public ChannelFuture register(final ChannelPromise promise) {
     ObjectUtil.checkNotNull(promise, "promise");
     promise.channel().unsafe().register(this, promise);//进入AbstractChannel.register
     return promise;
}

AbstractChannel.register
舍弃了部分代码。主要是实现了保存了EventLoop的引用。并且将任务提交到EventLoop中。

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
     ......
   AbstractChannel.this.eventLoop = eventLoop;
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
           eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            ......
        }
    }
}

在执行任务的时候首先会进入startThread判断是否Thread已经被分配。如果state为ST_NOT_STRATED则表示线程还没有分配。将state的更新为ST_STARTED后,开始分配Thread的工作。因为state已经被更新,所以下一次有Channel调用eventLoop.execute方法时就不会进入doStartThread方法。
在doStartThread方法中,可以看到使用断言对thread == null进行判断。然后通过EventLoop内部的executor,执行一个Runnable的任务。其中对Thread进行了分配,并调用了SingleThreadEventExecutor.this.run();。然可以明确是这个run方式定义在NioEventLoop中,并且这是一个死循环,确保这个任务线程一直存活着。

NioEventLoop
public void execute(Runnable task) {
    .....
    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task);
    } else {
       startThread();
        addTask(task);
        if (isShutdown() && removeTask(task)) {
            reject();
        }
    }
    .....
    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

SingleThreadEventExecutor
private void startThread() {
    if (state == ST_NOT_STARTED)
{
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED,
ST_STARTED)
) {
           doStartThread();
        }
    }
}
private void doStartThread() {
   assert thread == null;
    executor.execute(new Runnable() {
        @Override
        public void run() {
           thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {
               SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                .......
            }
        }
    });
}

NioEventLoop

protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));
                    // 'wakenUp.compareAndSet(false, true)' is always evaluated
                    // before calling 'selector.wakeup()' to reduce the wake-up
                    // overhead. (Selector.wakeup() is an expensive operation.)
                    //
                    // However, there is a race condition in this approach.
                    // The race condition is triggered when 'wakenUp' is set to
                    // true too early.
                    //
                    // 'wakenUp' is set to true too early if:
                    // 1) Selector is waken up between 'wakenUp.set(false)' and
                    //    'selector.select(...)'. (BAD)
                    // 2) Selector is waken up between 'selector.select(...)' and
                    //    'if (wakenUp.get()) { ... }'. (OK)
                    //
                    // In the first case, 'wakenUp' is set to true and the
                    // following 'selector.select(...)' will wake up immediately.
                    // Until 'wakenUp' is set to false again in the next round,
                    // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                    // any attempt to wake up the Selector will fail, too, causing
                    // the following 'selector.select(...)' call to block
                    // unnecessarily.
                    //
                    // To fix this problem, we wake up the selector again if wakenUp
                    // is true immediately after selector.select(...).
                    // It is inefficient in that it wakes up the selector for both
                    // the first case (BAD - wake-up required) and the second case
                    // (OK - no wake-up required).
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                    // fall through
                default:
            }
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    runAllTasks();
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        // Always handle shutdown even if the loop processing threw an exception.
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

下图是bossGroup监听线程获取到TCP连接,创建Channel,注册Channel的断点流程图。
1.从前文讲到的NioEventLoop的死循环run方法中监听到TCP连接
2.通过Selector操作创建Channel信息
3.通过ServerBootstrap将Channel注册到工作线程中去
4.workerGroup执行Channel任务,在最开始调用startThread,分配线程并将线程启动。和前文中的调用了SingleThreadEventExecutor.this.run();一样。通过死循环监听任务。
注意:bossGroup中的EventLoop使用SingleThreadEventExecutor.this.run();是为了循环监听TCP连接。workerGroup中的是为了循环监听Channel任务。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息