您的位置:首页 > 编程语言 > Java开发

NIO(JDK1.4)--通道

2015-08-08 13:03 387 查看
基础
I/O 可以分为广义的两大类别: File I/O 和 Stream
I/O。相应地有两种类型的通道,它们是文件(file)通道和套接字(socket)通道。
文件通道有一个 FileChannel类;套接字通道有三个类:SocketChannel、ServerSocketChannel和 DatagramChannel。
FileChannel:从文件中读写数据。
DatagramChannel:能通过UDP读写网络中的数据。
SocketChannel:能通过TCP读写网络中的数据。
ServerSocketChannel:可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。
通道可以以多种方式创建。Socket 通道有可以直接创建新 socket 通道的工厂方法。但是一个 FileChannel 对象却只能通过在一个打开的 RandomAccessFile、FileInputStream 或 FileOutputStream 对象上调用 getChannel(
)方法来获取。不能直接创建一个 FileChannel 对象。
1、下面例子使用FileChannel读取数据到Buffer中:
package
com;

import
java.io.FileInputStream;

import
java.io.IOException;

import
java.nio.ByteBuffer;

import
java.nio.channels.FileChannel;

public
class
Test{

public
static
void
main (String[]argv)
throws
IOException{

FileInputStream aFile =
new
FileInputStream(
"/Users/wudiyong/xxx"
);

FileChannel inChannel = aFile.getChannel();

ByteBuffer buf = ByteBuffer.allocate(
48
);

int
bytesRead = inChannel.read(buf);

while
(bytesRead != -
1
) {

System.out.println(
"Read "
+ bytesRead);

buf.flip();

while
(buf.hasRemaining()){

System.out.print((
char
) buf.get());

}

buf.clear();

bytesRead = inChannel.read(buf);

}

aFile.close();

}

}
FileInputStream会当打开文件不存在时自动创建。2、下面例子从一个通道复制数据到另一个通道 :
import
java.io.IOException;
import
java.nio.ByteBuffer;
import
java.nio.channels.Channels;
import
java.nio.channels.ReadableByteChannel;
import
java.nio.channels.WritableByteChannel;
public
class
Test {
public
static
void
main(String[] agrs)
throws
IOException {
ReadableByteChannel source = Channels.newChannel(System.in);
WritableByteChannel dest = Channels.newChannel(System.out);
//  channelCopy1(source,dest);//方式一
channelCopy2(source,dest);
//方式二
source.close();
dest.close();
}
private
static
void
channelCopy1(ReadableByteChannel src,WritableByteChannel dest)
throws
IOException {
ByteBuffer buffer = ByteBuffer.allocateDirect(
16
*
10
);
while
(src.read(buffer) != -
1
) {
buffer.flip();
dest.write(buffer);
/*
 
*因为可能不能够一次把buffer全部输出,此时buffer里还有剩余的数据,需要用compact()把
 
*这些数据往前移,新的数据才能从后面写入,如果一次就能完全输出,则compact的作用相当于clear
 
*/
buffer.compact();
}
//可能buffer里还有数据,把剩余的数据全部输出
buffer.flip();
while
(buffer.hasRemaining()) {
dest.write(buffer);
}
}
private
static
void
channelCopy2(ReadableByteChannel src,WritableByteChannel dest)
throws
IOException{
ByteBuffer buffer = ByteBuffer.allocateDirect (
16
*
10
);
while
(src.read (buffer) != -
1
) {
buffer.flip();
//循环把buffer里的所有数据都输出,再接收新的数据
while
(buffer.hasRemaining()){
dest.write (buffer);
}
buffer.clear( );
}
}
}
Scatter/Gather
通道提供了一种被称为Scatter/Gather的重要新功能(有时也被称为量 I/O)。它是指在多个缓冲区上实现一个简单的I/O操作。对于一个 write 操作而言,数据是从几个缓冲区按顺序抽取(称为gather)并沿着通道发送的。缓冲区本身并不需要具备这种gather 的能力(通常它们也没有此能力)。该gather 过程的效果就好比全部缓冲区的内容被连结起来,并在发送数据前存放到一个大的缓冲区中。对于
read操作而言,从通道读取的数据会按顺序被散布(称为scatter)到多个缓冲区,将每个缓冲区填满直至通道中的数据或者缓冲区的最大空间被消耗完。这是一个很大的进步,因为减少或避免了缓冲区拷贝和系统调用。Scatter/Gather应该使用直接的ByteBuffers以从本地I/O 获取最大性能优势。例子:以上代码只是gather的使用,scatter使用刚好相反,可以把一个缓冲区的数据读出到多个缓冲区中,填充规则是,一个填满再填另一个,直至全部读出位置。文件通道
FileChannel 对象是线程安全的,多个进程可以在同一个实例上并发调用方法而不会引起任何问题,不过并非所有的操作都是多线程的。影响通道位置或者影响文件大小的操作都是单线程的。如果有一个线程已经在执行会影响通道位置或文
件大小的操作,那么其他尝试进行此类操作之一的线程必须等待。并发行为也会受到底层的操作系 统或文件系统影响。Java NIO中的FileChannel是一个连接到文件的通道。可以通过文件通道读写文件。FileChannel无法设置为非阻塞模式,它总是运行在阻塞模式下。


