基于NIO实现非阻塞Socket编程
2009-05-27 15:21
405 查看
一、
描述
Java
提供的
NIO API
来开发高性能网络服务器,
JDK 1.4
以前
的网络通信程序是基于阻塞式
API
的——即当程序执行输入、输出操作后,在这些操作返回之前会一直阻塞该线程,所以服务器必须为每个客户端都提供一条独立线程进行处理,当服务器需要同时处理大量客户端时,这种做法会导致性能下降。使用
NIO API
则可以让服务器使用一个或有限几个线程来同时处理连接到服务器上的所有客户端。
NIO
使用面向缓冲
(buffer)
的模型。这就是说,
NIO
主要处理大块的数据。这就避免了利用流模型处理所引起的问题,在有可能的情况下,它甚至可以为了得到最大的吞吐量而使用系统级的工具。基本流
InputStream
和
OutputStream
能够读写字节数据;它们的子类可以读写各种各样的数据。在
NIO
中,所有的数据都通过缓冲读写。从图
1
可以看到两种模型的比较:
图
1.
流模型使用
Streams
和
Bytes
;
NIO
模型使用
Channels
和
Buffers
使用缓冲的好处:
A.
它可以大块的处理数据。你可以读写大块数据,缓冲的大小只受你所分配的内存数量的限制。
B.
它可以表示系统级的缓冲。多种系统采用统一的内存配置完成
I/O
处理,而不需要将数据从系统内存中拷贝到应用程序的内存空间。
buffer
对象的不同实现可以直接表示这些系统级的缓冲,这就意味着你可以用最少的拷贝次数来完成对数据的读写。
二、
Select
工具
select
提供了一种很好的方法来完成大量的数据源并行处理。它的名字来源于
Unix
系统中提供相同功能的
C
程序系统调用
select()
。
阻塞式编程特点:
通常,
I/O
属于阻塞式系统调用。当你对输入流调用
read()
方法,直到数据读入完成之前方法一直被阻塞。如果你读入本地文件就不需要等待很长时间。但是如果你从文件服务器或这是
socket
连接读取数据的话,那么你就要等很长时间。但你在等待过程中,你读取数据的线程将不能做任何事。
当然,在
Java
中你很容易为多个流创建多个线程。但是线程需要消耗大量的资源。在很多实现中,每个线程需要占用一块内存,即使它什么也不做。同时太多的线程会对性能造成很大的影响。
Select
编程特点:
select
采用不同的工作方式。通过
selet
你把输入流注册到一个
Selector
对象上。当某个流发生
I/O
活动时,
selector
将会通知你。以这种方式就可以只用一个线程读入多个数据源。尽管
Selector
不能帮你读取数据,但是它可以监听网络连接请求和越过较慢的通道进行写数据。
Java
的
NIO
为非阻塞式的
Socket
通信提供了如下几个特殊类:
Selector
:
它是
SelectableChannel
对象的多路复用器,所有希望采用非阻塞方式进行通信的
Channel
都应该注册到
Selector
对象。可通过调用此类的静态
open()
方法来创建
Selector
实例,该方法将使用系统默认的
Selector
来返回新的
Selector
。
Selector
可以同时监控多个
SelectableChannel
的
IO
状况,是非阻塞
IO
的核心。
一个
Selector
实例有
3
个
SelectionKey
的集合:
A.
所有
SelectionKey
集合:代表了注册在该
Selector
上的
Channel
,这个集合可以通过
keys()
方法返回。
B.
被选择的
SelectionKey
集合:代表了所有可通过
select()
方法监测到、需要进行
IO
处理的
Channel
,这个集合可以通过
selectedKeys()
返回。
C.
被取消的
SelectionKey
集合:代表了所有被取消注册关系的
Channel
,在下一次执行
select()
方法时,这些
Channel
对应的
SelectionKey
会被彻底删除,程序通常无须直接访问该集合。
Select
相关的方法:
A.
int select()
:监控所有注册的
Channel
,当它们中间有需要处理的
IO
操作时,该方法返回,并将对应的
SelectionKey
加入被选择的
SelectionKey
集合中,该方法返回这些
Channel
的数量。
B.
int select(long timeout)
:可以设置超时时长的
select()
操作。
C.
int selectNow()
:执行一个立即返回的
select()
操作,相对于无参数的
select()
方法而言,该方法不会阻塞线程。
D.
Selector wakeup()
:使一个还未返回的
select()
方法立刻返回。
SelectableChannel
:
它代表可以支持非阻塞
IO
操作的
Channel
对象,可以将其注册到
Selector
上,这种注册的关系由
SelectionKey
实例表示。应用程序可调用
SelectableChannel
的
register()
方法将其注册到指定
Selector
上,当该
Selector
上某些
SelectableChannel
上有需要处理的
IO
操作时,程序可以调用
Selector
实例的
select()
方法获取它们的数量,并可以通过
selectedKeys()
方法返回它们对应的
SelectKey
集合——通过该集合就可以获取所有需要处理
IO
操作的
SelectableChannel
集。
SelectableChannel
对象支持阻塞和非阻塞两种模式(所有
channel
默认都是阻塞模式),必须使用非阻塞式模式才可以利用非阻塞
IO
操作。
SelectableChannel
提供了如下两个方法来设置和返回该
Channel
的模式状态:
SelectableChannel configureBlocking(boolean block)
:设置是否采用阻塞模式。
boolean isBlocking()
:返回该
Channel
是否是阻塞模式。
使用
NIO
实现非阻塞式服务器的示意图:
从图中可以看出,服务器上所有
Channel
(包括
ServerSocketChannel
和
SocketChannel
)都需要向
Selector
注册,而该
Selector
则负责监视这些
Socket
的
IO
状态,当其中任意一个或多个
Channel
具有可用的
IO
操作时,该
Selector
的
select()
方法将会返回大于
0
的整数,该整数值就表示该
Selector
上有多少个
Channel
具有可用的
IO
操作,并提供了
selectedKeys()
方法来返回这些
Channel
对应的
SelectionKey
集合。正是通过
Selector
,使得服务器端只需要不断地调用
Selector
实例的
select()
方法即可知道当前所有
Channel
是否有需要处理的
IO
操作。
三、
应用范例
服务端代码:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
class AsyncServer implements Runnable {
private ByteBuffer r_buff = ByteBuffer.allocate(1024);
private ByteBuffer w_buff = ByteBuffer.allocate(1024);
private static int port = 8848;
public AsyncServer() {
new Thread(this).start();
}
private void info(String str){
System.out.println(str);
}
public void run() {
try {
Selector s = Selector.open(); //// 生成一个信号监视器
ServerSocketChannel ssc = ServerSocketChannel.open(); // 生成一个侦听端
ssc.configureBlocking(false);// 将侦听端设为异步方式
// 侦听端绑定到一个端口
ssc.socket().bind(new InetSocketAddress(port));
ssc.register(s, SelectionKey.OP_ACCEPT,new NetEventHandler());// 设置侦听端所选的异步信号OP_ACCEPT
info(”开始启动服务器”);
while (true) {
int n = s.select(100);
if (n == 0) {// 没有指定的I/O事件发生
continue;
}
Set<SelectionKey> readys = s.selectedKeys();
if(readys.size() == 0){
continue;
}
Iterator<SelectionKey> i = readys.iterator();
while (i.hasNext()) {
SelectionKey key = i.next();
i.remove();
if (key.isAcceptable()) {// 侦听端信号触发
info(”侦听端信号触发”);
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel sc = server.accept();
sc.configureBlocking(false);
sc.register(s, SelectionKey.OP_READ,new NetEventHandler());
}
if (key.isReadable()) {// 某socket可读信号
DealwithData(key);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void DealwithData(SelectionKey key) throws IOException {
NetEventHandler eventHandler = (NetEventHandler)key.attachment();
info(”eventHandler:” + eventHandler);
// 由key获取指定socketchannel的引用
SocketChannel sc = (SocketChannel) key.channel();
r_buff.clear();
int count;
while ((count = sc.read(r_buff)) > 0);
// 将r_buff内容拷入w_buff
r_buff.flip();
w_buff.clear();
w_buff.put(r_buff);
w_buff.flip();
// 将数据返回给客户端
EchoToClient(sc);
w_buff.clear();
r_buff.clear();
}
public void EchoToClient(SocketChannel sc) throws IOException {
while (w_buff.hasRemaining())
sc.write(w_buff);
}
public static void main(String args[]) {
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new AsyncServer();
}
}
客户端代码:
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
class AsyncClient {
private SocketChannel sc;
private final int MAX_LENGTH = 1024;
private ByteBuffer r_buff = ByteBuffer.allocate(MAX_LENGTH);
private ByteBuffer w_buff = ByteBuffer.allocate(MAX_LENGTH);
private static String host ;
private static int port = 8848;
public AsyncClient() {
try {
InetSocketAddress addr = new InetSocketAddress(host, port);
// 生成一个socketchannel
sc = SocketChannel.open();
// 连接到server
sc.connect(addr);
while (!sc.finishConnect())
;
System.out.println(”connection has been established!…”);
while (true) {
// 回射消息
String echo;
try {
System.err.println(”Enter msg you’d like to send: “);
BufferedReader br = new BufferedReader(
new InputStreamReader(System.in));
// 输入回射消息
echo = br.readLine();
// 把回射消息放入w_buff中
w_buff.clear();
w_buff.put(echo.getBytes());
w_buff.flip();
} catch (IOException ioe) {
System.err.println(”sth. is wrong with br.readline() “);
}
// 发送消息
while (w_buff.hasRemaining())
sc.write(w_buff);
w_buff.clear();
// 进入接收状态
Rec();
// 间隔1秒
Thread.currentThread().sleep(1000);
}
} catch (IOException ioe) {
ioe.printStackTrace();
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
public void Rec() throws IOException {
int count;
r_buff.clear();
count = sc.read(r_buff);
r_buff.flip();
byte[] temp = new byte[r_buff.limit()];
r_buff.get(temp);
System.out.println(”reply is ” + count + ” long, and content is: “
+ new String(temp));
}
public static void main(String args[]) {
if (args.length < 1) {// 输入需有主机名或IP地址
try {
System.err.println(”Enter host name: “);
BufferedReader br = new BufferedReader(new InputStreamReader(
System.in));
host = br.readLine();
} catch (IOException ioe) {
System.err.println(”sth. is wrong with br.readline() “);
}
} else if (args.length == 1) {
host = args[0];
} else if (args.length > 1) {
host = args[0];
port = Integer.parseInt(args[1]);
}
new AsyncClient();
}
}
转自:http://hi.baidu.com/chenweifighting/blog/item/38d7760e3d8efc226159f378.html
描述
Java
提供的
NIO API
来开发高性能网络服务器,
JDK 1.4
以前
的网络通信程序是基于阻塞式
API
的——即当程序执行输入、输出操作后,在这些操作返回之前会一直阻塞该线程,所以服务器必须为每个客户端都提供一条独立线程进行处理,当服务器需要同时处理大量客户端时,这种做法会导致性能下降。使用
NIO API
则可以让服务器使用一个或有限几个线程来同时处理连接到服务器上的所有客户端。
NIO
使用面向缓冲
(buffer)
的模型。这就是说,
NIO
主要处理大块的数据。这就避免了利用流模型处理所引起的问题,在有可能的情况下,它甚至可以为了得到最大的吞吐量而使用系统级的工具。基本流
InputStream
和
OutputStream
能够读写字节数据;它们的子类可以读写各种各样的数据。在
NIO
中,所有的数据都通过缓冲读写。从图
1
可以看到两种模型的比较:
图
1.
流模型使用
Streams
和
Bytes
;
NIO
模型使用
Channels
和
Buffers
使用缓冲的好处:
A.
它可以大块的处理数据。你可以读写大块数据,缓冲的大小只受你所分配的内存数量的限制。
B.
它可以表示系统级的缓冲。多种系统采用统一的内存配置完成
I/O
处理,而不需要将数据从系统内存中拷贝到应用程序的内存空间。
buffer
对象的不同实现可以直接表示这些系统级的缓冲,这就意味着你可以用最少的拷贝次数来完成对数据的读写。
二、
Select
工具
select
提供了一种很好的方法来完成大量的数据源并行处理。它的名字来源于
Unix
系统中提供相同功能的
C
程序系统调用
select()
。
阻塞式编程特点:
通常,
I/O
属于阻塞式系统调用。当你对输入流调用
read()
方法,直到数据读入完成之前方法一直被阻塞。如果你读入本地文件就不需要等待很长时间。但是如果你从文件服务器或这是
socket
连接读取数据的话,那么你就要等很长时间。但你在等待过程中,你读取数据的线程将不能做任何事。
当然,在
Java
中你很容易为多个流创建多个线程。但是线程需要消耗大量的资源。在很多实现中,每个线程需要占用一块内存,即使它什么也不做。同时太多的线程会对性能造成很大的影响。
Select
编程特点:
select
采用不同的工作方式。通过
selet
你把输入流注册到一个
Selector
对象上。当某个流发生
I/O
活动时,
selector
将会通知你。以这种方式就可以只用一个线程读入多个数据源。尽管
Selector
不能帮你读取数据,但是它可以监听网络连接请求和越过较慢的通道进行写数据。
Java
的
NIO
为非阻塞式的
Socket
通信提供了如下几个特殊类:
Selector
:
它是
SelectableChannel
对象的多路复用器,所有希望采用非阻塞方式进行通信的
Channel
都应该注册到
Selector
对象。可通过调用此类的静态
open()
方法来创建
Selector
实例,该方法将使用系统默认的
Selector
来返回新的
Selector
。
Selector
可以同时监控多个
SelectableChannel
的
IO
状况,是非阻塞
IO
的核心。
一个
Selector
实例有
3
个
SelectionKey
的集合:
A.
所有
SelectionKey
集合:代表了注册在该
Selector
上的
Channel
,这个集合可以通过
keys()
方法返回。
B.
被选择的
SelectionKey
集合:代表了所有可通过
select()
方法监测到、需要进行
IO
处理的
Channel
,这个集合可以通过
selectedKeys()
返回。
C.
被取消的
SelectionKey
集合:代表了所有被取消注册关系的
Channel
,在下一次执行
select()
方法时,这些
Channel
对应的
SelectionKey
会被彻底删除,程序通常无须直接访问该集合。
Select
相关的方法:
A.
int select()
:监控所有注册的
Channel
,当它们中间有需要处理的
IO
操作时,该方法返回,并将对应的
SelectionKey
加入被选择的
SelectionKey
集合中,该方法返回这些
Channel
的数量。
B.
int select(long timeout)
:可以设置超时时长的
select()
操作。
C.
int selectNow()
:执行一个立即返回的
select()
操作,相对于无参数的
select()
方法而言,该方法不会阻塞线程。
D.
Selector wakeup()
:使一个还未返回的
select()
方法立刻返回。
SelectableChannel
:
它代表可以支持非阻塞
IO
操作的
Channel
对象,可以将其注册到
Selector
上,这种注册的关系由
SelectionKey
实例表示。应用程序可调用
SelectableChannel
的
register()
方法将其注册到指定
Selector
上,当该
Selector
上某些
SelectableChannel
上有需要处理的
IO
操作时,程序可以调用
Selector
实例的
select()
方法获取它们的数量,并可以通过
selectedKeys()
方法返回它们对应的
SelectKey
集合——通过该集合就可以获取所有需要处理
IO
操作的
SelectableChannel
集。
SelectableChannel
对象支持阻塞和非阻塞两种模式(所有
channel
默认都是阻塞模式),必须使用非阻塞式模式才可以利用非阻塞
IO
操作。
SelectableChannel
提供了如下两个方法来设置和返回该
Channel
的模式状态:
SelectableChannel configureBlocking(boolean block)
:设置是否采用阻塞模式。
boolean isBlocking()
:返回该
Channel
是否是阻塞模式。
使用
NIO
实现非阻塞式服务器的示意图:
从图中可以看出,服务器上所有
Channel
(包括
ServerSocketChannel
和
SocketChannel
)都需要向
Selector
注册,而该
Selector
则负责监视这些
Socket
的
IO
状态,当其中任意一个或多个
Channel
具有可用的
IO
操作时,该
Selector
的
select()
方法将会返回大于
0
的整数,该整数值就表示该
Selector
上有多少个
Channel
具有可用的
IO
操作,并提供了
selectedKeys()
方法来返回这些
Channel
对应的
SelectionKey
集合。正是通过
Selector
,使得服务器端只需要不断地调用
Selector
实例的
select()
方法即可知道当前所有
Channel
是否有需要处理的
IO
操作。
三、
应用范例
服务端代码:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
class AsyncServer implements Runnable {
private ByteBuffer r_buff = ByteBuffer.allocate(1024);
private ByteBuffer w_buff = ByteBuffer.allocate(1024);
private static int port = 8848;
public AsyncServer() {
new Thread(this).start();
}
private void info(String str){
System.out.println(str);
}
public void run() {
try {
Selector s = Selector.open(); //// 生成一个信号监视器
ServerSocketChannel ssc = ServerSocketChannel.open(); // 生成一个侦听端
ssc.configureBlocking(false);// 将侦听端设为异步方式
// 侦听端绑定到一个端口
ssc.socket().bind(new InetSocketAddress(port));
ssc.register(s, SelectionKey.OP_ACCEPT,new NetEventHandler());// 设置侦听端所选的异步信号OP_ACCEPT
info(”开始启动服务器”);
while (true) {
int n = s.select(100);
if (n == 0) {// 没有指定的I/O事件发生
continue;
}
Set<SelectionKey> readys = s.selectedKeys();
if(readys.size() == 0){
continue;
}
Iterator<SelectionKey> i = readys.iterator();
while (i.hasNext()) {
SelectionKey key = i.next();
i.remove();
if (key.isAcceptable()) {// 侦听端信号触发
info(”侦听端信号触发”);
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel sc = server.accept();
sc.configureBlocking(false);
sc.register(s, SelectionKey.OP_READ,new NetEventHandler());
}
if (key.isReadable()) {// 某socket可读信号
DealwithData(key);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void DealwithData(SelectionKey key) throws IOException {
NetEventHandler eventHandler = (NetEventHandler)key.attachment();
info(”eventHandler:” + eventHandler);
// 由key获取指定socketchannel的引用
SocketChannel sc = (SocketChannel) key.channel();
r_buff.clear();
int count;
while ((count = sc.read(r_buff)) > 0);
// 将r_buff内容拷入w_buff
r_buff.flip();
w_buff.clear();
w_buff.put(r_buff);
w_buff.flip();
// 将数据返回给客户端
EchoToClient(sc);
w_buff.clear();
r_buff.clear();
}
public void EchoToClient(SocketChannel sc) throws IOException {
while (w_buff.hasRemaining())
sc.write(w_buff);
}
public static void main(String args[]) {
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new AsyncServer();
}
}
客户端代码:
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
class AsyncClient {
private SocketChannel sc;
private final int MAX_LENGTH = 1024;
private ByteBuffer r_buff = ByteBuffer.allocate(MAX_LENGTH);
private ByteBuffer w_buff = ByteBuffer.allocate(MAX_LENGTH);
private static String host ;
private static int port = 8848;
public AsyncClient() {
try {
InetSocketAddress addr = new InetSocketAddress(host, port);
// 生成一个socketchannel
sc = SocketChannel.open();
// 连接到server
sc.connect(addr);
while (!sc.finishConnect())
;
System.out.println(”connection has been established!…”);
while (true) {
// 回射消息
String echo;
try {
System.err.println(”Enter msg you’d like to send: “);
BufferedReader br = new BufferedReader(
new InputStreamReader(System.in));
// 输入回射消息
echo = br.readLine();
// 把回射消息放入w_buff中
w_buff.clear();
w_buff.put(echo.getBytes());
w_buff.flip();
} catch (IOException ioe) {
System.err.println(”sth. is wrong with br.readline() “);
}
// 发送消息
while (w_buff.hasRemaining())
sc.write(w_buff);
w_buff.clear();
// 进入接收状态
Rec();
// 间隔1秒
Thread.currentThread().sleep(1000);
}
} catch (IOException ioe) {
ioe.printStackTrace();
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
public void Rec() throws IOException {
int count;
r_buff.clear();
count = sc.read(r_buff);
r_buff.flip();
byte[] temp = new byte[r_buff.limit()];
r_buff.get(temp);
System.out.println(”reply is ” + count + ” long, and content is: “
+ new String(temp));
}
public static void main(String args[]) {
if (args.length < 1) {// 输入需有主机名或IP地址
try {
System.err.println(”Enter host name: “);
BufferedReader br = new BufferedReader(new InputStreamReader(
System.in));
host = br.readLine();
} catch (IOException ioe) {
System.err.println(”sth. is wrong with br.readline() “);
}
} else if (args.length == 1) {
host = args[0];
} else if (args.length > 1) {
host = args[0];
port = Integer.parseInt(args[1]);
}
new AsyncClient();
}
}
转自:http://hi.baidu.com/chenweifighting/blog/item/38d7760e3d8efc226159f378.html
相关文章推荐
- 基于NIO实现非阻塞Socket编程
- Java网络编程——使用NIO实现非阻塞Socket通信
- Java网络编程——使用NIO实现非阻塞Socket通信
- java分布式开发TCP/IP NIO无阻塞 Socket((基于消息方式实现系统间的通信) )(转)
- NIO实现非阻塞Socket编程
- 基于C#的socket编程的TCP同步实现
- 网络编程 基于Socket的多文件传输程序实现(一)
- Linux下基于socket多线程并发通信的实现 阻塞
- 基于NIO实现非阻塞Socket编程
- 基于C#的socket编程的TCP异步实现
- Java网络编程实践和总结 --- 基于TCP的Socket编程之实现文件上传和下载服务
- Java Socket应用(五)——编程实现基于 TCP 的 Socket 通信
- Socket编程——基于TCP实现自己的通信协议
- 基于Socket编程实现平行主机之间网络通讯
- Python网络编程基础笔记-poll实现非阻塞socket
- Java网络编程之(三): TCP协议使用NIO实现非阻塞Soket通信
- 基于Socket的java网络编程(实现类似于QQ两人聊天的交互)
- Java基于Socket实现网络编程实例详解
- Linux Socket 摘要(二)(基于TCP的C/S基本实现,相关基础知识,非阻塞select)
- Java Socket编程 - 基于Socket实现HTTP下载客户端