您的位置:首页 > 编程语言 > Java开发

【Java8源码分析】NIO包-Selector选择器

2017-06-13 22:09 686 查看
转载请注明出处:http://blog.csdn.net/linxdcn/article/details/73028902

1 概述

Java NIO 由以下几个核心部分组成:

Buffer

Channel

Selectors



相关类的使用方法可以参考Java NIO 系列教程,写的通俗易懂。

本文主要从源码方面分析一下Selector选择器。关于Selector类主要涉及三个重要的方法如下:

Selector.open()
:创建一个Selector实例

selector.select()
:获取已经就绪的通道

channel.register(Selector sel, int ops, Object att)
:注册需要监听的通道

2 open函数

实质上,
Selector.open()
函数会根据操作系统返回Selector实例,Windows下返回一个
WindowsSelectorImpl
实例,Linux下返回
PollSelectorImpl
实例,下面我们以Linux为例说明,
PollSelectorImpl
的构造函数如下:

PollSelectorImpl(SelectorProvider sp) {
super(sp, 1, 1);
// native方法
// 1 创建一个管道pipe,读端fd在高32位,写端fd在低32位
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;

// 2 初始化一个PollArrayWrapper,本质上是pollfd数组
pollWrapper = new PollArrayWrapper(INIT_CAP);

// 3 把读端fd添加到PollArrayWrapper的fd数组中
pollWrapper.initInterrupt(fd0, fd1);

// 4 初始化SelectionKey数组
channelArray = new SelectionKeyImpl[INIT_CAP];
}


初始化的主要步骤如下:

创建一个管道pipe,读端在高32位,写端在低32位

初始化一个PollArrayWrapper,本质上是pollfd数组

把读端fd添加到PollArrayWrapper的fd数组中

初始化SelectionKey数组

2.1 PollArrayWrapper

PollArrayWrapper
初始化时,通过Unsafe类申请一块物理内存,存放注册时的socket句柄fdVal和event的数据结构pollfd,其中pollfd共8位,0~3位保存socket句柄,4~7位保存event。

本质上PollArrayWrapper是一个pollfd数组。并且提供了添加pollfd的函数:

// Adds Windows wakeup socket at a given index.
void addWakeupSocket(int fdVal, int index) {
putDescriptor(index, fdVal);
putEventOps(index, POLLIN);
}


3 select函数

protected int doSelect(long timeout) throws IOException
{
if (channelArray == null)
throw new ClosedSelectorException();

// 处理已经不监听的事件(文件描述符或Channel)
processDeregisterQueue();
try {
// 标志开始一个可能会被中断的IO操作
begin();
// 1 调用native方法epoll获取已经就绪的pollfd
pollWrapper.poll(totalChannels, 0, timeout);
} finally {
end();
}
// 处理已经不监听的事件(文件描述符或Channel)
processDeregisterQueue();
// 2 获取就绪的Ke
c217
y的数目,并且将就绪的Key赋值给selector的selectedKey
int numKeysUpdated = updateSelectedKeys();
if (pollWrapper.getReventOps(0) != 0) {
// 清除wakeup通道
pollWrapper.putReventOps(0, 0);
synchronized (interruptLock) {
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}


select方法最主要的是这一句
pollWrapper.poll(totalChannels, 0, timeout);
,这一方法是一个native方法,本质上是调用了系统的epoll方法,对epoll不了解的,可以参考epoll源码分析一文。该方法会把已经就绪的pollfd添加到pollWrapper里的数组中。

4 register函数

既然Selector是监听哪些Channel已经就绪,那自然有往Selector注册Channel的函数。能被“select“的Channel都继承
SelectableChannel
,注册函数如下:

// SelectionKey保存注册时的channel、selector、event
// 以及保存在pollWrapper的偏移位置index
public final SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException
{
synchronized (regLock) {
if (!isOpen())
throw new ClosedChannelException();
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (blocking)
throw new IllegalBlockingModeException();

SelectionKey k = findKey(sel);
// 1 如果该channel和selector已经注册过,则直接添加感兴趣的事件和附件
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
// 2 否则通过selector实现注册过程
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
// 调用select的regist
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
}


主要步骤如下:

如果该channel和selector已经注册过,则直接添加感兴趣的事件和附件

否则通过selector实现注册过程,调用select的regist,如下

// selector的register方法
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops,
Object attachment)
{
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();

// 1 新建SelectionKey,注册感兴趣事件和附件
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
synchronized (publicKeys) {
implRegister(k);
}
k.interestOps(ops);
return k;
}

protected void implRegister(SelectionKeyImpl ski) {
synchronized (closeLock) {
if (closed)
throw new ClosedSelectorException();

// 查看pollWrapper中的pollfd数组是否足够大
if (channelArray.length == totalChannels) {
// Make a larger array
int newSize = pollWrapper.totalChannels * 2;
SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
// Copy over
for (int i=channelOffset; i<totalChannels; i++)
temp[i] = channelArray[i];
channelArray = temp;
// Grow the NativeObject poll array
pollWrapper.grow(newSize);
}

// 2 把新建的SelectionKey添加到pollWrapper的channel数组
channelArray[totalChannels] = ski;
ski.setIndex(totalChannels);
pollWrapper.addEntry(ski.channel);
totalChannels++;
keys.add(ski);
}
}


主要步骤如下:

新建SelectionKey,注册感兴趣事件和附件

把新建的SelectionKey添加到pollWrapper的channel数组

转载请注明出处:http://blog.csdn.net/linxdcn/article/details/73028902
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: