您的位置:首页 > 其它

NIO学习笔记——通道(channel)详解

2017-06-25 00:50 453 查看
通道可以形象地比喻为银行出纳窗口使用的气动导管。您的薪水支票就是您要传送的信息,载体(Carrier)就好比一个缓冲区。您先填充缓冲区(将您的支票放到载体上),接着将缓冲“写”到通道中(将载体丢进导管中),然后信息负载就被传递到通道另一侧的 I/O 服务(银行出纳员)。该过程的回应是:出纳员填充缓冲区(将您的收据放到载体上),接着开始一个反方向的通道传输(将载体丢回到导管中)。载体就到了通道的您这一侧(一个填满了的缓冲区正等待您的查验),然后您就会 flip缓冲区(打开盖子)并将它清空(移除您的收据)。现在您可以开车走了,下一个对象(银行客户)将使用同样的载体(Buffer)和导管(Channel)对象来重复上述过程。

从 Channel 接口引申出的其他接口都是面向字节的子接口,包括 Writable ByteChannel和ReadableByteChannel,并且通道只能在字节缓冲区上操作,操作系统都是以字节的形式实现底层I/O接口的。

打开通道

I/O可以分为广义的两大类别:File I/O 和 Stream I/O。那么相应地有两种类型的通道也就不足为怪了,它们是文件(file)通道和套接字(socket)通道,如下图所示



通道可以以多种方式创建。Socket通道有可以直接创建新socket通道的工厂方法。但是一个FileChannel 对象却只能通过在一个打开的 RandomAccessFile、FileInputStream 或 FileOutputStream对象上调用 getChannel( )方法来获取。您不能直接创建一个 FileChannel 对象。

三种打开通道方式代码如下:

/**
* 演示打开通道的三种方式
* fuyuwei
* 2017年6月22日 下午9:38:00
*/
public void openSocket(){
try {
// 1、打开一个套接字通道
SocketChannel sc = SocketChannel.open();
// 根据主机名和端口号创建套接字地址
InetSocketAddress socketAddress = new InetSocketAddress("192.168.1.102",8080);
// 连接套接字
sc.connect(socketAddress);

// 2、打开一个server-socket通道
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(8080));

// 3、打开一个datagram通道
DatagramChannel dc = DatagramChannel.open();
RandomAccessFile raf = new RandomAccessFile("/usr/local/swk/dump.txt", "r");
FileChannel fc = raf.getChannel();

} catch (IOException e) {
e.printStackTrace();
}

}


使用通道

通道将数据传输给 ByteBuffer 对象或者从 ByteBuffer 对象获取数据进行传输

通道可以是单向( unidirectional)或者双向的( bidirectional)。一个 channel 类可能实现定义read( )方法的 ReadableByteChannel 接口,而另一个 channel 类也许实现 WritableByteChannel 接口以提供 write( )方法。实现这两种接口其中之一的类都是单向的,只能在一个方向上传输数据。如果一个类同时实现这两个接口,那么它是双向的,可以双向传输数据。

我们知道,一个文件可以在不同的时候以不同的权限打开。从 FileInputStream 对象的getChannel( )方法获取的 FileChannel 对象是只读的,不过从接口声明的角度来看却是双向的,因为FileChannel 实现 ByteChannel 接口。在这样一个通道上调用 write( )方法将抛出未经检查的NonWritableChannelException 异常,因为 FileInputStream 对象总是以 read-only 的权限打开文件。

通道会连接一个特定 I/O 服务且通道实例( channel instance)的性能受它所连接的 I/O 服务的特征限制,记住这很重要。一个连接到只读文件的 Channel 实例不能进行写操作,即使该实例所属的类可能有 write( )方法。

ByteChannel 的 read( ) 和 write( )方法使用 ByteBuffer 对象作为参数。两种方法均返回已传输的字节数,可能比缓冲区的字节数少甚至可能为零。缓冲区的位置也会发生与已传输字节相同数量的前移。如果只进行了部分传输,缓冲区可以被重新提交给通道并从上次中断的地方继续传输。该过程重复进行直到缓冲区的 hasRemaining( )方法返回 false 值。如下代码我们演示如何从一个通道复制数据到另一个通道。

public void copyChannel(){
ReadableByteChannel source = Channels.newChannel(System.in);
WritableByteChannel dest = Channels.newChannel(System.out);
channelCopy1(source,dest);
try {
source.close();
dest.close();
} catch (IOException e) {
e.printStackTrace();
}

}

private void channelCopy1(ReadableByteChannel src,
WritableByteChannel dest) {
// 分配一个新的直接字节缓冲区
ByteBuffer buffer = ByteBuffer.allocateDirect(16*1024);
try {
while(src.read(buffer) != -1){
// 读转变成写模式
buffer.flip();
dest.write(buffer);
buffer.compact();
}
// 确保缓冲区完全排干
while (buffer.hasRemaining( )) {
dest.write (buffer);
}
} catch (IOException e) {
e.printStackTrace();
}
}

public void channelCopy2(ReadableByteChannel src,
WritableByteChannel dest) {
ByteBuffer buffer = ByteBuffer.allocateDirect (16 * 1024);
try {
while (src.read(buffer) != -1) {
// Prepare the buffer to be drained
buffer.flip();
// Make sure that the buffer was fully drained
while (buffer.hasRemaining()) {
dest.write(buffer);
}
// Make the buffer empty, ready for filling
buffer.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
}


通道可以以阻塞( blocking)或非阻塞( nonblocking)模式运行。非阻塞模式的通道永远不会让调用的线程休眠。请求的操作要么立即完成,要么返回一个结果表明未进行任何操作。只有面向流的( stream-oriented)的通道,如 sockets 和 pipes 才能使用非阻塞模式。

关闭通道

与缓冲区不同,通道不能被重复使用。一个打开的通道即代表与一个特定 I/O 服务的特定连接并封装该连接的状态。当通道关闭时,那个连接会丢失,然后通道将不再连接任何东西。

调用通道的close( )方法时,可能会导致在通道关闭底层I/O服务的过程中线程暂时阻塞 7,哪怕该通道处于非阻塞模式。通道关闭时的阻塞行为(如果有的话)是高度取决于操作系统或者文件系统的。在一个通道上多次调用close( )方法是没有坏处的,但是如果第一个线程在close( )方法中阻塞,那么在它完成关闭通道之前,任何其他调用close( )方法都会阻塞。后续在该已关闭的通道上调用close( )不会产生任何操作,只会立即返回。

可以通过 isOpen( )方法来测试通道的开放状态。如果返回 true 值,那么该通道可以使用。如果返回 false 值,那么该通道已关闭,不能再被使用。尝试进行任何需要通道处于开放状态作为前提的操作,如读、写等都会导致 ClosedChannelException 异常。

如果一个通道实现 InterruptibleChannel 接口它的行为以下述语义为准:如果一个线程在一个通道上被阻塞并且同时被中断,那么该通道将被关闭,该被阻塞线程也会产生一个 ClosedByInterruptException 异常。此外,假如一个线程的 interrupt status 被设置并且该线程试图访问一个通道,那么这个通道将立即被关闭,同时将抛出相同的 ClosedByInterruptException 异常。线程的 interrupt status 在线程的interrupt( )方法被调用时会被设置。我们可以使用 isInterrupted( )来测试某个线程当前的 interruptstatus。当前线程的 interrupt status 可以通过调用静态的 Thread.interrupted( )方法清除。

Scatter/Gather(分散/收集)

scatter:对于 read 操作而言,从通道读取的数据会按顺序被散布(称为 scatter)到多个缓冲区,将每个缓冲区填满直至通道中的数63据或者缓冲区的最大空间被消耗完。

gather:对于一个 write 操作而言,数据是从几个缓冲区按顺序抽取(称为 gather)并沿着通道发送的。缓冲区本身并不需要具备这种 gather 的能力(通常它们也没有此能力)。该 gather 过程的效果就好比全部缓冲区的内容被连结起来,并在发送数据前存放到一个大的缓冲区中。

scatter / gather经常用于需要将传输的数据分开处理的场合,例如传输一个由消息头和消息体组成的消息,你可能会将消息体和消息头分散到不同的buffer中,这样你可以方便的处理消息头和消息体。

gather代码例如:

ByteBuffer header = ByteBuffer.allocateDirect (10);
ByteBuffer body = ByteBuffer.allocateDirect (80);
ByteBuffer [] buffers = { header, body };
int bytesRead = channel.read (buffers);


scatter代码例如:

ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
//write data into buffers
ByteBuffer[] bufferArray = {header, body};
channel.write(bufferArray);


使用得当的话, Scatter/Gather 会是一个极其强大的工具。它允许您委托操作系统来完成辛苦活:将读取到的数据分开存放到多个存储桶( bucket)或者将不同的数据区块合并成一个整体。这是一个巨大的成就,因为操作系统已经被高度优化来完成此类工作了。它节省了您来回移动数据的工作,也就避免了缓冲区拷贝和减少了您需要编写、调试的代码数量。既然您基本上通过提供数据容器引用来组合数据,那么按照不同的组合构建多个缓冲区阵列引用,各种数据区块就可以以不同的方式来组合了

文件通道

FileChannel 类可以实现常用的 read, write 以及 scatter/gather 操作,同时它也提供了很多专用于文件的新方法。

FileChannel对象不能直接创建。一个FileChannel实例只能通过在一个打开的file对象( RandomAccessFile、 FileInputStream或 FileOutputStream)上调用getChannel( )方法获取。调用getChannel( )方法会返回一个连接到相同文件的FileChannel对象且该FileChannel对象具有与file对象相同的访问权限,然后您就可以使用该通道对象来利用强大的FileChannel API了

看个例子,如下:

public static void main(String[] argv) throws Exception {
RandomAccessFile raf = new RandomAccessFile("/usr/local/swk/dump.txt","rw");
FileChannel fc = raf.getChannel();
ByteBuffer bf = ByteBuffer.allocate(4);
int byteReads = fc.read(bf);
while(byteReads !=-1){
// 转为读模式
bf.flip();
while(bf.hasRemaining()){
System.out.println(bf.getChar());
}
bf.clear();
byteReads = fc.read(bf);
}
raf.close();
}


FileChannel 对象是线程安全( thread-safe)的。如果有一个线程已经在执行会影响通道位置或文件大小的操作,那么其他尝试进行此类操作之一的线程必须等待。

文件锁定

如果一个线程在某个文件上获得了一个独占锁,然后第二个线程利用一个单独打开的通道来请求该文件的独占锁,那么第二个线程的请求会被批准。但如果这两个线程运行在不同的 Java 虚拟机上,那么第二个线程会阻塞,因为锁最终是由操作系统或文件系统来判优的并且几乎总是在进程级而非线程级上判优。锁都是与一个文件关联的,而不是与单个的文件句柄或通道关联。

一个 FileLock 对象创建之后即有效,直到它的 release( )方法被调用或它所关联的通道被关闭或Java 虚拟机关闭时才会失效,可以通过调用 isShared( )方法来测试一个锁以判断它是共享的还是独占的。

我们来看一个writer和两个或更多的 readers 开始,我们来看下不同类型的锁是如何交互的代码实例

public class LockTest {

private static final int SIZEOF_INT = 4;
private static final int INDEX_START = 0;
private static final int INDEX_COUNT = 10;
private static final int INDEX_SIZE = INDEX_COUNT * SIZEOF_INT;
private ByteBuffer buffer = ByteBuffer.allocate (INDEX_SIZE);
private IntBuffer indexBuffer = buffer.asIntBuffer( );
private Random rand = new Random( );
public static void main(String[] args) throws Exception{
boolean writer = false;
String fileName;
if (args.length != 2) {
System.out.println ("Usage: [ -r | -w ] filename");
return;
}
writer = args[0].equals("-w");
fileName = args[1];
@SuppressWarnings("resource")
RandomAccessFile raf = new RandomAccessFile (fileName,
(writer) ? "rw" : "r");
FileChannel fc = raf.getChannel();
LockTest lockTest = new LockTest();
if (writer) {
lockTest.doUpdates (fc);
} else {
lockTest.doQueries (fc);
}
}

private void doQueries(FileChannel fc) throws Exception {

while (true) {
println("trying for shared lock...");
FileLock lock = fc.lock(INDEX_START, INDEX_SIZE, true);
int reps = rand.nextInt(60) + 20;
for (int i = 0; i < reps; i++) {
int n = rand.nextInt(INDEX_COUNT);
int position = INDEX_START + (n * SIZEOF_INT);
buffer.clear();
fc.read(buffer, position);
int value = indexBuffer.get(n);
System.out.println("Index entry " + n + "=" + value);
// Pretend to be doing some work
Thread.sleep(100);
}
lock.release();
println("<sleeping>");
Thread.sleep(rand.nextInt(3000) + 500);
}

}

private void doUpdates(FileChannel fc) throws Exception {
while (true) {
println("trying for exclusive lock...");
FileLock lock = fc.lock(INDEX_START, INDEX_SIZE, false);
updateIndex(fc);
lock.release();
System.out.println("<sleeping>");
Thread.sleep(rand.nextInt(2000) + 500);
}
}

private int idxval = 1;

private void updateIndex(FileChannel fc) throws Exception {
// "indexBuffer" is an int view of "buffer"
indexBuffer.clear();
for (int i = 0; i < INDEX_COUNT; i++) {
idxval++;
println("Updating index " + i + "=" + idxval);
indexBuffer.put(idxval);
// Pretend that this is really hard work
Thread.sleep(500);
}
// leaves position and limit correct for whole buffer
buffer.clear();
fc.write(buffer, INDEX_START);
}

private int lastLineLen = 0;

// Specialized println that repaints the current line
private void println(String msg) {
System.out.print("\r ");
System.out.print(msg);
for (int i = msg.length(); i < lastLineLen; i++) {
System.out.print(" ");
}
System.out.print("\r");
System.out.flush();
lastLineLen = msg.length();
}

}


socket通道

全部 socket 通道类( DatagramChannel、 SocketChannel 和ServerSocketChannel)都是由位于 java.nio.channels.spi 包中的 AbstractSelectableChannel 引申而来。这意味着我们可以用一个 Selector 对象来执行 socket 通道的有条件的选择( readiness selection)。全部 socket 通道类在被实例化时都会创建一个对等 socket 对象。这些是我们所熟悉的来自 java.net 的类( Socket、 ServerSocket和 DatagramSocket),它们已经被更新以识别通道。对等 socket 可以通过调用 socket( )方法从一个通道上获取。

Socket 通道可以在非阻塞模式下运行,要把一个 socket 通道置于非阻塞模式,我们要依靠所有 socket 通道类的公有超级类:SelectableChannel。下面的方法就是关于通道的阻塞模式的:

public abstract class SelectableChannel extends AbstractChannel implements Channel
{
// This is a partial API listing
public abstract void configureBlocking (boolean block)
throws IOException;
public abstract boolean isBlocking( );
public abstract Object blockingLock( );
}


socketChannel

Socket 和 SocketChannel 类封装点对点、有序的网络连接,类似于我们所熟知并喜爱的 TCP/IP网络连接。 SocketChannel 扮演客户端发起同一个监听服务器的连接。直到连接成功,它才能收到数据并且只会从连接到的地址接收。每个 SocketChannel 对象创建时都是同一个对等的 java.net.Socket 对象串联的。静态的 open( )方法可以创建一个新的 SocketChannel 对象,而在新创建的 SocketChannel 上调用 socket( )方法能返回它对等的 Socket 对象;在该 Socket 上调用 getChannel( )方法则能返回最初的那个 SocketChannel。

虽然每个 SocketChannel 对象都会创建一个对等的 Socket 对象,反过来却不成立。直接创建的 Socket 对象不会关联 SocketChannel 对象,它们的getChannel( )方法只返回 null

新创建的 SocketChannel 虽已打开却是未连接的。在一个未连接的 SocketChannel 对象上尝试一个 I/O 操作会导致 NotYetConnectedException 异常。我们可以通过在通道上直接调用 connect( )方法或在通道关联的 Socket 对象上调用 connect( )来将该 socket 通道连接。一旦一个 socket 通道被连接,它将保持连接状态直到被关闭。您可以通过调用布尔型的 isConnected( )方法来测试某个SocketChannel 当前是否已连接。

SocketChannel socketChannel =SocketChannel.open (new InetSocketAddress ("somehost", somePort));


等价于

SocketChannel socketChannel = SocketChannel.open( );
socketChannel.connect (new InetSocketAddress ("somehost", somePort));


在 SocketChannel 上并没有一种 connect( )方法可以让您指定超时( timeout)值,当 connect( )方法在非阻塞模式下被调用时 SocketChannel 提供并发连接:它发起对请求地址的连接并且立即返回值。如果返回值是 true,说明连接立即建立了(这可能是本地环回连接);如果连接不能立即建立, connect( )方法会返回 false 且并发地继续连接建立过程。

当通道处于中间的连接等待( connection-pending)状态时,您只可以调用 finishConnect( )、isConnectPending( )或 isConnected( )方法。一旦连接建立过程成功完成, isConnected( )将返回 true值

InetSocketAddress addr = new InetSocketAddress (host, port);
SocketChannel sc = SocketChannel.open( );
sc.configureBlocking (false);
sc.connect (addr);
while ( ! sc.finishConnect( )) {
doSomethingElse( );
}
doSomethingWithChannel (sc);
sc.close( );


下面我们看一个管理异步连接的可用代码实例(建立并发连接)

package com.swk.nio;

import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;

/**
* 建立并发连接
*
* @author fuyuwei 2017年6月25日 上午00:29:12
*/
public class ConnectAsync {
public static void main(String[] argv) throws Exception {
String host = "localhost";
int port = 80;
if (argv.length == 2) {
host = argv[0];
port = Integer.parseInt(argv[1]);
}
InetSocketAddress addr = new InetSocketAddress(host, port);
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
System.out.println("initiating connection");
sc.connect(addr);
while (!sc.finishConnect()) {
doSomethingUseful();
}
System.out.println("connection established");
// Do something with the connected socket
// The SocketChannel is still nonblocking
sc.close();
}

private static void doSomethingUseful() {
System.out.println("doing something useless");
}
}


如果尝试异步连接失败,那么下次调用 finishConnect( )方法会产生一个适当的经检查的异常以指出问题的性质。通道然后就会被关闭并将不能被连接或再次使用

connect( )和 finishConnect( )方法是互相同步的,并且只要其中一个操作正在进行,任何读或写的方法调用都会阻塞,即使是在非阻塞模式下。如果此情形下您有疑问或不能承受一个读或写操作在某个通道上阻塞,请用 isConnected( )方法测试一下连接状态。

管道

java.nio.channels 包中含有一个名为 Pipe(管道)的类。广义上讲,管道就是一个用来在两个实体之间单向传输数据的导管。Pipe 类创建一对提供环回机制的 Channel 对象。这两个通道的远端是连接起来的,以便任何写在 SinkChannel 对象上的数据都能出现在 SourceChannel 对象上。

Pipe 实例是通过调用不带参数的 Pipe.open( )工厂方法来创建的。 Pipe 类定义了两个嵌套的通道类来实现管路。这两个类是 Pipe.SourceChannel(管道负责读的一端)和Pipe.SinkChannel(管道负责写的一端)。这两个通道实例是在 Pipe 对象创建的同时被创建的,可以通过在 Pipe 对象上分别调用 source( )和 sink( )方法来取回。管道可以被用来仅在同一个 Java 虚拟机内部传输数据。虽然有更加有效率的方式来在线程之间传输数据,但是使用管道的好处在于封装性。生产者线程和用户线程都能被写道通用的 ChannelAPI 中。根据给定的通道类型,相同的代码可以被用来写数据到一个文件、 socket 或管道。选择器可以被用来检查管道上的数据可用性,如同在 socket 通道上使用那样地简单。这样就可以允许单个用户线程使用一个 Selector 来从多个通道有效地收集数据,并可任意结合网络连接或本地工作线程使用。因此,这些对于可伸缩性、冗余度以及可复用性来说无疑都是意义重大的。

下面看个管道的代码实例

package com.swk.nio;

import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.Pipe;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Random;

/**
* 工作线程对一个管道进行写操作
*
* @author fuyuwei 2017年6月25日 上午12:41:52
*/
public class PipeTest {

public static void main(String[] argv) throws Exception {
// Wrap a channel around stdout
WritableByteChannel out = Channels.newChannel(System.out);
// Start worker and get read end of channel
ReadableByteChannel workerChannel = startWorker(10);
ByteBuffer buffer = ByteBuffer.allocate(100);
while (workerChannel.read(buffer) >= 0) {
buffer.flip();
out.write(buffer);
buffer.clear();
}
}

// This method could return a SocketChannel or
// FileChannel instance just as easily
private static ReadableByteChannel startWorker(int reps) throws Exception {
Pipe pipe = Pipe.open();
Worker worker = new Worker(pipe.sink(), reps);
worker.start();
return (pipe.source());
}

// -----------------------------------------------------------------
/**
* A worker thread object which writes data down a channel. Note: this
* object knows nothing about Pipe, uses only a generic WritableByteChannel.
*/
private static class Worker extends Thread {
WritableByteChannel channel;
private int reps;

Worker(WritableByteChannel channel, int reps) {
this.channel = channel;
this.reps = reps;
}

// Thread execution begins here
public void run() {
ByteBuffer buffer = ByteBuffer.allocate(100);
try {
for (int i = 0; i < this.reps; i++) {
doSomeWork(buffer);
// channel may not take it all at once
while (channel.write(buffer) > 0) {
// empty
}
}
this.channel.close();
} catch (Exception e) {
// easy way out; this is demo code
e.printStackTrace();
}
}

private String[] products = { "No good deed goes unpunished",
"To be, or what?", "No matter where you go, there you are",
"Just say \"Yo\"", "My karma ran over my dogma" };
private Random rand = new Random();

private void doSomeWork(ByteBuffer buffer) {
int product = rand.nextInt(products.length);
buffer.clear();
buffer.put(products[product].getBytes());
buffer.put("\r\n".getBytes());
buffer.flip();
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: