您的位置:首页 > 编程语言 > Java开发

Java NIO完整总结

2017-03-27 20:41 141 查看

一、概述

Java NIO从JDK1.4开始出现,它只有一个目标:提高IO效率。老的IO包其实也都用NIO重新实现过,也可以利用NIO的一些特性提高效率,即使不写NIO的代码,平时使用IO也都用到了NIO的技术,只是隐藏到了底层的实现中。

NIO的高效率来自于类似操作系统IO处理的结构方式,也是NIO的核心部分:

channel(通道)

buffer(缓冲区)

selector(选择器)

IO与NIO

IO的流(stream)是阻塞的,当读或写的时候,线程被阻塞,这样就什么也干不了了。通常需要使用多线程来处理IO操作,当线程多了之后,阻塞的时候系统会频繁的上下文切换,那这些切换所消耗的资源都是没意义的,浪费了系统的资源。而NIO可以实现非阻塞IO模式,解决上面所导致的问题,这也是在大并发下常使用NIO来处理程序的原因。比如Tomcat的NIO模式,能让web服务器的效率提升很多。

Channel和Buffer

平时数据读写都是以流(Stream)的形式,而在NIO中是以缓冲区(Buffer)的形式来装载数据。读取数据传输的过程中数据存在Buffer中,而Buffer是通过Channel来传输的。 可以用经典的挖矿例子来理解:假设我们的数据是矿石,矿石怎么运输呢?需要通过轨道(Channel)和矿车(Buffer)。现在我们有一堆矿石要从A地点运到B地点,那么首先要在A点往矿车里装好矿石,矿车通过轨道到达B点,在B点将矿石从矿车上卸下。NIO的数据传输就是这么一个流程。

Channel(通道)又分为四种:

FileChannel——文件通道

DatagramChannel——UDP数据报通道

SocketChannel——TCP套接字通道

ServerSocketChannel——监听TCP连接的通道

而Buffer有以下几种实现:

ByteBuffer

CharBuffer

DoubleBuffer

FloatBuffer

IntBuffer

LongBuffer

ShortBuffer

MappedByteBuffer

Selector

官方文档中解释为Selector是SelectableChannel的多路复用器(FileChannel不是SelectableChannel,它是阻塞的),简单点理解就是
channel管理器


在使用的时候,可以将N个channel以及相应的事件(如读、写、连接等)注册到selector上。通过selector来控制判断应该用哪个channel,做哪个操作。 Selector能监控和检测N个Channel,知晓哪些通道准备好了读或写的操作,这样我们就能直接让Selector来管理Channel。使用Selector的好处是可以让一个线程管理多个通道,如大并发导致线程过多的问题就能得到很好的解决。

二、Buffer

Buffer是缓冲区类,在官方文档中是这么描述的:
一个用于特定基本类型数据的容器
。也可以理解为特定类型的缓冲区。Buffer用于channel(通道)之间数据交互,所以需要支持读和写的操作。

使用Buffer一般分为以下几步:

将数据写入Buffer缓存中

调用flip()方法,从写模式切换到读模式

从Buffer中读取数据

调用clear()或compact()方法清除缓存中数据

下面是一个使用ByteBuffer的例子:

ByteBuffer buffer = ByteBuffer.allocate(1024);
//从channel读取数据,写入到Buffer中(channel放到后面再说)
int bytesRead = channel.read(buffer);
while(bytesRead != -1) {
buffer.flip();//从写模式切换到读模式
while(buffer.hasRemaining()) {
System.out.print((char)buffer.get());
}
buffer.clear();//清空缓存,回到可写模式
bytesRead = channel.read(buffer);
}


缓冲区就是一块内存,可以写入数据,然后在读取数据。Java使用Buffer对象包装这块内存,来访问这块缓冲区。

Buffer是一个抽象类,实现的子类有ByteBuffer、CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer、LongBuffer、ShortBuffer以及MappedByteBuffer,即除了boolean类型外的所有原始类型的缓冲区实现。

4个属性:mark <= position <= limit <= capacity

缓冲区的公共特性实现都在Buffer中提供。缓冲区有下面4个基本属性,也是理解缓冲区数据操作的关键:

public abstract class Buffer {
//各变量位置: mark <= position <= limit <= capacity
private int mark = -1;//标记位置
private int position = 0;//位置:下一个要读取或写入的元素索引
private int limit;//限制:第一个不应该读取或写入的元素索引
private int capacity;//容量:元素数量
}


各个数据类型的实现类使用数组存储相应类型的数据,如ByteBuffer:

public abstract class ByteBuffer extends Buffer {
final byte[] bb;//堆缓冲数据
}


缓冲区的数据都是以数组形式存储的,读写的时候根据上面说的几个属性来确定当前的读写位置索引。



