jdk的Selector(3)select的过程
2018-01-15 23:54
399 查看
当调用了SelectorImpl的select()方法的时候,同时会将所带的参数,也就是给select()所设置的timeout,之后会调用lockAndDoSelect(),在这个方法中,主要还是调用了doSelect()方法,参数与传进来的一致。以WindowsSelectorImpl为例子,实现的deSelect()方法。
首先,会调用processDeregisterQueue()方法,来将已经准备解除注册的channel进行解除注册。
在这里,会取得所有需要取消注册的SelectionKey,并且依次调用implDereg()进行解除绑定。
如果需要解除注册的channel已经是selector当中最后一个了,那么直接从数组中移走就行,但如果没有,则需要与数组最后一个索引位置上的交换位置,保证数组中间位置的连续,再将其移除,后面的操作与注册的操作相似,但都是反向操作。
在完成取消注册的步骤后,将会调用adjustThreadCount()方法来调整线程的数量,具体看下面的方法。
在Selector中,每有1024条channel,就需要重新开一个线程加入完成监听的操作,这里是从新根据当前应该有的线程数量与此时现存的线程数量进行比较,动态调整。
在这之后,调用了begin()方法,准备开始正式进行select操作。
在这里的begin()方法判断了这里的interruptor是否为空,如果为空,则会在这里重新生成一个,这里的Interruptor保证了当线程阻塞在了Io操作上,并且被interruptor时,保证selecor能够被唤醒。
在begin()方法执行完毕之后,将会调用其subSelector的poll()方法,正式开始select操作。
这里的poll0()还是原生方法的实现。主要是为了监听pollWrapper中所保存的fd是否有数据进出,如果没有进出,则会在在此处在timeout的时间里一直保持阻塞状态。
当完成数据监听,取得相应的数据的时候,在这之后,将会重新检验并取消一边已经被取消的channel之后调用updateSelectedKeys()方法。private int processSelectedKeys(long var1) {
byte var3 = 0;
int var4 = var3 + this.processFDSet(var1, this.readFds, 1, false);
var4 += this.processFDSet(var1, this.writeFds, 6, false);
var4 += this.processFDSet(var1, this.exceptFds, 7, true);
return var4;
}
private int processFDSet(long var1, int[] var3, int var4, boolean var5) {
int var6 = 0;
for(int var7 = 1; var7 <= var3[0]; ++var7) {
int var8 = var3[var7];
if(var8 == WindowsSelectorImpl.this.wakeupSourceFd) {
synchronized(WindowsSelectorImpl.this.interruptLock) {
WindowsSelectorImpl.this.interruptTriggered = true;
}
} else {
WindowsSelectorImpl.MapEntry var9 = WindowsSelectorImpl.this.fdMap.get(var8);
if(var9 != null) {
SelectionKeyImpl var10 = var9.ski;
if(!var5 || !(var10.channel() instanceof SocketChannelImpl) || !WindowsSelectorImpl.this.discardUrgentData(var8)) {
if(WindowsSelectorImpl.this.selectedKeys.contains(var10)) {
if(var9.clearedCount != var1) {
if(var10.channel.translateAndSetReadyOps(var4, var10) && var9.updateCount != var1) {
var9.updateCount = var1;
++var6;
}
} else if(var10.channel.translateAndUpdateReadyOps(var4, var10) && var9.updateCount != var1) {
var9.updateCount = var1;
++var6;
}
var9.clearedCount = var1;
} else {
if(var9.clearedCount != var1) {
var10.channel.translateAndSetReadyOps(var4, var10);
if((var10.nioReadyOps() & var10.nioInterestOps()) != 0) {
WindowsSelectorImpl.this.selectedKeys.add(var10);
var9.updateCount = var1;
++var6;
}
} else {
var10.channel.translateAndUpdateReadyOps(var4, var10);
if((var10.nioReadyOps() & var10.nioInterestOps()) != 0) {
WindowsSelectorImpl.this.selectedKeys.add(var10);
var9.updateCount = var1;
++var6;
}
}
var9.clearedCount = var1;
}
}
}
}
}
return var6;
}
这里,将会在所有线程中调用processSelectedKeys()来对所有线程在poll过程中取得的结果进行处理,并返回所有线程中处理的channel的数量。private int processSelectedKeys(long var1) {
byte var3 = 0;
int var4 = var3 + this.processFDSet(var1, this.readFds, 1, false);
var4 += this.processFDSet(var1, this.writeFds, 6, false);
var4 += this.processFDSet(var1, this.exceptFds, 7, true);
return var4;
}
private int processFDSet(long var1, int[] var3, int var4, boolean var5) {
int var6 = 0;
for(int var7 = 1; var7 <= var3[0]; ++var7) {
int var8 = var3[var7];
if(var8 == WindowsSelectorImpl.this.wakeupSourceFd) {
synchronized(WindowsSelectorImpl.this.interruptLock) {
WindowsSelectorImpl.this.interruptTriggered = true;
}
} else {
WindowsSelectorImpl.MapEntry var9 = WindowsSelectorImpl.this.fdMap.get(var8);
if(var9 != null) {
SelectionKeyImpl var10 = var9.ski;
if(!var5 || !(var10.channel() instanceof SocketChannelImpl) || !WindowsSelectorImpl.this.discardUrgentData(var8)) {
if(WindowsSelectorImpl.this.selectedKeys.contains(var10)) {
if(var9.clearedCount != var1) {
if(var10.channel.translateAndSetReadyOps(var4, var10) && var9.updateCount != var1) {
var9.updateCount = var1;
++var6;
}
} else if(var10.channel.translateAndUpdateReadyOps(var4, var10) && var9.updateCount != var1) {
var9.updateCount = var1;
++var6;
}
var9.clearedCount = var1;
} else {
if(var9.clearedCount != var1) {
var10.channel.translateAndSetReadyOps(var4, var10);
if((var10.nioReadyOps() & var10.nioInterestOps()) != 0) {
WindowsSelectorImpl.this.selectedKeys.add(var10);
var9.updateCount = var1;
++var6;
}
} else {
var10.channel.translateAndUpdateReadyOps(var4, var10);
if((var10.nioReadyOps() & var10.nioInterestOps()) != 0) {
WindowsSelectorImpl.this.selectedKeys.add(var10);
var9.updateCount = var1;
++var6;
}
}
var9.clearedCount = var1;
}
}
}
}
}
return var6;
}
这里实则是对在之前的监听到发生了io时间需要处理的fd与对应的channel进行操作,根据读到的fd取得selector下注册了的相应的channel,根据监听到其所发生的时间类型(读,写,异常)更新channel应有的状态,这是其主要功能,在完成这些操作之后,相应的slector的select也相应完成。
protected int doSelect(long var1) throws IOException { if(this.channelArray == null) { throw new ClosedSelectorException(); } else { this.timeout = var1; this.processDeregisterQueue(); if(this.interruptTriggered) { this.resetWakeupSocket(); return 0; } else { this.adjustThreadsCount(); this.finishLock.reset(); this.startLock.startThreads(); try { this.begin(); try { this.subSelector.poll(); } catch (IOException var7) { this.finishLock.setException(var7); } if(this.threads.size() > 0) { this.finishLock.waitForHelperThreads(); } } finally { this.end(); } this.finishLock.checkForException(); this.processDeregisterQueue(); int var3 = this.updateSelectedKeys(); this.resetWakeupSocket(); return var3; } } }
首先,会调用processDeregisterQueue()方法,来将已经准备解除注册的channel进行解除注册。
void processDeregisterQueue() throws IOException { Set var1 = this.cancelledKeys(); synchronized(var1) { if(!var1.isEmpty()) { Iterator var3 = var1.iterator(); while(var3.hasNext()) { SelectionKeyImpl var4 = (SelectionKeyImpl)var3.next(); try { this.implDereg(var4); } catch (SocketException var12) { IOException var6 = new IOException("Error deregistering key"); var6.initCause(var12); throw var6; } finally { var3.remove(); } } } } }
在这里,会取得所有需要取消注册的SelectionKey,并且依次调用implDereg()进行解除绑定。
protected void implDereg(SelectionKeyImpl var1) throws IOException { int var2 = var1.getIndex(); assert var2 >= 0; Object var3 = this.closeLock; synchronized(this.closeLock) { if(var2 != this.totalChannels - 1) { SelectionKeyImpl var4 = this.channelArray[this.totalChannels - 1]; this.channelArray[var2] = var4; var4.setIndex(var2); this.pollWrapper.replaceEntry(this.pollWrapper, this.totalChannels - 1, this.pollWrapper, var2); } var1.setIndex(-1); } this.channelArray[this.totalChannels - 1] = null; --this.totalChannels; if(this.totalChannels != 1 && this.totalChannels % 1024 == 1) { --this.totalChannels; --this.threadsCount; } this.fdMap.remove(var1); this.keys.remove(var1); this.selectedKeys.remove(var1); this.deregister(var1); SelectableChannel var7 = var1.channel(); if(!var7.isOpen() && !var7.isRegistered()) { ((SelChImpl)var7).kill(); } }
如果需要解除注册的channel已经是selector当中最后一个了,那么直接从数组中移走就行,但如果没有,则需要与数组最后一个索引位置上的交换位置,保证数组中间位置的连续,再将其移除,后面的操作与注册的操作相似,但都是反向操作。
在完成取消注册的步骤后,将会调用adjustThreadCount()方法来调整线程的数量,具体看下面的方法。
private void adjustThreadsCount() { int var1; if(this.threadsCount > this.threads.size()) { for(var1 = this.threads.size(); var1 < this.threadsCount; ++var1) { WindowsSelectorImpl.SelectThread var2 = new WindowsSelectorImpl.SelectThread(var1); this.threads.add(var2); var2.setDaemon(true); var2.start(); } } else if(this.threadsCount < this.threads.size()) { for(var1 = this.threads.size() - 1; var1 >= this.threadsCount; --var1) { ((WindowsSelectorImpl.SelectThread)this.threads.remove(var1)).makeZombie(); } } }
在Selector中,每有1024条channel,就需要重新开一个线程加入完成监听的操作,这里是从新根据当前应该有的线程数量与此时现存的线程数量进行比较,动态调整。
在这之后,调用了begin()方法,准备开始正式进行select操作。
protected final void begin() { if (interruptor == null) { interruptor = new Interruptible() { public void interrupt(Thread ignore) { AbstractSelector.this.wakeup(); }}; } AbstractInterruptibleChannel.blockedOn(interruptor); Thread me = Thread.currentThread(); if (me.isInterrupted()) interruptor.interrupt(me); }
在这里的begin()方法判断了这里的interruptor是否为空,如果为空,则会在这里重新生成一个,这里的Interruptor保证了当线程阻塞在了Io操作上,并且被interruptor时,保证selecor能够被唤醒。
在begin()方法执行完毕之后,将会调用其subSelector的poll()方法,正式开始select操作。
private int poll() throws IOException { return this.poll0(WindowsSelectorImpl.this.pollWrapper.pollArrayAddress, Math.min(WindowsSelectorImpl.this.totalChannels, 1024), this.readFds, this.writeFds, this.exceptFds, WindowsSelectorImpl.this.timeout); } private int poll(int var1) throws IOException { return this.poll0(WindowsSelectorImpl.this.pollWrapper.pollArrayAddress + (long)(this.pollArrayIndex * PollArrayWrapper.SIZE_POLLFD), Math.min(1024, WindowsSelectorImpl.this.totalChannels - (var1 + 1) * 1024), this.readFds, this.writeFds, this.exceptFds, WindowsSelectorImpl.this.timeout); } private nati aa41 ve int poll0(long var1, int var3, int[] var4, int[] var5, int[] var6, long var7);
这里的poll0()还是原生方法的实现。主要是为了监听pollWrapper中所保存的fd是否有数据进出,如果没有进出,则会在在此处在timeout的时间里一直保持阻塞状态。
当完成数据监听,取得相应的数据的时候,在这之后,将会重新检验并取消一边已经被取消的channel之后调用updateSelectedKeys()方法。private int processSelectedKeys(long var1) {
byte var3 = 0;
int var4 = var3 + this.processFDSet(var1, this.readFds, 1, false);
var4 += this.processFDSet(var1, this.writeFds, 6, false);
var4 += this.processFDSet(var1, this.exceptFds, 7, true);
return var4;
}
private int processFDSet(long var1, int[] var3, int var4, boolean var5) {
int var6 = 0;
for(int var7 = 1; var7 <= var3[0]; ++var7) {
int var8 = var3[var7];
if(var8 == WindowsSelectorImpl.this.wakeupSourceFd) {
synchronized(WindowsSelectorImpl.this.interruptLock) {
WindowsSelectorImpl.this.interruptTriggered = true;
}
} else {
WindowsSelectorImpl.MapEntry var9 = WindowsSelectorImpl.this.fdMap.get(var8);
if(var9 != null) {
SelectionKeyImpl var10 = var9.ski;
if(!var5 || !(var10.channel() instanceof SocketChannelImpl) || !WindowsSelectorImpl.this.discardUrgentData(var8)) {
if(WindowsSelectorImpl.this.selectedKeys.contains(var10)) {
if(var9.clearedCount != var1) {
if(var10.channel.translateAndSetReadyOps(var4, var10) && var9.updateCount != var1) {
var9.updateCount = var1;
++var6;
}
} else if(var10.channel.translateAndUpdateReadyOps(var4, var10) && var9.updateCount != var1) {
var9.updateCount = var1;
++var6;
}
var9.clearedCount = var1;
} else {
if(var9.clearedCount != var1) {
var10.channel.translateAndSetReadyOps(var4, var10);
if((var10.nioReadyOps() & var10.nioInterestOps()) != 0) {
WindowsSelectorImpl.this.selectedKeys.add(var10);
var9.updateCount = var1;
++var6;
}
} else {
var10.channel.translateAndUpdateReadyOps(var4, var10);
if((var10.nioReadyOps() & var10.nioInterestOps()) != 0) {
WindowsSelectorImpl.this.selectedKeys.add(var10);
var9.updateCount = var1;
++var6;
}
}
var9.clearedCount = var1;
}
}
}
}
}
return var6;
}
private int updateSelectedKeys() { ++this.updateCount; byte var1 = 0; int var4 = var1 + this.subSelector.processSelectedKeys(this.updateCount); WindowsSelectorImpl.SelectThread var3; for(Iterator var2 = this.threads.iterator(); var2.hasNext(); var4 += var3.subSelector.processSelectedKeys(this.updateCount)) { var3 = (WindowsSelectorImpl.SelectThread)var2.next(); } return var4; }
这里,将会在所有线程中调用processSelectedKeys()来对所有线程在poll过程中取得的结果进行处理,并返回所有线程中处理的channel的数量。private int processSelectedKeys(long var1) {
byte var3 = 0;
int var4 = var3 + this.processFDSet(var1, this.readFds, 1, false);
var4 += this.processFDSet(var1, this.writeFds, 6, false);
var4 += this.processFDSet(var1, this.exceptFds, 7, true);
return var4;
}
private int processFDSet(long var1, int[] var3, int var4, boolean var5) {
int var6 = 0;
for(int var7 = 1; var7 <= var3[0]; ++var7) {
int var8 = var3[var7];
if(var8 == WindowsSelectorImpl.this.wakeupSourceFd) {
synchronized(WindowsSelectorImpl.this.interruptLock) {
WindowsSelectorImpl.this.interruptTriggered = true;
}
} else {
WindowsSelectorImpl.MapEntry var9 = WindowsSelectorImpl.this.fdMap.get(var8);
if(var9 != null) {
SelectionKeyImpl var10 = var9.ski;
if(!var5 || !(var10.channel() instanceof SocketChannelImpl) || !WindowsSelectorImpl.this.discardUrgentData(var8)) {
if(WindowsSelectorImpl.this.selectedKeys.contains(var10)) {
if(var9.clearedCount != var1) {
if(var10.channel.translateAndSetReadyOps(var4, var10) && var9.updateCount != var1) {
var9.updateCount = var1;
++var6;
}
} else if(var10.channel.translateAndUpdateReadyOps(var4, var10) && var9.updateCount != var1) {
var9.updateCount = var1;
++var6;
}
var9.clearedCount = var1;
} else {
if(var9.clearedCount != var1) {
var10.channel.translateAndSetReadyOps(var4, var10);
if((var10.nioReadyOps() & var10.nioInterestOps()) != 0) {
WindowsSelectorImpl.this.selectedKeys.add(var10);
var9.updateCount = var1;
++var6;
}
} else {
var10.channel.translateAndUpdateReadyOps(var4, var10);
if((var10.nioReadyOps() & var10.nioInterestOps()) != 0) {
WindowsSelectorImpl.this.selectedKeys.add(var10);
var9.updateCount = var1;
++var6;
}
}
var9.clearedCount = var1;
}
}
}
}
}
return var6;
}
这里实则是对在之前的监听到发生了io时间需要处理的fd与对应的channel进行操作,根据读到的fd取得selector下注册了的相应的channel,根据监听到其所发生的时间类型(读,写,异常)更新channel应有的状态,这是其主要功能,在完成这些操作之后,相应的slector的select也相应完成。
相关文章推荐
- Oracle学习笔记:oracle和serverver在过程sql中通过select对变量进行赋值的区别
- 在阿里云上配置服务器,安装jdk+tomcat+mysql的过程
- 存储过程结果进行查询 select 存过过程
- jdk的安装过程 注册表中的信息
- Oracle中select语句执行过程
- 【JAVA初学者】配置JDK的详细过程,以及第一个JAVA程序的编写。
- Linux下安装jdk过程
- select sql 语句执行过程 顺序
- java安装过程易出现问题(jdk,jre)
- ubuntu安装jdk全过程
- Java SE的简单介绍及JDK的初步安装过程
- SQL 查询语言 (1、 Select语句之过程)
- Oracle的存储过程能返回一个select查询结果集吗
- rails 日期选择gem calendar_date_select 安装全过程
- SELECT top n 储存过程
- LINUX下安装jdk过程及其环境变量配置
- ubuntu安装jdk过程
- 存储过程中的select字符串排列顺序要与表中字段的排序顺序一致 ,否则在调用有些字段值无法读出
- JSP环境的配置过程!(JDK+TOMCAT+MYSQL)
- JDK和Hadoop的安装过程