打开FileChannel

在使用FileChannel之前,必须先打开它。但是,我们无法直接打开一个FileChannel,需要通过使用一个InputStream、OutputStream或RandomAccessFile来获取一个FileChannel实例。下面是通过RandomAccessFile打开FileChannel的示例:
1
RandomAccessFile
aFile =
new
RandomAccessFile(
"data/nio-data.txt"
,
"rw"
);
2
FileChannel
inChannel = aFile.getChannel();


从FileChannel读取数据

调用多个read()方法之一从FileChannel中读取数据。如:
1
ByteBuffer
buf = ByteBuffer.allocate(48);
2
int
bytesRead = inChannel.read(buf);
首先,分配一个Buffer。从FileChannel中读取的数据将被读到Buffer中。然后,调用FileChannel.read()方法。该方法将数据从FileChannel读取到Buffer中。read()方法返回的int值表示了有多少字节被读到了Buffer中。如果返回-1,表示到了文件末尾。


向FileChannel写数据

使用FileChannel.write()方法向FileChannel写数据,该方法的参数是一个Buffer。如:
01
String
newData = "New String to write to file..." + System.currentTimeMillis();
02
03
ByteBuffer
buf = ByteBuffer.allocate(48);
04
buf.clear();
05
buf.put(newData.getBytes());
06
07
buf.flip();
08
09
while(buf.hasRemaining())
{
10
channel.write(buf);
11
}
注意FileChannel.write()是在while循环中调用的。因为无法保证write()方法一次能向FileChannel写入多少字节,因此需要重复调用write()方法,直到Buffer中已经没有尚未写入通道的字节。


关闭FileChannel

用完FileChannel后必须将其关闭。如:
1
channel.close();


FileChannel的position方法

有时可能需要在FileChannel的某个特定位置进行数据的读/写操作。可以通过调用position()方法获取FileChannel的当前位置。也可以通过调用position(long pos)方法设置FileChannel的当前位置。
1
long
pos
= channel.position();
2
channel.position(pos
+
123
);
如果将位置设置在文件结束符之后,然后试图从文件通道中读取数据,读方法将返回-1 —— 文件结束标志。如果将位置设置在文件结束符之后,然后向通道中写数据,文件将撑大到当前位置并写入数据。这可能导致“文件空洞”,磁盘上物理文件中写入的数据间有空隙。


FileChannel的size方法

FileChannel实例的size()方法将返回该实例所关联文件的大小。如:
1
long
fileSize
= channel.size();


FileChannel的truncate方法

可以使用FileChannel.truncate()方法截取一个文件。截取文件时,文件将指定长度后面的部分删除。如:
channel.truncate(
1024
);
这个例子截取文件的前1024个字节。


FileChannel的force方法

FileChannel.force()方法将通道里尚未写入磁盘的数据强制写到磁盘上。出于性能方面的考虑,操作系统会将数据缓存在内存中,所以无法保证写入到FileChannel里的数据一定会即时写到磁盘上。要保证这一点,需要调用force()方法。force()方法有一个boolean类型的参数,指明是否同时将文件元数据(权限信息等)写到磁盘上。下面的例子同时将文件数据和元数据强制写到磁盘上:
channel.force(
true
);
内存映射文件下面例子诠释了内存映射缓冲区如何同scatter/gather结合使用:
使用映射文件和 gathering写操作来编写 HTTP 回复:
public
class
MappedHttp {
private
static
final
String OUTPUT_FILE =
"MappedHttp.out"
;
private
static
final
String LINE_SEP =
"\r\n"
;
private
static
final
String SERVER_ID =
"Server: Ronsoft Dummy Server"
;
private
static
final
String HTTP_HDR =
"HTTP/1.0 200 OK"
+ LINE_SEP
+ SERVER_ID + LINE_SEP;
private
static
final
String HTTP_404_HDR =
"HTTP/1.0 404 Not Found"
+ LINE_SEP + SERVER_ID + LINE_SEP;
private
static
final
String MSG_404 =
"Could not open file: "
;
public
static
void
main(String[] argv)
throws
Exception {
String file =
"C:\\Users\\Administrator\\Desktop\\aa.TXT"
;
ByteBuffer header = ByteBuffer.wrap(HTTP_HDR.getBytes(
"US-ASCII"
));
ByteBuffer dynhdrs = ByteBuffer.allocate(
128
);
ByteBuffer[] gather = { header,dynhdrs,
null
};
String contentType =
"unknown/unknown"
;
long
contentLength = -
1
;
try
{
FileInputStream fis =
new
FileInputStream(file);
FileChannel fc = fis.getChannel();
MappedByteBuffer filedata = fc.map(MapMode.READ_ONLY,
0
,fc.size());
gather[
2
] = filedata;
contentLength = fc.size();
contentType =URLConnection.guessContentTypeFromName(file);
}
catch
(IOException e) {
ByteBuffer buf = ByteBuffer.allocate(
128
);
String msg = MSG_404 + e + LINE_SEP;
buf.put(msg.getBytes(
"US-ASCII"
));
buf.flip();
gather[
0
] = ByteBuffer.wrap(HTTP_404_HDR.getBytes(
"US-ASCII"
));
gather[
2
] = buf;
contentLength = msg.length();
contentType =
"text/plain"
;
}
StringBuffer sb =
new
StringBuffer();
sb.append(
"Content-Length: "
+ contentLength);
sb.append(LINE_SEP);
sb.append(
"Content-Type: "
).append(contentType);
sb.append(LINE_SEP).append(LINE_SEP);
dynhdrs.put(sb.toString().getBytes(
"US-ASCII"
));
dynhdrs.flip();
FileOutputStream fos =
new
FileOutputStream(OUTPUT_FILE);
FileChannel out = fos.getChannel();
while
(out.write(gather) >
0
);
out.close();
}
}
Socket通道DatagramChannel和SocketChannel实现定义读和写功能的接口而ServerSocketChannel 不实现。ServerSocketChannel负责监听传入的连接和创建新的SocketChannel对象,它本身从不传输数据。
全部socket通道类(DatagramChannel、SocketChannel和 ServerSocketChannel)在被实例化时 都会创建一个对等socket对象。这些是我们所熟悉的来自java.net的类(DatagramSocket、Socket和ServerSocket),它们已经被更新以识别通道。可以通过调用socket(
)方法从一个通道上获取对等socket。此外,这三个java.net类现在都有getChannel( )方法。
虽然每个socket通道(在java.nio.channels包中)都有一个关联的java.net socket对 象,却并非所有的socket都有一个关联的通道。如果您用传统方式(直接实例化)创建了一个Socket对象,它就不会有关联的SocketChannel并且它的getChannel(
)方法将总是返回null。
1、ServerSocketChannel
ServerSocketChannel 是一个基于通道的 socket 监听器。它同我们所熟的 java.net.ServerSocket 执行相同的基本任务,不过它增加了通道语义,因此能够在非阻模式下运行。
用静态的 open( )方法创建一个新的 ServerSocketChannel 对象,将会返回与一个未绑定的 java.net.ServerSocket 关联的通道。该对等 ServerSocket 可以通过在返回的 ServerSocketChannel 上调
用 socket( )方法来获取。作为 ServerSocketChannel 的对等体被创建的 ServerSocket 对象依赖通道实
现。这些 socket 关联的 SocketImpl 能识别通道。通道不能被封装在随意的 socket 对象外面。
由于 ServerSocketChannel 没有 bind(
)方法,因此有必要取出对等的 socket 并使用它来绑定到一 个端口以开始监听连接。我们也是使用对等 ServerSocket 的 API 来根据需要设置其他的 socket 选
项。例如:同它的对等体 java.net.ServerSocket 一样,ServerSocketChannel 也有 accept( )方法。一旦您创建 了一个 ServerSocketChannel 并用对等 socket 绑定了它,然后您就可以在其中一个上调用 accept( )。 如果您选择在 ServerSocket 上调用 accept( )方法,那么它会同任何其他的
ServerSocket 表现一样的行为:总是阻塞并返回一个 java.net.Socket 对象。如果您选择在 ServerSocketChannel 上调用 accept( ) 方法则会返回 SocketChannel 类型的对象,返回的对象能够在非阻塞模式下运行。
如果以非阻塞模式被调用,当没有传入连接在等待时,ServerSocketChannel.accept( )会立即返 回 null。正是这种检查连接而不阻塞的能力实现了可伸缩性并低了复杂性。可选择性也因此得到实现。我们可以使用一个选择器实例来注册一个 ServerSocketChannel 对象以实现新连接到达时自 动通知的功能。例子:上面例子使用ServerSocketChannel的非阻塞accept方法监听连接。实际上我们使用选择器来负责监听连接事件。
2、SocketChannel每个 SocketChannel 对象创建时都是同一个对等的 java.net.Socket 对象串联的。open( )方 法可以创建一个新的 SocketChannel 对象,而在新创建的 SocketChannel 上调用 socket( )方法能返回 它对等的 Socket 对象;在该 Socket 上调用 getChannel( )方法则能返回最初的那个
SocketChannel。虽然每个 SocketChannel 对象都会创建一个对等的 Socket 对象,反过来却不成 立。直接创建的 Socket 对象不会关联 SocketChannel 对象,它们的 getChannel( )方法只返回 null。新创建的 SocketChannel 虽已打开却是未连接的。在一个未连接的 SocketChannel 对象上尝试一 个 I/O 操作会导致 NotYetConnectedException 异常。我们可以通过在通道上直接调用 connect( )方法 或在通道关联的 Socket 对象上调用 connect( )来将该 socket 通道连接。一旦一个 socket
通道被连 接,它将保持连接状态直到被关闭。您可以通过调用布尔型的 isConnected( )方法来测试某个 SocketChannel 当前是否已连接。带 InetSocketAddress 参数形式的 open( )是在返回之前进行连接的便捷方法。下面两段代码功能是一样的:代码一:代码二:如果您选择使用传统方式进行连接,即通过在对等 Socket 对象上调用 connect( )方法,那么,线程在连接建立好或超时过期之前都将保持阻塞。如果您选择通过在通道上直接调用 connect( )方法来建立连接并且通道处于阻塞模式(默认模式),那么连接过程实际上是一样的。在 SocketChannel 上并没有一种 connect( )方法可以让您指定超时(timeout)值,当 connect( )方 法在非阻模式下被调用时, SocketChannel提供并发连接:它发起对请求地址的连接并且立即返回值。如果返回值是
true,说明连接立即建立了(这可能是本地环回连接);如果连接不能立即建 立,connect( )方法会返回false且并发地继续连接建立过程。当通道处于中间的连接等待(connection-pending)状态时,您只可以调用 finishConnect( )、 isConnectPending( )或 isConnected( )方法。一旦连接建立过程成功完成,isConnected(
)将返回 true 值。例子:上面例子是非阻塞连接,例子只发起一个连接,可以修改使其同时发起多个连接。
如果尝试异步连接失败,那么下次调用 finishConnect( )方法会产生一个适当的经检查的异常以 指出问题的性质。通道然后就会被关闭并将不能被连接或再次使用。
与连接相关的方法使得我们可以对一个通道进行轮询并在连接进行过程中判断通道所处的状 态。我们可以使用选择器来避免轮询并在异步连接建立之后得到通知。3、DatagramChannel正如 SocketChannel 对应 Socket,ServerSocketChannel 对应 ServerSocket,每一个 DatagramChannel 对象也有一个关联的 DatagramSocket 对象。DatagramChannel 是无连接的。每个数据报(datagram)都是一个自包含的实体,拥有它自己 的目的地址及不依赖其他数据报的数据。与面向流的 socket 不同,DatagramChannel 可以发 送单独的数据报给不同的目的地址。同样,DatagramChannel 对象也可以接收来自任意地址的数据 包。每个到达的数据报都含有关于它来自何处的信息(源地址)。一个未绑定的 DatagramChannel 仍能接收数据包。当一个底层 socket 被创建时,一个动态生成 的端口号就会分配给它。绑定行为要求通道关联的端口被设置为一个特定的值(此过程可能涉及安 全检查或其他验证)。不论通道是否绑定,所有发送的包都含有 DatagramChannel 的源地址 (带端口号)。未绑定的 DatagramChannel 可以接收发送给它的端口的包,通常是该通道之前发出的一个包的响应(否则别人不知道它的地址)。已绑定的通道接收发送给它们所绑定的熟知端口(wellknown
port)的包。数据 的实际发送或接收是通过send( )和receive( )方法来实现的。DatagramChannel也有一个connect( )方法:DatagramChannel 对数据报 socket 的连接语义不同于对流 socket 的连接语义。有时候,将 数据报对话限制为两方是很可取的。将 DatagramChannel 置于已连接的状态可以使除了它所连 接到的地址之外的任何其他源地址的数据报被忽略。这是很有帮助的,因为不想要的包都已经被 网络层丢弃了,从而避免了使用代码来接收、检查然后弃包的麻烦。同样,不可以发送包到除了指定给
connect( )方 法的目的地址以外的任何其他地址。不同于流 socket,数据报 socket 的无状态性质不需要同远程系统进行对话来建立连接状态。没 有实际的连接,只有用来指定允许的远程地址的本地状态信息。由于此原因,DatagramChannel 上 也就没有单独的 finishConnect( )方法。我们可以使用 isConnected( )方法来测试一个数据报通道的连 接状态。不同于 SocketChannel(必须连接了才有用并且只能连接一次),DatagramChannel 对象可以任 意次数地进行连接或断开连接。每次连接都可以到一个不同的远程地址。调用 disconnect( )方法可以配置通道,以便它能再次接收来自任意远程地址的数据或发 送数据到这些地址上。当一个 DatagramChannel 处于已连接状态时,发送数据将不用提供目的地址而且接收时的源地 址也是已知的。这意味着 DatagramChannel 已连接时可以使用常规的 read( )和 write( )方法。例子(服务器端):
public
class
TimeServer {
private
static
final
int
DEFAULT_TIME_PORT =
12345
;
private
static
final
long
DIFF_1900 = 2208988800L;
protected
DatagramChannel channel;
public
TimeServer()
throws
Exception {
this
.channel = DatagramChannel.open();
this
.channel.socket().bind(
new
InetSocketAddress(DEFAULT_TIME_PORT));
System.out.println(
"Listening on port "
+ DEFAULT_TIME_PORT +
" for time requests"
);
}
public
void
listen()
throws
Exception {
// Allocate a buffer to hold a long value
ByteBuffer longBuffer = ByteBuffer.allocate(
8
);
// Assure big-endian (network) byte order
longBuffer.order(ByteOrder.BIG_ENDIAN);
// Zero the whole buffer to be sure
longBuffer.putLong(
0
,
0
);
// Position to first byte of the low-order 32 bits
longBuffer.position(
4
);
// Slice the buffer; gives view of the low-order 32 bits
ByteBuffer buffer = longBuffer.slice();
while
(
true
) {
buffer.clear();
SocketAddress sa =
this
.channel.receive(buffer);
if
(sa ==
null
) {
continue
;
}
System.out.println(
"Time request from "
+ sa);
buffer.clear();
// Set 64-bit value; slice buffer sees low 32 bits
longBuffer.putLong(
0
,(System.currentTimeMillis() / 
1000
) + DIFF_1900);
this
.channel.send(buffer,sa);
// 只把低32位返回
}
}
public
static
void
main(String[] argv)
throws
Exception {
try
{
TimeServer server =
new
TimeServer();
server.listen();
}
catch
(SocketException e) {
System.out.println(
"Can't bind to port "
+ DEFAULT_TIME_PORT +
",try a different one"
);
}
}
}
例子(客户端):
public
class
TimeClient {
private
static
final
int
DEFAULT_TIME_PORT =
12345
;
private
static
final
long
DIFF_1900 = 2208988800L;
protected
List remoteHosts =
new
ArrayList();
protected
DatagramChannel channel;
public
TimeClient()
throws
Exception {
setServers();
this
.channel = DatagramChannel.open();
}
protected
InetSocketAddress receivePacket(DatagramChannel channel,ByteBuffer buffer)
throws
Exception {
buffer.clear();
// Receive an unsigned 32-bit,big-endian value
return
((InetSocketAddress) channel.receive(buffer));
}
// Send time requests to all the supplied hosts
protected
void
sendRequests()
throws
Exception {
ByteBuffer buffer = ByteBuffer.allocate(
1
);
Iterator it = remoteHosts.iterator();
while
(it.hasNext()) {
InetSocketAddress sa =(InetSocketAddress) it.next();
System.out.println(
"Requesting time from "
+ sa.getHostName() +
":"
+ sa.getPort());
buffer.clear().flip();
channel.send(buffer,sa);
}
}
// Receive any replies that arrive
public
void
getReplies()
throws
Exception {
// Allocate a buffer to hold a long value
ByteBuffer longBuffer = ByteBuffer.allocate(
8
);
// Assure big-endian (network) byte order
longBuffer.order(ByteOrder.BIG_ENDIAN);
// Zero the whole buffer to be sure
longBuffer.putLong(
0
,
0
);
// Position to first byte of the low-order 32 bits
longBuffer.position(
4
);
// Slice the buffer; gives view of the low-order 32 bits
ByteBuffer buffer = longBuffer.slice();
int
expect = remoteHosts.size();
int
replies =
0
;
System.out.println(
""
);
System.out.println(
"Waiting for replies..."
);
while
(
true
) {
InetSocketAddress sa;
sa = receivePacket(channel,buffer);
buffer.flip();
replies++;
printTime(longBuffer.getLong(
0
),sa);
if
(replies == expect) {
System.out.println(
"All packets answered"
);
break
;
}
// Some replies haven't shown up yet
System.out.println(
"Received "
+ replies +
" of "
+ expect +
" replies"
);
}
}
// Print info about a received time reply
protected
void
printTime(
long
remote1900,InetSocketAddress sa) {
// local time as seconds since Jan 1,1970
long
local = System.currentTimeMillis() /
1000
;
// remote time as seconds since Jan 1,1970
long
remote = remote1900 - DIFF_1900;
Date remoteDate =
new
Date(remote *
1000
);
Date localDate =
new
Date(local *
1000
);
long
skew = remote - local;
System.out.println(
"Reply from "
+ sa.getHostName() +
":"
+ sa.getPort());
System.out.println(
" there: "
+ remoteDate);
System.out.println(
" here: "
+ localDate);
System.out.print(
" skew: "
);
if
(skew ==
0
) {
System.out.println(
"none"
);
}
else
if
(skew >
0
) {
System.out.println(skew +
" seconds ahead"
);
}
else
{
System.out.println((-skew) +
" seconds behind"
);
}
}
protected
void
setServers() {
/*
 
*设置把请求发给哪些服务器,这里指定的地址和端口号是服务器的地址和端口号,
 
*如果要指定自己的端口号,则应该在DatagramChannel对象中调用bind绑定。
 
*可以指定多个,这里只指定两个,而且这两个指向的还是同一个服务器。
 
*/
InetSocketAddress sa1 =
new
InetSocketAddress(
"localhost"
,DEFAULT_TIME_PORT);
InetSocketAddress sa2 =
new
InetSocketAddress(
"localhost"
,DEFAULT_TIME_PORT);
remoteHosts.add(sa1);
remoteHosts.add(sa2);
}
public
static
void
main(String[] argv)
throws
Exception {
TimeClient client =
new
TimeClient();
client.sendRequests();
client.getReplies();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: