您的位置:首页 > 理论基础 > 计算机网络

Netty网络编程二[BIO,线程池BIO,NIO,AIO](代码案例)

2018-03-19 17:56 579 查看

BIO通信

通常由一个独立的Acceptor线程负责监听客户端的连接,收到客户连接请求之后为每个客户端创建一个新的线程进行链路处理,处理完成通过输出流返应答给客户端缺点:却反弹性伸缩能力,当客户端并发量增加,服务端的线程个数和客户端并发访问呈1:1的正比关系,由于java线程是非常宝贵的,膨胀之后性能急剧下降,随着并发访问增大,系统会发生线程堆栈溢出,创建线程失败等问题,不能正常服务了TimeClient
public class TimeClient {

/**
* @param args
*/
public static void main(String[] args) {

int port = 8080;
if (args != null && args.length > 0) {

try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}

}
Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try {
socket = new Socket("127.0.0.1", port);
in = new BufferedReader(new InputStreamReader(
socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
out.println("QUERY TIME ORDER");
System.out.println("Send order 2 server succeed.");
String resp = in.readLine();
System.out.println("Now is : " + resp);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (out != null) {
out.close();
out = null;
}

if (in != null) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
in = null;
}

if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
socket = null;
}
}
}
}
TimeServer
public class TimeServer {

/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
int port = 8080;
if (args != null && args.length > 0) {

try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}

}
ServerSocket server = null;
try {
server = new ServerSocket(port);
System.out.println("The time server is start in port : " + port);
Socket socket = null;
while (true) {
socket = server.accept();
new Thread(new TimeServerHandler(socket)).start();
}
} finally {
if (server != null) {
System.out.println("The time server close");
server.close();
server = null;
}
}
}
}
TimeServerHandler
public class TimeServerHandler implements Runnable {

private Socket socket;

public TimeServerHandler(Socket socket) {
this.socket = socket;
}

/*
* (non-Javadoc)
*
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try {
in = new BufferedReader(new InputStreamReader(
this.socket.getInputStream()));
out = new PrintWriter(this.socket.getOutputStream(), true);
String currentTime = null;
String body = null;
while (true) {
body = in.readLine();
if (body == null)
break;
System.out.println("The time server receive order : " + body);
currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
System.currentTimeMillis()).toString() : "BAD ORDER";
out.println(currentTime);
}

} catch (Exception e) {
if (in != null) {
try {
in.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
if (out != null) {
out.close();
out = null;
}
if (this.socket != null) {
try {
this.socket.close();
} catch (IOException e1) {
e1.printStackTrace();
}
this.socket = null;
}
}
}
}
  

线程池BIO

采用线程池和任务队列可以实现伪异步IO通信框架当新客户端接入时,将客户端的socket封装成一个Task(该任务实现Runnable接口)投递到线程池处理,线程池维护一个队列,资源可控,无论多少个客户端并发都不会造成耗尽和宕机弊端:服务端处理缓慢,返回应答消息耗时60s,平均只需要10ms
采用线程池IO的线程正在读取故障服务节点的响应,由于读取输入流是阻塞的,他会将同步阻塞60s
加入所有可用线程都阻塞住了,后续的IO都将在队列中排队
由于线程池采用阻塞队列实现,当队列积满之后,后续入队列将阻塞
由于前端只有一个accptor线程接受客户端接入,它如果被阻塞在线程池的同步阻塞队列之后,新的请求将被拒绝
由于几乎所有的连接都超时,故认为系统已经崩溃了
TimeServerHandlerExecutePool
public class TimeServerHandlerExecutePool {

private ExecutorService executor;

public TimeServerHandlerExecutePool(int maxPoolSize, int queueSize) {
executor = new ThreadPoolExecutor(Runtime.getRuntime()
.availableProcessors(), maxPoolSize, 120L, TimeUnit.SECONDS,
new ArrayBlockingQueue<java.lang.Runnable>(queueSize));
}

public void execute(java.lang.Runnable task) {
executor.execute(task);
}
}
TimeServer
public class TimeServer {

/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
int port = 8080;
if (args != null && args.length > 0) {

try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}

}
ServerSocket server = null;
try {
server = new ServerSocket(port);
System.out.println("The time server is start in port : " + port);
Socket socket = null;
TimeServerHandlerExecutePool singleExecutor = new TimeServerHandlerExecutePool(
50, 10000);// 创建IO任务线程池
while (true) {
socket = server.accept();
singleExecutor.execute(new TimeServerHandler(socket));
}
} finally {
if (server != null) {
System.out.println("The time server close");
server.close();
server = null;
}
}
}
}
 

NIO

New IO目标让Java支持非阻塞IO,Non-block I/OSocket->SocketChannelServerSocket->ServerSocketChannelNIO于JDK1.4引入的,弥补了原来同步阻塞IO的不足,在标准Java代码中提供了高速,面向块的IO

缓冲区Buffer

Buffer是一个对象,包含写入或者读出的数据,缓冲区实际上是个数组,通常是一个字节数组,可以使用其他种类的数组,但是一个缓冲区不仅仅是个数组,缓冲区提供了对数据的结构化访问以及维护读写位置等信息ByteBuffer:字节缓冲区
CharBuffer:字符缓冲区
ShortBuffer:短整型缓冲区
IntBuffer:整形缓冲区
LongBuffer:长整形缓冲区
FloatBuffer:浮点型缓冲区
DoubleBuffer:双精度浮点型缓冲区
 

通道Channel

Channel是一个通道,类似自来水管,网络数据通过读取和写入,与流不同之处在于通道是双向,流只是在一个方向,通道可以进行读写或者二者同时进行

  

多路复用器Selector

Selector会不断地轮询注册在其上的channel,如果某个channel上面发生读或者写时间,这个Channel就处于就绪状态,会被selector轮询出来,然后通过SelectionKey可以获取就绪的Channel的集合,然后进行后续的IO操作由于JDK提供了epoll来替代传统的select实现,所以它并没有最大连接句柄1024、2048的限制,也就是意味着需要一个线程负责selector的轮询,就可以接入成千上万的客户端打开ServerSocketChannel
绑定监听地址InetSocketAddress
创建Selector,启动线程
将ServerSocketChannel注册到Selector,监听
Selector轮询就绪的key
handleAccept()处置新的客户端接入
设置新建客户端连接的Socket参数
向Selector注册监听读操作SelectionKey.OP_READ
handleRead()异步读请求消息到ByteBuffer
decode请求信息
异步写ByteBuffer到SocketChannel
 TimeServer
public class TimeServer {

/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
}
}
MultiplexerTimeServer
public class MultiplexerTimeServer implements Runnable {

private Selector selector;

private ServerSocketChannel servChannel;

private volatile boolean stop;

/**
* 初始化多路复用器、绑定监听端口
*
* @param port
*/
public MultiplexerTimeServer(int port) {
try {
selector = Selector.open();
servChannel = ServerSocketChannel.open();
servChannel.configureBlocking(false);
servChannel.socket().bind(new InetSocketAddress(port), 1024);
servChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("The time server is start in port : " + port);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}

public void stop() {
this.stop = true;
}

/*
* (non-Javadoc)
*
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null)
key.channel().close();
}
}
}
} catch (Throwable t) {
t.printStackTrace();
}
}

// 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
if (selector != null)
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}

private void handleInput(SelectionKey key) throws IOException {

if (key.isValid()) {
// 处理新接入的请求消息
if (key.isAcceptable()) {
// Accept the new connection
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// Add the new connection to the selector
sc.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
// Read the data
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("The time server receive order : "
+ body);
String currentTime = "QUERY TIME ORDER"
.equalsIgnoreCase(body) ? new java.util.Date(
System.currentTimeMillis()).toString()
: "BAD ORDER";
doWrite(sc, currentTime);
} else if (readBytes < 0) {
// 对端链路关闭
key.cancel();
sc.close();
} else
; // 读到0字节,忽略
}
}
}

private void doWrite(SocketChannel channel, String response)
throws IOException {
if (response != null && response.trim().length() > 0) {
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
}
}
}
 TimeClient与之前不同之处在于TimeClientHandler线程来处理异步连接和读写操作
public class TimeClient {

/**
* @param args
*/
public static void main(String[] args) {

int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
new Thread(new TimeClientHandle("127.0.0.1", port), "TimeClient-001")
.start();
}
}
TimeClientHandle
public class TimeClientHandle implements Runnable {

private String host;
private int port;

private Selector selector;
private SocketChannel socketChannel;

private volatile boolean stop;

public TimeClientHandle(String host, int port) {
this.host = host == null ? "127.0.0.1" : host;
this.port = port;
try {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}

/*
* (non-Javadoc)
*
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
try {
doConnect();
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null)
key.channel().close();
}
}
}
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}

// 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
if (selector != null)
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}

}

private void handleInput(SelectionKey key) throws IOException {

if (key.isValid()) {
// 判断是否连接成功
SocketChannel sc = (SocketChannel) key.channel();
if (key.isConnectable()) {
if (sc.finishConnect()) {
sc.register(selector, SelectionKey.OP_READ);
doWrite(sc);
} else
System.exit(1);// 连接失败,进程退出
}
if (key.isReadable()) {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("Now is : " + body);
this.stop = true;
} else if (readBytes < 0) {
// 对端链路关闭
key.cancel();
sc.close();
} else
; // 读到0字节,忽略
}
}

}

private void doConnect() throws IOException {
// 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答
if (socketChannel.connect(new InetSocketAddress(host, port))) {
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
} else
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}

private void doWrite(SocketChannel sc) throws IOException {
byte[] req = "QUERY TIME ORDER".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
sc.write(writeBuffer);
if (!writeBuffer.hasRemaining())
System.out.println("Send order 2 server succeed.");
}

}
  

AIO异步编程

NIO2.0异步套接字通道是真正的异步非阻塞IO,对应Unix网络编程中的事件驱动IO(AIO)。它不需要通过多路复用器(Selector)对注册的通道进行轮询操作即可实现异步读写异步通道有以下两种方式获取结果通过java.util.concurrent.Future类来表示异步操作的结果
在执行异步操作的时候传入一个java.nio.channels
 TimeServer
public class TimeServer {

/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
AsyncTimeServerHandler timeServer = new AsyncTimeServerHandler(port);
new Thread(timeServer, "AIO-AsyncTimeServerHandler-001").start();
}
}
 TimeClient
public class TimeClient {

/**
* @param args
*/
public static void main(String[] args) {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}

}
new Thread(new AsyncTimeClientHandler("127.0.0.1", port),
"AIO-AsyncTimeClientHandler-001").start();

}
}
 AcceptCompletionHandler
public class AcceptCompletionHandler implements
CompletionHandler<AsynchronousSocketChannel, AsyncTimeServerHandler> {

@Override
public void completed(AsynchronousSocketChannel result,
AsyncTimeServerHandler attachment) {
attachment.asynchronousServerSocketChannel.accept(attachment, this);
ByteBuffer buffer = ByteBuffer.allocate(1024);
result.read(buffer, buffer, new ReadCompletionHandler(result));
}

@Override
public void failed(Throwable exc, AsyncTimeServerHandler attachment) {
exc.printStackTrace();
attachment.latch.countDown();
}

}
 AsyncTimeClientHandler
public class AsyncTimeClientHandler implements
CompletionHandler<Void, AsyncTimeClientHandler>, Runnable {

private AsynchronousSocketChannel client;
private String host;
private int port;
private CountDownLatch latch;

public AsyncTimeClientHandler(String host, int port) {
this.host = host;
this.port = port;
try {
client = AsynchronousSocketChannel.open();
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void run() {

latch = new CountDownLatch(1);
client.connect(new InetSocketAddress(host, port), this, this);
try {
latch.await();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void completed(Void result, AsyncTimeClientHandler attachment) {
byte[] req = "QUERY TIME ORDER".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
client.write(writeBuffer, writeBuffer,
new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (buffer.hasRemaining()) {
client.write(buffer, buffer, this);
} else {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
client.read(
readBuffer,
readBuffer,
new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result,
ByteBuffer buffer) {
buffer.flip();
byte[] bytes = new byte[buffer
.remaining()];
buffer.get(bytes);
String body;
try {
body = new String(bytes,
"UTF-8");
System.out.println("Now is : "
+ body);
latch.countDown();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}

@Override
public void failed(Throwable exc,
ByteBuffer attachment) {
try {
client.close();
latch.countDown();
} catch (IOException e) {
// ingnore on close
}
}
});
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
client.close();
latch.countDown();
} catch (IOException e) {
// ingnore on close
}
}
});
}

@Override
public void failed(Throwable exc, AsyncTimeClientHandler attachment) {
exc.printStackTrace();
try {
client.close();
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}

}
 AsyncTimeServerHandler
public class AsyncTimeServerHandler implements Runnable {

private int port;

CountDownLatch latch;
AsynchronousServerSocketChannel asynchronousServerSocketChannel;

public AsyncTimeServerHandler(int port) {
this.port = port;
try {
asynchronousServerSocketChannel = AsynchronousServerSocketChannel
.open();
asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
System.out.println("The time server is start in port : " + port);
} catch (IOException e) {
e.printStackTrace();
}
}

/*
* (non-Javadoc)
*
* @see java.lang.Runnable#run()
*/
@Override
public void run() {

latch = new CountDownLatch(1);
doAccept();
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public void doAccept() {
asynchronousServerSocketChannel.accept(this,
new AcceptCompletionHandler());
}

}
 ReadCompletionHandler
public class ReadCompletionHandler implements
CompletionHandler<Integer, ByteBuffer> {

private AsynchronousSocketChannel channel;

public ReadCompletionHandler(AsynchronousSocketChannel channel) {
if (this.channel == null)
this.channel = channel;
}

@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
byte[] body = new byte[attachment.remaining()];
attachment.get(body);
try {
String req = new String(body, "UTF-8");
System.out.println("The time server receive order : " + req);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(req) ? new java.util.Date(
System.currentTimeMillis()).toString() : "BAD ORDER";
doWrite(currentTime);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}

private void doWrite(String currentTime) {
if (currentTime != null && currentTime.trim().length() > 0) {
byte[] bytes = (currentTime).getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer, writeBuffer,
new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
// 如果没有发送完成,继续发送
if (buffer.hasRemaining())
channel.write(buffer, buffer, this);
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
// ingnore on close
}
}
});
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
this.channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}

}
 转载于:http://www.coderss.cn/?p=2634
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: