您的位置:首页 > Web前端 > React

非阻塞IO(nonblocking I/O)

2017-10-08 16:24 375 查看
原文地址:https://www.ibm.com/developerworks/java/library/j-javaio/

介绍

IO程序

Reactor模式

通道和选择器
Channel类功能

创建一个非阻塞的通道

Selector类功能
Channel和Selector

注册服务

代码

介绍

服务器的并发能力取决于它们如何有效的使用IO。同时处理数百个客户端请求需要数百个线程。直到JDK1.4,java平台都不支持非阻塞IO调用。JDK1.4之前一个线程处理一个客户端请求,导致用java编写的服务,在大量用户请求时,需要创建大量的线程,导致巨大的内存开销,同时也使得程序缺乏可扩展性。

为了解决这个问题,在最新发布的java平台中引入了一些新的类。其中最重要的是
SelectableChannel
Selector
类。通道(
channel
)在客户端和服务端之间建立连接。选择器(
selector
)类似于Win的消息循环,从不同的客户端中捕获事件,然后分发到他们各自的处理程序中。本文,我们将用这两种功能创建NIO。

IO程序

我们先来回顾下之前的
ServerSocket
生命周期,主要功能如下:

等待客户端连接

服务客户端请求数据

处理客户端请求

让我们通过代码片段查看
ServerSocket
的生命周期。首先,创建一个
ServerSocket


ServerSocket s = new ServerSocket();


接下来,通过调用accept()等待客户端请求,此处有一点要注意:

Socket conn = s.accept();


调用
accept()
,让服务端在收到客户端套接字连接的请求前被阻塞。一旦建立连接,服务端使用
LineNumberReader
读取客户端请求。因为
LineNumberReader
在缓冲区满时才去读取数据,所以在调用这个方法时会产生阻塞。下面是使用
LineNumberReader
的代码片段:

InputStream in = conn.getInputStream();
InputStreamReader rdr = new InputStreamReader(in);
LineNumberReader lnr = new LineNumberReader(rdr);
Request req = new Request();
while(!req.isComplete()){
String s = lnr.readLine();
req.addLine(s);
}


也可以通过
InputStream.read()
读取数据。不幸的是,这个方法读取数据也会造成阻塞。同样,写入也会阻塞。

下图描绘了服务端的典型工作流程,矩形框的步骤会产生阻塞。

Created with Raphaël 2.1.0Server ReadyAccept ConnectionRead Client RequestSend O/P to ClientClose Connection

JDK1.4之前,使用多线程绕开阻塞。这样的解决方法带来了新的问题–线程的开销,使用多线程影响了服务的性能和可扩展性。通过使用NIO,一切都将迎刃而解。

接下来,我们将介绍一些NIO的基础。然后运用它们去修改上面的server-socket例子。

Reactor模式

NIO使用Reactor模式设计。在分布式系统中,服务端需要处理来自不同客户端的请求。每个请求必须经过多路复用,再被分配给相应的处理程序。这正好符合Reactor模式。它使用多路复用技术同时响应来自一个或多个客户端的请求,并指派给对应的事件处理程序。

Reactor模式和Observer模式在这方面很相似:当唯一的主题发生变化时,会通知所有成员。Observer模式和单一的事件源有关,而Reactor模式和多个事件源相关。

Reactor的核心功能:

分成多路处理事件

指派事件给合适的处理程序

通道和选择器

非阻塞IO通过通道和选择器实现,
Channel
类代表客户端和服务端的通信。按照Reactor模式,
Selector
类多路复用通道。分成多路处理来自客户端的请求,并且指派这些请求到它们各自的请求处理程序。

我们将要分别介绍
Channel
Selector
类的功能,然后介绍这两个类是如何组合在一起实现NIO的功能的。

Channel类功能

Channel
表示与实体(如硬件设备、文件、网络套接字或程序组件)打开的连接,通过
Channel
可以对设备进行读写操作。NIO可以被异步关闭或阻塞。因此,如果线程上的IO通道阻塞了,另外的线程可以关闭这个通道。同样,如果IO通道阻塞,另外的线程也可以中断这个通道。



类图代码:

@startuml
Title java.nio.channels类图
interface Channel
interface ReadableByteChannel
interface WritableByteChannel
interface ScatteringByteChannel
interface ByteChannel
interface GatheringByteChannel
abstract class SelectableChannel
abstract class ServerSocketChannel
abstract class SocketChannel

Channel <|-- ReadableByteChannel
Channel <|-- WritableByteChannel
ReadableByteChannel <|-- ScatteringByteChannel
ReadableByteChannel <|-- ByteChannel
WritableByteChannel <|-- ByteChannel
WritableByteChannel <|-- GatheringByteChannel

SelectableChannel <|-- ServerSocketChannel
SelectableChannel <|-- SocketChannel
@enduml


