Java NIO —— Selector(用于SocketChannel)
2016-12-02 17:15
441 查看
使用Selector的原因
Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。仅用单个线程来处理多个Channels的好处是,只需要更少的线程来处理通道。事实上,可以只用一个线程处理所有的通道。对于操作系统来说,线程之间上下文切换的开销很大,而且每个线程都要占用系统的一些资源(如内存)。因此,使用的线程越少越好。
但是,需要记住,现代的操作系统和CPU在多任务方面表现的越来越好,所以多线程的开销随着时间的推移,变得越来越小了。实际上,如果一个CPU有多个内核,不使用多任务可能是在浪费CPU能力。不管怎么说,关于那种设计的讨论应该放在另一篇不同的文章中。在这里,只要知道使用Selector能够处理多个通道就足够了。
下面是单线程使用一个Selector处理3个channel的示例图:
范例
说明一下,实际上,对数据的处理应该放在一个专门的线程中来执行,但是我为了使代码更简洁,看起来更容易理解,因此并没有这么做!!!BaseSocketChannelSelector.java
package com.demo.channel; import java.io.IOException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; import java.util.Set; public abstract class BaseSocketChannelSelector { private Selector selector; protected BaseSocketChannelSelector() throws IOException { selector = Selector.open(); } protected void registerChannel(SelectableChannel channel, int ops) { try { // 与Selector一起使用时,Channel必须处于非阻塞模式下。这意味着不能将FileChannel与Selector一起使用,因为FileChannel不能切换到非阻塞模式。而套接字通道都可以。 channel.configureBlocking(false); // 可以监听以下四种不同类型的事件 SelectionKey.OP_ACCEPT(server) | SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE SelectionKey key = channel.register(selector, ops); } catch (IOException e) { e.printStackTrace(); } } public void select() { while (true) { try { /* * 每次使用select时必须判断下selector是否已经被关闭,否则会报如下错误 * Exception in thread "pool-1-thread-1" java.nio.channels.ClosedSelectorException at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:83) */ if (!selector.isOpen()) { System.out.println("selector is closed"); break; } // This method performs a blocking selection operation.It returns only after at least one channel is selected, this selector's wakeup method is invoked, or the current thread is interrupted, whichever comes first int readyChannels = selector.select(); if (readyChannels == 0) { continue; } if (!selector.isOpen()) { System.out.println("selector is closed"); break; } /* * select()阻塞到至少有一个通道在你注册的事件上就绪了。 * select(long timeout)和select()一样,除了最长会阻塞timeout毫秒(参数)。 * selectNow()不会阻塞,不管什么通道就绪都立刻返回(译者注:此方法执行非阻塞的选择操作。如果自从前一次选择操作后,没有通道变成可选择的,则此方法直接返回零。)。 * select()方法返回的int值表示有多少通道已经就绪。亦即,自上次调用select()方法后有多少通道变成就绪状态。 * 如果调用select()方法,因为有一个通道变成就绪状态,返回了1,若再次调用select()方法,如果另一个通道就绪了,它会再次返回1。 * 如果对第一个就绪的channel没有做任何操作,现在就有两个就绪的通道,但在每次select()方法调用之间,只有一个通道就绪了。 * 调用Selector.wakeup()方法即可。阻塞在select()方法上的线程会立马返回。 */ Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectedKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isValid()) { doOnSelectionKey(key); /* * if (key.isAcceptable()) { * // a connection was accepted by a ServerSocketChannel. * } else if (key.isConnectable()) { * // a connection was established with a remote server. * } else if (key.isReadable()) { * // a channel is ready for reading * } else if (key.isWritable()) { * // a channel is ready for writing * } */ } // 处理完该事件就移除该对象 iterator.remove(); } } catch (IOException e) { e.printStackTrace(); } } } protected abstract void doOnSelectionKey(SelectionKey key); protected void closeSelector() { try { // 闭该Selector,且使注册到该Selector上的所有SelectionKey实例无效。通道本身并不会关闭。 selector.close(); } catch (IOException e) { e.printStackTrace(); } } }
TcpClientChannelSelector.java
package com.demo.channel; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; public class TcpClientChannelSelector extends BaseSocketChannelSelector { private static final String HOSTNAME = "127.0.0.1"; private static final int PORT = 5555; private static final int BUFF_SIZE = 255; private SocketChannel socketChannel; private ByteBuffer readBuffer; private ByteBuffer writeBuffer; private boolean isConnected; public TcpClientChannelSelector() throws IOException { super(); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.connect(new InetSocketAddress(HOSTNAME, PORT)); readBuffer = ByteBuffer.allocate(BUFF_SIZE); writeBuffer = ByteBuffer.allocate(BUFF_SIZE); isConnected = false; registerChannel(socketChannel, SelectionKey.OP_CONNECT | SelectionKey.OP_READ); } public boolean isConnected() { return isConnected; } @Override public void doOnSelectionKey(SelectionKey key) { if (key.isAcceptable()) { System.out.println("client is accepted by server"); } else if (key.isConnectable()) { SocketChannel channel = (SocketChannel) key.channel(); if (channel.isConnectionPending()) { try { /* * 人为将程序中止,等待连接创建完成,否则会报如下错误 * java.nio.channels.NotYetConnectedException at sun.nio.ch.SocketChannelImpl.ensureWriteOpen(SocketChannelImpl.java:274) */ if (channel.finishConnect()) { System.out.println("client connect server succ"); isConnected = true; } } catch (IOException e) { e.printStackTrace(); } } } else if (key.isReadable()) { try { int len = socketChannel.read(readBuffer); if (len == -1) { System.out.println("client read : len=-1"); // 说明连接已经断开 close(); } else { readBuffer.flip(); byte[] buffer = new byte[len]; readBuffer.get(buffer); readBuffer.clear(); System.out.println("client read : len=" + len + ", str=" + new String(buffer)); } } catch (IOException e) { e.printStackTrace(); } } } public void write(String str) { byte[] buffer = str.getBytes(); writeBuffer.put(buffer); writeBuffer.flip(); try { System.out.println("client write : len=" + buffer.length + ", str=" + str); socketChannel.write(writeBuffer); } catch (IOException e) { e.printStackTrace(); } writeBuffer.clear(); } public void close() { System.out.println("close client"); isConnected = false; super.closeSelector(); try { socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } }
TcpServerChannelSelector.java
package com.demo.channel; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.ArrayList; public class TcpServerChannelSelector extends BaseSocketChannelSelector { private static final String HOSTNAME = "127.0.0.1"; private static final int PORT = 5555; private static final int BUFF_SIZE = 255; private ServerSocketChannel serverSocketChannel; private ArrayList<SocketChannel> socketChannelList; private ByteBuffer readBuffer; private ByteBuffer writeBuffer; public TcpServerChannelSelector() throws IOException { super(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress(HOSTNAME, PORT)); socketChannelList = new ArrayList<SocketChannel>(); readBuffer = ByteBuffer.allocate(BUFF_SIZE); writeBuffer = ByteBuffer.allocate(BUFF_SIZE); /* * 因此这里只能先accept,然后再read,否则会报如下错误 * Exception in thread "main" java.lang.IllegalArgumentException at java.nio.channels.spi.AbstractSelectableChannel.register(AbstractSelectableChannel.java:199) */ registerChannel(serverSocketChannel, SelectionKey.OP_ACCEPT); } @Override protected void doOnSelectionKey(SelectionKey key) { if (key.isAcceptable()) { try { SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept(); socketChannelList.add(socketChannel); System.out.println("server find client : " + socketChannel.getRemoteAddress()); registerChannel(socketChannel, SelectionKey.OP_READ); } catch (IOException e) { e.printStackTrace(); } } else if (key.isReadable()) { try { SocketChannel socketChannel = (SocketChannel) key.channel(); int len = socketChannel.read(readBuffer); if (len == -1) { System.out.println("client read : len=-1"); // 说明连接已经断开 socketChannel.close(); socketChannelList.remove(socketChannel); } else { readBuffer.flip(); byte[] buffer = new byte[len]; readBuffer.get(buffer, 0, len); readBuffer.clear(); String str = new String(buffer); System.out.println("server read : len=" + len + ", str=" + str); write(str); } } catch (IOException e) { e.printStackTrace(); } } } public void write(String str) { byte[] buffer = str.getBytes(); writeBuffer.put(buffer); writeBuffer.flip(); for (SocketChannel channel : socketChannelList) { try { System.out.println("server write : len=" + buffer.length + ", str=" + str); channel.write(writeBuffer); } catch (IOException e) { e.printStackTrace(); } writeBuffer.rewind(); } writeBuffer.clear(); } public void close() { System.out.println("close server"); super.closeSelector(); for (SocketChannel channel : socketChannelList) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } socketChannelList.clear(); try { serverSocketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } }
SelectDemo.java
package com.demo.channel; import java.io.IOException; import java.util.Date; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class SelectDemo { private static TcpServerChannelSelector serverChannelSelector; private static TcpClientChannelSelector clientChannelSelector; public static void main(String[] args) { int nThreads = 2; try { serverChannelSelector = new TcpServerChannelSelector(); clientChannelSelector = new TcpClientChannelSelector(); ExecutorService executorService = Executors.newFixedThreadPool(nThreads); executorService.execute(new Runnable() { @Override public void run() { serverChannelSelector.select(); } }); executorService.execute(new Runnable() { @Override public void run() { clientChannelSelector.select(); } }); while (!clientChannelSelector.isConnected()) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } for (int i = 0; i < 5; i++) { String str = "(" + i + ")" + new Date(); clientChannelSelector.write(str); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } serverChannelSelector.close(); clientChannelSelector.close(); } catch (IOException e) { e.printStackTrace(); } } }
执行结果
client connect server succ server find client : /127.0.0.1:59651 client write : len=31, str=(0)Mon Dec 05 19:12:46 CST 2016 client write : len=31, str=(1)Mon Dec 05 19:12:46 CST 2016 server read : len=31, str=(0)Mon Dec 05 19:12:46 CST 2016 client write : len=31, str=(2)Mon Dec 05 19:12:46 CST 2016 server write : len=31, str=(0)Mon Dec 05 19:12:46 CST 2016 client write : len=31, str=(3)Mon Dec 05 19:12:46 CST 2016 server read : len=62, str=(1)Mon Dec 05 19:12:46 CST 2016(2)Mon Dec 05 19:12:46 CST 2016 server write : len=62, str=(1)Mon Dec 05 19:12:46 CST 2016(2)Mon Dec 05 19:12:46 CST 2016 client write : len=31, str=(4)Mon Dec 05 19:12:46 CST 2016 server read : len=31, str=(3)Mon Dec 05 19:12:46 CST 2016 server write : len=31, str=(3)Mon Dec 05 19:12:46 CST 2016 server read : len=31, str=(4)Mon Dec 05 19:12:46 CST 2016 server write : len=31, str=(4)Mon Dec 05 19:12:46 CST 2016 client read : len=155, str=(0)Mon Dec 05 19:12:46 CST 2016(1)Mon Dec 05 19:12:46 CST 2016(2)Mon Dec 05 19:12:46 CST 2016(3)Mon Dec 05 19:12:46 CST 2016(4)Mon Dec 05 19:12:46 CST 2016 close server selector is closed close client Exception in thread "pool-1-thread-2" java.nio.channels.ClosedSelectorException at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:83) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:101) at com.demo.channel.BaseSocketChannelSelector.select(BaseSocketChannelSelector.java:40) at com.demo.channel.SelectDemo$2.run(SelectDemo.java:29) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
参考
http://ifeve.com/selectors/http://blog.csdn.net/windsunmoon/article/details/45373457
相关文章推荐
- Java NIO SocketChannel+Buffer+Selector 详解(含多人聊天室实例)
- NIO理解通道 selector SeverSocketChannel serverSocket selectionKey
- java NIO之ServerSocketChannel的使用
- Java NIO ServerSocketChannel
- [java]NIO服务器(ServerSocketChannel)开发的两种实现方式
- Java NIO ServerSocketChannel
- java nio socketChannel read
- 【JAVA】【NIO】9、Java NIO SocketChannel
- Java IO:SocketChannel和Selector在ZooKeeper中应用
- java_nio基础篇之Channel、Selector、Path、Paths、Pipe、Files
- Java Nio 十、Java NIO ServerSocketChannel
- Java Socket:Java-NIO-ServerSocketChannel
- Java NIO ServerSocketChannel
- Java Nio 九、Java NIO SocketChannel
- java netty之NioSocketChannel
- Java NIO(10-ServerSocketChannel)
- Java NIO SocketChannel客户端例子(支持连接失败后自动重连)
- Java NIO SocketChannel
- Java NIO Socket Channel
- Java NIO SocketChannel