mark:用于标记位置,在移动索引的时候能快速恢复到之前标记的位置上。

position:当前索引位置,最大值为capacity-1。

limit:写模式下表示最多能写入多少数据,等于capacity;读模式下表示最多能读到多少数据,即从写模式切换到读模式时的position值。

capacity:容量,即这块内存块的容量

常用的几个关键方法实现如下:

//在当前位置作标记
public final Buffer mark() {
mark = position;
return this;
}

//复位到之前标记位置,如果未标记则抛出异常
public final Buffer reset() {
int m = mark;
if (m < 0) throw new InvalidMarkException();
position = m;
return this;
}

//清空缓冲区,可以看出只是让各个标志属性重新初始化,缓冲区中的数据没动
//下次读写直接覆盖,即简单又高效
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}

//最常用的方法,当buffer要从写模式切换到读模式时,就需要执行这个方法
//限制退到当前位置(读的时候只到限制位),位置及标记重置
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}

//位置和标记重置,进行重新读或写
public final Buffer rewind() {
position = 0;
mark = -1;
return this;
}

//是否已到限制位置
public final boolean hasRemaining() {
return position < limit;
}


数据分配

分配一个1024个字符的缓冲区,这个缓存区的容量(capacity)就是1024。

CharBuffer buf = CharBuffer.allocate(1024);


向Buffer中写数据的两种方式:

从channel写到Buffer:
channel.read(buffer)


通过put()方法写入数据:
buffer.put(110)


从Buffer读取数据的两种方式:

从Buffer读取数据到channel:
channel.write(buffer)


使用get()方法从Buffer读取数据:
buffer.get()


三、channel

之前已经简单介绍过,Channel就是传输数据的通道,可以从Channel把数据读到Buffer中,也可以将数据从Buffer写入到Channel中。

FileChannel(文件通道)

FileChannel作为文件通道,通过FileChannel向文件进行读写。FileChannel无法设置成非阻塞的,无法和selector配合使用,下面介绍selector时会说。

//FileChannel通过InputStream、OutputStream或RandomAccessFile来获取channel实例
void writeDemo() throws IOException {
//写文件
FileChannel fc = new FileOutputStream("/test.txt").getChannel();
fc.write(ByteBuffer.wrap("demo string".getBytes()));
fc.close();

//同时读写的文件
fc = new RandomAccessFile("/raf_test.txt", "rw").getChannel();
fc.position(fc.size()); //定位到文件末尾
fc.write(ByteBuffer.wrap("more".getBytes()));
channel.force(true);//操作系统写文件数据会先缓存在内存中,调用该方法将数据强制写到磁盘上
fc.close();
}

void readDemo() throws IOException {
//读文件
FileChannel fc = new FileInputStream(file).getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int bytesRead = fc.read(buffer);
while (bytesRead != -1) {   //返回-1表示到文件末尾
//todo read from Buffer
//...
bytesRead = fc.read(buffer);
}
fc.close();
}


DatagramChannel(UDP数据报文通道)

UDP的传输分为服务端和客户端,直接给出相应的例子代码

服务端

DatagramChannel channel = DatagramChannel.open();
channel.socket().bind(new InetSocketAddress(8888));

//receive
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.clear();
System.out.println("receiving");
SocketAddress clientAddress = channel.receive(buffer);//如果数据超过buffer容量,多出的将被丢弃

buffer.flip();
while (buffer.hasRemaining()) {
System.out.print((char) buffer.get());
}

//send
buffer.clear();
buffer.put("I'm server".getBytes());
buffer.flip();
channel.send(buffer, clientAddress);

channel.close();


客户端

DatagramChannel channel = DatagramChannel.open();
channel.connect(new InetSocketAddress("localhost", 8888));

//send
ByteBuffer buffer = ByteBuffer.wrap("hello I'm client".getBytes());
channel.write(buffer);
channel.close();


SocketChannel和ServerSocketChannel

SocketChannel是TCP套接字通道,ServerSocketChannel是监听TCP连接的通道。服务端两个需要一起使用。

服务端

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(8888));

SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
ByteBuffer buffer = ByteBuffer.allocate(30);
//receive
int ret = socketChannel.read(buffer);

if (ret == -1) {
socketChannel.close();
System.out.println("no data");
} else {
buffer.flip();
while (buffer.hasRemaining()) {
System.out.print((char) buffer.get());
}
System.out.println();
}

//send
buffer.clear();
buffer.put("i'm server".getBytes());
buffer.flip();
socketChannel.write(buffer);//注意!!!如果在非阻塞模式下,write方法可能在尚未写出任何内容时就返回了,需要在循环中调用write方法。

socketChannel.close();
}


客户端

SocketChannel channel = SocketChannel.open();
channel.connect(new InetSocketAddress("localhost", 8888));

ByteBuffer buffer = ByteBuffer.allocate(30);
buffer.put("hello I'm client".getBytes());

buffer.flip();
channel.write(buffer);

buffer.clear();
int ret = channel.read(buffer);
if (ret == -1) {    //到流的末尾
System.out.println("no data");
} else {
buffer.flip();
while (buffer.hasRemaining()) {
System.out.print((char) buffer.get());
}
System.out.println();
}

channel.close();


Channel支持Scatter/Gather

就是多个Buffer对同一个Channel的读取和写入,比如消息由消息头和消息体组成,使用这种方便就比较适合。

Scatter(分散):通过Channel读取数据同时写入到多个Buffer中。

ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body   = ByteBuffer.allocate(1024);

ByteBuffer[] bufferArray = { header, body };
channel.read(bufferArray);
//注意,在写入下一个Buffer前,前一个Buffer必须填满,也就是这种方式需要知道每个Buffer的长度


Gather(聚集):将多个Buffer同时写入Channel。

ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body   = ByteBuffer.allocate(1024);

ByteBuffer[] bufferArray = { header, body };
channel.write(bufferArray);
//注意,这里和读取不一样,Buffer中的position和limit之间的数据都会被写入


Channel之间的数据传输

如果两个Channel其中有一个是FileChannel,那么就可以使用FileChannel的方法来实现两个Channel之间的数据传输

FileChannel有两个API方法:

transferFrom

将目标channel的数据传输到FileChannel中。根据参数count传输指定的字节数,如果channel准备好的数据不足count只传输准备好的数据。

transferTo

将FileChannel中的数据传输到目标channel中。

Pipe

在Unix操作系统上,管道用来连接一个进程的输出和另一个进程的输入。Pipe和管道类似,可以连接一个地方的输出和另一个地方的输入。比如可以在多线程中线程之间的数据传输,使用Pipe可以拥有更好的封装性。

Pipe有两个通道:

sink通道:数据写入sink通道

source通道:数据从source通道读取

Pipe pipe = Pipe.open();    //打开管道

//sink通道
Pipe.SinkChannel sinkChannel = pipe.sink();
sinkChannel.write(ByteBuffer.wrap("test string".getBytes()));

//source通道
Pipe.SourceChannel sourceChannel = pipe.source();
ByteBuffer buf = ByteBuffer.allocate(1024);
int bytesRead = sourceChannel.read(buf);


四、Selector

在上面已经介绍过selector了,主要用途就是让单线程就能管理多个通道,从而管理多个网络连接。这样就可以减少线程数,节省在线程间上下文切换的开销了。上述的4种channel中,FileChannel不能切换到非阻塞模式,因此不能与Selector一起使用,

Selector的使用

Selector selector = Selector.open();    //Selector的创建
channel.configureBlocking(flase);       //channel必须是非阻塞的才能用Selector
channel.register(selector, Selectionkey.OP_ACCEPT); //向selector注册channel和事件


如果不止对一种事件感兴趣,可以用“或”将事件连起来注册:

int interest = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
//共有4种事件,可以看SelectionKey枚举中的值


选择通道

Selector的3种选择通道方式:

select.select();        //阻塞到至少有一个通道在注册的事件上就绪
select.select(long timeout);    //和上面一样,加上了最长阻塞时间
select.selectNow();     //不阻塞,直接返回
//返回的int值表示自上一次调用select()后又有多少通道变成就绪状态


遍历已选择集

选择操作后发现有通道变成就绪状态,就可以获取”已选择键集”,并进行遍历获取channel里的数据:

Iterator keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
//ServerSocketChannel通道接收就绪
} else if (key.isConnectable()) {
//与远端服务器的连接就绪
} else if (key.isReadable()) {
//通道的读就绪
//获取通道,并转化成需要的类型。如:
SocketChannel client = (SocketChannel) key.channel();
} else if (key.isWritable()) {
//通道的写就绪
}
//需要手动remove,下次通道就绪时,会再次放入已选择健集中
keyIterator.remove();
}


wakeup

上面说过使用
select()
方法会使线程进入阻塞,如果没有通道就绪也想让
select()
返回怎么办呢。这时可以让其他线程调用这个select对象的
wakeup()
方法,就可以让阻塞的线程立即返回了。

如果调用了
wakeup()
方法,但没有线程阻塞在
select()
方法上,那下个调用select()方法的线程就会立即返回。

selector在TCP和UDP上的完整例子

TCP

服务端

ServerSocketChannel channel = ServerSocketChannel.open();//获取ServerSocket通道
channel.socket().bind(new InetSocketAddress(8888));
channel.configureBlocking(false);//设置为非阻塞

Selector selector = Selector.open();//获得通道的多路复用器(通道管理器)

channel.register(selector, SelectionKey.OP_ACCEPT);//将channel和selector绑定,并为该channel注册事件
while (true) {
int readChannels = selector.select();//注册的通道事件准备就绪,阻塞
if (readChannels == 0) {
continue;
}

Iterator keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
SelectionKey key = (SelectionKey) keyIterator.next();
keyIterator.remove();

if (key.isAcceptable()) {   //Accept
SocketChannel client = channel.accept();//获得客户端连接的通道
client.configureBlocking(false);
client.register(key.selector(), SelectionKey.OP_READ);//为了接收客户端的信息,将通道注册为读事件
} else if (key.isConnectable()) {
System.out.println("connect");
} else if (key.isReadable()) {  //Read
SocketChannel client = (SocketChannel) key.channel();

ByteBuffer buffer = ByteBuffer.allocate(1024);
//receive
int ret = client.read(buffer);
if (ret == -1) {
System.out.println("no data");
client.close();
} else {
System.out.print("receive client message:");
buffer.flip();
while (buffer.hasRemaining())
System.out.print((char) buffer.get());
System.out.println();

//send
buffer.clear();
buffer.put("i'm server".getBytes());
buffer.flip();
client.write(buffer);
}
client.close();
} else if (key.isWritable()) {
System.out.println("write");
}
}
}


客户端

SocketChannel channel = SocketChannel.open();//获取Socket通道
channel.configureBlocking(false);//设为非阻塞
channel.connect(new InetSocketAddress("localhost", 8888));//在获取注册的事件后,channel.finishConnect()才能完成连接

Selector selector = Selector.open();//获得通道的多路复用器(通道管理器)
channel.register(selector, SelectionKey.OP_CONNECT);

boolean done = false, write = false;
while (!done) {
int readChannels = selector.select(200);//非阻塞,通过上面的while轮询
if (readChannels == 0) {
System.out.println("continue");
continue;
}

Iterator keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
SelectionKey key = (SelectionKey) keyIterator.next();
keyIterator.remove();

if (key.isConnectable() && !channel.isConnected()) {
SocketChannel c = (SocketChannel) key.channel();
if (c.isConnectionPending()) c.finishConnect();

c.register(selector, SelectionKey.OP_WRITE);//注册为写事件,准备向服务端发送数据
} else if (key.isWritable() && !write) {
SocketChannel c = (SocketChannel) key.channel();

ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("Hi, I'm client".getBytes());
buffer.flip();
c.write(buffer);
write = true;

c.register(selector, SelectionKey.OP_READ);//注册为读事件,准备从服务端接收数据
} else if (key.isReadable() && write) {
SocketChannel c = (SocketChannel) key.channel();

ByteBuffer buffer = ByteBuffer.allocate(1024);
int ret = c.read(buffer);
if (ret == -1) {
System.out.println("no data");
} else {
buffer.flip();
while (buffer.hasRemaining())
System.out.print((char) buffer.get());
System.out.println();
}
c.close();
done = true;
}
}
}


UDP

服务端

Selector selector = Selector.open();

DatagramChannel channel = DatagramChannel.open();
channel.socket().bind(new InetSocketAddress(8888));
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);

while (true) {
int count = selector.select();
if (count == 0) {
System.out.println("select zero");
continue;
}

Iterator iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = (SelectionKey) iterator.next();
iterator.remove();

if (key.isReadable()) {
channel = (DatagramChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketAddress clientAddress = channel.receive(buffer);
buffer.flip();

while (buffer.hasRemaining())
System.out.print((char) buffer.get());
System.out.println();

buffer.clear();
buffer.put("i'm server".getBytes());
buffer.flip();
channel.send(buffer, clientAddress);
}
}
}


客户端

Selector selector = Selector.open();

DatagramChannel channel = DatagramChannel.open();
channel.connect(new InetSocketAddress("localhost", 8888));
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ);

boolean done = false, write = false;
while (!done) {
int count = selector.select(200);
if (count == 0) {
System.out.println("select zero");
return;
}

Iterator iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = (SelectionKey) iterator.next();
iterator.remove();

if (key.isWritable() && !write) {
channel = (DatagramChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("I'm client".getBytes());
buffer.flip();
channel.write(buffer);
write = true;
}
if (key.isReadable() && write) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
int ret = channel.read(buffer);
if (ret == -1) {
System.out.println("server no data");
} else {
buffer.flip();
while (buffer.hasRemaining())
System.out.print((char) buffer.get());
System.out.println();
}
channel.close();
done = true;
}
}
}


总结

NIO是面向缓存区的,而老的IO是面向流的。NIO使用选择器的方式在高并发的场景下能很大的提高性能。但并不是所有场景都适合使用NIO,在非高并发的场景下,并不一定适合使用NIO。如:

客户端应用

连接数小于1000

并发程度不高

局域网环境

常见的NIO的网络框架有:Netty、Mina、Cindy等。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: