您的位置:首页 > 理论基础 > 计算机网络

Java NIO —— TCP套接字(ServerSocketChannel & SocketChannel)

2016-12-02 15:13 651 查看
package com.demo.test;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SocketChannelDemo {

private static final int PORT = 9999;
private static final int ACCEPT_INTERVAL = 300;
private static final int BUFF_SIZE = 255;
private static final int WAIT_CONNECT_INTERVAL = 300;

private static void openServerSocketChannel(boolean isBlocking) {
try {
// The new channel's socket is initially unbound; it must be bound to a specific address via one of its socket's bind methods before connections can be accepted.
final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
serverSocketChannel.configureBlocking(isBlocking);
while (true) {
final SocketChannel socketChannel = serverSocketChannel.accept();
// 在非阻塞模式下,accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是null
if (!isBlocking && socketChannel == null) {
System.out.println("serverSocketChannel.accept() = null");
try {
Thread.sleep(ACCEPT_INTERVAL);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
System.out.println("serverSocketChannel.accept() != null");
new Thread(new Runnable() {

@Override
public void run() {
// do something with socketChannel...
ByteBuffer readBuffer = ByteBuffer.allocate(BUFF_SIZE);
byte[] buffer = new byte[BUFF_SIZE];
int len = -1;
do {
try {
len = socketChannel.read(readBuffer);
} catch (IOException e) {
e.printStackTrace();
try {
socketChannel.close();
} catch (IOException e1) {
e1.printStackTrace();
}
break;
}
if (len != -1) {
//切换读写模式,socketChannel.read是向ByteBuffer里面写,而get是将数据从ByteBuffer里读出来
readBuffer.flip();
//get也会改变position
readBuffer.get(buffer, 0, len);
//rewind会将position的位置设置为0
readBuffer.rewind();
try {
socketChannel.write(readBuffer);
} catch (IOException e) {
e.printStackTrace();
}
//将当前指针移到数组首位,相当于清空数据
readBuffer.clear();
System.out.println("service recv : len=" + len + ", data=" + new String(buffer, 0, len));
}
} while (len != -1);
}
}).start();
break;
}
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
serverSocketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}

private static void openSocketChannel(boolean isBlocking) {
try {
final SocketChannel socketChannel = SocketChannel.open();
// true if a connection was established, false if this channel is in non-blocking mode and the connection operation is in progress
boolean isEstablished = socketChannel.connect(new InetSocketAddress("127.0.0.1", PORT));
System.out.println("socketChannel.connect() = " + isEstablished);
if (!isBlocking && !isEstablished) {
while (!socketChannel.finishConnect()) {
// wait, or do something else...
System.out.println("socketChannel.finishConnect() = false");
try {
Thread.sleep(WAIT_CONNECT_INTERVAL);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("socketChannel.finishConnect() = true");
}
new Thread(new Runnable() {

@Override
public void run() {
int len = -1;
do {
ByteBuffer readBuffer = ByteBuffer.allocate(BUFF_SIZE);
byte[] buffer = new byte[BUFF_SIZE];
// The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream
try {
len = socketChannel.read(readBuffer);
if (len != -1) {
readBuffer.flip();
readBuffer.get(buffer, 0, len);
readBuffer.compact();
System.out.println("client recv : len=" + len + ", data=" + new String(buffer, 0, len));
}
} catch (IOException e) {
e.printStackTrace();
break;
}
} while (len != -1);
}
}).start();
ByteBuffer writeBuffer = ByteBuffer.allocate(BUFF_SIZE);
for (int i = 0; i < 5; i++) {
String str = "(" + i + ")" + new Date().toString();
byte[] buffer = str.getBytes();
//将数组写入到ByteBuffer中
writeBuffer.put(buffer);
// 这里要交换读写模式
writeBuffer.flip();
//将数据从ByteBuffer中读出,写入到流中
socketChannel.write(writeBuffer);
// 压缩数据,即将数据向前移动已使用的长度, 如果没有这句会报 java.nio.BufferOverflowException
writeBuffer.compact();
System.out.println("client send : len=" + buffer.length + ", data=" + str);
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
int nThreads = 2;
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
final boolean isBlocking = false;
executorService.execute(new Runnable() {

@Override
public void run() {
openServerSocketChannel(isBlocking);
}
});
executorService.execute(new Runnable() {

@Override
public void run() {
openSocketChannel(isBlocking);
}
});
}
}


“isBlocking = true”的情况

serverSocketChannel.accept() != null
socketChannel.connect() = true
client send : len=31, data=(0)Mon Dec 05 09:25:48 CST 2016
client send : len=31, data=(1)Mon Dec 05 09:25:48 CST 2016
client send : len=31, data=(2)Mon Dec 05 09:25:48 CST 2016
client send : len=31, data=(3)Mon Dec 05 09:25:48 CST 2016
client send : len=31, data=(4)Mon Dec 05 09:25:48 CST 2016
service recv : len=31, data=(0)Mon Dec 05 09:25:48 CST 2016
service recv : len=124, data=(1)Mon Dec 05 09:25:48 CST 2016(2)Mon Dec 05 09:25:48 CST 2016(3)Mon Dec 05 09:25:48 CST 2016(4)Mon Dec 05 09:25:48 CST 2016
client recv : len=31, data=(0)Mon Dec 05 09:25:48 CST 2016
client recv : len=124, data=(1)Mon Dec 05 09:25:48 CST 2016(2)Mon Dec 05 09:25:48 CST 2016(3)Mon Dec 05 09:25:48 CST 2016(4)Mon Dec 05 09:25:48 CST 2016
java.nio.channels.AsynchronousCloseException
at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:407)
at com.demo.test.SocketChannelDemo$2.run(SocketChannelDemo.java:118)
at java.lang.Thread.run(Thread.java:745)


“isBlocking = false”的情况

socketChannel.connect() = true
serverSocketChannel.accept() != null
service recv : len=31, data=(0)Mon Dec 05 09:26:39 CST 2016
client recv : len=31, data=(0)Mon Dec 05 09:26:39 CST 2016
client send : len=31, data=(0)Mon Dec 05 09:26:39 CST 2016
client send : len=31, data=(1)Mon Dec 05 09:26:39 CST 2016
client recv : len=31, data=(1)Mon Dec 05 09:26:39 CST 2016
service recv : len=31, data=(1)Mon Dec 05 09:26:39 CST 2016
client send : len=31, data=(2)Mon Dec 05 09:26:39 CST 2016
client recv : len=31, data=(2)Mon Dec 05 09:26:39 CST 2016
service recv : len=31, data=(2)Mon Dec 05 09:26:39 CST 2016
client send : len=31, data=(3)Mon Dec 05 09:26:39 CST 2016
service recv : len=31, data=(3)Mon Dec 05 09:26:39 CST 2016
client recv : len=31, data=(3)Mon Dec 05 09:26:39 CST 2016
service recv : len=31, data=(4)Mon Dec 05 09:26:39 CST 2016
client recv : len=31, data=(4)Mon Dec 05 09:26:39 CST 2016
client send : len=31, data=(4)Mon Dec 05 09:26:39 CST 2016
java.nio.channels.AsynchronousCloseException
at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:407)
at com.demo.test.SocketChannelDemo$2.run(SocketChannelDemo.java:118)
at java.lang.Thread.run(Thread.java:745)


注意:并没有要求客户端和服务端必须同时阻塞或同时非阻塞,只是还有2种情况的打印这里省略了而已。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java channel