如上图,在
java.nio.channels
包下有相当多的接口。我们主要关心
java.nio.channels.SocketChannel
java.nio.channels.ServerSocketChannel
类。这两个类可以分别替换
java.net.Socket
java.net.ServerSocket
。通道在阻塞、非阻塞模式下都可以使用,但是我们将重点放在非阻塞模式下。

创建一个非阻塞的通道

JDK4添加了两个新类,实现非阻塞模式下的读写操作。它们是用于指定连接的
java.net.InetSocketAddress
和用于操作读写的
java.nio.channels.SocketChannel
类。

下面的代码展示了非阻塞服务器套接字程序基本的创建方法,注意下面的代码示例和使用
ServerSocket
套接字的差异:

String host = "127.0.0.1";
InetSocketAddress socketAddress = new InetSocketAddress(host, 80);
SocketChannel channel = SocketChannel.open();
channel.connet(socketAddress);
//将通道设为为阻塞模式,译文中使用的是configureBlockingMethod方法
channel.configureBlocking(false);


缓冲区作用

Buffer是一个抽象类,包含特定原始类型的数据。通过固定大小的数组包裹数据,提供getter/setter方法访问数据。
java.nio.Buffer
有一系列的子类,如下:

ByteBuffer


CharBuffer


DoubleBuffer


FloatBuffer


IntBuffer


LongBuffer


ShortBuffer


只有
ByteBuffer
支持从其他数据类型中读取数据。建立连接后,数据应当使用
ByteBuffer
对象读写。

在阻塞模式中,在读写操作没有完成之前,线程将一直阻塞。在读期间,数据没有传输完成,线程也会阻塞。

在非阻塞模式中,线程可以读取数据,返回给执行的线程。当
configureBlocking()
设为true时,通道的读写行为和
Socket
完全一样。一个主要的区别是,它可以被其他线程中断。

单独使用
Channel
无法实现NIO,
Channel
必须和
Slelector
类一起,实现NIO。

Selector类功能

Selector
在Reactor模式中提供注册功能,
Selector
在几个
SelelctableChannels
上实现多路复用。同时
Channel
Selelctor
上注册事件。当客户端请求时,
Selector
解复用请求,并分派请求给对应的通道处理。

使用
open()
创建
Selector
是最简单的方式,如下:

Selector selector = Selector.open();


Channel和Selector

Channel
为客户端请求提供服务的通道,首先创建连接。下面的代码创建
ServerSocketChannel
服务,并绑定了本地端口:

ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
InetAddress ia = InetAddress.getLocalHost();
InetSocketAddress isa = new InetSocketAddress(ia, port);
serverChannel.socket().bind(isa);


其次,在
Selector
上根据将要处理的事件注册自己。举个例子,接受连接的通带,可以向下面这样注册:

SelectionKey acceptKey = channel.register(selector, SelectionKey.OP_ACCEPT);


SelectionKey对象代表通道与选择器的注册。当出现以下情形时,SelectionKey将失效:

通道被关闭

选择器被关闭

SelectionKey调用
cancel()
方法

调用
select()
方法会阻塞线程,等待一个新的连接,直到另外的线程唤醒它或者中断原始的阻塞线程。

注册服务

Selector
上注册的
ServerSocketChannel
接收全部传入的连接。如下:

SelectionKey acceptKey = serverChannel.register(sel, SelectionKey.OP_ACCEPT);
while(acceptKey.selector().select()>0){
...
}


服务注册后,通过迭代每一个在选择器上注册的Key,处理事件。当key被执行后,从列表中移除。如下:

Set readyKeys = sel.selectedKeys();
Iterator it = readyKeys.iterator();
while(it.hasNext()){
SelectionKey key = (SelectionKey)it.next();
it.remove();
...
}


如果key可用,并且连接允许,可以在channel上注册进一步的操作,如读或写。如果key是可读或者可写的,可以在连接后,进行读或写操作。

SocketChannel socket;
if(key.isAcceptable()){
System.out.println("Acceptable Key");
ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
socket = (SocketChannel)ssc.accept();
socket.configureBlocking(false);
SelectionKey another = socket.register(sel, SelectionKey.OP_READ|SelectionKey.OP_WRITE);
}
if(key.isReadable()){
System.out.println("Readable Key");
String ret = readMessage(key);
if(ret.length()>0){
writeMessage(socket, ret);
}
}
if(key.isWritable()){
System.out.println("Writable Key");
String ret = readMessagae(key);
socket = (SocketChannel)key.channel();
if(result.length()>0){
writeMessage(socket, ret);
}
}


代码

Client.java

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;

public class Client {
public SocketChannel client = null;
public InetSocketAddress isa = null;
public RecvThread rt = null;

public Client() {
}

public void makeConnection() {
try {
client = SocketChannel.open();
isa = new InetSocketAddress("nicholson", 4900);
client.connect(isa);
client.configureBlocking(false);
receiveMessage();
} catch (Exception e) {
e.printStackTrace();
}

while (sendMessage() != -1) {
}

try {
client.close();
System.exit(0);
} catch (IOException e) {
e.printStackTrace();
}
}

public int sendMessage() {
System.out.println("Inside SendMessage");
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
String msg = null;
ByteBuffer bytebuf;
int nBytes = 0;
try {
msg = in.readLine();
System.out.println("msg is " + msg);
bytebuf = ByteBuffer.wrap(msg.getBytes());
nBytes = client.write(bytebuf);
System.out.println("nBytes is " + nBytes);
if (msg.equals("quit") || msg.equals("shutdown")) {
System.out.println("time to stop the client");
interruptThread();
Thread.sleep(5000);
client.close();
return -1;
}

} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Wrote " + nBytes + " bytes to the server");
return nBytes;
}

public void receiveMessage() {
rt = new RecvThread("Receive THread", client);
rt.start();
}

public void interruptThread() {
rt.val = false;
}

public static void main(String args[]) {
Client cl = new Client();
cl.makeConnection();
}

public class RecvThread extends Thread {
public SocketChannel sc = null;
public boolean val = true;

public RecvThread(String str, SocketChannel client) {
super(str);
sc = client;
}

public void run() {
System.out.println("Inside receivemsg");
ByteBuffer buf = ByteBuffer.allocate(2048);
try {
while (val) {
while (client.read(buf) > 0) {
buf.flip();
Charset charset = Charset.forName("us-ascii");
CharsetDecoder decoder = charset.newDecoder();
CharBuffer charBuffer = decoder.decode(buf);
String result = charBuffer.toString();
System.out.println(result);
buf.flip();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}


NonBlockingServer.java

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Iterator;
import java.util.Set;

public class NonBlockingServer {
public Selector sel = null;
public ServerSocketChannel server = null;
public SocketChannel socket = null;
public int port = 4900;
String result = null;

public NonBlockingServer() {
System.out.println("Inside default ctor");
}

public void initializeOperations() throws IOException {
System.out.println("Inside initialization");
sel = Selector.open();
server = ServerSocketChannel.open();
server.configureBlocking(false);
InetAddress ia = InetAddress.getLocalHost();
InetSocketAddress isa = new InetSocketAddress(ia, port);
server.socket().bind(isa);
}

public void startServer() throws IOException {
System.out.println("Inside startserver");
initializeOperations();
System.out.println("Abt to block on select()");
SelectionKey acceptKey = server.register(sel, SelectionKey.OP_ACCEPT);

while (acceptKey.selector().select() > 0) {
Set readyKeys = sel.selectedKeys();
Iterator it = readyKeys.iterator();

while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
it.remove();

if (key.isAcceptable()) {
System.out.println("Key is Acceptable");
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
socket = (SocketChannel) ssc.accept();
socket.configureBlocking(false);
SelectionKey another = socket.register(sel, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
if (key.isReadable()) {
System.out.println("Key is readable");
String ret = readMessage(key);
if (ret.length() > 0) {
writeMessage(socket, ret);
}
}
if (key.isWritable()) {
System.out.println("THe key is writable");
String ret = readMessage(key);
socket = (SocketChannel) key.channel();
if (result.length() > 0) {
writeMessage(socket, ret);
}
}
}
}
}

public void writeMessage(SocketChannel socket, String ret) {
System.out.println("Inside the loop");
if (ret.equals("quit") || ret.equals("shutdown")) {
return;
}
File file = new File(ret);
try {
RandomAccessFile rdm = new RandomAccessFile(file, "r");
FileChannel fc = rdm.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
fc.read(buffer);
buffer.flip();

Charset set = Charset.forName("us-ascii");
CharsetDecoder dec = set.newDecoder();
CharBuffer charBuf = dec.decode(buffer);
System.out.println(charBuf.toString());
buffer = ByteBuffer.wrap((charBuf.toString()).getBytes());
int nBytes = socket.write(buffer);
System.out.println("nBytes = " + nBytes);
result = null;
} catch (Exception e) {
e.printStackTrace();
}
}

public String readMessage(SelectionKey key) {
socket = (SocketChannel) key.channel();
ByteBuffer buf = ByteBuffer.allocate(1024);
try {
socket.read(buf);
buf.flip();
Charset charset = Charset.forName("us-ascii");
CharsetDecoder decoder = charset.newDecoder();
CharBuffer charBuffer = decoder.decode(buf);
result = charBuffer.toString();
} catch (IOException e) {
e.printStackTrace();
}
return result;
}

public static void main(String args[]) {
NonBlockingServer nb = new NonBlockingServer();
try {
nb.startServer();
} catch (IOException e) {
e.printStackTrace();
System.exit(-1);
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息