您的位置:首页 > 编程语言 > Java开发

Java 非阻塞式的高伸缩性IO通信 学习demo

2017-11-17 00:00 190 查看
本示例是通过Java实现的非阻塞式高性能IO通信的例子。通信的交互协议:

服务端给每个accept到的SocketChannel发送一条欢迎信息(Welcome to Log on!)

客户端连接在建立完成之后生成随机数,若生成的随机数是5,则向服务端发送关闭消息(good-bye.),之后关闭连接释放资源;若生成的不是5,则生成消息发送给服务端

服务端接收客户端发送的消息,判断是否为关闭消息(good-bye.),若是则关闭连接释放资源;若不是则在日志记录接收到的客户端信息,再将消息发送给客户端

客户端在接收到服务端返回的消息之后打印消息,且再次生成随机数,重复2,3步骤

消息的封装格式(这里涉及到了拆包和解包):

消息长度(int)+消息类容(为了简便只是String)

每条消息的总长度:4+消息长度

服务端代码

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.util.*;

/**
* Nio非阻塞服务端 + 消息的拆分包
* 通信协议:
* 1.客户端连接时候服务端发送 Welcome to Log on! 消息
* 2.消息格式:消息长度(int)+消息内容(String)
* 3.客户端主动终止连接(发送 good-bye.消息)
*/
public class NonBlockingEchoServer {

private static final Logger LOG = LoggerFactory.getLogger(NonBlockingEchoServer.class);

private static final SelectorProvider selectorProvider = SelectorProvider.provider();
private final Charset defaultCharset = Charset.forName("UTF-8");
private final CharsetDecoder decoder = defaultCharset.newDecoder();
private final CharsetEncoder encoder = defaultCharset.newEncoder();

private final Map<SocketChannel, List<ByteBuffer>> keepDataTrack = new HashMap<>();
private final String bindIp = "0.0.0.0";
private final int bindPort = 5555;

private final String strWelcomeMsg = "Welcome to Log on!";

public static void main(String[] args) {
new NonBlockingEchoServer().startEchoServer();
}

private void startEchoServer() {
// open Selector and ServerSocketChannel by calling the open() method
try (Selector selector = selectorProvider.openSelector();
ServerSocketChannel serverSocketChannel = selectorProvider.openServerSocketChannel()) {
// check that both of them were successfully opened
if (serverSocketChannel.isOpen() && selector.isOpen()) {
// configure non-blocking mode
serverSocketChannel.configureBlocking(false);
// set some options
serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 256 * 1024);
serverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
// bind the server socket channel to port
serverSocketChannel.bind(new InetSocketAddress(bindIp, bindPort));
// register the current channel with the given selector
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, serverSocketChannel);
// display a waiting message while ... waiting!
LOG.info("Waiting for connections ...");
this.handler(selector);
} else {
LOG.info("The server socket channel or selector cannot be opened!");
}
} catch (IOException ex) {
LOG.error("Creation Selector or ServerSocketChannel Error!", ex);
}
}

private void handler(Selector selector) throws IOException {
while (true) {
// wait for incoming events
selector.select();
// there is something to process on selected keys
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
// prevent the same key from coming up again
keys.remove();
if (!key.isValid()) {
Object attachment = key.attachment();
if (attachment instanceof ServerSocketChannel) {
LOG.error("服务端异常,退出服务.或者升级服务重新启动服务!");
System.exit(1);
}
if (attachment instanceof SocketChannel) {
LOG.warn("Client connect exception. remote address is {}",
((SocketChannel) attachment).getRemoteAddress());
continue;
}
}
if (key.isAcceptable()) {
this.handleAccept(key, selector);
} else if (key.isReadable()) {
this.handleRead(key);
} else if (key.isWritable()) {
this.handleWrite(key);
}
}
}
}

// isAcceptable returned true 服务端先写后读
private void handleAccept(SelectionKey key, Selector selector) throws IOException {
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverChannel.accept();
socketChannel.configureBlocking(false);
LOG.info("Incoming connection from: {}", socketChannel.getRemoteAddress());
// write a welcome message
ByteBuffer welcomeMsg = this.wrapWelcomeMsg();
socketChannel.write(welcomeMsg);
// register channel with selector for further I/O
List<ByteBuffer> list = new ArrayList<>(2);
list.add(ByteBuffer.allocate(128));//readBuffer
list.add(ByteBuffer.allocate(128));//writeBuffer
keepDataTrack.put(socketChannel, list);
socketChannel.register(selector, SelectionKey.OP_READ, socketChannel);
selector.wakeup();
}

private ByteBuffer wrapWelcomeMsg() throws CharacterCodingException {
ByteBuffer buffer = encoder.encode(CharBuffer.wrap(strWelcomeMsg));
ByteBuffer welcomeMsg = ByteBuffer.allocate(buffer.remaining() + 4);
welcomeMsg.putInt(buffer.remaining());
welcomeMsg.put(buffer);
welcomeMsg.flip();
return welcomeMsg;
}

// isReadable returned true
private void handleRead(SelectionKey key) {
SocketChannel socketChannel = (SocketChannel) key.channel();
try {
ByteBuffer buffer = keepDataTrack.get(socketChannel).get(0);
try {
socketChannel.read(buffer);
} catch (IOException e) {
LOG.warn("Cannot read error!", e);
socketChannel.close();
key.cancel();
return;
}
buffer.mark();//set mark = position
buffer.flip();
int msgLen = buffer.getInt();
if (buffer.remaining() >= msgLen) {//能够解析到一个完整的包
byte[] buf = new byte[msgLen];
buffer.get(buf);
String receive = decoder.decode(ByteBuffer.wrap(buf)).toString().trim();
if ("good-bye.".equals(receive)) {
this.keepDataTrack.remove(socketChannel);
LOG.warn("Connection closed by: {}", socketChannel.getRemoteAddress());
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
socketChannel.close();
key.cancel();
return;
} else {
LOG.debug("[Server] ({}) from {}", receive, socketChannel.getRemoteAddress());
// write back to client
if (buffer.hasRemaining()) {
buffer.compact();
} else {
buffer.clear();
}
doEchoJob(key, encoder.encode(CharBuffer.wrap(receive)));
}
} else {//解析不够一个完整的数据包
buffer.reset();//set position = mark
LOG.debug("The received data is less than one pack");
}
} catch (IOException e) {
LOG.warn("Read data from {} exception.", socketChannel, e);
if (socketChannel.isOpen()) {
try {
socketChannel.close();
} catch (IOException ex) {
//NOOP
}
}
}
}

// isWritable returned true
private void handleWrite(SelectionKey key) {
SocketChannel socketChannel = (SocketChannel) key.channel();
try {
ByteBuffer buffer = keepDataTrack.get(socketChannel).get(1);
try {
socketChannel.write(buffer);
} catch (IOException e) {
socketChannel.close();
key.cancel();
}
key.interestOps(SelectionKey.OP_READ);
if (buffer.hasRemaining()) {
buffer.compact();
} else {
buffer.clear();
}
} catch (IOException e) {
LOG.warn("Write data to {} exception.", socketChannel, e);
if (socketChannel.isOpen()) {
try {
socketChannel.close();
} catch (IOException ex) {
//NOOP
}
}
}
}

/**
* 将从客户端读入的数据重新包装之后发送给客户端
*
* @param key  用于关联输入和输出
* @param data 被输入处理后想要发送给输出的数据
*/
private void doEchoJob(SelectionKey key, ByteBuffer data) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = keepDataTrack.get(socketChannel).get(1);
buffer.putInt(data.remaining());
buffer.put(data);
buffer.flip();
key.interestOps(SelectionKey.OP_WRITE);
}
}

客户端代码

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.nio.channels.spi.SelectorProvider;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;

/**
* Nio非阻塞客户端+ 消息的拆分包
* 通信协议:
* 1.客户端连接时候服务端发送 Welcome to Log on! 消息
* 2.消息格式:消息长度(int)+消息内容(String)
* 3.客户端主动终止连接(发送 good-bye. 消息)
*/
public class NonBlockingEchoClient implements Runnable {

private static final Logger LOG = LoggerFactory.getLogger(NonBlockingEchoClient.class);
private static final SelectorProvider selectorProvider = SelectorProvider.provider();
private final Charset charset = Charset.forName("UTF-8");
private final CharsetDecoder decoder = charset.newDecoder();
private final CharsetEncoder encoder = charset.newEncoder();
private final ByteBuffer readBuffer = ByteBuffer.allocateDirect(128);
private final ByteBuffer writeBuffer = ByteBuffer.allocateDirect(128);

private final SocketChannel socketChannel;
private final Selector selector;

private final int DEFAULT_PORT = 5555;
private final String IP = "127.0.0.1";
private final ByteBuffer byeMsg = encoder.encode(CharBuffer.wrap("good-bye."));
private boolean close = false;

public NonBlockingEchoClient() throws IOException {
// open Selector and ServerSocketChannel by calling the open() method
this.selector = selectorProvider.openSelector();
this.socketChannel = selectorProvider.openSocketChannel();
// configure non-blocking mode
socketChannel.configureBlocking(false);
// set some options
socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 128);
socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, 128);
socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
// register the current channel with the given selector
InetSocketAddress address = new InetSocketAddress(IP, DEFAULT_PORT);
// connect to remote host
Boolean connected = Boolean.FALSE;
try {
connected = socketChannel.connect(address);
} catch (UnresolvedAddressException e) {
this.releaseResource();
throw new IOException("Can't resolve address: " + address, e);
} catch (IOException ex) {
this.releaseResource();
throw ex;
}
if (connected) {
this.socketChannel.register(this.selector, SelectionKey.OP_READ);
} else {
this.socketChannel.register(this.selector, SelectionKey.OP_CONNECT);
}
}

