【Java】NIO中Selector的创建源码分析
在使用Selector时首先需要通过静态方法open创建Selector对象
public static Selector open() throws IOException { return SelectorProvider.provider().openSelector(); }
可以看到首先是调用SelectorProvider的静态方法provider,得到一个Selector的提供者
public static SelectorProvider provider() { synchronized (lock) { if (provider != null) return provider; return AccessController.doPrivileged( new PrivilegedAction<SelectorProvider>() { public SelectorProvider run() { if (loadProviderFromProperty()) return provider; if (loadProviderAsService()) return provider; provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); } }
这段代码的逻辑也比较简单,首先判断provider是否已经产生,若已经产生,则直接返回现有的;若没有,则需要调用AccessController的静态方法doPrivileged,该方法是一个native方法,就不说了;可以看到在实现的PrivilegedAction接口中的run方法,做了三次判断:
第一次是根据是系统属性,使用ClassLoader类加载:
private static boolean loadProviderFromProperty() { String cn = System.getProperty("java.nio.channels.spi.SelectorProvider"); if (cn == null) return false; try { Class<?> c = Class.forName(cn, true, ClassLoader.getSystemClassLoader()); provider = (SelectorProvider)c.newInstance(); return true; } catch (ClassNotFoundException x) { throw new ServiceConfigurationError(null, x); } catch (IllegalAccessException x) { throw new ServiceConfigurationError(null, x); } catch (InstantiationException x) { throw new ServiceConfigurationError(null, x); } catch (SecurityException x) { throw new ServiceConfigurationError(null, x); } }
先获取键值为"java.nio.channels.spi.SelectorProvider"的属性,若没有,则直接返回false;若设置了,则需要使用加载器直接加载系统属性设置的java.nio.channels.spi.SelectorProvider的实现类,再通过反射机制直接产生实例对象并赋值给静态成员provider,最后返回true。
第二次使用ServiceLoader加载:
private static boolean loadProviderAsService() { ServiceLoader<SelectorProvider> sl = ServiceLoader.load(SelectorProvider.class, ClassLoader.getSystemClassLoader()); Iterator<SelectorProvider> i = sl.iterator(); for (;;) { try { if (!i.hasNext()) return false; provider = i.next(); return true; } catch (ServiceConfigurationError sce) { if (sce.getCause() instanceof SecurityException) { // Ignore the security exception, try the next provider continue; } throw sce; } } }
有关ServiceLoader的加载过程可以看我的上一篇博客【Java】ServiceLoader源码分析,在这里我就不累赘了。
该方法调用ServiceLoader的load加载在"META-INF/services/"路径下指明的SelectorProvider.class的实现类(其实是懒加载,在迭代时才真正加载)得到ServiceLoader对象,通过该对象的带迭代器,遍历这个迭代器;可以看到若是迭代器不为空,则直接返回迭代器保存的第一个元素,即第一个被加载的类的对象,并赋值给provider,返回true;否则返回false;
第三次是使用的默认的SelectorProvider(windows环境为例):
public class DefaultSelectorProvider { private DefaultSelectorProvider() { } public static SelectorProvider create() { return new WindowsSelectorProvider(); } }
可以看到直接返回了WindowsSelectorProvider赋值给provider ;
此时provider无论如何都已经有了,接下来就是调用provider的openSelector方法。
WindowsSelectorProvider的openSelector方法:
public class WindowsSelectorProvider extends SelectorProviderImpl { public WindowsSelectorProvider() { } public AbstractSelector openSelector() throws IOException { return new WindowsSelectorImpl(this); } }
可以看到仅仅是产生了WindowsSelectorImpl:
WindowsSelectorImpl(SelectorProvider var1) throws IOException { super(var1); this.wakeupSourceFd = ((SelChImpl)this.wakeupPipe.source()).getFDVal(); SinkChannelImpl var2 = (SinkChannelImpl)this.wakeupPipe.sink(); var2.sc.socket().setTcpNoDelay(true); this.wakeupSinkFd = var2.getFDVal(); this.pollWrapper.addWakeupSocket(this.wakeupSourceFd, 0); }
WindowsSelectorImpl首先调用父类SelectorImpl的构造方法:
protected Set<SelectionKey> selectedKeys = new HashSet(); protected HashSet<SelectionKey> keys = new HashSet(); private Set<SelectionKey> publicKeys; private Set<SelectionKey> publicSelectedKeys; protected SelectorImpl(SelectorProvider var1) { super(var1); if (Util.atBugLevel("1.4")) { this.publicKeys = this.keys; this.publicSelectedKeys = this.selectedKeys; } else { this.publicKeys = Collections.unmodifiableSet(this.keys); this.publicSelectedKeys = Util.ungrowableSet(this.selectedKeys); } }
SelectorImpl同样调用父类AbstractSelector的构造:
protected AbstractSelector(SelectorProvider provider) { this.provider = provider; }
此时的provider就是刚才产生的WindowsSelectorProvider对象;
在SelectorImpl中还会对其成员有一系列的赋值操作;
上述都完成后才继续完成WindowsSelectorImpl的构造。
WindowsSelectorImpl在进行this.wakeupSourceFd = ((SelChImpl)this.wakeupPipe.source()).getFDVal()之前,其wakeupPipe成员如下:
private final Pipe wakeupPipe = Pipe.open();
wakeupPipe管道通过Pipe.open()赋值:
public static Pipe open() throws IOException { return SelectorProvider.provider().openPipe(); }
可以看到实际上 SelectorProvider.provider()的provider的openPipe方法,而这个provider就是WindowsSelectorProvider,而WindowsSelectorProvider继承自SelectorProviderImpl,openPipe方法是在SelectorProviderImpl里实现的:
public Pipe openPipe() throws IOException { return new PipeImpl(this); }
该方法直接产生了PipeImpl对象,并将WindowsSelectorProvider对象传入进去:
PipeImpl(SelectorProvider var1) throws IOException { try { AccessController.doPrivileged(new PipeImpl.Initializer(var1)); } catch (PrivilegedActionException var3) { throw (IOException)var3.getCause(); } }
可以看到这个构造方法实际上是以特权模式运行的PipeImpl的内部类Initializer的run方法(doPrivileged需要的参数是PrivilegedExceptionAction接口的实现类,该接口只有run方法):
Initializer 的初始化:
private class Initializer implements PrivilegedExceptionAction<Void> { private final SelectorProvider sp; private IOException ioe; private Initializer(SelectorProvider var2) { this.ioe = null; this.sp = var2; } ...... }
该构造方法给sp赋值为传入进来的WindowsSelectorProvider对象,令ioe=null;
其所实现的run方法如下:
public Void run() throws IOException { PipeImpl.Initializer.LoopbackConnector var1 = new PipeImpl.Initializer.LoopbackConnector(); var1.run(); if (this.ioe instanceof ClosedByInterruptException) { this.ioe = null; Thread var2 = new Thread(var1) { public void interrupt() { } }; var2.start(); while(true) { try { var2.join(); break; } catch (InterruptedException var4) { ; } } Thread.currentThread().interrupt(); } if (this.ioe != null) { throw new IOException("Unable to establish loopback connection", this.ioe); } else { return null; } }
首先产生LoopbackConnector 对象,是Initializer的内部类,而且实现了Runnable接口:
private class LoopbackConnector implements Runnable { private LoopbackConnector() { } }
其实现的run方法如下:
public void run() { ServerSocketChannel var1 = null; SocketChannel var2 = null; SocketChannel var3 = null; try { ByteBuffer var4 = ByteBuffer.allocate(16); ByteBuffer var5 = ByteBuffer.allocate(16); InetAddress var6 = InetAddress.getByName("127.0.0.1"); assert var6.isLoopbackAddress(); InetSocketAddress var7 = null; while(true) { if (var1 == null || !var1.isOpen()) { var1 = ServerSocketChannel.open(); var1.socket().bind(new InetSocketAddress(var6, 0)); var7 = new InetSocketAddress(var6, var1.socket().getLocalPort()); } var2 = SocketChannel.open(var7); PipeImpl.RANDOM_NUMBER_GENERATOR.nextBytes(var4.array()); do { var2.write(var4); } while(var4.hasRemaining()); var4.rewind(); var3 = var1.accept(); do { var3.read(var5); } while(var5.hasRemaining()); var5.rewind(); if (var5.equals(var4)) { PipeImpl.this.source = new SourceChannelImpl(Initializer.this.sp, var2); PipeImpl.this.sink = new SinkChannelImpl(Initializer.this.sp, var3); break; } var3.close(); var2.close(); } } catch (IOException var18) { try { if (var2 != null) { var2.close(); } if (var3 != null) { var3.close(); } } catch (IOException var17) { ; } Initializer.this.ioe = var18; } finally { try { if (var1 != null) { var1.close(); } } catch (IOException var16) { ; } } }
在这个run方法中首先定义了三个Channel一个ServerSocketChannel和两个SocketChannel,然后申请了两个十六字节的ByteBuffer缓冲区,定义了一个回送地址var6;在while循环中先检查ServerSocketChannel是否开启了,若没有则需要调用open方法开启并赋值给var1,绑定地址为var6即回送地址,端口为0,令var7这个InetSocketAddress对象的地址是var6,端口是ServerSocketChannel的端口;ServerSocketChannel初始化完毕,初始化一个SocketChannel即var2,通过刚才的var7这个InetSocketAddress对象和ServerSocketChannel建立连接;
在PipeImpl里有一个静态成员:
private static final Random RANDOM_NUMBER_GENERATOR = new SecureRandom();
RANDOM_NUMBER_GENERATOR 听名字就知道它是用来生成随机数;
通过RANDOM_NUMBER_GENERATOR将从生成的随机数存放在其中一个缓冲区ByteBuffer(var4)中,然后通过刚才连接好的SocketChannel即var2的write方法写入缓冲区中的所有可用数据发送给ServerSocketChannel;令var4缓冲区标志置0;接着ServerSocketChannel调用accept方法侦听刚才的连接产生一个SocketChannel对象var3,从var3中读取数据存放在缓冲区var5中,令var5缓冲区标志置0;然后比较var4和var5中的内容是否一致,若是一致则给PipeImpl的成员source和sink分别初始化保存起来,若不一致就继续循环,不断地重复上述过程,直至Pipe通道成功建立;至此结束LoopbackConnector的run方法。
其在连接建立的过程中若是出现了异常会通过Initializer的ioe成员保存异常。
再回到Initializer的run方法,在完成LoopbackConnector的run方法后,再根据ioe判读是否在刚才的连接建立中出现了ClosedByInterruptException异常,若是出现还需要通过线程启动LoopbackConnector的run方法直至其结束;若不是ClosedByInterruptException异常则直接抛出IOException。
至此PipeImpl的构造结束,再回到WindowsSelectorImpl的构造,通过上述的操作产生的PipeImpl对象就赋值给了wakeupPipe成员;wakeupPipe的source就是刚才产生的SourceChannelImpl对象,wakeupPipe的sink就是刚才产生的SinkChannelImpl对象,再使用wakeupSourceFd保存source的fdVal值和wakeupSinkFd保存sink的fdVal值;并且开启Nagle算法,最后使用pollWrpper成员保存source的fdVal值。
上述建立的这个连接通道的主要目的不是为了确保能建立连接,而是为了解决Selector的select方法的阻塞问题,调用select方法时只有注册在Selector上的channel有事件就绪时才会被唤醒,而Selector提供的wakeup方法就利用了上述建立好的通道,通过SinkChannel给SourceChannel发送信号量,使得select被唤醒,具体实现会在后续的博客给出。
Selector到此创建完毕。
- Java NIO——Selector机制解析三(源码分析)
- Java NIO——Selector机制源码分析---转
- 源码分析netty服务器创建过程vs java nio服务器创建
- 《Java 源码分析》:Java NIO 之 Selector(第二部分selector.select())
- 源码分析:Java堆的创建
- 《Java 源码分析》:Java NIO 之 ServerSocketChannel
- 【深入Java开发】JVM源码分析之一个Java进程究竟能创建多少线程
- JVM源码分析之一个Java进程究竟能创建多少线程
- 【简记】Java Web 内幕——Spring源码(组件分析,BeanFactory源码,Bean创建之前)
- Java NIO 学习笔记 selector 行为机制分析(select操作 cancel操作)
- JVM源码分析之Java对象的创建过程
- 源码分析:Java堆的创建
- netty(四) NIO创建的TimerServer源码分析之服务端
- 【Java8源码分析】NIO包-Selector选择器
- 【Java】NIO中Channel的注册源码分析
- 【Java8源码分析】NIO包-FileChannel
- 源码分析:Java堆的创建
- Java并发编程:线程池创建及源码分析
- 《Java 源码分析》:Java NIO 之 ServerSocketChannel
- 《Java 源码分析》:Java NIO 之 SelectionKey