非阻塞IO(nonblocking I/O)
2017-10-08 16:24
375 查看
原文地址:https://www.ibm.com/developerworks/java/library/j-javaio/
介绍
IO程序
Reactor模式
通道和选择器
Channel类功能
创建一个非阻塞的通道
Selector类功能
Channel和Selector
注册服务
代码
为了解决这个问题,在最新发布的java平台中引入了一些新的类。其中最重要的是
等待客户端连接
服务客户端请求数据
处理客户端请求
让我们通过代码片段查看
接下来,通过调用accept()等待客户端请求,此处有一点要注意:
调用
也可以通过
下图描绘了服务端的典型工作流程,矩形框的步骤会产生阻塞。
Created with Raphaël 2.1.0Server ReadyAccept ConnectionRead Client RequestSend O/P to ClientClose Connection
JDK1.4之前,使用多线程绕开阻塞。这样的解决方法带来了新的问题–线程的开销,使用多线程影响了服务的性能和可扩展性。通过使用NIO,一切都将迎刃而解。
接下来,我们将介绍一些NIO的基础。然后运用它们去修改上面的server-socket例子。
Reactor模式和Observer模式在这方面很相似:当唯一的主题发生变化时,会通知所有成员。Observer模式和单一的事件源有关,而Reactor模式和多个事件源相关。
Reactor的核心功能:
分成多路处理事件
指派事件给合适的处理程序
我们将要分别介绍
![](http://www.plantuml.com/plantuml/svg/bP0n3i8m34Ntdi8NQ4_010CxGM9sZQ61A9KIgn8ID-4Kp4ZS1d4D4EsGAix0bCZlMRR_refZEE60ZRTzGnrwpow-xu_NxLbPBnmEI0oh4IIL3UCUMyVBYt1ETy5AcHf2-VwjF-fShwvHkgm9soW1IO0SnWY67TDlSb480mVJqvcbfCpWzCBYMjVgqJD1krjf6cKzXgAUIigSFoGroIhBRjfIIQoafj87)
类图代码:
如上图,在
下面的代码展示了非阻塞服务器套接字程序基本的创建方法,注意下面的代码示例和使用
缓冲区作用
Buffer是一个抽象类,包含特定原始类型的数据。通过固定大小的数组包裹数据,提供getter/setter方法访问数据。
只有
在阻塞模式中,在读写操作没有完成之前,线程将一直阻塞。在读期间,数据没有传输完成,线程也会阻塞。
在非阻塞模式中,线程可以读取数据,返回给执行的线程。当
单独使用
使用
其次,在
SelectionKey对象代表通道与选择器的注册。当出现以下情形时,SelectionKey将失效:
通道被关闭
选择器被关闭
SelectionKey调用
调用
服务注册后,通过迭代每一个在选择器上注册的Key,处理事件。当key被执行后,从列表中移除。如下:
如果key可用,并且连接允许,可以在channel上注册进一步的操作,如读或写。如果key是可读或者可写的,可以在连接后,进行读或写操作。
NonBlockingServer.java
介绍
IO程序
Reactor模式
通道和选择器
Channel类功能
创建一个非阻塞的通道
Selector类功能
Channel和Selector
注册服务
代码
介绍
服务器的并发能力取决于它们如何有效的使用IO。同时处理数百个客户端请求需要数百个线程。直到JDK1.4,java平台都不支持非阻塞IO调用。JDK1.4之前一个线程处理一个客户端请求,导致用java编写的服务,在大量用户请求时,需要创建大量的线程,导致巨大的内存开销,同时也使得程序缺乏可扩展性。为了解决这个问题,在最新发布的java平台中引入了一些新的类。其中最重要的是
SelectableChannel和
Selector类。通道(
channel)在客户端和服务端之间建立连接。选择器(
selector)类似于Win的消息循环,从不同的客户端中捕获事件,然后分发到他们各自的处理程序中。本文,我们将用这两种功能创建NIO。
IO程序
我们先来回顾下之前的ServerSocket生命周期,主要功能如下:
等待客户端连接
服务客户端请求数据
处理客户端请求
让我们通过代码片段查看
ServerSocket的生命周期。首先,创建一个
ServerSocket:
ServerSocket s = new ServerSocket();
接下来,通过调用accept()等待客户端请求,此处有一点要注意:
Socket conn = s.accept();
调用
accept(),让服务端在收到客户端套接字连接的请求前被阻塞。一旦建立连接,服务端使用
LineNumberReader读取客户端请求。因为
LineNumberReader在缓冲区满时才去读取数据,所以在调用这个方法时会产生阻塞。下面是使用
LineNumberReader的代码片段:
InputStream in = conn.getInputStream(); InputStreamReader rdr = new InputStreamReader(in); LineNumberReader lnr = new LineNumberReader(rdr); Request req = new Request(); while(!req.isComplete()){ String s = lnr.readLine(); req.addLine(s); }
也可以通过
InputStream.read()读取数据。不幸的是,这个方法读取数据也会造成阻塞。同样,写入也会阻塞。
下图描绘了服务端的典型工作流程,矩形框的步骤会产生阻塞。
Created with Raphaël 2.1.0Server ReadyAccept ConnectionRead Client RequestSend O/P to ClientClose Connection
JDK1.4之前,使用多线程绕开阻塞。这样的解决方法带来了新的问题–线程的开销,使用多线程影响了服务的性能和可扩展性。通过使用NIO,一切都将迎刃而解。
接下来,我们将介绍一些NIO的基础。然后运用它们去修改上面的server-socket例子。
Reactor模式
NIO使用Reactor模式设计。在分布式系统中,服务端需要处理来自不同客户端的请求。每个请求必须经过多路复用,再被分配给相应的处理程序。这正好符合Reactor模式。它使用多路复用技术同时响应来自一个或多个客户端的请求,并指派给对应的事件处理程序。Reactor模式和Observer模式在这方面很相似:当唯一的主题发生变化时,会通知所有成员。Observer模式和单一的事件源有关,而Reactor模式和多个事件源相关。
Reactor的核心功能:
分成多路处理事件
指派事件给合适的处理程序
通道和选择器
非阻塞IO通过通道和选择器实现,Channel类代表客户端和服务端的通信。按照Reactor模式,
Selector类多路复用通道。分成多路处理来自客户端的请求,并且指派这些请求到它们各自的请求处理程序。
我们将要分别介绍
Channel和
Selector类的功能,然后介绍这两个类是如何组合在一起实现NIO的功能的。
Channel类功能
Channel表示与实体(如硬件设备、文件、网络套接字或程序组件)打开的连接,通过
Channel可以对设备进行读写操作。NIO可以被异步关闭或阻塞。因此,如果线程上的IO通道阻塞了,另外的线程可以关闭这个通道。同样,如果IO通道阻塞,另外的线程也可以中断这个通道。
类图代码:
@startuml Title java.nio.channels类图 interface Channel interface ReadableByteChannel interface WritableByteChannel interface ScatteringByteChannel interface ByteChannel interface GatheringByteChannel abstract class SelectableChannel abstract class ServerSocketChannel abstract class SocketChannel Channel <|-- ReadableByteChannel Channel <|-- WritableByteChannel ReadableByteChannel <|-- ScatteringByteChannel ReadableByteChannel <|-- ByteChannel WritableByteChannel <|-- ByteChannel WritableByteChannel <|-- GatheringByteChannel SelectableChannel <|-- ServerSocketChannel SelectableChannel <|-- SocketChannel @enduml
如上图,在
java.nio.channels包下有相当多的接口。我们主要关心
java.nio.channels.SocketChannel和
java.nio.channels.ServerSocketChannel类。这两个类可以分别替换
java.net.Socket和
java.net.ServerSocket。通道在阻塞、非阻塞模式下都可以使用,但是我们将重点放在非阻塞模式下。
创建一个非阻塞的通道
JDK4添加了两个新类,实现非阻塞模式下的读写操作。它们是用于指定连接的java.net.InetSocketAddress和用于操作读写的
java.nio.channels.SocketChannel类。
下面的代码展示了非阻塞服务器套接字程序基本的创建方法,注意下面的代码示例和使用
ServerSocket套接字的差异:
String host = "127.0.0.1"; InetSocketAddress socketAddress = new InetSocketAddress(host, 80); SocketChannel channel = SocketChannel.open(); channel.connet(socketAddress); //将通道设为为阻塞模式,译文中使用的是configureBlockingMethod方法 channel.configureBlocking(false);
缓冲区作用
Buffer是一个抽象类,包含特定原始类型的数据。通过固定大小的数组包裹数据,提供getter/setter方法访问数据。
java.nio.Buffer有一系列的子类,如下:
ByteBuffer
CharBuffer
DoubleBuffer
FloatBuffer
IntBuffer
LongBuffer
ShortBuffer
只有
ByteBuffer支持从其他数据类型中读取数据。建立连接后,数据应当使用
ByteBuffer对象读写。
在阻塞模式中,在读写操作没有完成之前,线程将一直阻塞。在读期间,数据没有传输完成,线程也会阻塞。
在非阻塞模式中,线程可以读取数据,返回给执行的线程。当
configureBlocking()设为true时,通道的读写行为和
Socket完全一样。一个主要的区别是,它可以被其他线程中断。
单独使用
Channel无法实现NIO,
Channel必须和
Slelector类一起,实现NIO。
Selector类功能
Selector在Reactor模式中提供注册功能,
Selector在几个
SelelctableChannels上实现多路复用。同时
Channel在
Selelctor上注册事件。当客户端请求时,
Selector解复用请求,并分派请求给对应的通道处理。
使用
open()创建
Selector是最简单的方式,如下:
Selector selector = Selector.open();
Channel和Selector
Channel为客户端请求提供服务的通道,首先创建连接。下面的代码创建
ServerSocketChannel服务,并绑定了本地端口:
ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); InetAddress ia = InetAddress.getLocalHost(); InetSocketAddress isa = new InetSocketAddress(ia, port); serverChannel.socket().bind(isa);
其次,在
Selector上根据将要处理的事件注册自己。举个例子,接受连接的通带,可以向下面这样注册:
SelectionKey acceptKey = channel.register(selector, SelectionKey.OP_ACCEPT);
SelectionKey对象代表通道与选择器的注册。当出现以下情形时,SelectionKey将失效:
通道被关闭
选择器被关闭
SelectionKey调用
cancel()方法
调用
select()方法会阻塞线程,等待一个新的连接,直到另外的线程唤醒它或者中断原始的阻塞线程。
注册服务
在Selector上注册的
ServerSocketChannel接收全部传入的连接。如下:
SelectionKey acceptKey = serverChannel.register(sel, SelectionKey.OP_ACCEPT); while(acceptKey.selector().select()>0){ ... }
服务注册后,通过迭代每一个在选择器上注册的Key,处理事件。当key被执行后,从列表中移除。如下:
Set readyKeys = sel.selectedKeys(); Iterator it = readyKeys.iterator(); while(it.hasNext()){ SelectionKey key = (SelectionKey)it.next(); it.remove(); ... }
如果key可用,并且连接允许,可以在channel上注册进一步的操作,如读或写。如果key是可读或者可写的,可以在连接后,进行读或写操作。
SocketChannel socket; if(key.isAcceptable()){ System.out.println("Acceptable Key"); ServerSocketChannel ssc = (ServerSocketChannel)key.channel(); socket = (SocketChannel)ssc.accept(); socket.configureBlocking(false); SelectionKey another = socket.register(sel, SelectionKey.OP_READ|SelectionKey.OP_WRITE); } if(key.isReadable()){ System.out.println("Readable Key"); String ret = readMessage(key); if(ret.length()>0){ writeMessage(socket, ret); } } if(key.isWritable()){ System.out.println("Writable Key"); String ret = readMessagae(key); socket = (SocketChannel)key.channel(); if(result.length()>0){ writeMessage(socket, ret); } }
代码
Client.javaimport java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; public class Client { public SocketChannel client = null; public InetSocketAddress isa = null; public RecvThread rt = null; public Client() { } public void makeConnection() { try { client = SocketChannel.open(); isa = new InetSocketAddress("nicholson", 4900); client.connect(isa); client.configureBlocking(false); receiveMessage(); } catch (Exception e) { e.printStackTrace(); } while (sendMessage() != -1) { } try { client.close(); System.exit(0); } catch (IOException e) { e.printStackTrace(); } } public int sendMessage() { System.out.println("Inside SendMessage"); BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); String msg = null; ByteBuffer bytebuf; int nBytes = 0; try { msg = in.readLine(); System.out.println("msg is " + msg); bytebuf = ByteBuffer.wrap(msg.getBytes()); nBytes = client.write(bytebuf); System.out.println("nBytes is " + nBytes); if (msg.equals("quit") || msg.equals("shutdown")) { System.out.println("time to stop the client"); interruptThread(); Thread.sleep(5000); client.close(); return -1; } } catch (Exception e) { e.printStackTrace(); } System.out.println("Wrote " + nBytes + " bytes to the server"); return nBytes; } public void receiveMessage() { rt = new RecvThread("Receive THread", client); rt.start(); } public void interruptThread() { rt.val = false; } public static void main(String args[]) { Client cl = new Client(); cl.makeConnection(); } public class RecvThread extends Thread { public SocketChannel sc = null; public boolean val = true; public RecvThread(String str, SocketChannel client) { super(str); sc = client; } public void run() { System.out.println("Inside receivemsg"); ByteBuffer buf = ByteBuffer.allocate(2048); try { while (val) { while (client.read(buf) > 0) { buf.flip(); Charset charset = Charset.forName("us-ascii"); CharsetDecoder decoder = charset.newDecoder(); CharBuffer charBuffer = decoder.decode(buf); String result = charBuffer.toString(); System.out.println(result); buf.flip(); } } } catch (IOException e) { e.printStackTrace(); } } } }
NonBlockingServer.java
import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.*; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.util.Iterator; import java.util.Set; public class NonBlockingServer { public Selector sel = null; public ServerSocketChannel server = null; public SocketChannel socket = null; public int port = 4900; String result = null; public NonBlockingServer() { System.out.println("Inside default ctor"); } public void initializeOperations() throws IOException { System.out.println("Inside initialization"); sel = Selector.open(); server = ServerSocketChannel.open(); server.configureBlocking(false); InetAddress ia = InetAddress.getLocalHost(); InetSocketAddress isa = new InetSocketAddress(ia, port); server.socket().bind(isa); } public void startServer() throws IOException { System.out.println("Inside startserver"); initializeOperations(); System.out.println("Abt to block on select()"); SelectionKey acceptKey = server.register(sel, SelectionKey.OP_ACCEPT); while (acceptKey.selector().select() > 0) { Set readyKeys = sel.selectedKeys(); Iterator it = readyKeys.iterator(); while (it.hasNext()) { SelectionKey key = (SelectionKey) it.next(); it.remove(); if (key.isAcceptable()) { System.out.println("Key is Acceptable"); ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); socket = (SocketChannel) ssc.accept(); socket.configureBlocking(false); SelectionKey another = socket.register(sel, SelectionKey.OP_READ | SelectionKey.OP_WRITE); } if (key.isReadable()) { System.out.println("Key is readable"); String ret = readMessage(key); if (ret.length() > 0) { writeMessage(socket, ret); } } if (key.isWritable()) { System.out.println("THe key is writable"); String ret = readMessage(key); socket = (SocketChannel) key.channel(); if (result.length() > 0) { writeMessage(socket, ret); } } } } } public void writeMessage(SocketChannel socket, String ret) { System.out.println("Inside the loop"); if (ret.equals("quit") || ret.equals("shutdown")) { return; } File file = new File(ret); try { RandomAccessFile rdm = new RandomAccessFile(file, "r"); FileChannel fc = rdm.getChannel(); ByteBuffer buffer = ByteBuffer.allocate(1024); fc.read(buffer); buffer.flip(); Charset set = Charset.forName("us-ascii"); CharsetDecoder dec = set.newDecoder(); CharBuffer charBuf = dec.decode(buffer); System.out.println(charBuf.toString()); buffer = ByteBuffer.wrap((charBuf.toString()).getBytes()); int nBytes = socket.write(buffer); System.out.println("nBytes = " + nBytes); result = null; } catch (Exception e) { e.printStackTrace(); } } public String readMessage(SelectionKey key) { socket = (SocketChannel) key.channel(); ByteBuffer buf = ByteBuffer.allocate(1024); try { socket.read(buf); buf.flip(); Charset charset = Charset.forName("us-ascii"); CharsetDecoder decoder = charset.newDecoder(); CharBuffer charBuffer = decoder.decode(buf); result = charBuffer.toString(); } catch (IOException e) { e.printStackTrace(); } return result; } public static void main(String args[]) { NonBlockingServer nb = new NonBlockingServer(); try { nb.startServer(); } catch (IOException e) { e.printStackTrace(); System.exit(-1); } } }
相关文章推荐
- 同步(synchronous) IO和异步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO
- 同步(synchronous)/异步(asynchronous)与阻塞(blocking)/非阻塞(non-blocking)
- 阻塞与非阻塞(blocking and non-blocking)
- linux select 与 阻塞( blocking ) 及非阻塞 (non blocking)实现io多路复用的示例
- 阻塞(Blocking),非阻塞(Non-Blocking),同步(Sync),异步(Async),重叠(Overlapped)
- Application和四大组件、NIO(non-blocking IO 非阻塞线程的IO) 以及Android Studio恢复(误删除文件或文件夹)
- 异步与非阻塞的区别(Difference between asynchronous and non-blocking)
- nodejs中的阻塞和非阻塞(https://nodejs.org/en/docs/guides/blocking-vs-non-blocking/)
- 将socket设置为非阻塞(non-blocking) - 艾子的日志 - 网易博客
- 非阻塞套接字(Nonblocking Sockets) 概述
- IO通信模型(二)同步非阻塞模式NIO(NonBlocking IO)
- 阻塞(blocking)赋值与非阻塞(non-blocking)赋值
- Blocking vs. Non-Blocking Sockets 阻塞与非阻塞式套接字
- 非阻塞套接字(Nonblocking Sockets) 概述
- 非阻塞套接字(Nonblocking Sockets) 概述
- 将socket设置为非阻塞(non-blocking)(
- 阻塞(Blocking)和非阻塞(Non-Blocking)
- 非阻塞套接字(Nonblocking Sockets) 概述
- [linux] 将socket设置为非阻塞(non-blocking)
- Non blocking IO web server