Java NIO完整总结
2017-03-27 20:41
141 查看
一、概述
Java NIO从JDK1.4开始出现,它只有一个目标:提高IO效率。老的IO包其实也都用NIO重新实现过,也可以利用NIO的一些特性提高效率,即使不写NIO的代码,平时使用IO也都用到了NIO的技术,只是隐藏到了底层的实现中。NIO的高效率来自于类似操作系统IO处理的结构方式,也是NIO的核心部分:
channel(通道)
buffer(缓冲区)
selector(选择器)
IO与NIO
IO的流(stream)是阻塞的,当读或写的时候,线程被阻塞,这样就什么也干不了了。通常需要使用多线程来处理IO操作,当线程多了之后,阻塞的时候系统会频繁的上下文切换,那这些切换所消耗的资源都是没意义的,浪费了系统的资源。而NIO可以实现非阻塞IO模式,解决上面所导致的问题,这也是在大并发下常使用NIO来处理程序的原因。比如Tomcat的NIO模式,能让web服务器的效率提升很多。Channel和Buffer
平时数据读写都是以流(Stream)的形式,而在NIO中是以缓冲区(Buffer)的形式来装载数据。读取数据传输的过程中数据存在Buffer中,而Buffer是通过Channel来传输的。 可以用经典的挖矿例子来理解:假设我们的数据是矿石,矿石怎么运输呢?需要通过轨道(Channel)和矿车(Buffer)。现在我们有一堆矿石要从A地点运到B地点,那么首先要在A点往矿车里装好矿石,矿车通过轨道到达B点,在B点将矿石从矿车上卸下。NIO的数据传输就是这么一个流程。Channel(通道)又分为四种:
FileChannel——文件通道
DatagramChannel——UDP数据报通道
SocketChannel——TCP套接字通道
ServerSocketChannel——监听TCP连接的通道
而Buffer有以下几种实现:
ByteBuffer
CharBuffer
DoubleBuffer
FloatBuffer
IntBuffer
LongBuffer
ShortBuffer
MappedByteBuffer
Selector
官方文档中解释为Selector是SelectableChannel的多路复用器(FileChannel不是SelectableChannel,它是阻塞的),简单点理解就是channel管理器。
在使用的时候,可以将N个channel以及相应的事件(如读、写、连接等)注册到selector上。通过selector来控制判断应该用哪个channel,做哪个操作。 Selector能监控和检测N个Channel,知晓哪些通道准备好了读或写的操作,这样我们就能直接让Selector来管理Channel。使用Selector的好处是可以让一个线程管理多个通道,如大并发导致线程过多的问题就能得到很好的解决。
二、Buffer
Buffer是缓冲区类,在官方文档中是这么描述的:一个用于特定基本类型数据的容器。也可以理解为特定类型的缓冲区。Buffer用于channel(通道)之间数据交互,所以需要支持读和写的操作。
使用Buffer一般分为以下几步:
将数据写入Buffer缓存中
调用flip()方法,从写模式切换到读模式
从Buffer中读取数据
调用clear()或compact()方法清除缓存中数据
下面是一个使用ByteBuffer的例子:
ByteBuffer buffer = ByteBuffer.allocate(1024); //从channel读取数据,写入到Buffer中(channel放到后面再说) int bytesRead = channel.read(buffer); while(bytesRead != -1) { buffer.flip();//从写模式切换到读模式 while(buffer.hasRemaining()) { System.out.print((char)buffer.get()); } buffer.clear();//清空缓存,回到可写模式 bytesRead = channel.read(buffer); }
缓冲区就是一块内存,可以写入数据,然后在读取数据。Java使用Buffer对象包装这块内存,来访问这块缓冲区。
Buffer是一个抽象类,实现的子类有ByteBuffer、CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer、LongBuffer、ShortBuffer以及MappedByteBuffer,即除了boolean类型外的所有原始类型的缓冲区实现。
4个属性:mark <= position <= limit <= capacity
缓冲区的公共特性实现都在Buffer中提供。缓冲区有下面4个基本属性,也是理解缓冲区数据操作的关键:public abstract class Buffer { //各变量位置: mark <= position <= limit <= capacity private int mark = -1;//标记位置 private int position = 0;//位置:下一个要读取或写入的元素索引 private int limit;//限制:第一个不应该读取或写入的元素索引 private int capacity;//容量:元素数量 }
各个数据类型的实现类使用数组存储相应类型的数据,如ByteBuffer:
public abstract class ByteBuffer extends Buffer { final byte[] bb;//堆缓冲数据 }
缓冲区的数据都是以数组形式存储的,读写的时候根据上面说的几个属性来确定当前的读写位置索引。
mark:用于标记位置,在移动索引的时候能快速恢复到之前标记的位置上。
position:当前索引位置,最大值为capacity-1。
limit:写模式下表示最多能写入多少数据,等于capacity;读模式下表示最多能读到多少数据,即从写模式切换到读模式时的position值。
capacity:容量,即这块内存块的容量
常用的几个关键方法实现如下:
//在当前位置作标记 public final Buffer mark() { mark = position; return this; } //复位到之前标记位置,如果未标记则抛出异常 public final Buffer reset() { int m = mark; if (m < 0) throw new InvalidMarkException(); position = m; return this; } //清空缓冲区,可以看出只是让各个标志属性重新初始化,缓冲区中的数据没动 //下次读写直接覆盖,即简单又高效 public final Buffer clear() { position = 0; limit = capacity; mark = -1; return this; } //最常用的方法,当buffer要从写模式切换到读模式时,就需要执行这个方法 //限制退到当前位置(读的时候只到限制位),位置及标记重置 public final Buffer flip() { limit = position; position = 0; mark = -1; return this; } //位置和标记重置,进行重新读或写 public final Buffer rewind() { position = 0; mark = -1; return this; } //是否已到限制位置 public final boolean hasRemaining() { return position < limit; }
数据分配
分配一个1024个字符的缓冲区,这个缓存区的容量(capacity)就是1024。CharBuffer buf = CharBuffer.allocate(1024);
向Buffer中写数据的两种方式:
从channel写到Buffer:
channel.read(buffer)
通过put()方法写入数据:
buffer.put(110)
从Buffer读取数据的两种方式:
从Buffer读取数据到channel:
channel.write(buffer)
使用get()方法从Buffer读取数据:
buffer.get()
三、channel
之前已经简单介绍过,Channel就是传输数据的通道,可以从Channel把数据读到Buffer中,也可以将数据从Buffer写入到Channel中。FileChannel(文件通道)
FileChannel作为文件通道,通过FileChannel向文件进行读写。FileChannel无法设置成非阻塞的,无法和selector配合使用,下面介绍selector时会说。//FileChannel通过InputStream、OutputStream或RandomAccessFile来获取channel实例 void writeDemo() throws IOException { //写文件 FileChannel fc = new FileOutputStream("/test.txt").getChannel(); fc.write(ByteBuffer.wrap("demo string".getBytes())); fc.close(); //同时读写的文件 fc = new RandomAccessFile("/raf_test.txt", "rw").getChannel(); fc.position(fc.size()); //定位到文件末尾 fc.write(ByteBuffer.wrap("more".getBytes())); channel.force(true);//操作系统写文件数据会先缓存在内存中,调用该方法将数据强制写到磁盘上 fc.close(); } void readDemo() throws IOException { //读文件 FileChannel fc = new FileInputStream(file).getChannel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int bytesRead = fc.read(buffer); while (bytesRead != -1) { //返回-1表示到文件末尾 //todo read from Buffer //... bytesRead = fc.read(buffer); } fc.close(); }
DatagramChannel(UDP数据报文通道)
UDP的传输分为服务端和客户端,直接给出相应的例子代码服务端
DatagramChannel channel = DatagramChannel.open(); channel.socket().bind(new InetSocketAddress(8888)); //receive ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.clear(); System.out.println("receiving"); SocketAddress clientAddress = channel.receive(buffer);//如果数据超过buffer容量,多出的将被丢弃 buffer.flip(); while (buffer.hasRemaining()) { System.out.print((char) buffer.get()); } //send buffer.clear(); buffer.put("I'm server".getBytes()); buffer.flip(); channel.send(buffer, clientAddress); channel.close();
客户端
DatagramChannel channel = DatagramChannel.open(); channel.connect(new InetSocketAddress("localhost", 8888)); //send ByteBuffer buffer = ByteBuffer.wrap("hello I'm client".getBytes()); channel.write(buffer); channel.close();
SocketChannel和ServerSocketChannel
SocketChannel是TCP套接字通道,ServerSocketChannel是监听TCP连接的通道。服务端两个需要一起使用。服务端
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(8888)); SocketChannel socketChannel = serverSocketChannel.accept(); if (socketChannel != null) { ByteBuffer buffer = ByteBuffer.allocate(30); //receive int ret = socketChannel.read(buffer); if (ret == -1) { socketChannel.close(); System.out.println("no data"); } else { buffer.flip(); while (buffer.hasRemaining()) { System.out.print((char) buffer.get()); } System.out.println(); } //send buffer.clear(); buffer.put("i'm server".getBytes()); buffer.flip(); socketChannel.write(buffer);//注意!!!如果在非阻塞模式下,write方法可能在尚未写出任何内容时就返回了,需要在循环中调用write方法。 socketChannel.close(); }
客户端
SocketChannel channel = SocketChannel.open(); channel.connect(new InetSocketAddress("localhost", 8888)); ByteBuffer buffer = ByteBuffer.allocate(30); buffer.put("hello I'm client".getBytes()); buffer.flip(); channel.write(buffer); buffer.clear(); int ret = channel.read(buffer); if (ret == -1) { //到流的末尾 System.out.println("no data"); } else { buffer.flip(); while (buffer.hasRemaining()) { System.out.print((char) buffer.get()); } System.out.println(); } channel.close();
Channel支持Scatter/Gather
就是多个Buffer对同一个Channel的读取和写入,比如消息由消息头和消息体组成,使用这种方便就比较适合。Scatter(分散):通过Channel读取数据同时写入到多个Buffer中。
ByteBuffer header = ByteBuffer.allocate(128); ByteBuffer body = ByteBuffer.allocate(1024); ByteBuffer[] bufferArray = { header, body }; channel.read(bufferArray); //注意,在写入下一个Buffer前,前一个Buffer必须填满,也就是这种方式需要知道每个Buffer的长度
Gather(聚集):将多个Buffer同时写入Channel。
ByteBuffer header = ByteBuffer.allocate(128); ByteBuffer body = ByteBuffer.allocate(1024); ByteBuffer[] bufferArray = { header, body }; channel.write(bufferArray); //注意,这里和读取不一样,Buffer中的position和limit之间的数据都会被写入
Channel之间的数据传输
如果两个Channel其中有一个是FileChannel,那么就可以使用FileChannel的方法来实现两个Channel之间的数据传输FileChannel有两个API方法:
transferFrom
将目标channel的数据传输到FileChannel中。根据参数count传输指定的字节数,如果channel准备好的数据不足count只传输准备好的数据。
transferTo
将FileChannel中的数据传输到目标channel中。
Pipe
在Unix操作系统上,管道用来连接一个进程的输出和另一个进程的输入。Pipe和管道类似,可以连接一个地方的输出和另一个地方的输入。比如可以在多线程中线程之间的数据传输,使用Pipe可以拥有更好的封装性。Pipe有两个通道:
sink通道:数据写入sink通道
source通道:数据从source通道读取
Pipe pipe = Pipe.open(); //打开管道 //sink通道 Pipe.SinkChannel sinkChannel = pipe.sink(); sinkChannel.write(ByteBuffer.wrap("test string".getBytes())); //source通道 Pipe.SourceChannel sourceChannel = pipe.source(); ByteBuffer buf = ByteBuffer.allocate(1024); int bytesRead = sourceChannel.read(buf);
四、Selector
在上面已经介绍过selector了,主要用途就是让单线程就能管理多个通道,从而管理多个网络连接。这样就可以减少线程数,节省在线程间上下文切换的开销了。上述的4种channel中,FileChannel不能切换到非阻塞模式,因此不能与Selector一起使用,Selector的使用
Selector selector = Selector.open(); //Selector的创建 channel.configureBlocking(flase); //channel必须是非阻塞的才能用Selector channel.register(selector, Selectionkey.OP_ACCEPT); //向selector注册channel和事件
如果不止对一种事件感兴趣,可以用“或”将事件连起来注册:
int interest = SelectionKey.OP_READ | SelectionKey.OP_WRITE; //共有4种事件,可以看SelectionKey枚举中的值
选择通道
Selector的3种选择通道方式:select.select(); //阻塞到至少有一个通道在注册的事件上就绪 select.select(long timeout); //和上面一样,加上了最长阻塞时间 select.selectNow(); //不阻塞,直接返回 //返回的int值表示自上一次调用select()后又有多少通道变成就绪状态
遍历已选择集
选择操作后发现有通道变成就绪状态,就可以获取”已选择键集”,并进行遍历获取channel里的数据:Iterator keyIterator = selector.selectedKeys().iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if(key.isAcceptable()) { //ServerSocketChannel通道接收就绪 } else if (key.isConnectable()) { //与远端服务器的连接就绪 } else if (key.isReadable()) { //通道的读就绪 //获取通道,并转化成需要的类型。如: SocketChannel client = (SocketChannel) key.channel(); } else if (key.isWritable()) { //通道的写就绪 } //需要手动remove,下次通道就绪时,会再次放入已选择健集中 keyIterator.remove(); }
wakeup
上面说过使用select()方法会使线程进入阻塞,如果没有通道就绪也想让
select()返回怎么办呢。这时可以让其他线程调用这个select对象的
wakeup()方法,就可以让阻塞的线程立即返回了。
如果调用了
wakeup()方法,但没有线程阻塞在
select()方法上,那下个调用select()方法的线程就会立即返回。
selector在TCP和UDP上的完整例子
TCP
服务端ServerSocketChannel channel = ServerSocketChannel.open();//获取ServerSocket通道 channel.socket().bind(new InetSocketAddress(8888)); channel.configureBlocking(false);//设置为非阻塞 Selector selector = Selector.open();//获得通道的多路复用器(通道管理器) channel.register(selector, SelectionKey.OP_ACCEPT);//将channel和selector绑定,并为该channel注册事件 while (true) { int readChannels = selector.select();//注册的通道事件准备就绪,阻塞 if (readChannels == 0) { continue; } Iterator keyIterator = selector.selectedKeys().iterator(); while (keyIterator.hasNext()) { SelectionKey key = (SelectionKey) keyIterator.next(); keyIterator.remove(); if (key.isAcceptable()) { //Accept SocketChannel client = channel.accept();//获得客户端连接的通道 client.configureBlocking(false); client.register(key.selector(), SelectionKey.OP_READ);//为了接收客户端的信息,将通道注册为读事件 } else if (key.isConnectable()) { System.out.println("connect"); } else if (key.isReadable()) { //Read SocketChannel client = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); //receive int ret = client.read(buffer); if (ret == -1) { System.out.println("no data"); client.close(); } else { System.out.print("receive client message:"); buffer.flip(); while (buffer.hasRemaining()) System.out.print((char) buffer.get()); System.out.println(); //send buffer.clear(); buffer.put("i'm server".getBytes()); buffer.flip(); client.write(buffer); } client.close(); } else if (key.isWritable()) { System.out.println("write"); } } }
客户端
SocketChannel channel = SocketChannel.open();//获取Socket通道 channel.configureBlocking(false);//设为非阻塞 channel.connect(new InetSocketAddress("localhost", 8888));//在获取注册的事件后,channel.finishConnect()才能完成连接 Selector selector = Selector.open();//获得通道的多路复用器(通道管理器) channel.register(selector, SelectionKey.OP_CONNECT); boolean done = false, write = false; while (!done) { int readChannels = selector.select(200);//非阻塞,通过上面的while轮询 if (readChannels == 0) { System.out.println("continue"); continue; } Iterator keyIterator = selector.selectedKeys().iterator(); while (keyIterator.hasNext()) { SelectionKey key = (SelectionKey) keyIterator.next(); keyIterator.remove(); if (key.isConnectable() && !channel.isConnected()) { SocketChannel c = (SocketChannel) key.channel(); if (c.isConnectionPending()) c.finishConnect(); c.register(selector, SelectionKey.OP_WRITE);//注册为写事件,准备向服务端发送数据 } else if (key.isWritable() && !write) { SocketChannel c = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.put("Hi, I'm client".getBytes()); buffer.flip(); c.write(buffer); write = true; c.register(selector, SelectionKey.OP_READ);//注册为读事件,准备从服务端接收数据 } else if (key.isReadable() && write) { SocketChannel c = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int ret = c.read(buffer); if (ret == -1) { System.out.println("no data"); } else { buffer.flip(); while (buffer.hasRemaining()) System.out.print((char) buffer.get()); System.out.println(); } c.close(); done = true; } } }
UDP
服务端Selector selector = Selector.open(); DatagramChannel channel = DatagramChannel.open(); channel.socket().bind(new InetSocketAddress(8888)); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ); while (true) { int count = selector.select(); if (count == 0) { System.out.println("select zero"); continue; } Iterator iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = (SelectionKey) iterator.next(); iterator.remove(); if (key.isReadable()) { channel = (DatagramChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); SocketAddress clientAddress = channel.receive(buffer); buffer.flip(); while (buffer.hasRemaining()) System.out.print((char) buffer.get()); System.out.println(); buffer.clear(); buffer.put("i'm server".getBytes()); buffer.flip(); channel.send(buffer, clientAddress); } } }
客户端
Selector selector = Selector.open(); DatagramChannel channel = DatagramChannel.open(); channel.connect(new InetSocketAddress("localhost", 8888)); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ); boolean done = false, write = false; while (!done) { int count = selector.select(200); if (count == 0) { System.out.println("select zero"); return; } Iterator iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = (SelectionKey) iterator.next(); iterator.remove(); if (key.isWritable() && !write) { channel = (DatagramChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.put("I'm client".getBytes()); buffer.flip(); channel.write(buffer); write = true; } if (key.isReadable() && write) { ByteBuffer buffer = ByteBuffer.allocate(1024); int ret = channel.read(buffer); if (ret == -1) { System.out.println("server no data"); } else { buffer.flip(); while (buffer.hasRemaining()) System.out.print((char) buffer.get()); System.out.println(); } channel.close(); done = true; } } }
总结
NIO是面向缓存区的,而老的IO是面向流的。NIO使用选择器的方式在高并发的场景下能很大的提高性能。但并不是所有场景都适合使用NIO,在非高并发的场景下,并不一定适合使用NIO。如:客户端应用
连接数小于1000
并发程度不高
局域网环境
常见的NIO的网络框架有:Netty、Mina、Cindy等。
相关文章推荐
- UBB完整实例(使用后的总结!!!)
- 对基于qmail的smtp用户验证的总结和完整安装设置方法
- [zz]完整的ubuntu镜像源/本地源/更新源/离线升级包!制作总结!
- vim常见使用命令总结完整分享(一)
- document.all的一个比较完整的总结及案例
- vim常见使用命令总结完整分享(二)
- 取偏移地址指令总结(不完整版)0
- JAVA NIO总结(四)—网络和异步IO
- 说说java NIO的一些个人总结
- 对基于qmail的smtp用户验证的总结和完整安装设置方法
- [推荐] 新手搭建PHP环境必备知识:windows下PHP5+APACHE+MYSQ完整配置(个人总结)
- Jsp+Dao+Jdbc+Servelet 用户完整登录总结
- 呜,本科基础知识点总结。[完整了]
- linux系列服务总结之四:SAMBA共享设置完整介绍
- 理解杀毒软件的意义(完整---另加补充和总结)
- seo下49个影响网站排名的因素完整总结版
- printf的格式控制的完整格式总结
- 第一个完整Andorid项目总结
- jQuery学习大总结(二)jQuery选择器完整介绍
- document.all的一个比较完整的总结及案例