[分布式java]基于JavaAPI实现消息方式的系统间通信:TCP/IP+NIO
2015-08-27 12:37
1026 查看
TCP/IP+BIO的限制:
1、线程池大小限制。
2、为每个连接建立线程,增加系统开销。
3、线程很难保证某些连接的优先级。
4、线程很难保证数据的一致性和高效性。
Java中的TCP/IP+BIO是阻塞式,而Java的TCP/IP+NIO是非阻塞,是为了解决TCP/IP+BIO的阻塞与上面提到的限制。
JavaNIO原理:JavaNIO使用反应器(Reactor)模式,与观察者模式类似,反应器模式与多事件源关联,观察者模式只与单个事件源关联。JavaNIO将最耗时的 I/O 操作(即填充和提取缓冲区)转移回操作系统,因而可以极大地提高速度。java的TCP/IP+NIO能同时监听多个端口,处理来自所有这些端口的连接与连接的关闭,并且它只在单个线程中完成所有这些工作。只有读、写事件使用线程池实现非阻塞。其中的重要概念:
1、Channel是一个通道,是连接到如硬件设备、文件、网络Socket等的一个连接,可以读取数据块也可以写入数据块。
2、Selector的作用是监听Channel发生的事件,对注册的事件做操作。
3、Channel使用Buffer传递数据(无论是发送还是保存),Buffer的子类可以容纳不同的数据类型:FLoatBuffer、IntBuffer、ByteBuffer,并且缓冲区固定,内部状态记录有多少数据放入或取出。
一、客户端代码实现
重要方法说明:
Channel和Selector的open()方法,用于创建各自实例。
Channel的regist()方法用于注册accept、connect、write、read事件,当regist有新事件注册时,会改变selectKeys里面集合的key值。
Selector的select()是一个阻塞事件。
二、服务端代码实现
与客户端写法一样,只不过我这次将accept、read、write分离出单独的类中
1、Protocol接口
以上为简单的个人实现,以下为详细的说明:
JavaNIO入门参考:http://www.ibm.com/developerworks/cn/education/java/j-nio/j-nio.html
JavaNIO实现非阻塞原理:http://www.ibm.com/developerworks/cn/java/l-niosvr/
1、线程池大小限制。
2、为每个连接建立线程,增加系统开销。
3、线程很难保证某些连接的优先级。
4、线程很难保证数据的一致性和高效性。
Java中的TCP/IP+BIO是阻塞式,而Java的TCP/IP+NIO是非阻塞,是为了解决TCP/IP+BIO的阻塞与上面提到的限制。
JavaNIO原理:JavaNIO使用反应器(Reactor)模式,与观察者模式类似,反应器模式与多事件源关联,观察者模式只与单个事件源关联。JavaNIO将最耗时的 I/O 操作(即填充和提取缓冲区)转移回操作系统,因而可以极大地提高速度。java的TCP/IP+NIO能同时监听多个端口,处理来自所有这些端口的连接与连接的关闭,并且它只在单个线程中完成所有这些工作。只有读、写事件使用线程池实现非阻塞。其中的重要概念:
1、Channel是一个通道,是连接到如硬件设备、文件、网络Socket等的一个连接,可以读取数据块也可以写入数据块。
2、Selector的作用是监听Channel发生的事件,对注册的事件做操作。
3、Channel使用Buffer传递数据(无论是发送还是保存),Buffer的子类可以容纳不同的数据类型:FLoatBuffer、IntBuffer、ByteBuffer,并且缓冲区固定,内部状态记录有多少数据放入或取出。
一、客户端代码实现
public void createTcpIpNioJavaMethodClient(){ try { SocketChannel channel=SocketChannel.open();//获取一个Channel实例 channel.configureBlocking(false);//设置为非阻塞 //对于非阻塞模式,返回false,表示连接正在建立中,调用channel.finishConnect()才能完成连接 channel. connect(new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), 1234)); Selector selector= Selector.open(); channel.register(selector, SelectionKey.OP_CONNECT);//将selector注册到Channel while(true) { int nKeys = selector.select(10000);//阻塞直到感兴趣的IO事件发生或者达到超时时间 //int nKeys=selector.select();//一直阻塞,直到感兴趣的IO事件发生 //int nKeys_SelectNow=selector.selectNow();//不阻塞直接返回目前是否有感兴趣的事件 //nKeys大于0说明有感兴趣的IO事件,返回可进行I/O操作的信道数量 if (nKeys > 0) { Set<SelectionKey> keys = selector.selectedKeys(); for (SelectionKey key : keys) { if (key.isConnectable()) { //如果发生连接 if (channel.isConnectionPending()) {//如果正在连接 channel.finishConnect();//完成连接 } SocketChannel sc = (SocketChannel) key.channel(); sc.configureBlocking(false); //注册Write事件,selector的key中会保存WRITE事件,则下一个循环会执行WRITE事件 sc.register(selector, SelectionKey.OP_WRITE); sc.finishConnect();//完成连接 } else if (key.isWritable()) { //取消对OP_WRITE事件的注册 key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); SocketChannel sc = (SocketChannel) key.channel(); sc.configureBlocking(false); byte[] data = "This is Client!".getBytes(); ByteBuffer writeBuffer = ByteBuffer.wrap(data);//将byte[]放到ByteBuffer中 sc.write(writeBuffer); sc.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); ByteBuffer buffer = ByteBuffer.allocate(1024); SocketChannel sc = (SocketChannel) key.channel(); sc.configureBlocking(false); sc.read(buffer); String msg = new String(buffer.array()); System.out.println("Client receive msg:" + msg); return; } } selector.selectedKeys().clear();//清空keys的key。 } } } catch (IOException e) { e.printStackTrace(); } }
重要方法说明:
Channel和Selector的open()方法,用于创建各自实例。
Channel的regist()方法用于注册accept、connect、write、read事件,当regist有新事件注册时,会改变selectKeys里面集合的key值。
Selector的select()是一个阻塞事件。
二、服务端代码实现
与客户端写法一样,只不过我这次将accept、read、write分离出单独的类中
1、Protocol接口
public interface Protocol { public void handleAccept(SelectionKey key)throws IOException; public void handleRead(SelectionKey key)throws IOException; public void handleWrite(SelectionKey key)throws IOException; }2、Protocol实现类
public class EchoProtocol implements Protocol { private int bufferSize; public EchoProtocol(int inBufferSize){ this.bufferSize=inBufferSize; } @Override public void handleAccept(SelectionKey key) thro 4000 ws IOException { SocketChannel channel=((ServerSocketChannel)key.channel()).accept(); //设置非阻塞 channel.configureBlocking(false); //注册该selector System.out.println("连接完毕,注册Read事件"); channel. register(key.selector(),SelectionKey.OP_READ, ByteBuffer.allocateDirect(bufferSize)); } @Override public void handleRead(SelectionKey key) throws IOException { //取消对OP_READ事件的注册 key.interestOps(key.interestOps()&(~SelectionKey.OP_READ)); SocketChannel channel= (SocketChannel) key.channel(); ByteBuffer buffer=ByteBuffer.allocate(200); channel.read(buffer); String msg=new String(buffer.array()); System.out.println("Server receive msg:"+msg); channel.register(key.selector(),SelectionKey.OP_WRITE); } @Override public void handleWrite(SelectionKey key) throws IOException { //取消对OP_READ事件的注册 //key.interestOps(key.interestOps()&(~SelectionKey.OP_WRITE)); System.out.println("start send msg to client…"); ByteBuffer buffer=ByteBuffer.wrap(new String("Hello,I am Server!").getBytes()); SocketChannel channel= (SocketChannel) key.channel(); long successedNum=channel.write(buffer);//向信道中写数据 System.out.println("endoff send msg to client,send successed char :"+successedNum); } }3、服务端主要类实现
public class TcpIpNioJavaMethod_Server { public static void main(String[] args) { new TcpIpNioJavaMethod_Server().createNioJavaMethodServer(); } public void createNioJavaMethodServer(){ //打开一个Selector Selector selector=null; try { selector= Selector.open(); } catch (IOException e) { e.printStackTrace(); } //打开一个Channel ServerSocketChannel listnChannel=null; try { listnChannel=ServerSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } //绑定服务端端口 try { listnChannel.socket().bind(new InetSocketAddress(1234)); } catch (IOException e) { e.printStackTrace(); } //设置channel为非阻塞才可以注册选择器 try { listnChannel.configureBlocking(false); } catch (IOException e) { e.printStackTrace(); } //注册该selector,channel可以进行accept操作 try { listnChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (ClosedChannelException e) { e.printStackTrace(); } Protocol protocol=new EchoProtocol(256); while(true){ try { System.out.println("阻塞等待感兴趣的事~"); if(selector.select(6000)==0){//等于0表示没有Channel操作 continue; } } catch (IOException e) { e.printStackTrace(); } Iterator<SelectionKey> keyIter=selector.selectedKeys().iterator(); while(keyIter.hasNext()){ SelectionKey key=keyIter.next(); keyIter.remove();// //处理accept事件 if(key.isAcceptable()){ try { System.out.println("handleAccept~"); protocol.handleAccept(key); } catch (IOException e) { e.printStackTrace(); } } //处理读事件 if(key.isReadable()){ try { System.out.println("handleRead~"); protocol.handleRead(key); } catch (IOException e) { e.printStackTrace(); } } //处理写事件 if(key.isWritable()&&key.isValid()){ try { System.out.println("handleWrite~"); protocol.handleWrite(key); } catch (IOException e) { e.printStackTrace(); } } //keyIter.remove(); //由于select()操作只是向Selector所关联的键集合中添加元素 //因此,如果不移除每个处理过的键, //它就会在下次调用select()方法时仍然保留在集合中 //而且可能会有无用的操作来调用它。 } } } }
以上为简单的个人实现,以下为详细的说明:
JavaNIO入门参考:http://www.ibm.com/developerworks/cn/education/java/j-nio/j-nio.html
JavaNIO实现非阻塞原理:http://www.ibm.com/developerworks/cn/java/l-niosvr/
相关文章推荐
- java对世界各个时区(TimeZone)的通用转换处理方法(转载)
- java-注解annotation
- java-模拟tomcat服务器
- java-用HttpURLConnection发送Http请求.
- java-WEB中的监听器Lisener
- Android IPC进程间通讯机制
- Android Native 绘图方法
- Android java 与 javascript互访(相互调用)的方法例子
- 介绍一款信息管理系统的开源框架---jeecg
- 聚类算法之kmeans算法java版本
- java实现 PageRank算法
- PropertyChangeListener简单理解
- 插入排序
- 冒泡排序
- 堆排序
- 快速排序
- 二叉查找树
- [原创]java局域网聊天系统