public static void main(String[] args) {
// org.apache.log4j.PropertyConfigurator.configure("./config/log4j.properties");
for (int i = 0; i < 10; i++) {
try {
new Thread(new NonBlockingEchoClient()).start();
} catch (IOException e) {
LOG.error("Instantiation Selector or SocketChannel Error!", e);
}
}
}

@Override
public void run() {
while (!close) {
try {
// waiting for the connection
if (this.selector.select() > 0) {// get keys
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> its = keys.iterator();
// process each key
while (its.hasNext()) {
SelectionKey key = its.next();
// remove the current key
its.remove();

if (key.isConnectable()) {
this.socketChannel.finishConnect();
//key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
key.interestOps(SelectionKey.OP_READ);
}
if (key.isWritable()) {
this.writeDateToChannel(key);
}
if (key.isReadable()) {
this.readDataFromChannel(key);
}
if (!key.isValid()) {
key.channel().close();
key.cancel();
}
}
}
} catch (IOException e) {
try {
this.releaseResource();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
try {
this.releaseResource();
} catch (IOException e) {
LOG.warn("Close connect exception.", e);
}
}

private void writeDateToChannel(SelectionKey key) {
SocketChannel keySocketChannel = (SocketChannel) key.channel();
try {
int r = ThreadLocalRandom.current().nextInt(10);
if (r == 5) {
LOG.debug("5 was generated! Close the socket channel!");
writeBuffer.putInt(byeMsg.remaining());
writeBuffer.put(byeMsg);
writeBuffer.flip();
keySocketChannel.write(writeBuffer);
this.close = true;
try {
Thread.sleep(200);
} catch (InterruptedException ex) {
//NOOP
}
writeBuffer.clear();
} else {
ByteBuffer tmp = encoder.encode(CharBuffer.wrap((Thread.currentThread().getName() + " [Client]Random number: ").concat(String.valueOf(r))));
writeBuffer.putInt(tmp.remaining());
writeBuffer.put(tmp);
writeBuffer.flip();
;
keySocketChannel.write(writeBuffer);
if (writeBuffer.hasRemaining()) {
writeBuffer.compact();
} else {
writeBuffer.clear();
}
key.interestOps(SelectionKey.OP_READ);
}
} catch (IOException e) {
LOG.warn("Write data to channel exception.", e);
}
}

private void readDataFromChannel(SelectionKey key) {
SocketChannel keySocketChannel = (SocketChannel) key.channel();
try {
keySocketChannel.read(readBuffer);
readBuffer.mark();
readBuffer.flip();
int msgLen = readBuffer.getInt();
if (readBuffer.remaining() >= msgLen) {
byte[] buf = new byte[msgLen];
readBuffer.get(buf);
String receive = decoder.decode(ByteBuffer.wrap(buf)).toString().trim();
LOG.debug("{}[[client]] {}", Thread.currentThread().getName(), receive);
if (readBuffer.hasRemaining()) {
readBuffer.compact();
} else {
readBuffer.clear();
}
key.interestOps(SelectionKey.OP_WRITE);
} else {
readBuffer.reset();
LOG.debug("The received data is less than one pack");
}
} catch (IOException e) {
LOG.warn("Read data from channel exception.", e);
}
}

/**
* 连接异常释放资源
*
* @throws IOException
*/
private void releaseResource() throws IOException {
this.socketChannel.shutdownOutput();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// noop
}
this.socketChannel.close();
this.selector.close();
this.close = false;
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息