您的位置:首页 > 编程语言 > Java开发

Jetty NIO模型

2016-07-25 09:01 323 查看
http://blog.csdn.net/aesop_wubo/article/details/11537577

jetty NIO是典型reactor模型,如下图所示:



即:mainReactor负责监听server socket,接受新连接,并将建立的socket分派给subReactor。subReactor负责多路分离已连接的socket,读写网络数据,扔给worker线程池来处理。本文主要是讲解jetty中mainReactor、subReactor、线程池的实现。

mainReactor

jetty中的server就相当于一个容器,一个jetty容器包含多个连接器和一个线程池,连接器实现了LifeCycle接口,随容器启动而启动,下图是连接器启动后,监听server socket,建立连接的过程:



可见,jetty利用了线程池来建立连接,每一个连接任务被当成一个job被放到了job队列里面,负责连接的线程会从队列中取出任务来执行,将得到的ServerSocket交给subReactor,下面来看subReactor的实现。

subReactor

这里需要提一下jetty nio很重要的一个类SelectorManager,它负责channel注册,select,wakeup等操作。在SelectorManager中有SelectSet数组,可以把SelectSet理解为SelectorManager的代理,因为真正做事的是SelectSet,这里面SelectSet设计为一个数组,应该也是分而治之的思想,让一个selector监听更少的selectionkey。

SelectSet中有一个非常重要的成员changes,changes中存放了所有有变化的channel、endpoint、attachement。分别在以下情况触发addChannel方法:当有新的通道加入时,当有新的事件到来时,当有数据到来时。

subReactor的执行流程如下图:



在这里导致addChange除了selectorManager.register之外,还有endpoint.updatekey()以及selectionkey数据有变化时等等。

ThreadPool

jetty的线程池相当简单,其实mainReactor与subReactor共用同一个线程池,线程池的实现类是QueuedThreadPool,当然在jetty.xml中可以设置自己的线程池类。简单看下线程池的run方法

[java] view
plain copy

 print?

private Runnable _runnable = new Runnable()  

  {  

      public void run()  

      {  

          boolean shrink=false;  

          try  

          {  

              Runnable job=_jobs.poll();  

              while (isRunning())  

              {  

                  // Job loop  

                  while (job!=null && isRunning())  

                  {  

                      runJob(job);  

                      job=_jobs.poll();  

                  }  

  

                  // Idle loop  

                  try  

                  {  

                      _threadsIdle.incrementAndGet();  

  

                      while (isRunning() && job==null)  

                      {  

                          if (_maxIdleTimeMs<=0)  

                              job=_jobs.take();  

                          else  

                          {  

                              // maybe we should shrink?  

                              final int size=_threadsStarted.get();  

                              if (size>_minThreads)  

                              {  

                                  long last=_lastShrink.get();  

                                  long now=System.currentTimeMillis();  

                                  if (last==0 || (now-last)>_maxIdleTimeMs)  

                                  {  

                                      shrink=_lastShrink.compareAndSet(last,now) &&  

                                      _threadsStarted.compareAndSet(size,size-1);  

                                      if (shrink)  

                                          return;  

                                  }  

                              }  

                              job=idleJobPoll();  

                          }  

                      }  

                  }  

                  finally  

                  {  

                      _threadsIdle.decrementAndGet();  

                  }  

              }  

          }  

          catch(InterruptedException e)  

          {  

                ...  

          }  

      }  

  };  

1、线程池有个最小线程数_minThreads=8,当线程池启动时会创建_minThreads个线程,并启动它们。第12行,线程从任务队列中取出一个任务,并执行。这里使用了while循环表示这里会阻塞等待任务执行完,当任务队列中没有任务时,才会退出while循环;

2、退出while循环后,这个线程就空闲了,在这里需要有个回收策略,在等待_maxIdleTimeMs时间后,如果当前线程数大于_minThreads时,就会回收这个线程。

那么线程数什么时候会大于_minThreads?来看看dispatch()方法中的核心代码

[java] view
plain copy

 print?

// If we had no idle threads or the jobQ is greater than the idle threads  

               if (idle==0 || jobQ>idle)  

               {  

                   int threads=_threadsStarted.get();  

                   if (threads<_maxThreads)  

                       startThread(threads);  

               }  

如果没有空闲的线程或者空闲线程数太少,在保证线程数没有超过_maxThreads时会新建线程。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java