您的位置:首页 > 理论基础 > 计算机网络

[分布式java]基于JavaAPI实现消息方式的系统间通信:TCP/IP+NIO

2015-08-27 12:37 1026 查看
TCP/IP+BIO的限制:
1、线程池大小限制。
2、为每个连接建立线程,增加系统开销。
3、线程很难保证某些连接的优先级。
4、线程很难保证数据的一致性和高效性。

Java中的TCP/IP+BIO是阻塞式,而Java的TCP/IP+NIO是非阻塞,是为了解决TCP/IP+BIO的阻塞与上面提到的限制。

JavaNIO原理:JavaNIO使用反应器(Reactor)模式,与观察者模式类似,反应器模式与多事件源关联,观察者模式只与单个事件源关联。JavaNIO将最耗时的 I/O 操作(即填充和提取缓冲区)转移回操作系统,因而可以极大地提高速度。java的TCP/IP+NIO能同时监听多个端口,处理来自所有这些端口的连接与连接的关闭,并且它只在单个线程中完成所有这些工作。只有读、写事件使用线程池实现非阻塞。其中的重要概念:
1、Channel是一个通道,是连接到如硬件设备、文件、网络Socket等的一个连接,可以读取数据块也可以写入数据块。
2、Selector的作用是监听Channel发生的事件,对注册的事件做操作。
3、Channel使用Buffer传递数据(无论是发送还是保存),Buffer的子类可以容纳不同的数据类型:FLoatBuffer、IntBuffer、ByteBuffer,并且缓冲区固定,内部状态记录有多少数据放入或取出。

一、客户端代码实现
public void createTcpIpNioJavaMethodClient(){
try {
SocketChannel channel=SocketChannel.open();//获取一个Channel实例
channel.configureBlocking(false);//设置为非阻塞
//对于非阻塞模式,返回false,表示连接正在建立中,调用channel.finishConnect()才能完成连接
channel.
connect(new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), 1234));
Selector selector= Selector.open();
channel.register(selector, SelectionKey.OP_CONNECT);//将selector注册到Channel
while(true) {
int nKeys = selector.select(10000);//阻塞直到感兴趣的IO事件发生或者达到超时时间
//int nKeys=selector.select();//一直阻塞,直到感兴趣的IO事件发生
//int nKeys_SelectNow=selector.selectNow();//不阻塞直接返回目前是否有感兴趣的事件
//nKeys大于0说明有感兴趣的IO事件,返回可进行I/O操作的信道数量
if (nKeys > 0) {
Set<SelectionKey> keys = selector.selectedKeys();
for (SelectionKey key : keys) {
if (key.isConnectable()) { //如果发生连接
if (channel.isConnectionPending()) {//如果正在连接
channel.finishConnect();//完成连接
}
SocketChannel sc = (SocketChannel) key.channel();
sc.configureBlocking(false);
//注册Write事件,selector的key中会保存WRITE事件,则下一个循环会执行WRITE事件
sc.register(selector, SelectionKey.OP_WRITE);
sc.finishConnect();//完成连接
} else if (key.isWritable()) {
//取消对OP_WRITE事件的注册
key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
SocketChannel sc = (SocketChannel) key.channel();
sc.configureBlocking(false);
byte[] data = "This is Client!".getBytes();
ByteBuffer writeBuffer = ByteBuffer.wrap(data);//将byte[]放到ByteBuffer中
sc.write(writeBuffer);
sc.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel sc = (SocketChannel) key.channel();
sc.configureBlocking(false);
sc.read(buffer);
String msg = new String(buffer.array());
System.out.println("Client receive msg:" + msg);
return;
}
}
selector.selectedKeys().clear();//清空keys的key。
}
}
} catch (IOException e) {
e.printStackTrace();
}
}


重要方法说明:
Channel和Selector的open()方法,用于创建各自实例。
Channel的regist()方法用于注册accept、connect、write、read事件,当regist有新事件注册时,会改变selectKeys里面集合的key值。
Selector的select()是一个阻塞事件。

二、服务端代码实现
与客户端写法一样,只不过我这次将accept、read、write分离出单独的类中
1、Protocol接口
public interface Protocol {

public void handleAccept(SelectionKey key)throws IOException;
public void handleRead(SelectionKey key)throws IOException;
public void handleWrite(SelectionKey key)throws IOException;

}
2、Protocol实现类
public class EchoProtocol implements Protocol {
private int bufferSize;
public EchoProtocol(int inBufferSize){
this.bufferSize=inBufferSize;
}
@Override
public void handleAccept(SelectionKey key) thro
4000
ws IOException {
SocketChannel channel=((ServerSocketChannel)key.channel()).accept();
//设置非阻塞
channel.configureBlocking(false);
//注册该selector
System.out.println("连接完毕,注册Read事件");
channel.
register(key.selector(),SelectionKey.OP_READ, ByteBuffer.allocateDirect(bufferSize));
}

@Override
public void handleRead(SelectionKey key) throws IOException {
//取消对OP_READ事件的注册
key.interestOps(key.interestOps()&(~SelectionKey.OP_READ));
SocketChannel channel= (SocketChannel) key.channel();
ByteBuffer buffer=ByteBuffer.allocate(200);
channel.read(buffer);
String msg=new String(buffer.array());
System.out.println("Server receive msg:"+msg);
channel.register(key.selector(),SelectionKey.OP_WRITE);
}

@Override
public void handleWrite(SelectionKey key) throws IOException {
//取消对OP_READ事件的注册
//key.interestOps(key.interestOps()&(~SelectionKey.OP_WRITE));
System.out.println("start send msg to client…");
ByteBuffer buffer=ByteBuffer.wrap(new String("Hello,I am Server!").getBytes());
SocketChannel channel= (SocketChannel) key.channel();
long successedNum=channel.write(buffer);//向信道中写数据
System.out.println("endoff send msg to client,send successed char :"+successedNum);
}
}
3、服务端主要类实现
public class TcpIpNioJavaMethod_Server {

public static void main(String[] args) {
new TcpIpNioJavaMethod_Server().createNioJavaMethodServer();
}

public void createNioJavaMethodServer(){
//打开一个Selector
Selector selector=null;
try {
selector= Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
//打开一个Channel
ServerSocketChannel listnChannel=null;
try {
listnChannel=ServerSocketChannel.open();
} catch (IOException e) {
e.printStackTrace();
}
//绑定服务端端口
try {
listnChannel.socket().bind(new InetSocketAddress(1234));
} catch (IOException e) {
e.printStackTrace();
}
//设置channel为非阻塞才可以注册选择器
try {
listnChannel.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
}
//注册该selector,channel可以进行accept操作
try {
listnChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (ClosedChannelException e) {
e.printStackTrace();
}

Protocol protocol=new EchoProtocol(256);

while(true){
try {
System.out.println("阻塞等待感兴趣的事~");
if(selector.select(6000)==0){//等于0表示没有Channel操作
continue;
}
} catch (IOException e) {
e.printStackTrace();
}
Iterator<SelectionKey> keyIter=selector.selectedKeys().iterator();
while(keyIter.hasNext()){
SelectionKey key=keyIter.next();
keyIter.remove();//
//处理accept事件
if(key.isAcceptable()){
try {
System.out.println("handleAccept~");
protocol.handleAccept(key);
} catch (IOException e) {
e.printStackTrace();
}
}
//处理读事件
if(key.isReadable()){
try {
System.out.println("handleRead~");
protocol.handleRead(key);
} catch (IOException e) {
e.printStackTrace();
}
}
//处理写事件
if(key.isWritable()&&key.isValid()){
try {
System.out.println("handleWrite~");
protocol.handleWrite(key);
} catch (IOException e) {
e.printStackTrace();
}
}
//keyIter.remove();
//由于select()操作只是向Selector所关联的键集合中添加元素
//因此,如果不移除每个处理过的键,
//它就会在下次调用select()方法时仍然保留在集合中
//而且可能会有无用的操作来调用它。
}
}
}
}


以上为简单的个人实现,以下为详细的说明:

JavaNIO入门参考:http://www.ibm.com/developerworks/cn/education/java/j-nio/j-nio.html
JavaNIO实现非阻塞原理:http://www.ibm.com/developerworks/cn/java/l-niosvr/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java