【Java】NIO中Selector的select方法源码分析
该篇博客的有些内容和在之前介绍过了,在这里再次涉及到的就不详细说了,如果有不理解请看【Java】NIO中Channel的注册源码分析, 【Java】NIO中Selector的创建源码分析
Selector的创建在Windows下默认生成WindowsSelectorImpl对象,那么Selector的select方法使用的就是WindowsSelectorImpl的select方法,而在WindowsSelectorImpl下并没有覆盖这个方法,而是由其基类SelectorImpl实现的:
public int select() throws IOException { return this.select(0L); }
这个方法调用了另一个重载的方法:
public int select(long var1) throws IOException { if (var1 < 0L) { throw new IllegalArgumentException("Negative timeout"); } else { return this.lockAndDoSelect(var1 == 0L ? -1L : var1); } }
首先对var1参数的合法性进行判断,无参传入进来的是0,实则交给lockAndDoSelect方法去完成,并且令参数为-1。
private int lockAndDoSelect(long var1) throws IOException { synchronized(this) { if (!this.isOpen()) { throw new ClosedSelectorException(); } else { Set var4 = this.publicKeys; int var10000; synchronized(this.publicKeys) { Set var5 = this.publicSelectedKeys; synchronized(this.publicSelectedKeys) { var10000 = this.doSelect(var1); } } return var10000; } } }
在方法执行时先使用同步块包裹,使用this作为锁;进入同步块先判断当前的Selector对象是否关闭了,因为在初始化时就是开启状态,只有在关闭后isOpen才是false;isOpen是由AbstractSelector实现的:
private AtomicBoolean selectorOpen = new AtomicBoolean(true); public final boolean isOpen() { return selectorOpen.get(); } public final void close() throws IOException { boolean open = selectorOpen.getAndSet(false); if (!open) return; implCloseSelector(); }
可以看到在AbstractSelector中使用了原子化Boolean值表示开启关闭。
回到SelectorImpl的lockAndDoSelect,若是Selector已经关闭则抛出ClosedSelectorException异常,否则分别以publicKeys以及publicSelectedKeys为锁,最终的实现交给抽象方法doSelect完成;
protected abstract int doSelect(long var1) throws IOException;
其中publicKeys是供外部访问的SelectionKey集合,publicSelectedKeys是供外部访问并且已经就绪的SelectionKey集合。
因为使用的是WindowsSelectorImpl,所以来看看WindowsSelectorImpl的doSelect实现:
protected int doSelect(long var1) throws IOException { if (this.channelArray == null) { throw new ClosedSelectorException(); } else { this.timeout = var1; this.processDeregisterQueue(); if (this.interruptTriggered) { this.resetWakeupSocket(); return 0; } else { this.adjustThreadsCount(); this.finishLock.reset(); this.startLock.startThreads(); try { this.begin(); try { this.subSelector.poll(); } catch (IOException var7) { this.finishLock.setException(var7); } if (this.threads.size() > 0) { this.finishLock.waitForHelperThreads(); } } finally { this.end(); } this.finishLock.checkForException(); this.processDeregisterQueue(); int var3 = this.updateSelectedKeys(); this.resetWakeupSocket(); return var3; } } }
首先判断channelArray是否为空,上一篇博客说了channelArray是一个SelectionKeyImpl数组,SelectionKeyImpl负责记录Channel和SelectionKey状态,channelArray是根据连接的Channel数量动态维持的,初始化大小是8。
private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[8];
SelectionKeyImpl是SelectionKey的子类,只有当Selector调用close方法时,在回调函数中才会令channelArray=null,所以这还是检测Selector是否关闭了。
接着继续,在前面传入的long类型的参数是-1,在这里令超时时间timeout就等于-1,
接着调用processDeregisterQueue方法来取消准备撤销的集合
所谓的准备撤销的集合是因为SelectionKey对象在调用cancel方法时,会使Selector将其加入cancelledKeys,仅仅如此,真真的取消是在Selector调用selector方法时执行
SelectionKey的cancel方法是在AbstractSelectionKey中实现的:
public final void cancel() { // Synchronizing "this" to prevent this key from getting canceled // multiple times by different threads, which might cause race // condition between selector's select() and channel's close(). synchronized (this) { if (valid) { valid = false; ((AbstractSelector)selector()).cancel(this); } } }
这个方法在上一篇讲过,可以看到基本上什么都没做,仅仅时调用了与它关联的Selector对象(AbstractSelector)的cancel方法:
AbstractSelector的cancel方法:
private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>(); void cancel(SelectionKey k) { synchronized (cancelledKeys) { cancelledKeys.add(k); } }
cancelledKeys就是所谓的准备撤销的集合,可以看到AbstractSelector的cancel方法仅仅是把此时请求取消的SelectionKey对象加入到cancelledKeys集合中,并没有多余的操作。
回到doSelect方法,processDeregisterQueue这个方法的实现是在SelectorImpl中:
void processDeregisterQueue() throws IOException { Set var1 = this.cancelledKeys(); synchronized(var1) { if (!var1.isEmpty()) { Iterator var3 = var1.iterator(); while(var3.hasNext()) { SelectionKeyImpl var4 = (SelectionKeyImpl)var3.next(); try { this.implDereg(var4); } catch (SocketException var11) { throw new IOException("Error deregistering key", var11); } finally { var3.remove(); } } } } }
这个方法的逻辑比较简单,首先得到准备撤销的集合cancelledKeys,判断是否有请求取消的,若有那么就进行遍历,实际的取消操作主要逻辑交给了抽象方法implDereg执行,最后再从集合中删除这个SelectionKeyImpl对象。
implDereg方法的实现是在WindowsSelectorImpl中:
protected void implDereg(SelectionKeyImpl var1) throws IOException { int var2 = var1.getIndex(); assert var2 >= 0; Object var3 = this.closeLock; synchronized(this.closeLock) { if (var2 != this.totalChannels - 1) { SelectionKeyImpl var4 = this.channelArray[this.totalChannels - 1]; this.channelArray[var2] = var4; var4.setIndex(var2); this.pollWrapper.replaceEntry(this.pollWrapper, this.totalChannels - 1, this.pollWrapper, var2); } var1.setIndex(-1); } this.channelArray[this.totalChannels - 1] = null; --this.totalChannels; if (this.totalChannels != 1 && this.totalChannels % 1024 == 1) { --this.totalChannels; --this.threadsCount; } this.fdMap.remove(var1); this.keys.remove(var1); this.selectedKeys.remove(var1); this.deregister(var1); SelectableChannel var7 = var1.channel(); if (!var7.isOpen() && !var7.isRegistered()) { ((SelChImpl)var7).kill(); } }
首先获取SelectionKeyImpl的下标Index,这个下标就是其在channelArray中的下标,检验下标的合法性;
在同步块内,首先检验这个SelectionKeyImpl对象是否是数组的最后一个元素,若不是那么就直接用最后一个元素覆盖当前位置的SelectionKeyImpl对象,同时还需要将pollWrapper中最后一个元素对应的Channel描述符和事件响应覆盖到相应位置。无论该SelectionKeyImpl对象是否是最后一个,都将其下标置为-1,防止再次访问。
再完成上述操作后,channelArray中的最后一个元素必然是不需要的,直接置为null,再totalChannels再自减。
接着根据totalChannels的数量来判断是否需要减少轮询线程的个数,这和注册时同理,就不再多说。
然后在fdMap中移除掉该SelectionKeyImpl和Channel的描述符映射(fdMap保存的是Channel的描述符和SelectionKeyImpl的映射关系,在上一篇提到过),keys和selectedKeys中同样也需要移除(keys所有注册了的SelectionKey集合,selectedKeys是所有有事件就绪的SelectionKey集合)。
这些操作仅仅是删除了其在Selector中的映射关系,而真正的Channel的(虽说是SelectionKey的cancel方法,实则是Channel要取消对某一事件的响应)取消操作是在deregister中执行:
deregister方法在AbstractSelector中实现:
protected final void deregister(AbstractSelectionKey key) { ((AbstractSelectableChannel)key.channel()).removeKey(key); }
可以看到直接获取SelectionKey对应的channel对象,然后调用AbstractSelectableChannel的removeKey方法:
void removeKey(SelectionKey k) { synchronized (keyLock) { for (int i = 0; i < keys.length; i++) if (keys[i] == k) { keys[i] = null; keyCount--; } ((AbstractSelectionKey)k).invalidate(); } }
前面的遍历很简单,通过遍历Channel的所有绑定的SelectionKey,即keys,直接将要取消的置为null,keyCount再自减,最后调用SelectionKey(AbstractSelectionKey)的invalidate方法:
void invalidate() { valid = false; }
直接设置valid属性为false,表明不可用。
回到implDereg中,最后一步操作,检查Channel的活跃性,若是Channel既没有打开且当且也没有注册了的SelectionKey,那么直接“杀死”该Channel。
而这个kill方法,在不同的Channel中有不同的实现,
SocketChannelImpl中:
public void kill() throws IOException { Object var1 = this.stateLock; synchronized(this.stateLock) { if (this.state != 4) { if (this.state == -1) { this.state = 4; } else { assert !this.isOpen() && !this.isRegistered(); if (this.readerThread == 0L && this.writerThread == 0L) { nd.close(this.fd); this.state = 4; } else { this.state = 3; } } } } }
其中state表示SocketChannelImpl的状态,一共有六种:
private static final int ST_UNINITIALIZED = -1; // 尚未初始化 private static final int ST_UNCONNECTED = 0; // 尚未建立连接 private static final int ST_PENDING = 1; // 未决状态 private static final int ST_CONNECTED = 2; // 连接状态 private static final int ST_KILLPENDING = 3; // KILL的未决状态 private static final int ST_KILLED = 4; // KILL状态 private int state = -1;
这样就很清晰,若是SocketChannelImpl尚未初始化直接变为KILL状态,否则检查再次检查Channel的活跃性,若是不活跃就断言为false,直接结束,否则“杀死”。
接下来的判断中的readerThread和writerThread,我在看完SocketChannelImpl后,发现一直都是赋值的0,并不知道会在何时发生修改,而且这两个成员的赋值都是在有数据读、写操作后,若是有知道的朋友想请教一下!
这个就先不讨论了,但是通过它们的赋值都是发生在有数据读、写操作后,那么就可以明白,若是完成了读、写,那么直接变为KILL状态,否则,等待读、写完成,就变为KILL的未决状态。
其中 nd.close(this.fd),nd是Socket描述符,fd是文件描述符,这就是由操作系统来关闭Socket描述符对应的文件描述符。
ServerSocketChannelImpl中kill:
private static final int ST_UNINITIALIZED = -1; // 尚未初始化 private static final int ST_INUSE = 0; // 使用中 private static final int ST_KILLED = 1; // KILL状态 private int state = -1; public void kill() throws IOException { Object var1 = this.stateLock; synchronized(this.stateLock) { if (this.state != 1) { if (this.state == -1) { this.state = 1; } else { assert !this.isOpen() && !this.isRegistered(); nd.close(this.fd); this.state = 1; } } } }
ServerSocketChannelImpl就要简单一点,基本上一样,由于ServerSocketChannel只能注册ACCEPT事件响应,所以就没有判断读、写。
implDereg方法结束,processDeregisterQueue也彻底结束,再回到doSelect方法
接着检验interruptTriggered,表示是否触发中断。
interruptTriggered初始化时就是false,表示未触发中断,而在调用close或者wakeup方法时会触发中断,赋值true;
先看wakeup方法:
public Selector wakeup() { Object var1 = this.interruptLock; synchronized(this.interruptLock) { if (!this.interruptTriggered) { this.setWakeupSocket(); this.interruptTriggered = true; } return this; } }
可以看到核心是setWakeupSocket方法,当目前没有触发中断调用setWakeupSocket:
private void setWakeupSocket() { this.setWakeupSocket0(this.wakeupSinkFd); } private native void setWakeupSocket0(int var1);
在讲Selector的创建时说过,在Selector创建时会产生一对SocketChannel,分别是SourceChannelImpl和SinkChannelImpl,wakeupSinkFd是SinkChannelImpl的描述符。
再来看看setWakeupSocket0的实现:
Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this, jint scoutFd) { /* Write one byte into the pipe */ const char byte = 1; send(scoutFd, &byte, 1, 0); }
虽然是用C写的,但是依旧很清晰,就是通过这个双向通道的sink端向source发送一个字节的数据,这样source端描述符就进入就绪状态,就能被select感知到,Selector便被唤醒。
再来看下close方法,在AbstractSelector中实现的:
public final void close() throws IOException { boolean open = selectorOpen.getAndSet(false); if (!open) return; implCloseSelector(); }
核心是implCloseSelector,在SelectorImpl中实现:
public void implCloseSelector() throws IOException { this.wakeup(); synchronized(this) { Set var2 = this.publicKeys; synchronized(this.publicKeys) { Set var3 = this.publicSelectedKeys; synchronized(this.publicSelectedKeys) { this.implClose(); } } } }
一开始就直接调用wakeup方法唤醒,然后调用implClose方法:
implClose是在WindowsSelectorImpl中实现的:
protected void implClose() throws IOException { Object var1 = this.closeLock; synchronized(this.closeLock) { if (this.channelArray != null && this.pollWrapper != null) { Object var2 = this.interruptLock; synchronized(this.interruptLock) { this.interruptTriggered = true; } this.wakeupPipe.sink().close(); this.wakeupPipe.source().close(); for(int var7 = 1; var7 < this.totalChannels; ++var7) { if (var7 % 1024 != 0) { this.deregister(this.channelArray[var7]); SelectableChannel var3 = this.channelArray[var7].channel(); if (!var3.isOpen() && !var3.isRegistered()) { ((SelChImpl)var3).kill(); } } } this.pollWrapper.free(); this.pollWrapper = null; this.selectedKeys = null; this.channelArray = null; Iterator var8 = this.threads.iterator(); while(var8.hasNext()) { WindowsSelectorImpl.SelectThread var9 = (WindowsSelectorImpl.SelectThread)var8.next(); var9.makeZombie(); } this.startLock.startThreads(); } } }
根据channelArray和pollWrapper是否为null来检验是否有必要关闭资源,后面就是对一些资源的关闭,可以看到关闭了我们一开始建立的双向通道,取消了所有注册事件,顺便“杀死”不活跃的Channel,删除所有映射关系,将所有轮询线程从阻塞中唤醒,关于makeZombie和startLock后面给出。
再次回到doSelect上,若是发生了中断,调用resetWakeupSocket方法恢复中断:
private void resetWakeupSocket() { Object var1 = this.interruptLock; synchronized(this.interruptLock) { if (this.interruptTriggered) { this.resetWakeupSocket0(this.wakeupSourceFd); this.interruptTriggered = false; } } }
resetWakeupSocket0也是一个native方法,和setWakeupSocket0正好互补,用来读取setWakeupSocket0中发送的数据,再将interruptTriggered设置为false,最后doSelect将会立即返回0,而不会调用poll操作。
在doSelect判断没有触发中断后,首先调用adjustThreadsCount调整轮询线程数量:
private void adjustThreadsCount() { int var1; if (this.threadsCount > this.threads.size()) { for(var1 = this.threads.size(); var1 < this.threadsCount; ++var1) { WindowsSelectorImpl.SelectThread var2 = new WindowsSelectorImpl.SelectThread(var1); this.threads.add(var2); var2.setDaemon(true); var2.start(); } } else if (this.threadsCount < this.threads.size()) { for(var1 = this.threads.size() - 1; var1 >= this.threadsCount; --var1) { ((WindowsSelectorImpl.SelectThread)this.threads.remove(var1)).makeZombie(); } } }
threads是用ArrayList存放的:
private final List<WindowsSelectorImpl.SelectThread> threads = new ArrayList();
逻辑比较简单,通过检查threadsCount的数量和threads的大小比较,若是threadsCount大于threads,则产生一个新的轮询线程SelectThread,将其加入threads,并且设置轮询线程是守护线程,直接启动;若是threadsCount小于threads,则移除并唤醒多余的轮询线程;若是threadsCount等于threads什么都不做。
来看一下SelectThread这个轮询线程具体是怎么工作的:
private final class SelectThread extends Thread { private final int index; final WindowsSelectorImpl.SubSelector subSelector; private long lastRun; private volatile boolean zombie; private SelectThread(int var2) { this.lastRun = 0L; this.index = var2; this.subSelector = WindowsSelectorImpl.this.new SubSelector(var2); this.lastRun = WindowsSelectorImpl.this.startLock.runsCounter; } void makeZombie() { this.zombie = true; } boolean isZombie() { return this.zombie; } public void run() { for(; !WindowsSelectorImpl.this.startLock.waitForStart(this); WindowsSelectorImpl.this.finishLock.threadFinished()) { try { this.subSelector.poll(this.index); } catch (IOException var2) { WindowsSelectorImpl.this.finishLock.setException(var2); } } } }
在构造方法中对几个成员完成初始化,index对应的是其在ArrayList中的下标,lastRun 和startLock有关等会再说,subSelector是真正执行轮询的对象;zombie是一个标志,在startLock中会使用到。
再来看run方法,核心就是调用subSelector的poll方法,而何时调用该方法由startLock来决定。
StartLock的定义:
private final class StartLock { private long runsCounter; private StartLock() { } private synchronized void startThreads() { ++this.runsCounter; this.notifyAll(); } private synchronized boolean waitForStart(WindowsSelectorImpl.SelectThread var1) { while(this.runsCounter == var1.lastRun) { try { WindowsSelectorImpl.this.startLock.wait(); } catch (InterruptedException var3) { Thread.currentThread().interrupt(); } } if (var1.isZombie()) { return true; } else { var1.lastRun = this.runsCounter; return false; } } }
在startThreads方法中,仅仅是通过synchronized 包裹,使runsCounter自增,然后notifyAll唤醒所有持有StartLock对象锁的阻塞。
在WindowsSelectorImpl中StartLock对象有且只有一份,对于所有SelectThread来说StartLock是公共的
waitForStart方法需要结合SelectThread的run方法来看,首先先检验SelectThread的lastRun成员是否和runsCounter相等,若是相等直接阻塞,等待startThreads方法将其唤醒;若是不相等,说明它的run是在startThreads之后运行的,需要将lastRun更新后再执行。
回到SelectThread中,我们再来看看SubSelector的定义:
private final class SubSelector { private final int pollArrayIndex; private final int[] readFds; private final int[] writeFds; private final int[] exceptFds; private SubSelector() { this.readFds = new int[1025]; this.writeFds = new int[1025]; this.exceptFds = new int[1025]; this.pollArrayIndex = 0; } private SubSelector(int var2) { this.readFds = new int[1025]; this.writeFds = new int[1025]; this.exceptFds = new int[1025]; this.pollArrayIndex = (var2 + 1) * 1024; } ...... }
其中无参构造是WindowsSelectorImpl使用的,单参构造由SelectThread使用。
之前在讲Channel的注册时说过,每1024个注册了的Channel会开启一个SelectThread轮询,如果是1024个以内,那么直接由WindowsSelectorImpl轮询,不交给SelectThread处理,超过1024则WindowsSelectorImpl和SelectThread一起轮询。
readFds 、writeFds、exceptFds 分别对应读、写、异常描述符 ,在SubSelector构造中初始化大小都是1025,多出来的一个就是前面说过的wakeupSourceFd描述符,用于唤醒,所以是1025。pollArrayIndex 对应其在pollWrapper中的wakeupSourceFd描述符的起始位置。
再来看看poll方法:
private int poll() throws IOException { return this.poll0(WindowsSelectorImpl.this.pollWrapper.pollArrayAddress, Math.min(WindowsSelectorImpl.this.totalChannels, 1024), this.readFds, this.writeFds, this.exceptFds, WindowsSelectorImpl.this.timeout); } private int poll(int var1) throws IOException { return this.poll0(WindowsSelectorImpl.this.pollWrapper.pollArrayAddress + (long)(this.pollArrayIndex * PollArrayWrapper.SIZE_POLLFD), Math.min(1024, WindowsSelectorImpl.this.totalChannels - (var1 + 1) * 1024), this.readFds, this.writeFds, this.exceptFds, WindowsSelectorImpl.this.timeout); } private native int poll0(long var1, int var3, int[] var4, int[] var5, int[] var6, long var7);
无参poll方法是WindowsSelectorImpl执行的,单参poll是由SelectThread执行;
最后都调用poll0这个native方法,这个方法是真正的轮询核心,交由操作系统来完成。
其中pollArrayAddress是pollArray在内存空间的起始位置,在poll()中直接定位到最开始,而在poll(int var1)中通过加上pollArrayIndex * PollArrayWrapper.SIZE_POLLFD这个偏移量定位。
PollArrayWrapper.SIZE_POLLFD是8,表示pollWrapper中存放的一对Channel描述符和事件响应共8位,0-3位保存Channel描述符fdVal,4-7位保存事件响应events。
第二个参数表明需要底层轮询的描述符fd个数,最后一个是超时时间,若是底层超时是会结束的。
还是回到doSelect方法,在adjustThreadsCount调整完轮询线程后,调用finishLock的reset方法
finishLock定义如下:
private final class FinishLock { private int threadsToFinish; IOException exception; private FinishLock() { this.exception = null; } private void reset() { this.threadsToFinish = WindowsSelectorImpl.this.threads.size(); } private synchronized void threadFinished() { if (this.threadsToFinish == WindowsSelectorImpl.this.threads.size()) { WindowsSelectorImpl.this.wakeup(); } --this.threadsToFinish; if (this.threadsToFinish == 0) { this.notify(); } } ...... }
这个和startLock很相似,也是WindowsSelectorImpl持有,有且仅有一份,所有SelectThread共享,reset方法用来记录在当前select方法执行时需要的轮询线程个数,在SelectThread的run方法中执行完poll方法后,会执行threadFinished,首先this.threadsToFinish == WindowsSelectorImpl.this.threads.size()的判断是为帮助唤醒所有处于poll阻塞的轮询。SelectThread执行完毕,就需要让threadsToFinish自减,至于notify的唤醒和后面有关系。
doSelect中执行完finishLock的reset后,就需要调用startLock的startThreads唤醒所有轮询线程工作。接着调用begin方法:
begin方法在AbstractSelector中实现:
private Interruptible interruptor = null; protected final void begin() { if (interruptor == null) { interruptor = new Interruptible() { public void interrupt(Thread ignore) { AbstractSelector.this.wakeup(); }}; } AbstractInterruptibleChannel.blockedOn(interruptor); Thread me = Thread.currentThread(); if (me.isInterrupted()) interruptor.interrupt(me); }
若是中断器interruptor=null,就创建一个,当当前线程阻塞在I/O操作上并且发生了线程级别的中断时,就会调用wakeup方法唤醒Selector。
doSelect中begin完毕后,调用subSelector的poll方法轮询;若是poll上有事件就绪,那么就不会阻塞,继续往下进行;若poll上没有事件就绪就会等待SelectThread上的事件就绪,通过threadFinished将其唤醒;若是SelectThread上也没有事件就绪就会一直阻塞,除非被外部唤醒,或者调用的是select的单参方法,会阻塞到超时结束。
接着判断是否有轮询线程的工作,调用waitForHelperThreads等待轮询线程的结束:
private synchronized void waitForHelperThreads() { if (this.threadsToFinish == WindowsSelectorImpl.this.threads.size() { WindowsSelectorImpl.this.wakeup(); } while(this.threadsToFinish != 0) { try { WindowsSelectorImpl.this.finishLock.wait(); } catch (InterruptedException var2) { Thread.currentThread().interrupt(); } } }
waitForHelperThreads方法就呼应了threadFinished方法,若是threadsToFinish != 0说明还有轮询线程没有结束,就wait阻塞,一直等到threadsToFinish == 0时再将其唤醒。
当所有轮询结束后,调用end方法:
protected final void end() { AbstractInterruptibleChannel.blockedOn(null); }
这个方法是处理发生中断,具体就不详细介绍了。
然后调用finishLock的checkForException方法检查异常,这个没啥好说的,然后又调用processDeregisterQueue来取消可能在select轮询时发生的SelectionKeyl的撤销。
接着调用updateSelectedKeys方法:
private long updateCount = 0L; private int updateSelectedKeys() { ++this.updateCount; byte var1 = 0; int var4 = var1 + this.subSelector.processSelectedKeys(this.updateCount); WindowsSelectorImpl.SelectThread var3; for(Iterator var2 = this.threads.iterator(); var2.hasNext(); var4 += var3.subSelector.processSelectedKeys(this.updateCount)) { var3 = (WindowsSelectorImpl.SelectThread)var2.next(); } return var4; }
updateCount记录更新次数,即select调用次数;然后调用subSelector的processSelectedKeys方法,得到poll返回的就绪的Channel描述符,也就是得到事件就绪的Channel个数,同理也就需要得到所有SelectThread中的。
其中processSelectedKeys方法如下:
private int processSelectedKeys(long var1) { byte var3 = 0; int var4 = var3 + this.processFDSet(var1, this.readFds, Net.POLLIN, false); var4 += this.processFDSet(var1, this.writeFds, Net.POLLCONN | Net.POLLOUT, false); var4 += this.processFDSet(var1, this.exceptFds, Net.POLLIN | Net.POLLCONN | Net.POLLOUT, true); return var4; }
分别对读、写、异常都处理了,主要还是调用processFDSet方法:
private int processFDSet(long var1, int[] var3, int var4, boolean var5) { int var6 = 0; for(int var7 = 1; var7 <= var3[0]; ++var7) { int var8 = var3[var7]; if (var8 == WindowsSelectorImpl.this.wakeupSourceFd) { synchronized(WindowsSelectorImpl.this.interruptLock) { WindowsSelectorImpl.this.interruptTriggered = true; } } else { WindowsSelectorImpl.MapEntry var9 = WindowsSelectorImpl.this.fdMap.get(var8); if (var9 != null) { SelectionKeyImpl var10 = var9.ski; if (!var5 || !(var10.channel() instanceof SocketChannelImpl) || !WindowsSelectorImpl.this.discardUrgentData(var8)) { if (WindowsSelectorImpl.this.selectedKeys.contains(var10)) { if (var9.clearedCount != var1) { if (var10.channel.translateAndSetReadyOps(var4, var10) && var9.updateCount != var1) { var9.updateCount = var1; ++var6; } } else if (var10.channel.translateAndUpdateReadyOps(var4, var10) && var9.updateCount != var1) { var9.updateCount = var1; ++var6; } var9.clearedCount = var1; } else { if (var9.clearedCount != var1) { var10.channel.translateAndSetReadyOps(var4, var10); if ((var10.nioReadyOps() & var10.nioInterestOps()) != 0) { WindowsSelectorImpl.this.selectedKeys.add(var10); var9.updateCount = var1; ++var6; } } else { var10.channel.translateAndUpdateReadyOps(var4, var10); if ((var10.nioReadyOps() & var10.nioInterestOps()) != 0) { WindowsSelectorImpl.this.selectedKeys.add(var10); var9.updateCount = var1; ++var6; } } var9.clearedCount = var1; } } } } } return var6; }
这个方法其实就是把poll0方法轮询的描述符结果放入传入的数组中,然后通过遍历这个数组,得到相应的Channel描述符,因为之前通过fdMap保存了Channel的描述符和SelectionKeyImpl的映射关系,那么就可以根据Channel描述符找到对应的SelectionKeyImpl对象,再根据传入的状态值var4来更新Channel的状态,最后将其保存在selectedKeys集合中供外部访问。
Selector的select方法到此全部结束。
- 《Java 源码分析》:Java NIO 之 Selector(第二部分selector.select())
- Java NIO——Selector机制解析三(源码分析)
- Java NIO 学习笔记 selector 行为机制分析(select操作 cancel操作)
- Java NIO——Selector机制源码分析---转
- 【Java】NIO中Selector的创建源码分析
- java学习之旅56--数组_StringBuilder和StringBuffer的使用_常用方法_方法链的实现_JDK源码分析
- 源码分析netty服务器创建过程vs java nio服务器创建
- 【Java】HashMap源码分析—put、get、resize方法详解
- JAVA源码分析之---Object类(二)---hashCode,equals,clone方法的使用
- 《Java 源码分析》:Java NIO 之 ServerSocketChannel
- 详解java并发包源码之AQS独占方法源码分析
- Java之类Object方法源码分析
- java 源码分析Arrays.asList方法详解
- 【Java8源码分析】NIO包-Selector选择器
- 源码分析-java-ArrayList-基本方法及实现
- jQuery原型属性constructor,selector,length,jquery和原型方法size,get,toArray源码分析
- java的Arrays.sort(Object[] a)方法源码分析
- 《Java 源码分析》:Java NIO 之 SelectionKey
- 【Java8源码分析】NIO包-Buffer类:ByteBuffer与HeapByteBuffer(一)
- 《Java 源码分析》:Java NIO 之 ServerSocketChannel