您的位置:首页 > 其它

NIO学习笔记——选择器(selectors)

2017-06-26 22:26 513 查看
选择器(Selector)

选择器类管理着一个被注册的通道集合的信息和它们的就绪状态。通道是和选择器一起被注册的,并且使用选择器来更新通道的就绪状态。当这么做的时候,可以选择将被激发的线程挂起,直到有就绪的的通道。

可选择通道(SelectableChannel)

这个抽象类提供了实现通道的可选择性所需要的公共方法。它是所有支持就绪检查的通道类的父类。 FileChannel 对象不是可选择的,因为它们没有继承 SelectableChannel。所有 socket 通道都是可选择的,包括从管道(Pipe)对象的中获得的通道。 SelectableChannel 可以被注册到 Selector 对象上,同时可以指定对

那个选择器而言,那种操作是感兴趣的。一个通道可以被注册到多个选择器上,但对每个选择器而言只能被注册一次

选择键(SelectionKey)

选择键封装了特定的通道与特定的选择器的注册关系。选择键对象被SelectableChannel.register()返回并提供一个表示这种注册关系的标记。选择键包含了两个比特集(以整数的形式进行编码),指示了该注册关系所关心的通道操作,以及通道已经准备好的操作。

通道在被注册到一个选择器上之前,必须先设置为非阻塞模式(通过调用 configureBlocking(false))。

尽管SelectableChannel类上定义了register()方法,还是应该将通道注册到选择器上,而不是另一种方式。选择器维护了一个需要监控的通道的集合。一个给定的通道可以被注册到多于一个的选择器上,而且不需要知道它被注册了那个Selector对象上。将register()放在SelectableChannel上而不是Selector上,这种做法看起来有点随意。它将返回一个封装了两个对象的关系的选择键对象。重要的是要记住选择器对象控制了被注册到它之上的通道的选择过程。

建立选择器

下面看一个建立监控三个 Socket 通道的选择器的代码实例

Selector selector = Selector.open();
channel1.register(selector,SelectionKey.OP_READ);
channel2.register(selector,SelectionKey.OP_WRITE);
channel3.register (selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
// Wait up to 10 seconds for a channel to become ready
readyCount = selector.select (10000);


这些代码创建了一个新的选择器,然后将这三个(已经存在的)socket 通道注册到选择器上,而且感兴趣的操作各不相同。select()方法在将线程置于睡眠状态,直到这些感兴趣的事情中的操作中的一个发生或者 10 秒钟的时间过去。

Selector 对象是通过调用静态工厂方法 open( )来实例化的。选择器不是像通道或流(stream)那样的基本 I/O 对象:数据从来没有通过它们进行传递。类方法 open( )向 SPI 发出请求,通过默认的 SelectorProvider 对象获取一个新的实例。通过调用一个自定义的 SelectorProvider对象的 openSelector( )方法来创建一个 Selector 实例也是可行的。您可以通过调用 provider( )方法来决定由哪个 SelectorProvider 对象来创建给定的 Selector 实例。大多数情况下,您不需要关心 SPI;只需要调用 open( )方法来创建新的 Selector 对象。

下面我们来看看如何将通道注册到选择器上,SelectableChannel的register()方法接手一个Selector对象作为参数。第二个参数表示所关心的通道操作。这是一个表示选择器在检查通道就绪状态时要关心的操作的比特掩码。

一个单独的通道对象可以被注册到多个选择器上。可以调用 isRegistered( )方法来检查一个通道是否被注册到任何一个选择器上

任何一个通道和选择器的注册关系都被封装在一个 SelectionKey 对象中。 keyFor( )方法将返回与该通道和指定的选择器相关的键。如果通道被注册到指定的选择器上,那么相关的键将被返回。如果它们之间没有注册关系,那么将返回 null

使用选择键

键对象表示了一种特定的注册关系。当应该终结这种关系的时候,可以调用 SelectionKey对象的 cancel( )方法。可以通过调用 isValid( )方法来检查它是否仍然表示一种有效的关系。当键被取消时,它将被放在相关的选择器的已取消的键的集合里。注册不会立即被取消,但键会立即失效。当再次调用 select( )方法时(或者一个正在进行的 select()调用结束时),已取消的键的集合中的被取消的键将被清理掉,并且相应的注销也将完成。通道会被注销,而新的SelectionKey 将被返回。

可以通过调用键的 readyOps( )方法来获取相关的通道的已经就绪的操作。 ready 集合是 interest集合的子集,并且表示了 interest 集合中从上次调用 select( )以来已经就绪的那些操作。例如,下面的代码测试了与键关联的通道是否就绪。如果就绪,就将数据读取出来,写入一个缓冲区,并将它送到一个 consumer(消费者)方法中

if ((key.readyOps( ) & SelectionKey.OP_READ) != 0)
{
myBuffer.clear( );
key.channel( ).read (myBuffer);
doSomethingWithBuffer (myBuffer.flip( ));
}


attach( )方法将在键对象中保存所提供的对象的引用。 SelectionKey 类除了保存它之外,不会将它用于任何其他用途。任何一个之前保存在键中的附件引用都会被替换。可以使用 null 值来清除附件。可以通过调用 attachment( )方法来获取与键关联的附件句柄。如果没有附件,或者显式地SelectionKey 对象包含的 ready 集合与最近一次选择器对所注册的通道所作的检查相同。而每个单独的通道的就绪状态会同时改变通过 null 方法进行过设置,这个方法将返回 null。

如果选择键的存续时间很长,但您附加的对象不应该存在那么长时间,请记得在完成后清理附件。否则,您附加的对象将不能被垃圾回收,您将会面临内存泄漏问题

SelectableChannel 类的一个 register( )方法的重载版本接受一个 Object 类型的参数。这是一个方便您在注册时附加一个对象到新生成的键上的方法。以下代码:

SelectionKey key =
channel.register (selector, SelectionKey.OP_READ, myObject);
// 等价于:
SelectionKey key = channel.register (selector, SelectionKey.OP_READ);
key.attach (myObject);


SelectionKey 对象是线程安全的,但知道修改 interest 集合的操作是通过 Selector 对象进行同步的是很重要的。这可能会导致 interestOps( )方法的调用会阻塞不确定长的一段时间。选择器所使用的锁策略(例如是否在整个选择过程中保持这些锁)是依赖于具体实现的。

使用选择器

选择过程

已注册的键的集合(Registered key set)

已选择的键的集合(Selected key set)

已取消的键的集合(Cancelled key set)

选择操作是当三种形式的 select( )中的任意一种被调用时,由选择器执行的。不管是哪一种形式的调用,下面步骤将被执行

1.已取消的键的集合将会被检查。如果它是非空的,每个已取消的键的集合中的键将从另外两个集合中移除,并且相关的通道将被注销。这个步骤结束后,已取消的键的集合将是空的。

2.已注册的键的集合中的键的 interest 集合将被检查。在这个步骤中的检查执行过后,对interest 集合的改动不会影响剩余的检查过程。

一旦就绪条件被定下来,底层操作系统将会进行查询,以确定每个通道所关心的操作的真实就绪状态。依赖于特定的 select( )方法调用,如果没有通道已经准备好,线程可能会在这时阻塞,通常会有一个超时值。直到系统调用完成为止,这个过程可能会使得调用线程睡眠一段时间,然后当前每个通道的就绪状态将确定下来。对于那些还没准备好的通道将不会执行任何的操作。对于那些操作系统指示至少已经准备好 interest集合中的一种操作的通道,将执行以下两种操作中的一种:

a.如果通道的键还没有处于已选择的键的集合中,那么键的 ready 集合将被清空,然后表示操作系统发现的当前通道已经准备好的操作的比特掩码将被设置。

b.否则,也就是键在已选择的键的集合中。键的 ready 集合将被表示操作系统发现的当前已经准备好的操作的比特掩码更新。所有之前的已经不再是就绪状态的操作不会被清除。事实上,所有的比特位都不会被清理。由操作系统决定的 ready 集合是与之前的 ready 集合按位分离的,一旦键被放置于选择器的已选择的键的集合中,它的 ready 集合将是累积的。比特位只会被设置,不会被清理。

3.步骤 2 可能会花费很长时间,特别是所激发的线程处于休眠状态时。与该选择器相关的键可能会同时被取消。当步骤 2 结束时,步骤 1将重新执行,以完成任意一个在选择进行的过程中,键已经被取消的通道的注销。

4.select 操作返回的值是 ready 集合在步骤 2 中被修改的键的数量,而不是已选择的键的集合中的通道的总数。返回值不是已准备好的通道的总数,而是从上一个select( )调用之后进入就绪状态

的通道的数量。之前的调用中就绪的,并且在本次调用中仍然就绪的通道不会被计入,而那些在前一次调用中已经就绪但已经不再处于就绪状态的通道也不会被计入。这些通道可能仍然在已选择的键的集合中,但不会被计入返回值中。返回值可能是 0

停止选择过程

调用 wakeup( )

调用 Selector 对象的 wakeup( )方法将使得选择器上的第一个还没有返回的选择操作立即返回.

调用 close( )

如果选择器的 close( )方法被调用,那么任何一个在选择操作中阻塞的线程都将被唤醒,就像wakeup( )方法被调用了一样。与选择器相关的通道将被注销, 而键将被取消。

调用 interrupt( )

如果睡眠中的线程的 interrupt( )方法被调用,它的返回状态将被设置

管理选择键

选择是累积的。一旦一个选择器将一个键添加到它的已选择的键的集合中,它就不会移除这个键。并且,一旦一个键处于已选择的键的集合中,这个键的 ready 集合将只会被设置,而不会被清理。乍一看,这好像会引起麻烦,因为选择操作可能无法表现出已注册的通道的正确状态。它提供了极大的灵活性,但把合理地管理键以确保它们表示的状态信息不会变得陈旧的任务交给了程序员。

清理一个 SelectKey 的 ready 集合的方式是将这个键从已选择的键的集合中移除。选择键的就绪状态只有在选择器对象在选择操作过程中才会修改。处理思想是只有在已选择的键的集合中的键才被认为是包含了合法的就绪信息的。这些信息将在键中长久地存在,直到键从已选择的键的集合中移除,以通知选择器您已经看到并对它进行了处理。如果下一次通道的一些感兴趣的操作发生时,键将被重新设置以反映当时通道的状态并再次被添加到已选择的键的集合中

我们来看个使用 select( )来为多个通道提供服务的例子

package com.swk.nio;

import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

/**
* Simple echo-back server which listens for incoming stream connections and
* echoes back whatever it reads. A single Selector object is used to listen to
* the server socket (to accept new connections) and all the active socket
* channels.
*
*/
public class SelectSockets {
public static int PORT_NUMBER = 1234;

public static void main(String[] argv) throws Exception {
new SelectSockets().go(argv);
}

public void go(String[] argv) throws Exception {
int port = PORT_NUMBER;
if (argv.length > 0) { // Override default listen port
port = Integer.parseInt(argv[0]);
}
System.out.println("Listening on port " + port);
// Allocate an unbound server socket channel
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// Get the associated ServerSocket to bind it with
ServerSocket serverSocket = serverChannel.socket();
// Create a new Selector for use below
Selector selector = Selector.open();
// Set the port the server channel will listen to
serverSocket.bind(new InetSocketAddress(port));
// Set nonblocking mode for the listening socket
serverChannel.configureBlocking(false);
// Register the ServerSocketChannel with the Selector
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// This may block for a long time. Upon returning, the
// selected set contains keys of the ready channels.
int n = selector.select();
if (n == 0) {
continue; // nothing to do
}
// Get an iterator over the set of selected keys
Iterator it = selector.selectedKeys().iterator();
// Look at each key in the selected set
while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
// Is a new connection coming in?
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key
.channel();
SocketChannel channel = server.accept();
registerChannel(selector, channel, SelectionKey.OP_READ);
sayHello(channel);
}
// Is there data to read on this channel?
if (key.isReadable()) {
readDataFromSocket(key);
}
// Remove key from selected set; it's been handled
it.remove();
}
}
}

// ----------------------------------------------------------
/**
* Register the given channel with the given selector for the given
* operations of interest
*/
protected void registerChannel(Selector selector,
SelectableChannel channel, int ops) throws Exception {
if (channel == null) {
return; // could happen
}
// Set the new channel nonblocking
channel.configureBlocking(false);
// Register it with the selector
channel.register(selector, ops);
}

// ----------------------------------------------------------
// Use the same byte buffer for all channels. A single thread is
// servicing all the channels, so no danger of concurrent acccess.
private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);

/**
* Sample data handler method for a channel with data ready to read.
*
* @param key
*            A SelectionKey object associated with a channel determined by
*            the selector to be ready for reading. If the channel
*            returns142 an EOF condition, it is closed here, which
*            automatically invalidates the associated key. The selector
*            will then de-register the channel on the next select call.
*/
protected void readDataFromSocket(SelectionKey key) throws Exception {
SocketChannel socketChannel = (SocketChannel) key.channel();
int count;
buffer.clear(); // Empty buffer
// Loop while data is available; channel is nonblocking
while ((count = socketChannel.read(buffer)) > 0) {
buffer.flip(); // Make buffer readable
// Send the data; don't assume it goes all at once
while (buffer.hasRemaining()) {
socketChannel.write(buffer);
}
// WARNING: the above loop is evil. Because
// it's writing back to the same nonblocking
// channel it read the data from, this code can
// potentially spin in a busy loop. In real life
// you'd do something more useful than this.
buffer.clear(); // Empty buffer
}
if (count < 0) {
// Close channel on EOF, invalidates the key
socketChannel.close();
}
}

// ----------------------------------------------------------
/**
* Spew a greeting to the incoming client connection.
*
* @param channel
*            The newly connected SocketChannel to say hello to.
*/
private void sayHello(SocketChannel channel) throws Exception {
buffer.clear();
buffer.put("Hi there!\r\n".getBytes());
buffer.flip();
channel.write(buffer);
}
}


并发性

选择器对象是线程安全的,但它们包含的键集合不是。通过 keys( )和 selectKeys( )返回的键的集合是 Selector 对象内部的私有的 Set 对象集合的直接引用。这些集合可能在任意时间被改变。

如果在多个线程并发地访问一个选择器的键的集合的时候存在任何问题,您可以采取一些步骤来合理地同步访问。在执行选择操作时,选择器在 Selector 对象上进行同步,然后是已注册的键的集合,最后是已选择的键的集合,按照这样的顺序

Selector 类的 close( )方法与 slect( )方法的同步方式是一样的,因此也有一直阻塞的可能性。在选择过程还在进行的过程中,所有对 close( )的调用都会被阻塞,直到选择过程结束,或者执行选择的线程进入睡眠。在后面的情况下,执行选择的线程将会在执行关闭的线程获得锁是立即被唤醒,并关闭选择器。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: