您的位置:首页 > 编程语言 > Java开发

Java NIO —— Selector(用于SocketChannel)

2016-12-02 17:15 441 查看

使用Selector的原因

Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。

仅用单个线程来处理多个Channels的好处是,只需要更少的线程来处理通道。事实上,可以只用一个线程处理所有的通道。对于操作系统来说,线程之间上下文切换的开销很大,而且每个线程都要占用系统的一些资源(如内存)。因此,使用的线程越少越好。

但是,需要记住,现代的操作系统和CPU在多任务方面表现的越来越好,所以多线程的开销随着时间的推移,变得越来越小了。实际上,如果一个CPU有多个内核,不使用多任务可能是在浪费CPU能力。不管怎么说,关于那种设计的讨论应该放在另一篇不同的文章中。在这里,只要知道使用Selector能够处理多个通道就足够了。

下面是单线程使用一个Selector处理3个channel的示例图:



范例

说明一下,实际上,对数据的处理应该放在一个专门的线程中来执行,但是我为了使代码更简洁,看起来更容易理解,因此并没有这么做!!!

BaseSocketChannelSelector.java

package com.demo.channel;

import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;

public abstract class BaseSocketChannelSelector {
private Selector selector;

protected BaseSocketChannelSelector() throws IOException {
selector = Selector.open();
}

protected void registerChannel(SelectableChannel channel, int ops) {
try {
// 与Selector一起使用时,Channel必须处于非阻塞模式下。这意味着不能将FileChannel与Selector一起使用,因为FileChannel不能切换到非阻塞模式。而套接字通道都可以。
channel.configureBlocking(false);
// 可以监听以下四种不同类型的事件 SelectionKey.OP_ACCEPT(server) | SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE
SelectionKey key = channel.register(selector, ops);
} catch (IOException e) {
e.printStackTrace();
}
}

public void select() {
while (true) {
try {
/*
* 每次使用select时必须判断下selector是否已经被关闭,否则会报如下错误
* Exception in thread "pool-1-thread-1" java.nio.channels.ClosedSelectorException at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:83)
*/
if (!selector.isOpen()) {
System.out.println("selector is closed");
break;
}
// This method performs a blocking selection operation.It returns only after at least one channel is selected, this selector's wakeup method is invoked, or the current thread is interrupted, whichever comes first
int readyChannels = selector.select();
if (readyChannels == 0) {
continue;
}
if (!selector.isOpen()) {
System.out.println("selector is closed");
break;
}
/*
* select()阻塞到至少有一个通道在你注册的事件上就绪了。
* select(long timeout)和select()一样,除了最长会阻塞timeout毫秒(参数)。
* selectNow()不会阻塞,不管什么通道就绪都立刻返回(译者注:此方法执行非阻塞的选择操作。如果自从前一次选择操作后,没有通道变成可选择的,则此方法直接返回零。)。
* select()方法返回的int值表示有多少通道已经就绪。亦即,自上次调用select()方法后有多少通道变成就绪状态。
* 如果调用select()方法,因为有一个通道变成就绪状态,返回了1,若再次调用select()方法,如果另一个通道就绪了,它会再次返回1。
* 如果对第一个就绪的channel没有做任何操作,现在就有两个就绪的通道,但在每次select()方法调用之间,只有一个通道就绪了。
* 调用Selector.wakeup()方法即可。阻塞在select()方法上的线程会立马返回。
*/
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isValid()) {
doOnSelectionKey(key);
/*
* if (key.isAcceptable()) {
* // a connection was accepted by a ServerSocketChannel.
* } else if (key.isConnectable()) {
* // a connection was established with a remote server.
* } else if (key.isReadable()) {
* // a channel is ready for reading
* } else if (key.isWritable()) {
* // a channel is ready for writing
* }
*/
}
// 处理完该事件就移除该对象
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

protected abstract void doOnSelectionKey(SelectionKey key);

protected void closeSelector() {
try {
// 闭该Selector,且使注册到该Selector上的所有SelectionKey实例无效。通道本身并不会关闭。
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}


TcpClientChannelSelector.java

package com.demo.channel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;

public class TcpClientChannelSelector extends BaseSocketChannelSelector {

private static final String HOSTNAME = "127.0.0.1";
private static final int PORT = 5555;
private static final int BUFF_SIZE = 255;

private SocketChannel socketChannel;
private ByteBuffer readBuffer;
private ByteBuffer writeBuffer;
private boolean isConnected;

public TcpClientChannelSelector() throws IOException {
super();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress(HOSTNAME, PORT));
readBuffer = ByteBuffer.allocate(BUFF_SIZE);
writeBuffer = ByteBuffer.allocate(BUFF_SIZE);
isConnected = false;
registerChannel(socketChannel, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
}

public boolean isConnected() {
return isConnected;
}

@Override
public void doOnSelectionKey(SelectionKey key) {
if (key.isAcceptable()) {
System.out.println("client is accepted by server");
} else if (key.isConnectable()) {
SocketChannel channel = (SocketChannel) key.channel();
if (channel.isConnectionPending()) {
try {
/*
* 人为将程序中止,等待连接创建完成,否则会报如下错误
* java.nio.channels.NotYetConnectedException at sun.nio.ch.SocketChannelImpl.ensureWriteOpen(SocketChannelImpl.java:274)
*/
if (channel.finishConnect()) {
System.out.println("client connect server succ");
isConnected = true;
}
} catch (IOException e) {
e.printStackTrace();
}
}
} else if (key.isReadable()) {
try {
int len = socketChannel.read(readBuffer);
if (len == -1) {
System.out.println("client read : len=-1");
// 说明连接已经断开
close();
} else {
readBuffer.flip();
byte[] buffer = new byte[len];
readBuffer.get(buffer);
readBuffer.clear();
System.out.println("client read : len=" + len + ", str=" + new String(buffer));
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

public void write(String str) {
byte[] buffer = str.getBytes();
writeBuffer.put(buffer);
writeBuffer.flip();
try {
System.out.println("client write : len=" + buffer.length + ", str=" + str);
socketChannel.write(writeBuffer);
} catch (IOException e) {
e.printStackTrace();
}
writeBuffer.clear();
}

public void close() {
System.out.println("close client");
isConnected = false;
super.closeSelector();
try {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}


TcpServerChannelSelector.java

package com.demo.channel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;

public class TcpServerChannelSelector extends BaseSocketChannelSelector {

private static final String HOSTNAME = "127.0.0.1";
private static final int PORT = 5555;
private static final int BUFF_SIZE = 255;

private ServerSocketChannel serverSocketChannel;
private ArrayList<SocketChannel> socketChannelList;
private ByteBuffer readBuffer;
private ByteBuffer writeBuffer;

public TcpServerChannelSelector() throws IOException {
super();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(HOSTNAME, PORT));
socketChannelList = new ArrayList<SocketChannel>();
readBuffer = ByteBuffer.allocate(BUFF_SIZE);
writeBuffer = ByteBuffer.allocate(BUFF_SIZE);
/*
* 因此这里只能先accept,然后再read,否则会报如下错误
* Exception in thread "main" java.lang.IllegalArgumentException at java.nio.channels.spi.AbstractSelectableChannel.register(AbstractSelectableChannel.java:199)
*/
registerChannel(serverSocketChannel, SelectionKey.OP_ACCEPT);
}

@Override
protected void doOnSelectionKey(SelectionKey key) {
if (key.isAcceptable()) {
try {
SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();
socketChannelList.add(socketChannel);
System.out.println("server find client : " + socketChannel.getRemoteAddress());
registerChannel(socketChannel, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
} else if (key.isReadable()) {
try {
SocketChannel socketChannel = (SocketChannel) key.channel();
int len = socketChannel.read(readBuffer);
if (len == -1) {
System.out.println("client read : len=-1");
// 说明连接已经断开
socketChannel.close();
socketChannelList.remove(socketChannel);
} else {
readBuffer.flip();
byte[] buffer = new byte[len];
readBuffer.get(buffer, 0, len);
readBuffer.clear();
String str = new String(buffer);
System.out.println("server read : len=" + len + ", str=" + str);
write(str);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

public void write(String str) {
byte[] buffer = str.getBytes();
writeBuffer.put(buffer);
writeBuffer.flip();
for (SocketChannel channel : socketChannelList) {
try {
System.out.println("server write : len=" + buffer.length + ", str=" + str);
channel.write(writeBuffer);
} catch (IOException e) {
e.printStackTrace();
}
writeBuffer.rewind();
}
writeBuffer.clear();
}

public void close() {
System.out.println("close server");
super.closeSelector();
for (SocketChannel channel : socketChannelList) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
socketChannelList.clear();
try {
serverSocketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}


SelectDemo.java

package com.demo.channel;

import java.io.IOException;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SelectDemo {
private static TcpServerChannelSelector serverChannelSelector;
private static TcpClientChannelSelector clientChannelSelector;

public static void main(String[] args) {
int nThreads = 2;
try {
serverChannelSelector = new TcpServerChannelSelector();
clientChannelSelector = new TcpClientChannelSelector();
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
executorService.execute(new Runnable() {

@Override
public void run() {
serverChannelSelector.select();
}
});
executorService.execute(new Runnable() {

@Override
public void run() {
clientChannelSelector.select();
}
});
while (!clientChannelSelector.isConnected()) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 5; i++) {
String str = "(" + i + ")" + new Date();
clientChannelSelector.write(str);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
serverChannelSelector.close();
clientChannelSelector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}


执行结果



client connect server succ
server find client : /127.0.0.1:59651
client write : len=31, str=(0)Mon Dec 05 19:12:46 CST 2016
client write : len=31, str=(1)Mon Dec 05 19:12:46 CST 2016
server read : len=31, str=(0)Mon Dec 05 19:12:46 CST 2016
client write : len=31, str=(2)Mon Dec 05 19:12:46 CST 2016
server write : len=31, str=(0)Mon Dec 05 19:12:46 CST 2016
client write : len=31, str=(3)Mon Dec 05 19:12:46 CST 2016
server read : len=62, str=(1)Mon Dec 05 19:12:46 CST 2016(2)Mon Dec 05 19:12:46 CST 2016
server write : len=62, str=(1)Mon Dec 05 19:12:46 CST 2016(2)Mon Dec 05 19:12:46 CST 2016
client write : len=31, str=(4)Mon Dec 05 19:12:46 CST 2016
server read : len=31, str=(3)Mon Dec 05 19:12:46 CST 2016
server write : len=31, str=(3)Mon Dec 05 19:12:46 CST 2016
server read : len=31, str=(4)Mon Dec 05 19:12:46 CST 2016
server write : len=31, str=(4)Mon Dec 05 19:12:46 CST 2016
client read : len=155, str=(0)Mon Dec 05 19:12:46 CST 2016(1)Mon Dec 05 19:12:46 CST 2016(2)Mon Dec 05 19:12:46 CST 2016(3)Mon Dec 05 19:12:46 CST 2016(4)Mon Dec 05 19:12:46 CST 2016
close server
selector is closed
close client
Exception in thread "pool-1-thread-2" java.nio.channels.ClosedSelectorException
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:83)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:101)
at com.demo.channel.BaseSocketChannelSelector.select(BaseSocketChannelSelector.java:40)
at com.demo.channel.SelectDemo$2.run(SelectDemo.java:29)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


参考

http://ifeve.com/selectors/

http://blog.csdn.net/windsunmoon/article/details/45373457
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java selector