Java NIO —— TCP套接字(ServerSocketChannel & SocketChannel)
2016-12-02 15:13
651 查看
package com.demo.test; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Date; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class SocketChannelDemo { private static final int PORT = 9999; private static final int ACCEPT_INTERVAL = 300; private static final int BUFF_SIZE = 255; private static final int WAIT_CONNECT_INTERVAL = 300; private static void openServerSocketChannel(boolean isBlocking) { try { // The new channel's socket is initially unbound; it must be bound to a specific address via one of its socket's bind methods before connections can be accepted. final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(PORT)); serverSocketChannel.configureBlocking(isBlocking); while (true) { final SocketChannel socketChannel = serverSocketChannel.accept(); // 在非阻塞模式下,accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是null if (!isBlocking && socketChannel == null) { System.out.println("serverSocketChannel.accept() = null"); try { Thread.sleep(ACCEPT_INTERVAL); } catch (InterruptedException e) { e.printStackTrace(); } continue; } System.out.println("serverSocketChannel.accept() != null"); new Thread(new Runnable() { @Override public void run() { // do something with socketChannel... ByteBuffer readBuffer = ByteBuffer.allocate(BUFF_SIZE); byte[] buffer = new byte[BUFF_SIZE]; int len = -1; do { try { len = socketChannel.read(readBuffer); } catch (IOException e) { e.printStackTrace(); try { socketChannel.close(); } catch (IOException e1) { e1.printStackTrace(); } break; } if (len != -1) { //切换读写模式,socketChannel.read是向ByteBuffer里面写,而get是将数据从ByteBuffer里读出来 readBuffer.flip(); //get也会改变position readBuffer.get(buffer, 0, len); //rewind会将position的位置设置为0 readBuffer.rewind(); try { socketChannel.write(readBuffer); } catch (IOException e) { e.printStackTrace(); } //将当前指针移到数组首位,相当于清空数据 readBuffer.clear(); System.out.println("service recv : len=" + len + ", data=" + new String(buffer, 0, len)); } } while (len != -1); } }).start(); break; } try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } serverSocketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } private static void openSocketChannel(boolean isBlocking) { try { final SocketChannel socketChannel = SocketChannel.open(); // true if a connection was established, false if this channel is in non-blocking mode and the connection operation is in progress boolean isEstablished = socketChannel.connect(new InetSocketAddress("127.0.0.1", PORT)); System.out.println("socketChannel.connect() = " + isEstablished); if (!isBlocking && !isEstablished) { while (!socketChannel.finishConnect()) { // wait, or do something else... System.out.println("socketChannel.finishConnect() = false"); try { Thread.sleep(WAIT_CONNECT_INTERVAL); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("socketChannel.finishConnect() = true"); } new Thread(new Runnable() { @Override public void run() { int len = -1; do { ByteBuffer readBuffer = ByteBuffer.allocate(BUFF_SIZE); byte[] buffer = new byte[BUFF_SIZE]; // The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream try { len = socketChannel.read(readBuffer); if (len != -1) { readBuffer.flip(); readBuffer.get(buffer, 0, len); readBuffer.compact(); System.out.println("client recv : len=" + len + ", data=" + new String(buffer, 0, len)); } } catch (IOException e) { e.printStackTrace(); break; } } while (len != -1); } }).start(); ByteBuffer writeBuffer = ByteBuffer.allocate(BUFF_SIZE); for (int i = 0; i < 5; i++) { String str = "(" + i + ")" + new Date().toString(); byte[] buffer = str.getBytes(); //将数组写入到ByteBuffer中 writeBuffer.put(buffer); // 这里要交换读写模式 writeBuffer.flip(); //将数据从ByteBuffer中读出,写入到流中 socketChannel.write(writeBuffer); // 压缩数据,即将数据向前移动已使用的长度, 如果没有这句会报 java.nio.BufferOverflowException writeBuffer.compact(); System.out.println("client send : len=" + buffer.length + ", data=" + str); } try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { int nThreads = 2; ExecutorService executorService = Executors.newFixedThreadPool(nThreads); final boolean isBlocking = false; executorService.execute(new Runnable() { @Override public void run() { openServerSocketChannel(isBlocking); } }); executorService.execute(new Runnable() { @Override public void run() { openSocketChannel(isBlocking); } }); } }
“isBlocking = true”的情况
serverSocketChannel.accept() != null socketChannel.connect() = true client send : len=31, data=(0)Mon Dec 05 09:25:48 CST 2016 client send : len=31, data=(1)Mon Dec 05 09:25:48 CST 2016 client send : len=31, data=(2)Mon Dec 05 09:25:48 CST 2016 client send : len=31, data=(3)Mon Dec 05 09:25:48 CST 2016 client send : len=31, data=(4)Mon Dec 05 09:25:48 CST 2016 service recv : len=31, data=(0)Mon Dec 05 09:25:48 CST 2016 service recv : len=124, data=(1)Mon Dec 05 09:25:48 CST 2016(2)Mon Dec 05 09:25:48 CST 2016(3)Mon Dec 05 09:25:48 CST 2016(4)Mon Dec 05 09:25:48 CST 2016 client recv : len=31, data=(0)Mon Dec 05 09:25:48 CST 2016 client recv : len=124, data=(1)Mon Dec 05 09:25:48 CST 2016(2)Mon Dec 05 09:25:48 CST 2016(3)Mon Dec 05 09:25:48 CST 2016(4)Mon Dec 05 09:25:48 CST 2016 java.nio.channels.AsynchronousCloseException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:407) at com.demo.test.SocketChannelDemo$2.run(SocketChannelDemo.java:118) at java.lang.Thread.run(Thread.java:745)
“isBlocking = false”的情况
socketChannel.connect() = true serverSocketChannel.accept() != null service recv : len=31, data=(0)Mon Dec 05 09:26:39 CST 2016 client recv : len=31, data=(0)Mon Dec 05 09:26:39 CST 2016 client send : len=31, data=(0)Mon Dec 05 09:26:39 CST 2016 client send : len=31, data=(1)Mon Dec 05 09:26:39 CST 2016 client recv : len=31, data=(1)Mon Dec 05 09:26:39 CST 2016 service recv : len=31, data=(1)Mon Dec 05 09:26:39 CST 2016 client send : len=31, data=(2)Mon Dec 05 09:26:39 CST 2016 client recv : len=31, data=(2)Mon Dec 05 09:26:39 CST 2016 service recv : len=31, data=(2)Mon Dec 05 09:26:39 CST 2016 client send : len=31, data=(3)Mon Dec 05 09:26:39 CST 2016 service recv : len=31, data=(3)Mon Dec 05 09:26:39 CST 2016 client recv : len=31, data=(3)Mon Dec 05 09:26:39 CST 2016 service recv : len=31, data=(4)Mon Dec 05 09:26:39 CST 2016 client recv : len=31, data=(4)Mon Dec 05 09:26:39 CST 2016 client send : len=31, data=(4)Mon Dec 05 09:26:39 CST 2016 java.nio.channels.AsynchronousCloseException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:407) at com.demo.test.SocketChannelDemo$2.run(SocketChannelDemo.java:118) at java.lang.Thread.run(Thread.java:745)
注意:并没有要求客户端和服务端必须同时阻塞或同时非阻塞,只是还有2种情况的打印这里省略了而已。
相关文章推荐
- 10. Java NIO ServerSocketChannel 服务端套接字通道
- Java Nio 十、Java NIO ServerSocketChannel
- Java NIO 之 ServerSocketChannel SocketChannel
- Java Socket:Java-NIO-ServerSocketChannel
- java NIO之ServerSocketChannel的使用
- 【JAVA】【NIO】10、Java NIO ServerSocketChannel
- [java]NIO服务器(ServerSocketChannel)开发的两种实现方式
- 【Java NIO的深入研究】 ServerSocketChannel
- Java NIO(10-ServerSocketChannel)
- 【Java NIO的深入研究】 ServerSocketChannel
- 【Java NIO的深入研究】 ServerSocketChannel
- Java NIO ServerSocketChannel
- Java netty之NioServerSocketChannel
- Java NIO ServerSocketChannel
- 【Java NIO的深入研究】 ServerSocketChannel
- Java NIO ServerSocketChannel
- Java NIO 学习(四)--ServerSocketChannel与SocketChannel
- 《Java 源码分析》:Java NIO 之 ServerSocketChannel
- Java基础知识强化之IO流笔记80:NIO之 ServerSocketChannel
- 【Java NIO的深入研究】 ServerSocketChannel