您的位置:首页 > 其它

nio的一些相关知识。

2009-04-15 12:05 387 查看
本来打算自己写,但网上的资料太全了,就转过来顺便当作自用了。

通过自己在网上搜索各方面的资料来了解一个知识,真的是很有难度,现在在想想上课,有老师在那讲课,这两者之间的效果真的相差甚远,所以更加要花时间去掌握好课堂内容。

一、nio基于TCP的通信方式:(转自:http://bat0906.javaeye.com/blog/192677)
对相关类的简单介绍

java.nio.*, 据说它提供了一些更加底层的一些功能,如:类似windows环境下的

AsyncSocket类的异步操作的功能,能显著降低server端程序的线程管理开销。

因为大多数应用是建立在TCP之上,所以在此只说说SocketChannel,

ServerSocketChannel,
Selector和ByteBuffer这几个类.前三个最终都源自channel类。而channel
类,可以理解为在具体I/O或文件对象之上抽象的一个操作对象,我们通过操作channel的读写达到对其对应的文件或I/O对象(包括socket)读
写的目的。读写的内容在内存中放在ByteBuffer类提供的缓冲区。总而言之,channel作为一个桥梁,连接了I/O对象和内存中的
ByteBuffer,实现了I/O的更高效的存取。

一个基于TCP的服务器端程序,必然有个侦听端和若干个通信端,它们在nio中由对应的ServerSocketChannel 和SocketChannel类来实现。为了达到异步I/O操作的目的,需要Selector类,它能检测到I/O对象的状态。

SocketChannel类是抽象类,通过调用它的静态函数open(),可生成一个

SocketChannel对象,该对象对应一个java.net.Socket,可通过SocketChannel.socket()获得,而其对应的Socket也可通过调用函数getChannel()得到已建立的相应SocketChannel。

SocketChannel与它的socket是一一对应的。SocketChannel的操作与Socket也很相似.

ServerSocketChannel也是通过调用它的静态函数open()生成的,只是它不能

直接调用bind()函数来绑定一个地址,需要它对应的ServerSocket来完成绑定工作,一般可按如下步骤做:

ServerSocketChannel ssc = new ServerSocketChannel.open();

ssc.socket().bind(InetSocketAddress(host,port));

罗嗦了半天,还是看看最简单的C/S实现吧,服务器提供了基本的回射(echo)功

能,其中提供了较详细的注释。

源码分析

1.服务器端:

////////////////////////

//AsyncServer.java

// by zztudou@163.com

////////////////////////

import java.nio.channels.SocketChannel;

import java.nio.channels.ServerSocketChannel;

import java.nio.channels.SelectionKey;

import java.nio.channels.Selector;

import java.nio.ByteBuffer;

import java.nio.channels.SelectableChannel;

import java.nio.channels.spi.SelectorProvider;

import java.net.ServerSocket;

import java.net.Socket;

import java.net.InetSocketAddress;

import java.net.SocketAddress;

import java.util.Iterator;

import java.util.LinkedList;

import java.io.IOException;

class AsyncServer implements Runnable{

private ByteBuffer r_buff = ByteBuffer.allocate(1024);

private ByteBuffer w_buff = ByteBuffer.allocate(1024);

private static int port = 8848;

public AsyncServer(){

new Thread(this).start();

}

public void run(){

try{

//生成一个侦听端

ServerSocketChannel ssc = ServerSocketChannel.open();

//将侦听端设为异步方式

ssc.configureBlocking(false);

//生成一个信号监视器

Selector s = Selector.open();

//侦听端绑定到一个端口

ssc.socket().bind(new InetSocketAddress(port));

//设置侦听端所选的异步信号OP_ACCEPT

ssc.register(s,SelectionKey.OP_ACCEPT);

System.out.println("echo server has been set up ......");

while(true){

int n = s.select();

if (n == 0) {//没有指定的I/O事件发生

continue;

}

Iterator it = s.selectedKeys().iterator();

while (it.hasNext()) {

SelectionKey key = (SelectionKey) it.next();

if (key.isAcceptable()) {//侦听端信号触发

ServerSocketChannel server = (ServerSocketChannel) key.channel();

//接受一个新的连接

SocketChannel sc = server.accept();

sc.configureBlocking(false);

//设置该socket的异步信号OP_READ:当socket可读时,

//触发函数DealwithData();

sc.register(s,SelectionKey.OP_READ);

}

if (key.isReadable()) {//某socket可读信号

DealwithData(key);

}

it.remove();

}

}

}

catch(Exception e){

e.printStackTrace();

}

}

public void DealwithData(SelectionKey key) throws IOException{

int count;

//由key获取指定socketchannel的引用

SocketChannel sc = (SocketChannel)key.channel();

r_buff.clear();

//读取数据到r_buff

while((count = sc.read(r_buff))> 0)

;

//确保r_buff可读

r_buff.flip();

w_buff.clear();

//将r_buff内容拷入w_buff

w_buff.put(r_buff);

w_buff.flip();

//将数据返回给客户端

EchoToClient(sc);

w_buff.clear();

r_buff.clear();

}

public void EchoToClient(SocketChannel sc) throws IOException{

while(w_buff.hasRemaining())

sc.write(w_buff);

}

public static void main(String args[]){

if(args.length > 0){

port = Integer.parseInt(args[0]);

}

new AsyncServer();

}

}

在当前目录下运行:

javac AsynServer.java

后,若无编译出错,接下来可运行:

java AsynServer 或 java AsynServer ×××(端口号)

上述服务程序在运行时,可指定其侦听端口,否则程序会取8848为默认端口。

2.客户端的简单示例:

////////////////////////

//AsyncClient.java

// by zztudou@163.com

////////////////////////

import java.nio.channels.SocketChannel;

import java.net.InetSocketAddress;

import java.nio.ByteBuffer;

import java.nio.channels.Selector;

import java.nio.channels.SelectionKey;

import java.io.IOException;

import java.io.BufferedReader;

import java.io.InputStreamReader;

class AsyncClient{

private SocketChannel sc;

private final int MAX_LENGTH = 1024;

private ByteBuffer r_buff = ByteBuffer.allocate(MAX_LENGTH);

private ByteBuffer w_buff = ByteBuffer.allocate(MAX_LENGTH);

private static String host ;

private static int port = 8848;

public AsyncClient(){

try {

InetSocketAddress addr = new InetSocketAddress(host,port);

//生成一个socketchannel

sc = SocketChannel.open();

//连接到server

sc.connect(addr);

while(!sc.finishConnect())

;

System.out.println("connection has been established!...");

while(true){

//回射消息

String echo;

try{

System.err.println("Enter msg you'd like to send: ");

BufferedReader br = new BufferedReader(new InputStreamReader(System.in));

//输入回射消息

echo = br.readLine();

//把回射消息放入w_buff中

w_buff.clear();

w_buff.put(echo.getBytes());

w_buff.flip();

}catch(IOException ioe){

System.err.println("sth. is wrong with br.readline() ");

}

//发送消息

while(w_buff.hasRemaining())

sc.write(w_buff);

w_buff.clear();

//进入接收状态

Rec();

//间隔1秒

Thread.currentThread().sleep(1000);

}

}catch(IOException ioe){

ioe.printStackTrace();

}

catch(InterruptedException ie){

ie.printStackTrace();

}

}

////////////

//读取server端发回的数据,并显示

public void Rec() throws IOException{

int count;

r_buff.clear();

count=sc.read(r_buff);

r_buff.flip();

byte[] temp = new byte[r_buff.limit()];

r_buff.get(temp);

System.out.println("reply is " + count +" long, and content is: " + new String(temp));

}

public static void main(String args[]){

if(args.length < 1){//输入需有主机名或IP地址

try{

System.err.println("Enter host name: ");

BufferedReader br = new BufferedReader(new InputStreamReader(System.in));

host = br.readLine();

}catch(IOException ioe){

System.err.println("sth. is wrong with br.readline() ");

}

}

else if(args.length == 1){

host = args[0];

}

else if(args.length > 1){

host = args[0];

port = Integer.parseInt(args[1]);

}

new AsyncClient();

}

}

在当前目录下运行:

javac AsynClient.java

后,若无编译出错,确认AsyncServer已经运行的情况下,接下来可运行:

java AsynClient hostname 或 java AsynClient hostname ×××(端口号)

并按提示进行操作即可。

总结

总的来说,用nio进行网络编程还是很有新意的,服务器端软件能在一个线程中维护与众多客户端的通信连接。笔者在本文中试图用一个典型的回射例子说明如何用nio建立最基本的C/S应用。希望大家能试着用用它。

另外,笔者在实践中也发现nio在应用中存在的一些难题,比如如何应用SocketChannel的继承类,以及如何在socketchannel之上应
用SSL(Secure Socket Layer)等等,因而希望这篇文章只是抛砖引玉,引起大家对nio作进一步的讨论。

在当前目录下运行:

javac AsynServer.java

后,若无编译出错,接下来可运行:

java AsynServer 或 java AsynServer ×××(端口号)

上述服务程序在运行时,可指定其侦听端口,否则程序会取8848为默认端口。

二、nio基于UDP的通信方式:(转自:http://book.51cto.com/art/200902/109761.htm)

5.7 数据报(UDP)信道

Java的NIO包通过DatagramChannel类实现了数据报(UDP)信道。与我们之前看到的其他形式的
SelectableChannel一样,DatagramChannel在DatagramSocket上添加了选择和非阻塞行为,以及基于缓冲区的
I/O操作能力。

DatagramChannel: 创建,连接和关闭

static DatagramChannel open()
boolean isOpen()
DatagramSocket socket() void close()

需要调用DatagramChannel的open()工厂方法来创建一个DatagramChannel实例,该实例是未绑定的。
DatagramChannel只是对基本DatagramSocket的一个包装器(wrapper)。使用其socket()方法可以直接访问内部的
DatagramSocket实例。这就允许通过调用基本的DatagramSocket方法进行绑定、设置套接字选项等操作。用完
DatagramChannel后,要调用它的close()方法将其关闭。

只要创建了一个DatagramChannel实例,就可以非常直接地发送和接收数据。

DatagramChannel: 发送和接收

int send(ByteBuffer src, SocketAddress target)
SocketAddress receive(ByteBuffer dst)

send()方法用于创建一个包含了给定ByteBuffer中的数据的数据报文,并将其发送到目的地址指定的SocketAddress上。
receive()方法用于将接收到的数据报文存入指定缓冲区并返回发送者的地址。重要提示:如果缓冲区的剩余空间小于数据报文中的数据大小,多余的数据
将毫无提示地丢弃。

以下代码段用于创建一个DatagramChannel实例,并将UTF-16编码的字符串"Hello"发送到运行在同一主机的5000端口上的UDP服务器上。

DatagramChannel channel = DatagramChannel.open();
ByteBuffer buffer = ByteBuffer.wrap("Hello".getBytes("UTF-16"));
channel.send(buffer, new InetSocketAddress("localhost", 5000));

以下代码段用于创建一个DatagramChannel实例,将底层的套接字绑定到5000端口,接收最长为20字节的数据报文,并将字节转换成使用UTF-16编码的字符串。

DatagramChannel channel = DatagramChannel.open();
channel.socket().bind(new InetSocketAddress(5000));
ByteBuffer buffer = ByteBuffer.allocateDirect(20);
SocketAddress address = channel.receive(buffer);
buffer.flip();
String received = Charset.forName("UTF-16").
newDecoder().decode(buffer).toString();

在上面的send()实例中,调用send()方法时并没有显式地绑定本地端口,因此将随机选择一个可用端口。相应的receive()方法用于返回一个SocketAddress,其中包含了端口号。

如果总是向同一个远程终端发送或接收数据,我们可以选择调用connect()方法,并使用 SocketAddress指定远程终端的地址。

DatagramChannel: 连接DatagramChannel

DatagramChannel connect(SocketAddress remote)
DatagramChannel disconnect()
boolean isConnected()
int read(ByteBuffer dst)
long read(ByteBuffer[] dsts)
long read(ByteBuffer[] dsts, int offset, int length)
int write(ByteBuffer src)
long write(ByteBuffer[] srcs)
long write(ByteBuffer[] srcs, int offset, int length)

这些方法限制我们只能通过指定的地址发送和接收数据。为什么要这样做呢?原因之一是调用connect()方法后,可以使用read()和
write()方法来代替receive()和send()方法,并且不需要处理远程地址。read()和write()方法分别用于接收和发送一个数据
报文。分散式读操作以一个ByteBuffer数组为参数,只接收一个数据报文,并按顺序将其填入缓冲区中。聚集式写操作将缓冲区数组中的所有字节连接起
来创建一个要传输的数据报文。重要提示:现在能够发送的最大数据报文可以包含65507个字节,试图发送更多的数据将被无提示地截断。

使用connect()方法的另一个好处是,已建立连接的数据报文信道可能只接收从指定终端发送来的数据,因此我们不需要测试接收端的有效性。注
意,DatagramChannel的connect()方法只起到限制发送和接收终端的作用,连接时并没有数据包在SocketChannel上进行交
换,而且也不需要像SocketChannel那样等待或测试连接是否完成。(见第6章)

到目前为止DatagramChannel看起来与DatagramSocket非常相似。数据报文信道和套接字的主要区别是,信道可以进行非阻塞
I/O操作和使用选择器。DatagramChannel中选择器的创建,信道的注册、选择等,与SocketChannel几乎一模一样。有一个区别是
DatagramChannel不能注册连接I/O操作,不过也不需要这样做,因为DatagramChannel的connect()方法永远不会阻
塞。

DatagramChannel: 设置阻塞行为和使用选择器

SelectableChannel configureBlocking(boolean block)
boolean isBlocking()
SelectionKey register(Selector sel, int ops)
SelectionKey register(Selector sel, int ops, Object attachment)
boolean isRegistered()
int validOps()
SelectionKey keyFor(Selector sel)

这些方法的功能与SocketChannel和ServerSocketChannel中的相应方法一样。

下面使用DatagramChannel对第4章中的DatagramSocket
UDP回显服务器进行重写。服务器侦听指定的端口,并将接收到的数据报文简单地回发给客户端。重写后的服务器与原来版本的主要区别是它不会在send()
和receive()方法上阻塞等待。

UDPEchoServerSelector.java

0 import java.io.IOException;
1 import java.net.InetSocketAddress;
2 import java.net.SocketAddress;
3 import java.nio.ByteBuffer;
4 import java.nio.channels.DatagramChannel;
5 import java.nio.channels.SelectionKey;
6 import java.nio.channels.Selector;
7 import java.util.Iterator;
8
9 public class UDPEchoServerSelector {
10
11 private static final int TIMEOUT = 3000; // Wait timeout (milliseconds)
12
13 private static final int ECHOMAX = 255; // Maximum size of echo datagram
14
15 public static void main(String[] args) throws IOException {
16
17 if (args.length != 1) // Test for correct argument list
18 throw new IllegalArgumentException("Parameter(s): <Port>");
19
20 int servPort = Integer.parseInt(args[0]);
21
22 // Create a selector to multiplex client connections.
23 Selector selector = Selector.open();
24
25 DatagramChannel channel = DatagramChannel.open();
26 channel.configureBlocking(false);
27 channel.socket().bind(new InetSocketAddress(servPort));
28 channel.register(selector, SelectionKey.OP_READ, new ClientRecord());
29
30 while (true) { // Run forever, receiving and echoing datagrams
31 // Wait for task or until timeout expires
32 if (selector.select(TIMEOUT) == 0) {
33 System.out.print(".");
34 continue;
35 }
36
37 // Get iterator on set of keys with I/O to process
38 Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator();
39 while (keyIter.hasNext()) {
40 SelectionKey key = keyIter.next(); // Key is bit mask
41
42 // Client socket channel has pending data?
43 if (key.isReadable())
44 handleRead(key);
45
46 // Client socket channel is available for writing and
47 // key is valid (i.e., channel not closed).
48 if (key.isValid() && key.isWritable())
49 handleWrite(key);
50
51 keyIter.remove();
52 }
53 }
54 }
55
56 public static void handleRead(SelectionKey key) throws IOException {
57 DatagramChannel channel = (DatagramChannel) key.channel();
58 ClientRecord clntRec = (ClientRecord) key.attachment();
59 clntRec.buffer.clear(); // Prepare buffer for receiving
60 clntRec.clientAddress = channel.receive(clntRec.buffer);
61 if (clntRec.clientAddress != null) { // Did we receive something?
62 // Register write with the selector
63 key.interestOps(SelectionKey.OP_WRITE);
64 }
65 }
66
67 public static void handleWrite(SelectionKey key) throws IOException {
68 DatagramChannel channel = (DatagramChannel) key.channel();
69 ClientRecord clntRec = (ClientRecord) key.attachment();
70 clntRec.buffer.flip(); // Prepare buffer for sending
71 int bytesSent = channel.send(clntRec.buffer, clntRec.clientAddress);
72 if (bytesSent != 0) { // Buffer completely written?
73 // No longer interested in writes
74 key.interestOps(SelectionKey.OP_READ);
75 }
76 }
77
78 static class ClientRecord {
79 public SocketAddress clientAddress;
80 public ByteBuffer buffer = ByteBuffer.allocate(ECHOMAX);
81 }
82 }

UDPEchoServerSelector.java
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: