您的位置:首页 > 理论基础 > 计算机网络

Java网络编程——11.非阻塞I/O

2017-05-03 00:00 337 查看
要允许CPU速度高于网络,传统的Java解决方案是缓冲和多线程。对于相对简单的服务器和客户端,如果不需要非常高的性能,这种方法效果很好。但是在一个一秒钟处理上千个请求的大服务器上,你可能不会为每个连接都分配一个线程(生成多个线程以及在线程之间切换的开销很大)。如果一个线程可以负责多个连接,可以选取一个准备好接收数据的连接,尽快填充这个连接所能管理的尽可能多的数据,然后转向下一个准备好的连接,这样速度就会更快。

1、一个示例客户端

虽然新的I/O API并不是专门为客户端设计,但的确可以用于客户端,我将以一个使用新I/O API的客户端程序开始介绍,因为它比较简单。

传统的客户端,你会获取该Socket的输入或输出流。但这不是传统的客户端,利用通道(channel),你可以直接写入通道本身,不是写入字节数组,而是要写入ByteBuffer对象。

try {
SocketAddress rama = new InetSocketAddress("121.40.47.132", 13);
SocketChannel client = SocketChannel.open(rama);

ByteBuffer buffer = ByteBuffer.allocate(1);
WritableByteChannel out = Channels.newChannel(System.out);

while (client.read(buffer) != -1) {
buffer.flip();
out.write(buffer);
buffer.clear();
}

} catch (IOException e) {
e.printStackTrace();
}

你可以在阻塞或非阻塞模式下运行这个连接,在非阻塞模式下,即时没有任何可用的数据,read()也会立即返回,这就允许程序在试图读取前做其他操作。client.configureBlocking()方法传入true(阻塞)或false(非阻塞)。当程序处理多个连接时,这种做法会使代码在快速连接上运行得很快,而在慢速连接上运行慢一些。

2、一个示例服务器

客户端使用通道和缓冲区是可以的,不过实际上通道和缓冲区主要用于需要高效处理很多并发连接的服务器系统。要处理服务器,除了用于客户端的缓冲区和通道外,还需要有一些选择器,允许服务器查找所有准备好接收输出或发送输入的连接。

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class ChargenServer {

public static void main(String[] args) {
byte[] rotation = new byte[95 * 2];
for (byte i = ' '; i <= '~'; i++) {
rotation[i - ' '] = i;
rotation[i + 95 - ' '] = i;
}

ServerSocketChannel serverChannel;
Selector selector;
try {
serverChannel = ServerSocketChannel.open();
ServerSocket ss = serverChannel.socket();
InetSocketAddress address = new InetSocketAddress(19);
ss.bind(address);// 将通道绑定到端口19的服务器socket,JDK7及以后的版本可以直接用serverChannel绑定
serverChannel.configureBlocking(false);// 非阻塞模式
selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
return;
}

while (true) {
try {
selector.select();
} catch (IOException e) {
e.printStackTrace();
break;
}

Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
System.out.println("接收连接来自:" + client);
client.configureBlocking(false);
SelectionKey key2 = client.register(selector, SelectionKey.OP_WRITE);
ByteBuffer buffer = ByteBuffer.allocate(74);
buffer.put(rotation, 0, 72);
buffer.put((byte) '\r');
buffer.put((byte) '\n');
buffer.flip();
key2.attach(buffer);
} else if (key.isWritable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
if (!buffer.hasRemaining()) {
buffer.rewind();// 用下一行重新填充缓冲区
int first = buffer.get();// 得到上一次的首字符
buffer.rewind();// 准备改变缓冲区中的数据
int position = first - ' ' + 1;// 寻找rotation中新的首字符位置
buffer.put(rotation, position, 72);// 将数据从rotation复制到缓冲区
buffer.put((byte) '\r');
buffer.put((byte) '\n');
buffer.flip();// 准备缓冲区进行写入
}
client.write(buffer);
}
} catch (IOException e) {
key.channel();
try {
key.channel().close();
} catch (IOException e2) {
}
}
}

}

}
}

