您的位置:首页 > 编程语言

基于NIO实现非阻塞Socket编程

2009-05-27 15:21 405 查看
一、

描述

Java

提供的
NIO API

来开发高性能网络服务器,
JDK 1.4
以前
的网络通信程序是基于阻塞式
API

的——即当程序执行输入、输出操作后,在这些操作返回之前会一直阻塞该线程,所以服务器必须为每个客户端都提供一条独立线程进行处理,当服务器需要同时处理大量客户端时,这种做法会导致性能下降。使用
NIO API

则可以让服务器使用一个或有限几个线程来同时处理连接到服务器上的所有客户端。

NIO

使用面向缓冲
(buffer)

的模型。这就是说,
NIO

主要处理大块的数据。这就避免了利用流模型处理所引起的问题,在有可能的情况下,它甚至可以为了得到最大的吞吐量而使用系统级的工具。基本流
InputStream


OutputStream

能够读写字节数据;它们的子类可以读写各种各样的数据。在
NIO

中,所有的数据都通过缓冲读写。从图
1

可以看到两种模型的比较:




1.

流模型使用
Streams


Bytes


NIO

模型使用
Channels


Buffers

使用缓冲的好处:

A.

它可以大块的处理数据。你可以读写大块数据,缓冲的大小只受你所分配的内存数量的限制。

B.

它可以表示系统级的缓冲。多种系统采用统一的内存配置完成
I/O

处理,而不需要将数据从系统内存中拷贝到应用程序的内存空间。
buffer

对象的不同实现可以直接表示这些系统级的缓冲,这就意味着你可以用最少的拷贝次数来完成对数据的读写。

二、

Select

工具

select

提供了一种很好的方法来完成大量的数据源并行处理。它的名字来源于
Unix

系统中提供相同功能的
C

程序系统调用
select()



阻塞式编程特点:


通常,
I/O

属于阻塞式系统调用。当你对输入流调用
read()

方法,直到数据读入完成之前方法一直被阻塞。如果你读入本地文件就不需要等待很长时间。但是如果你从文件服务器或这是
socket

连接读取数据的话,那么你就要等很长时间。但你在等待过程中,你读取数据的线程将不能做任何事。

当然,在
Java

中你很容易为多个流创建多个线程。但是线程需要消耗大量的资源。在很多实现中,每个线程需要占用一块内存,即使它什么也不做。同时太多的线程会对性能造成很大的影响。

Select

编程特点:


select

采用不同的工作方式。通过
selet

你把输入流注册到一个
Selector

对象上。当某个流发生
I/O

活动时,
selector

将会通知你。以这种方式就可以只用一个线程读入多个数据源。尽管
Selector

不能帮你读取数据,但是它可以监听网络连接请求和越过较慢的通道进行写数据。

Java


NIO

为非阻塞式的
Socket

通信提供了如下几个特殊类:

Selector





它是
SelectableChannel

对象的多路复用器,所有希望采用非阻塞方式进行通信的
Channel

都应该注册到
Selector

对象。可通过调用此类的静态
open()

方法来创建
Selector

实例,该方法将使用系统默认的
Selector

来返回新的
Selector


Selector

可以同时监控多个
SelectableChannel


IO

状况,是非阻塞
IO

的核心。

一个
Selector

实例有
3


SelectionKey

的集合:

A.

所有
SelectionKey

集合:代表了注册在该
Selector

上的
Channel

,这个集合可以通过
keys()

方法返回。

B.

被选择的
SelectionKey

集合:代表了所有可通过
select()

方法监测到、需要进行
IO

处理的
Channel

,这个集合可以通过
selectedKeys()

返回。

C.

被取消的
SelectionKey

集合:代表了所有被取消注册关系的
Channel

,在下一次执行
select()

方法时,这些
Channel

对应的
SelectionKey

会被彻底删除,程序通常无须直接访问该集合。

Select

相关的方法:

A.

int select()

:监控所有注册的
Channel

,当它们中间有需要处理的
IO

操作时,该方法返回,并将对应的
SelectionKey

加入被选择的
SelectionKey

集合中,该方法返回这些
Channel

的数量。

B.

int select(long timeout)

:可以设置超时时长的
select()

操作。

C.

int selectNow()

:执行一个立即返回的
select()

操作,相对于无参数的
select()

方法而言,该方法不会阻塞线程。

D.

Selector wakeup()

:使一个还未返回的
select()

方法立刻返回。

SelectableChannel



它代表可以支持非阻塞
IO

操作的
Channel

对象,可以将其注册到
Selector

上,这种注册的关系由
SelectionKey

实例表示。应用程序可调用
SelectableChannel


register()

方法将其注册到指定
Selector

上,当该
Selector

上某些
SelectableChannel

上有需要处理的
IO

操作时,程序可以调用
Selector

实例的
select()

方法获取它们的数量,并可以通过
selectedKeys()

方法返回它们对应的
SelectKey

集合——通过该集合就可以获取所有需要处理
IO

操作的
SelectableChannel

集。

SelectableChannel

对象支持阻塞和非阻塞两种模式(所有
channel

默认都是阻塞模式),必须使用非阻塞式模式才可以利用非阻塞
IO

操作。

SelectableChannel

提供了如下两个方法来设置和返回该
Channel

的模式状态:

SelectableChannel configureBlocking(boolean block)

:设置是否采用阻塞模式。

boolean isBlocking()

:返回该
Channel

是否是阻塞模式。

使用
NIO

实现非阻塞式服务器的示意图:



从图中可以看出,服务器上所有
Channel

(包括
ServerSocketChannel


SocketChannel

)都需要向
Selector

注册,而该
Selector

则负责监视这些
Socket


IO

状态,当其中任意一个或多个
Channel

具有可用的
IO

