JAVA多进程服务器与客户机实现收发异步
2016-09-24 18:44
344 查看
核心: 非阻塞NIO的使用,发送和接收使用不同的Buffer
icesongqiang
java
icesongqiang
java
客户端类
/** ************************************************************* * 1. 客户端采用阻塞模式接入服务器 * 2. 接入后使用NIO事件机制一直查询可以读或者写的事件---主进程 * 3. 写入的数据 = 要传数据,也跟随主线程一直运行,但是写的内容可以由用户自己决定: * *3.1. 使用send_data(String data)将用户数据写入sendBuffer中 * *3.2. 在主线程检测可以写的时候,调用send(SelectionKey key)函数 * 把sendBuffer中的内容写入该key关联的SocketChannel中 * 4. 读数据 : 跟随主线程一直运行,可以读就直接读 *******************参考java网络编程精解 例程4-4 *************** ************************************** -- [icesongqiang] ***** */ import java.io.*; import java.nio.*; import java.nio.channels.*; import java.nio.charset.*; import java.net.*; import java.util.*; // for Set class EchoClient{ /** * Selector类, 只要ServerSocketChannel及SocketChannel向 * Selector 注册了特定的事件,Selector类就会监控这些事件是否发生 * register()方法,返回SelectionKey对象 */ private Selector selector = null; private SocketChannel socketChannel = null; private int s_port = 8000; private String s_IP = "192.168.100.141"; private Charset charset = Charset.forName("GBK"); private ByteBuffer sendBuffer = ByteBuffer.allocate(1024); private ByteBuffer receiveBuffer = ByteBuffer.allocate(1024); public EchoClient(String s_IP, int s_port) throws IOException{ this.s_IP = s_IP; this.s_port = s_port; socketChannel = SocketChannel.open(); //InetAddress ia = InetAddress.getLocalHost(); InetSocketAddress isa = new InetSocketAddress(this.s_IP, this.s_port); // 采用阻塞模式连接服务器 socketChannel.connect(isa); // 设置为非阻塞模式 socketChannel.configureBlocking(false); System.out.println("与服务器的连接建立成功"); selector = Selector.open(); } /*// 接收用户从控制台输入的数据,把它放到sendBuffer中 public void receiveFromUser(){ try{ BufferedReader localReader=new BufferedReader(new InputStreamReader(System.in)); String msg=null; while((msg=localReader.readLine())!=null){ synchronized(sendBuffer){ sendBuffer.put(encode(msg + "\r\n")); } if(msg.equals("bye")) break; } }catch(IOException e){ e.printStackTrace(); } }*/ // 接收指定的数据,把它放到sendBuffer中 public void send_data(String msg){ if(msg!= null){ synchronized(sendBuffer){ System.out.println("write to sendBuffer..."); sendBuffer.clear(); // 把极限设为容量,再把位置设为0 sendBuffer.put(encode(msg+"\n")); } } } // 接收和发送数据 public void talk() throws IOException{ System.out.println(socketChannel.toString()); socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); while(selector.select()>0){ Set<SelectionKey> readyKeys = selector.selectedKeys(); Iterator<SelectionKey> it = readyKeys.iterator(); while(it.hasNext()){ SelectionKey key = null; try{ key = it.next(); it.remove(); if(key.isReadable()){receive(key);} if(key.isWritable()){send(key);} }catch(IOException e){ e.printStackTrace(); try{ if(key!=null){ key.cancel(); key.channel().close(); } }catch(IOException ex){ex.printStackTrace();} } } } } private void send(SelectionKey key)throws IOException{ // 发送sendBuffer中的数据 //System.out.println("sending to server..."); SocketChannel socketChannel = (SocketChannel) key.channel(); synchronized(sendBuffer){ sendBuffer.flip(); // 把极限设为位置,再把位置设为0 socketChannel.write(sendBuffer); sendBuffer.compact(); // 删除已经发送的数据 } } private void receive(SelectionKey key)throws IOException{ // 接收EchoServer发送的数据,把它放到receiveBuffer中 // 如果receiveBuffer中有一行数据,就打印这行数据,然后把它从recviveBuffer中删除 SocketChannel socketChannel = (SocketChannel) key.channel(); socketChannel.read(receiveBuffer); receiveBuffer.flip(); String receiveData = decode(receiveBuffer); if(receiveData.indexOf("\n")==-1) return; String outputData = receiveData.substring(0, receiveData.indexOf("\n")+1); System.out.print(outputData); if(outputData.equals("echo:bye\r\n")){ key.cancel(); socketChannel.close(); System.out.println("关闭与服务器的连接"); selector.close(); System.exit(0); } ByteBuffer temp = encode(outputData); receiveBuffer.position(temp.limit()); receiveBuffer.compact(); // 删除已经打印的数据 } /**解码*/ private String decode(ByteBuffer buffer){ CharBuffer charbuff = charset.decode(buffer); return charbuff.toString(); } /**编码*/ private ByteBuffer encode(String str){ return charset.encode(str); } /*public void main(String[] argv)throws IOException{ try { echoclient = new EchoClient(m_ServerIP, m_ServerPort); Thread c_talk = new Thread(){ public void run(){ try { echoclient.talk(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }; c_talk.start(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } }*/ }
服务器
服务器类
/** * 1. 参考EchoClient类的实现,只是现在服务器为了接收多个客户端, * 2. 继承了Thread类,其run()方法就是负责阻塞的accept()SocketChannel, * 3. 然后根据accept到的每个SocketChannel的不同,建立doEchoServer的类, * 专门用于处理其中的一个连接,每一个连接就和CLient是相同的用法 * ************************************* --[icesongqiang] */ import java.io.*; import java.nio.*; import java.nio.channels.*; import java.nio.charset.*; import java.net.*; import java.util.*; // for Set class EchoServer extends Thread{ /** * Selector类, 只要ServerSocketChannel及SocketChannel向 * Selector 注册了特定的事件,Selector类就会监控这些事件是否发生 * register()方法,返回SelectionKey对象 */ private ServerSocketChannel serverSocketChannel = null; private int port = 8000; private int count =0; private List<DoEchoServer> list_DoEchoServer = new ArrayList<DoEchoServer>(); public EchoServer() throws IOException{ // 静态方法:返回一个ServerSocketChannel对象, 没有与任何端口绑定,并且处于阻塞模式 s 10535 erverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().setReuseAddress(true); // 复用端口 serverSocketChannel.configureBlocking(true); // 等待接入。默认阻塞模式,false 非阻塞 serverSocketChannel.socket().bind(new InetSocketAddress(port)); // 绑定端口 System.out.println("服务器启动"); } public void run(){ for(;;){ try{ SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println("接收到客户连接,来自:" + socketChannel.socket().getInetAddress() + ":" + socketChannel.socket().getPort()); // 连接了一个SocketChannel, 开启一个相应的进程 ++ count; DoEchoServer doEchoServer = new DoEchoServer(socketChannel); list_DoEchoServer.add(doEchoServer); Thread s_talk = new Thread(){ public void run(){ try { doEchoServer.talk(); } catch (IOException ex) { // TODO Auto-generated catch block ex.printStackTrace(); } } }; s_talk.start(); try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } Iterator<DoEchoServer> it = list_DoEchoServer.iterator(); while(it.hasNext()){ DoEchoServer ds = it.next(); if(ds.get_iswork()){ds.send_data("welcome to connect, now " + count + " clients");} else{ it.remove(); --count; } } }catch(IOException e){e.printStackTrace();} } } public void broadcast(){ while(true){ try { Thread.sleep(1000*10); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } Iterator<DoEchoServer> it = list_DoEchoServer.iterator(); while(it.hasNext()){ DoEchoServer ds = it.next(); if(ds.get_iswork()){ds.send_data("BroadCast: now " + count + " clients");} else{ it.remove(); --count; } } } } }
服务器分支进程
/** * 客户相应的执行方法, 主线程利用为每个用户都开了这样的一个进程*/ class DoEchoServer { private SocketChannel socketChannel = null; private String clientIP= null; private Selector selector = null; private Charset charset = Charset.forName("GBK"); private ByteBuffer sendBuffer = ByteBuffer.allocate(1024); private ByteBuffer receiveBuffer = ByteBuffer.allocate(1024); private boolean iswork = true; // 监测当前连接是否正常工作(发送&&接收) public DoEchoServer(SocketChannel socketChannel) throws IOException { // TODO Auto-generated constructor stub this.socketChannel = socketChannel; clientIP = socketChannel.socket().getInetAddress().toString(); clientIP = clientIP.substring(1); //System.out.println("DoEchoServer: " + socketChannel.toString()); // 为SocketChannel监控接收连接就绪事件 selector = Selector.open(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ |SelectionKey.OP_WRITE); } public SocketChannel get_SocketChannel(){ return this.socketChannel;} public boolean get_iswork(){return iswork;} // 发送和接收数据 public void talk() throws IOException{ while(selector.select()>0){ Set<SelectionKey> readyKeys = selector.selectedKeys(); Iterator<SelectionKey> it = readyKeys.iterator(); while(it.hasNext()){ SelectionKey key = null; try{ key = it.next(); it.remove(); if(key.isWritable()){send(key);} if(key.isReadable()){receive(key);} }catch(IOException e){ iswork = false; // 出现异常,表示该SocketChannel已经断开 e.printStackTrace(); try{ if(key!=null){ key.cancel(); key.channel().close(); } }catch(IOException ex){ex.printStackTrace();} } } } } // 接收指定的数据,把它放到sendBuffer中 public void send_data(String msg){ if(msg!= null){ synchronized(sendBuffer){ sendBuffer.clear(); // 把极限设为容量,再把位置设为0 sendBuffer.put(encode(msg+"\r\n")); } } } // 将sendBuffer中的内容发送到与key绑定的信道上 private void send(SelectionKey key)throws IOException{ // 发送sendBuffer中的数据 //System.out.println("sending to server..."); SocketChannel socketChannel = (SocketChannel) key.channel(); synchronized(sendBuffer){ sendBuffer.flip(); // 把极限设为位置,再把位置设为0 socketChannel.write(sendBuffer); sendBuffer.compact(); // 删除已经发送的数据 } } private void receive(SelectionKey key)throws IOException{ // 接收EchoClient发送的数据,把它放到receiveBuffer中 // 如果receiveBuffer中有一行数据,就打印这行数据,然后把它从recviveBuffer中删除 SocketChannel socketChannel = (SocketChannel) key.channel(); //System.out.println(socketChannel.toString()); socketChannel.read(receiveBuffer); receiveBuffer.flip(); String receiveData = decode(receiveBuffer); if(receiveData.indexOf("\n")==-1) return; String outputData = receiveData.substring(0, receiveData.indexOf("\n")+1); dosomething(outputData); /* if(outputData.equals("echo:bye\r\n")){ key.cancel(); socketChannel.close(); System.out.println("关闭与客户端的连接"); selector.close(); System.exit(0); }*/ ByteBuffer temp = encode(outputData); receiveBuffer.position(temp.limit()); receiveBuffer.compact(); // 删除已经打印的数据 } /**解码*/ private String decode(ByteBuffer buffer){ CharBuffer charbuff = charset.decode(buffer); return charbuff.toString(); } /**编码*/ private ByteBuffer encode(String str){ return charset.encode(str); } /**wrete your own code there */ private void dosomething(String str){ // wrete your own code there // 这里还可以配合进程处理进行其他进程的开关,具体可以参考[java 多进程] } }
服务器启动
import java.io.IOException; public class MyServer { public static void main(String[] args) { // TODO Auto-generated method stub try { EchoServer echoServer = new EchoServer(); Thread s_accept = new Thread(){ public void run(){ echoServer.run(); } }; s_accept.start(); echoServer.broadcast(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
相关文章推荐
- java对世界各个时区(TimeZone)的通用转换处理方法(转载)
- java-注解annotation
- java-模拟tomcat服务器
- java-用HttpURLConnection发送Http请求.
- java-WEB中的监听器Lisener
- 小心服务器内存居高不下的元凶--WebAPI服务
- Android IPC进程间通讯机制
- Android之使用Http协议实现文件上传功能
- Android Native 绘图方法
- Android java 与 javascript互访(相互调用)的方法例子
- Python3写爬虫(四)多线程实现数据爬取
- 介绍一款信息管理系统的开源框架---jeecg
- 聚类算法之kmeans算法java版本
- java实现 PageRank算法
- 运维入门
- PropertyChangeListener简单理解
- c++11 + SDL2 + ffmpeg +OpenAL + java = Android播放器