这个例子只使用了一个线程,还有一些情况下仍要使用多个线程,特别是当不同操作有不同优先级时。例如,要在一个高优先级线程中接受新连接,而在一个低优先级的线程中对现有的连接提供服务。线程和连接之间不再需要1:1的比例,这样可以极大地提升用Java编写的服务器的可扩展性。

还有一点很重要,可以使用多线程来达到最高性能,多线程允许服务器利用多个CPU。接受连接的线程可以将接受的连接放在队列中,由池中的线程进行处理。与不用选择器的做法相比,这扔要快得多,因为select()可以确保如果连接没有准备好接收数据,就绝对不会在这些连接上浪费时间。

3、缓冲区

在新的I/O模型中,所有I/O都要缓冲,缓冲区已经成为这个API的基础部分。在新的I/O模型中,不再向输出流写入数据和从输出流读取数据,而是要从缓冲区中读写数据。像在缓冲流中一样,缓冲区可能就是字节数组。原始实现可以将缓冲区直接与硬件或内存连接,或者使用其他非常高效的实现。

从编程角度看,流和通道之间的关键区别在于流是基于字节的,而通道是基于块的。流设计为按顺序一个字节接一个字节地传送数据,也可以传送字节数组。而通道会传送缓冲区的数据块。可以读写通道的字节之前,这些字节必须已经存储在缓冲区,而且一次会读/写一个缓冲区的数据。流和通道/缓冲区之间的第二个关键区别是,通道和缓冲区支持同一对象的读/写。

可以把缓冲区看作是固定大小的元素列表,网络程序几乎只会用ByteBuffer。除了数据列表外,每个缓冲区都记录了信息的4个关键部分:

位置(position):缓冲区中将读取或写入的下一个位置。这个位置值从0开始计,最大值等于缓冲区的大小。

容量(capacity):缓冲区可以保存的元素的最大数目。容量值在创建缓冲区时设置,此后不能改变。

限度(limit):缓冲区中可访问数据的末尾位置。只要不改变限度,就无法读/写超过这个位置的数据,即时缓冲区有更大的容量也没有用。

标记(mark):缓冲区中客户端指定的索引。通过调用mark()可以将标记设置为当前位置。调用rest()可以将当前位置设置为所标记的位置。

与读取InputStream不同,读取缓冲区实际上不会以任何方式改变缓冲区中的数据。只可能向前或向后设置位置,从而可以从缓冲区中某个特定位置开始读取。程序可以调整限度,从而控制将要读取的数据的末尾,只有容量的固定的。

公共的Buffer超类还提供了另外几个方法,可以通过这些公共属性的引用来进行操作。clear()方法将位置设置为0,并将限度设置为容量,从而将缓冲区“清空”。不过clear()方法没有删除缓冲区中的老数据,可以使用绝对get方法或者再改变限度和位置进行读取。rewind()将位置设置为0,但不改变限度,这允许重新读取缓冲区。flip()方法将限度设置为当前位置,位置设置为0,希望排空刚刚填充的缓冲区时可以调用这个方法。最后,remaining()方法返回缓冲区中当前位置与限度之间的元素数,如果剩余元素大于0,hasRemaining()方法返回true。

空的缓冲区一般由分配(allocate)方法创建,预填充数据的缓冲区由包装(wrap)方法创建。分配方法通常用于输入,而包装方法一般用于输出。

缓冲区是为顺序访问而设计的,从缓冲区读取或向其写入一个元素时,缓冲区的位置将增1。例如:

CharBuffer buffer = CharBuffer.allocate(12);
buffer.put('H');
buffer.put('e');
buffer.put('l');
buffer.put('l');
buffer.put('o');

容量为12的CharBuffer ,在其中放置5个字符。缓冲区的位置现在为5,这称为填充缓冲区。现在试图get()从缓冲区获取数据,会得到null字符,所以再次读取写入的数据之前,需要回绕缓冲区:buffer.flip()。如果填充时指定位置,读出前就不需要回绕。

即时是使用缓冲区,操作数据块也比一次填充和排空一个元素要快,缓冲区类有批量方法来填充和排空响应元素类型的数组,如ByteBuffer有put()和get()方法,可以用现有的字节数组或子数组填充和排空一个ByteBuffer。

Java中的所有数据最终都解析为字节,任何适当长度的字节序列都可以解释为基本类型数据。ByteBuffer类提供了put方法,可以用简单类型(boolean除外)参数的相应字节填充缓冲区,还提供了get方法,可以读取适当数量的字节来形成一个新的基本类型数据。

从SocketChannel读取的ByteBuffer只包含一种特定基本数据类型的元素,就要创建一个视图缓冲区(view buffer),这个是一个适当类型(如DoubleBuffer、IntBuffer等)的新的Buffer对象,它从当前位置开始由底层ByteBuffer提取数据。视图缓存区可以用ByteBuffer的6个方法之一来创建,如asIntBuffer()。

大多数可写缓冲区都支持压缩(compact),压缩时将缓冲区中所有剩余的数据移到缓冲区的开头,为元素释放更多空间。如果网络准备好可以立即输出但没有输入(或者反之),程序就可以利用缓冲区压缩。

经常需要建立缓存区的副本,从而将相同的信息分发到两个或多个通道,缓冲区类都提供了duplicate()方法来完成这项工作。复制的缓冲区共享相同的数据,如果原来是一个间接缓冲区,那么复制缓冲区会包含相同的后备数组。修改一个缓冲区中的数据会反映到另一个缓冲区中。所有这个方法主要用在只准备读取缓存区时,否则要想跟踪数据哪里有修改很困难。

分片(slicing)缓冲区是复制的一个变形。分片也会创建一个新缓冲区,与原缓冲区共享数据。不过分片的起始位置是原缓冲区的当前位置,而且其容量最大不超过原缓冲区的限度。也就是说,分片是原缓冲区的一个字序列,只包含从当前位置到限度的所有元素。如果你有一个很长的数据缓冲区,很容易地分为多个部分,此时分片就很有用。

与输入流类似,如果希望重新读取某些数据,可以标记和重置缓冲区。与输入流不同的是,这将应用于所有缓冲区,而不只是部分缓冲区。

4、通道

通道将缓冲区的数据块移入或移出到各种I/O源,如文件、socket、数据报等。对于网络编程来说,实际上只有3个重要的通道类:SocketChannel、ServerSocketChannel和DatagramChannel。对于目前为止谈到的TCP连接,只需要前两个通道类。

SocketChannel类可以读写TCP Socket。数据必须编码到ByteBuffer对象中来完成读/写。每个SocketChannel都与一个对等端(peer)Socket对象相关联。

连接:open(SocketAddress remote)方法将阻塞(在连接建立或抛出异常之前,这个方法不会返回),无参数open()方法不立即连接,之后必须用connet()方法进行连接,如果以无阻塞方式打开通道,就要使用这个方法:

SocketChannel channel = SocketChannel.open();
SocketAddress address = new InetSocketAddress("www.innovatelife.net", 80);
channel .configureBlocking(false);// 非阻塞模式
channel.connet();

使用非阻塞通道时,connet()方法会立即返回,在等待操作吸引建立连接时,程序可以做其他的操作,不过在实际使用连接之前,必须调用finishConnect(),如果连接还没用建立,这个方法返回false,无法建立连接则抛出异常。

读取:首先要创建一个ByteBuffer,通道可以在其中存储数据,然后将这个ByteBuffer传给read()方法。如果通道是阻塞的,这个方法将至少读取一个字节,或者返回-1,也可能抛出一个异常。如果通道是非阻塞的,这个方法可能返回0。

写入:基本的write()方法接收一个缓冲区作为参数,如果通道是非阻塞的,这个方法不能保证会写入缓冲区的全部内容。由于缓冲区基于游标的特性,你可以很容易地反复调用这个方法,直到缓冲区完全排空,而且数据已完全写入。

关闭:就像正常Socket一样,在用完通道后应当将其关闭,释放它可能使用的端口和其他任何资源,调用close()方法。

ServerSocketChannel类只有一个目的:接受入站连接,你无法读取、写入或连接ServerSocketChannel。这个类只声明了4个方法,其中accept()最重要,还有就是向Selector注册来得到入站连接通知有关的方法,最后有个close()方法,用于关闭服务器Socket。

创建服务器Socket通道:ServerSocketChannel.open()创建一个新的ServerSocketChannel对象,在使用之前需要调用socket()方法来获得相应的对等端(peer)ServerSocket,然后你可以使用ServerSocket的各种设置方法随意配置任何服务器选项,如接收缓冲区大小或Socket超时值,然后将ServerSocket连接到对应该端口的SocketAddress。

接受连接:一旦打开并绑定了ServerSocketChannel对象,accept()方法就可以监听入站连接了。在阻塞模式下,accept()方法等待入站连接,然后它接受一个连接并返回连接到远程客户端的一个SocketChannel对象,这种策略适用于立即响应每一个请求的简单服务器。在非阻塞模式下,如果没有入站连接,accept()方法会返回null,非阻塞模式更适合于需要为每个连接完成大量工作的服务器,这样就可以并行地处理多个请求。非阻塞模式一般与Seletor结合使用。

Channels类是一个简单的工具类,可以将传统的基于I/O的流、阅读器和书写器包装在通道中,也可以从通道转换为基于I/O的流、阅读器和书写器。例如,如果编写一个HTTP服务器,用于处理SOAP请求,可能会出于性能原因使用通道读取HTTP请求主体,而使用SAX解析XML,在这种情况下,就需要将通道转换为流,在传给XMLReader的parse()方法:

SocketChannel channel = server.accept();
processHTTPHeader(channel);
XMLReader parser = XMLReaderFactory.createXMLReader();
parser .setContentHandler(someContentHandlerObject);
InputStream in  = Channels.newInputStream(channel );
parser.parse(in);

Java 7引入AsynchronousSocketChannel和AsynchronousServerSocketChannel类。与SocketChannel和ServerSocketChannel不同的是,读/写异步通道会立即返回,甚至在I/O完成之前就会返回,所读/写的数据会由一个Future或CompletionHandler进一步处理,connect()和accept()方法也会异步执行,并返回Future。这个方法的好处是,网络连接在并行运行,同时程序可以做其他事情,准备好处理来自网络的数据时会停下来。利用线程池和callable也可以达到同样的效果,不过这种方法可能稍微简单一些,特别是如果你的应用恰好非常适合使用缓冲区,就可以采用这个方法。

4、就绪选择

对于网络编程,新I/O API的第二部分是就绪选择,即能够选择读写时不阻塞的Socket。为了完成就绪选择,要将不同的通道注册到一个Selector对象,每个通道分配一个SelectionKey。然后程序可以询问这个Selector对象,哪些通道已经准备就绪可以无阻塞地完成你希望完成的操作,可以请求Selector对象返回相应的键集合。

调用Seletor.open()创建新的选择器,下一步是向选择器增加通道,通过选择器传递给通道的一个注册方法(register),就可以向选择器注册这个通道。SelectionKey定义了4个命名为常量,用于选择操作类型:

SelectionKey.OP_ACCEPT

SelectionKey.OP_CONNECT

SelectionKey.OP_READ

SelectionKey.OP_WRITE

有三个方法可以选择就绪的通道,它们的区别在于寻找就绪通道等待的时间:

selectNow() :该方法会完成非阻塞选择,如果当前没有准备好要处理的连接,它会立即返回。

select():在返回前会等待,直到至少有一个注册的通道准备好可以进行处理。

select(long timeout):在返回前只等待不超过timeout毫秒。

当直到有通道已经准备好处理时,可以使用selectedKeys()方法获取就绪通道,迭代处理返回的集合时,要依次处理各个SelectionKey,还可以从迭代器中删除键,告诉选择器这个键已经得到处理。

SelectionKey对象相当于通道的指针,它们还可以保存一个对象附件,一般会存储这个通道上的连接的状态。一旦了解了与键相关联的通道准备好完成何种操作(isAcceptable()、isConnectable()、isReadable()、isWritable()),就可以用channel()方法来获取这个通道。如果在保存状态信息的SelectionKey存储了一个对象,就可以用attachment()方法获取该对象。最后,如果结束使用连接,就要撤销其SelectionKey对象的注册,调用这个键的cancel()方法来撤销注册,不过,只有在未关闭通道时这个步骤才有必要,如果关闭通道,会自动在所有选择器中撤销对应这个通道的所有键的注册。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  非阻塞I/O