一个非常完善的基于Socket的多服务器通信框架
2015-06-01 11:41
435 查看
一共4个文件
XServerReceiver : Socket接受处理XServer返回工具类 XServerReceiver充当服务器
XServerSender : Socket连接发送XServer服务器工具类 XServerSender充当客户端
ClientTest : 客户端测试程序
ServerTest : 服务端测试程序
XServerReceiver.java
XServerSender .java
ClientTest.java
ServerTest.java
测试结果
![](http://img.blog.csdn.net/20150601114133278)
XServerReceiver : Socket接受处理XServer返回工具类 XServerReceiver充当服务器
XServerSender : Socket连接发送XServer服务器工具类 XServerSender充当客户端
ClientTest : 客户端测试程序
ServerTest : 服务端测试程序
XServerReceiver.java
import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.net.BindException; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ConcurrentLinkedQueue; class XServerReceiverThread extends Thread { private Socket m_socket = null; private InputStream m_input = null; private OutputStream m_output = null; private BufferedReader m_br = null; private String m_clientHost = "";// 客户端的socket地址 public XServerReceiverThread(Socket socket) { try { this.m_socket = socket; this.m_input = socket.getInputStream(); this.m_output = socket.getOutputStream(); this.m_br = new BufferedReader(new InputStreamReader(this.m_input, "UTF-8")); this.m_clientHost = m_socket.getRemoteSocketAddress().toString(); this.m_clientHost = this.m_clientHost.substring(1);//因为m_clientHost为 /192.168.0.83:52177 this.m_socket.setSoTimeout(3000); System.out.println("connection established from " + m_clientHost); } catch (Exception ex) { ex.printStackTrace(); closeHandles(); } } public void run() { try { while (true) { String event = ""; while (true) { int one = this.m_br.read(); if (one == -1) { closeHandles(); break; } event += String.valueOf((char) one); if (one == '\1')// 一般消息尾部截止为一个不可见字符,这里假设为'\1' { XServerReceiver.PushEvent(event); break; } } } } catch (Exception ex) { ex.printStackTrace(); closeHandles(); } } private void closeHandles() { System.out.println("connection closed from " + m_clientHost); try { if (this.m_br != null) { this.m_br.close(); this.m_br = null; } } catch (Exception ex) { ex.printStackTrace(); } try { if (this.m_input != null) { this.m_input.close(); this.m_input = null; } } catch (Exception ex) { ex.printStackTrace(); } try { if (this.m_output != null) { this.m_output.close(); this.m_output = null; } } catch (Exception ex) { ex.printStackTrace(); } try { if (this.m_socket != null) { this.m_socket.close(); this.m_socket = null; } } catch (Exception ex) { ex.printStackTrace(); } } } class XServerListenThread extends Thread { private ServerSocket server_socket = null; private boolean m_pleaseWait = true; public void run() { while (true) { try { if (server_socket == null || server_socket.isClosed() || !server_socket.isBound()) { this.server_socket = new ServerSocket(5600);// XServer.TCP_PORT_FOR_OTHER_SERVER,这里假设为5600 System.out.println("TCP Processor is listening on " + 5600); m_pleaseWait = false; } Socket socket = this.server_socket.accept(); XServerReceiverThread xt = new XServerReceiverThread(socket); xt.start(); } catch (BindException bex) { bex.printStackTrace(); try { sleep(5000); } catch (Exception ex) { ex.printStackTrace(); } } catch (Exception ex) { ex.printStackTrace(); } } } public void Wait() { try { while (m_pleaseWait) Thread.sleep(10); } catch (Exception ex) { ex.printStackTrace(); } } } class XServerHandlerThread extends Thread { public void run() { try { while (true) { String event = XServerReceiver.PopEvent(); if (event == null) { Thread.sleep(1000); continue; } EventHandler(event);// 处理消息事件 event = null; } } catch (Exception ex) { ex.printStackTrace(); } } private void EventHandler(String event) { System.out.println("Received event: " + event); } } /** * * Socket接受处理XServer返回工具类 * XServerReceiver充当服务器 * * @author vhreal * */ public class XServerReceiver { static private ConcurrentLinkedQueue<String> m_eventQueue = new ConcurrentLinkedQueue<String>(); static private XServerListenThread m_XServerListenThread = null;// 监听线程 static private XServerHandlerThread m_XServerHandlerThread = null;// 处理线程 static public void PushEvent(String event) {// 外部类调用XServerReceiver.PushEvent(event); m_eventQueue.add(event); } static public String PopEvent() {// 外部类调用XServerReceiver.PopEvent(event); if (m_eventQueue.size() == 0) return null; return m_eventQueue.remove(); } static public void Start()// 外部类调用XServerReceiver.Start(); { m_XServerListenThread = new XServerListenThread(); m_XServerListenThread.start(); m_XServerListenThread.Wait(); m_XServerHandlerThread = new XServerHandlerThread(); m_XServerHandlerThread.start(); } }
XServerSender .java
import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketAddress; import java.util.concurrent.ConcurrentLinkedQueue; class XServerSenderThread extends Thread { private Socket m_socket = null; private OutputStream m_output = null; private InputStream m_input = null; private InputStreamReader m_isr = null; private BufferedReader m_br = null; private ConcurrentLinkedQueue<String> m_Events = new ConcurrentLinkedQueue<String>(); private String m_ServerHost = "192.168.0.83"; private int m_ServerPort = 5600; public void run() { long lastkeepalive = 0; long now = 0; while (true) { try { if (m_socket == null || m_socket.isClosed()) { Connect2XServer(); continue; } now = System.currentTimeMillis() / 1000; if (now - lastkeepalive > 2)// 2s发一次心跳 { RequestKeepAlive(); lastkeepalive = now; } String event = PopEvent(); if (event == null) { Thread.sleep(1000); continue; } SendEvent2XServer(event); event = null; } catch (Exception ex) { ex.printStackTrace(); } } } public void PushEvent(String event) { m_Events.add(event); } private String PopEvent() { if (m_Events.size() == 0) return null; return m_Events.remove(); } private boolean SendEvent2XServer(String event) { try { m_output.write(event.getBytes()); m_output.flush(); return true; } catch (Exception ex) { closeHandles(); return false; } } private boolean RequestKeepAlive() { try { String json = String .format("{\"AttributeTimeinSecs\":\"%d\",\"AttributeEventName\":\"EventKeepAlive\"}", System.currentTimeMillis() / 1000); m_output.write(json.getBytes()); m_output.flush(); return true; } catch (Exception ex) { closeHandles(); return false; } } private boolean Connect2XServer() { try { System.out.println("connecting to XServer"); SocketAddress endpoint = new InetSocketAddress(m_ServerHost, m_ServerPort);// XServer的Ip和端口号 m_socket = new Socket(); m_socket.setSoTimeout(3000);// 3s网络超时 m_socket.connect(endpoint); m_output = m_socket.getOutputStream(); m_input = m_socket.getInputStream(); m_isr = new InputStreamReader(m_input, "UTF-8"); m_br = new BufferedReader(m_isr); System.out.println("connected to XServer"); return true; } catch (Exception ex) { ex.printStackTrace(); closeHandles(); } return false; } private void closeHandles() { try { if (m_isr != null) { m_isr.close(); m_isr = null; } } catch (Exception ex) { ex.printStackTrace(); } try { if (m_br != null) { m_br.close(); m_br = null; } } catch (Exception ex) { ex.printStackTrace(); } try { if (m_input != null) { m_input.close(); m_input = null; } } catch (Exception ex) { ex.printStackTrace(); } try { if (m_output != null) { m_output.close(); m_output = null; } } catch (Exception ex) { ex.printStackTrace(); } try { if (m_socket != null) { m_socket.close(); m_socket = null; } } catch (Exception ex) { ex.printStackTrace(); } try { Thread.sleep(1000); } catch (Exception ex) { ex.printStackTrace(); } } } /** * * Socket连接发送XServer服务器工具类 XServerSender充当客户端 * * @author vhreal * */ public class XServerSender { static private XServerSenderThread m_SenderThread = null; static public void Start() {// 外部类调用XServerSender.Start(); try { m_SenderThread = new XServerSenderThread(); m_SenderThread.start(); } catch (Exception ex) { ex.printStackTrace(); } } static public void SendEvent2XServer(String event) {// 外部类调用XServerSender.SendEvent2XServer(event); if (m_SenderThread != null) { m_SenderThread.PushEvent(event); } } }
ClientTest.java
public class ClientTest { public static void main(String[] args) { XServerSender.Start(); XClientThread xc = new XClientThread(); xc.start(); } } class XClientThread extends Thread { public void run() { try { while (true) { String s = String.format("hello,Xsever!\1"); XServerSender.SendEvent2XServer(s); } } catch (Exception ex) { ex.printStackTrace(); } } }
ServerTest.java
public class ServerTest { public static void main(String[] args) { XServerReceiver.Start(); } }
测试结果
相关文章推荐
- JAVA -- 线程synchronized关键字
- JAVA -- 线程synchronized关键字
- JAVA -- 线程synchronized关键字
- ActiveMQ之消息持久化方式
- nfs固定启动端口
- 第十章 会话管理——《跟我学Shiro》
- 33.画线,三角形,四边形
- Redisson操作Redis之基础篇
- meta属性在HTML
- 简单Profibus/DP实验系统的组建
- 离散LQR:原理,求解与拓展
- 第八章 拦截器机制——《跟我学Shiro》2
- 第六章 Realm及相关对象——《跟我学Shiro》
- java mail使用qq邮箱发邮件的配置方法
- 黑马程序员——继承,抽象,接口
- JAVA -- 线程wait()、notify()、notifyAll()
- JAVA -- 线程wait()、notify()、notifyAll()
- JAVA -- 线程wait()、notify()、notifyAll()
- 09、Spring_web.xml_DispatcherServlet
- Shiro入门(2)Shiro提供的验证模块