操作时,该
Selector


select()

方法将会返回大于
0

的整数,该整数值就表示该
Selector

上有多少个
Channel

具有可用的
IO

操作,并提供了
selectedKeys()

方法来返回这些
Channel

对应的
SelectionKey

集合。正是通过
Selector

,使得服务器端只需要不断地调用
Selector

实例的
select()

方法即可知道当前所有
Channel

是否有需要处理的
IO

操作。

三、

应用范例

服务端代码:

import java.io.IOException;

import java.net.InetSocketAddress;

import java.nio.ByteBuffer;

import java.nio.channels.SelectionKey;

import java.nio.channels.Selector;

import java.nio.channels.ServerSocketChannel;

import java.nio.channels.SocketChannel;

import java.util.Iterator;

import java.util.Set;

class AsyncServer implements Runnable {

private ByteBuffer r_buff = ByteBuffer.allocate(1024);

private ByteBuffer w_buff = ByteBuffer.allocate(1024);

private static int port = 8848;

public AsyncServer() {

new Thread(this).start();

}

private void info(String str){

System.out.println(str);

}

public void run() {

try {

Selector s = Selector.open(); //// 生成一个信号监视器

ServerSocketChannel ssc = ServerSocketChannel.open(); // 生成一个侦听端

ssc.configureBlocking(false);// 将侦听端设为异步方式

// 侦听端绑定到一个端口

ssc.socket().bind(new InetSocketAddress(port));

ssc.register(s, SelectionKey.OP_ACCEPT,new NetEventHandler());// 设置侦听端所选的异步信号OP_ACCEPT

info(”开始启动服务器”);

while (true) {

int n = s.select(100);

if (n == 0) {// 没有指定的I/O事件发生

continue;

}

Set<SelectionKey> readys = s.selectedKeys();

if(readys.size() == 0){

continue;

}

Iterator<SelectionKey> i = readys.iterator();

while (i.hasNext()) {

SelectionKey key = i.next();

i.remove();

if (key.isAcceptable()) {// 侦听端信号触发

info(”侦听端信号触发”);

ServerSocketChannel server = (ServerSocketChannel) key.channel();

SocketChannel sc = server.accept();

sc.configureBlocking(false);

sc.register(s, SelectionKey.OP_READ,new NetEventHandler());

}

if (key.isReadable()) {// 某socket可读信号

DealwithData(key);

}

}

}

} catch (Exception e) {

e.printStackTrace();

}

}

public void DealwithData(SelectionKey key) throws IOException {

NetEventHandler eventHandler = (NetEventHandler)key.attachment();

info(”eventHandler:” + eventHandler);

// 由key获取指定socketchannel的引用

SocketChannel sc = (SocketChannel) key.channel();

r_buff.clear();

int count;

while ((count = sc.read(r_buff)) > 0);

// 将r_buff内容拷入w_buff

r_buff.flip();

w_buff.clear();

w_buff.put(r_buff);

w_buff.flip();

// 将数据返回给客户端

EchoToClient(sc);

w_buff.clear();

r_buff.clear();

}

public void EchoToClient(SocketChannel sc) throws IOException {

while (w_buff.hasRemaining())

sc.write(w_buff);

}

public static void main(String args[]) {

if (args.length > 0) {

port = Integer.parseInt(args[0]);

}

new AsyncServer();

}

}

客户端代码:

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.net.InetSocketAddress;

import java.nio.ByteBuffer;

import java.nio.channels.SocketChannel;

class AsyncClient {

private SocketChannel sc;

private final int MAX_LENGTH = 1024;

private ByteBuffer r_buff = ByteBuffer.allocate(MAX_LENGTH);

private ByteBuffer w_buff = ByteBuffer.allocate(MAX_LENGTH);

private static String host ;

private static int port = 8848;

public AsyncClient() {

try {

InetSocketAddress addr = new InetSocketAddress(host, port);

// 生成一个socketchannel

sc = SocketChannel.open();

// 连接到server

sc.connect(addr);

while (!sc.finishConnect())

;

System.out.println(”connection has been established!…”);

while (true) {

// 回射消息

String echo;

try {

System.err.println(”Enter msg you’d like to send: “);

BufferedReader br = new BufferedReader(

new InputStreamReader(System.in));

// 输入回射消息

echo = br.readLine();

// 把回射消息放入w_buff中

w_buff.clear();

w_buff.put(echo.getBytes());

w_buff.flip();

} catch (IOException ioe) {

System.err.println(”sth. is wrong with br.readline() “);

}

// 发送消息

while (w_buff.hasRemaining())

sc.write(w_buff);

w_buff.clear();

// 进入接收状态

Rec();

// 间隔1秒

Thread.currentThread().sleep(1000);

}

} catch (IOException ioe) {

ioe.printStackTrace();

} catch (InterruptedException ie) {

ie.printStackTrace();

}

}

public void Rec() throws IOException {

int count;

r_buff.clear();

count = sc.read(r_buff);

r_buff.flip();

byte[] temp = new byte[r_buff.limit()];

r_buff.get(temp);

System.out.println(”reply is ” + count + ” long, and content is: “

+ new String(temp));

}

public static void main(String args[]) {

if (args.length < 1) {// 输入需有主机名或IP地址

try {

System.err.println(”Enter host name: “);

BufferedReader br = new BufferedReader(new InputStreamReader(

System.in));

host = br.readLine();

} catch (IOException ioe) {

System.err.println(”sth. is wrong with br.readline() “);

}

} else if (args.length == 1) {

host = args[0];

} else if (args.length > 1) {

host = args[0];

port = Integer.parseInt(args[1]);

}

new AsyncClient();

}

}
转自:http://hi.baidu.com/chenweifighting/blog/item/38d7760e3d8efc226159f378.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: