socket 通讯检测客户端非正常断线。
2016-09-26 14:11
204 查看
package com.ist.socket; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.EOFException; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.net.InetAddress; import java.net.Socket; import java.net.UnknownHostException; import java.util.Date; /** * Socket收发器 通过Socket发送数据,并使用新线程监听Socket接收到的数据 * @author songqiang * @createtime 2015-12-15 */ public abstract class MediaTransceiver implements Runnable{ protected Socket socket; protected InetAddress addr; protected String falshId; protected DataInputStream in; protected DataOutputStream out; private boolean runFlag; //是否在线标识 private boolean onlineFlag; /** * 服务器端实例化 * @param socket */ public MediaTransceiver(Socket socket) { this.socket = socket; this.addr = socket.getInetAddress(); } /** * 监听Socket接收的数据(新线程中运行) */ @Override public void run() { try { //socket输入流 in = new DataInputStream(this.socket.getInputStream()); //socket输出流 out = new DataOutputStream(this.socket.getOutputStream()); } catch (IOException e) { e.printStackTrace(); runFlag = false; } //无限接收服务端信息,直到连接端口 while(runFlag){ try{ //接受数据 final String s = in.readUTF(); if(s.equals("1")){ onlineFlag = true; } this.onReceive(addr, s); }catch(EOFException e){ }catch (IOException e){ // 连接被断开(被动) runFlag = false; } } //断开连接 try { in.close(); out.close(); socket.close(); in = null; out = null; socket = null; } catch (IOException e) { e.printStackTrace(); } this.onDisconnect(addr); } static void delay() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 开启Socket收发 * 如果开启失败,会断开连接并回调{@code onDisconnect()} */ public void start(){ runFlag = true; new Thread(this).start(); } /** * 断开连接(主动) * 连接断开后,会回调{@code onDisconnect()} */ public void stop(){ runFlag = false; try { socket.shutdownInput(); in.close(); } catch (Exception e) { e.printStackTrace(); } } //向服务器发送falshId public void sendFalshId(String falshId){ if(out!=null){ try{ send("falshId:"+falshId); }catch(Exception e){ e.printStackTrace(); } }else{ if(runFlag){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } sendFalshId(falshId); } } } /** * 向服务端发送信息 * @param s * @return */ public boolean send(String s){ if(out!=null){ try{ out.writeUTF(s); out.flush(); return true; }catch(Exception e){ e.printStackTrace(); } } return false; } /** * 检查socket是否执行 */ public void checkOnLine(){ send("0"); onlineFlag = false; //线程等待onlineFlag标识是否改变 Thread checkThread = new Thread(new Runnable() { @Override public void run() { delay(); //标识没有改变则判断为离线 if(!onlineFlag){ stop(); } } }); checkThread.start(); } /** * 获取连接到的Socket地址 * * @return InetAddress对象 */ public InetAddress getInetAddress() { return addr; } public String getFalshId() { return falshId; } public void setFalshId(String falshId) { this.falshId = falshId; } //-------------------------------实例化时实现--------------------------- /** * 接收到数据 * 注意:此回调是在新线程中执行的 * @param addr 连接到的Socket地址 * @param s:收到的字符串 */ public abstract void onReceive(InetAddress addr, String s); /** * 连接断开 * 注意:此回调是在新线程中执行的 * @param addr * 连接到的Socket地址 */ public abstract void onDisconnect(InetAddress addr); }
public abstract class MediaServer implements Runnable{ private static MediaServer mediaServer=null; //监听端口 private int port; //服务端运行标识 private boolean runFlag; //客户端响应集合(ConcurrentHashMap线程安全解决并发修改问题) private Map results = new ConcurrentHashMap(); //客户端集合 private Map clinets = new ConcurrentHashMap(); /** * 实例化服务端 * @param port */ private MediaServer(int port) { this.port = port; } //加同步锁(线程安全) private static synchronized void syncInit(){ if(mediaServer==null){ mediaServer = new MediaServer(port) { //服务器停止 @Override public void onServerStop() { } //接收方法 @Override public void onReceive(MediaTransceiver client, String s) { this.getResults().put(client.falshId, s); delay(1); //System.out.println("接收结果:"+this.toString()+":"+this.getResults().get(client.falshId)); } //断开连接 @Override public void onDisconnect(MediaTransceiver client) { if(null!=client){ this.getClinets().remove(client.falshId); this.getResults().remove(client.falshId); updateLogoutTime(client.falshId); } } //连接失败 @Override public void onConnectFailed() { System.out.println("连接失败!"); } //连接上 @Override public void onConnect(MediaTransceiver client) { } }; mediaServer.start(); } } /** * 获取服务器实例与创建实例分开 * 如果在构造函数中抛出异常,实例将永远得不到创建 * @return */ public static synchronized MediaServer getInstance(){ if(mediaServer == null){ syncInit(); mediaServer.updateAllOffLine(); } mediaServer.checkOnLine(); return mediaServer; } /** * 服务器启动 * 如果启动失败,会回调onServerStop() */ public void start(){ runFlag = true; new Thread(this).start(); } /** * 服务器停止 * 服务器停止后,会回调{@code onServerStop()} */ public void stop(){ runFlag = false; } /** * 监听端口,接受客户端连接(新线程中运行) */ @Override public void run() { try{ final ServerSocket server = new ServerSocket(port); //无限等待启动客户端,直到服务器关闭 while(runFlag){ try{ final Socket socket = server.accept(); //socket.setSoTimeout(10000);//十秒连接超时 //与客户端建立连接 startClinet(socket); } catch (IOException e) { // ServerSocket对象创建出错,服务器启动失败 this.onConnectFailed(); } } //停止服务器 try{ for(String key:clinets.keySet()){ clinets.get(key).stop(); } clinets.clear(); results.clear(); server.close(); mediaServer=null; }catch (IOException e) { // ServerSocket对象创建出错,服务器启动失败 e.printStackTrace(); } } catch (IOException e) { // ServerSocket对象创建出错,服务器启动失败 e.printStackTrace(); } this.onServerStop(); } /** * 启动客户端收发 * @param socket */ public void startClinet(Socket socket){ //服务器端接收对象 MediaTransceiver clinet = new MediaTransceiver(socket) { //接收信息 @Override public void onReceive(InetAddress addr, String s) { if("0".equals(s)){ this.send("1"); }else{ System.out.println("接收:"+s); if(null!=s && s.startsWith("falshId:")){//添加客户端 s=s.replace("falshId:", ""); if(s!=null && !"null".equals(s.trim()) && hasFalshId(s)){ this.falshId=s; clinets.put(this.falshId,this); updateOnline(falshId); } }else if(this.falshId!=null){//接受发送消息 System.out.println(this.falshId+":"+s); MediaServer.this.onReceive(this, s); }else{ //System.out.println("返回值"+s); } } } //服务器断开连接 @Override public void onDisconnect(InetAddress addr) { if(null!=this && null!= this.falshId){ 4000 MediaServer.this.onDisconnect(this); } } }; clinet.start(); this.onConnect(clinet); } //向客户端发送命令 public boolean send(String falshId,String s){ for(String key:clinets.keySet()){ MediaTransceiver mt = clinets.get(key); if(null!=mt && mt.falshId!=null && mt.falshId.equals(falshId)){ //连接不为空且soket是连接着的 if(null!=mt.socket && mt.socket.isConnected()){ mt.send(s); return true; } } } return false; } //等待客户端响应 public String getResult(String falshId){ String resultstr = this.getResults().get(falshId); if(null == resultstr){ return ""; }else{//去掉已返回命令 this.getResults().remove(falshId); } return resultstr; } //等待客户端响应 public Map getResults(){ return results; } public Map getClinets() { return clinets; } //修改离线时间 public void updateLogoutTime(String falshId){ 具体实现 } //修改在线终端 public void updateOnline(String falshId){ 具体实现 } public boolean hasFalshId(String falshId){ 具体实现 } //修改所有终端为离线 public void updateAllOffLine(){ 具体实现 } /** * 检查是否在线 */ public void checkOnLine(){ for(String clinetId:clinets.keySet()){ MediaTransceiver clinet = clinets.get(clinetId); clinet.checkOnLine(); } } static void delay(int count) { try { Thread.sleep(count*1000); } catch (InterruptedException e) { e.printStackTrace(); } } //---------------------------具体实现交给子类,或是实例化时实现----------------------- /** * 客户端:连接建立 * 注意:此回调是在新线程中执行的 * @param client 客户端对象 */ public abstract void onConnect(MediaTransceiver client); /** * 客户端:连接建立失败 * 注意:此回调是在新线程中执行的 */ public abstract void onConnectFailed(); /** * 客户端:收到字符串 * 注意:此回调是在新线程中执行的 * @param client 客户端对象 * @param s 字符串 */ public abstract void onReceive(MediaTransceiver client, String s); /** * 客户端:连接断开 * 注意:此回调是在新线程中执行的 * @param client MediaCline客户端对象 */ public abstract void onDisconnect(MediaTransceiver client); /** * 服务器停止 * 注意:此回调是在新线程中执行的 */ public abstract void onServerStop();
package com.ist.socket; import java.net.InetAddress; import java.net.Socket; public abstract class MediaClinet implements Runnable{ private int port; private String hostIP; private boolean mBlnNetIsOk = false; private MediaTransceiver transceiver; //终端标识码 private String falshId; public MediaClinet(String falshId) { this.falshId = falshId; } /** * 建立连接 * 连接的建立将在新线程中进行 * 连接建立成功,回调{@code onConnect()} * 连接建立失败,回调{@code onConnectFailed()} * @param hostIP 服务器主机IP * @param port 端口 */ public void connect(String hostIP, int port) { this.hostIP = hostIP; this.port = port; new Thread(this).start(); } private void connect(){ try{ Socket socket = new Socket(hostIP, port); transceiver = new MediaTransceiver(socket) { @Override public void onReceive(InetAddress addr, String s) { if(s.equals("0")){ this.send("1"); } MediaClinet.this.onReceive(transceiver, s); } @Override public void onDisconnect(InetAddress addr) { mBlnNetIsOk = false; MediaClinet.this.onDisconnect(transceiver,falshId); } }; transceiver.start(); mBlnNetIsOk = true; transceiver.sendFalshId(falshId); this.onConnect(transceiver,falshId); } catch (Exception e) { e.printStackTrace(); this.onConnectFailed(); } } @Override public void run(){ connect(); try { Thread.sleep(4000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } while (true) { if (mBlnNetIsOk == false) { connect(); } try { Thread.sleep(3000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } private boolean sendPacket(String string) { // TODO Auto-generated method stub if(transceiver!=null){ return transceiver.send(string); } return false; } /** * 断开连接 * 连接断开,回调{@code onDisconnect()} */ public void disconnect() { if (transceiver != null) { transceiver.stop(); transceiver = null; } } /** * 获取Socket收发器 * * @return 未连接则返回null */ public MediaTransceiver getTransceiver() { return isConnected() ? transceiver : null; } /** * 判断是否连接 * * @return 当前处于连接状态,则返回true */ public boolean isConnected() { return mBlnNetIsOk; } /** * 连接建立 * @param transceiver SocketTransceiver对象 */ public abstract void onConnect(MediaTransceiver transceiver,String falshId); /** * 连接建立失败 */ public abstract void onConnectFailed(); /** * 接收到数据 * 注意:此回调是在新线程中执行的 * @param transceiver Socket收发对象 * @param s 字符串 */ public abstract void onReceive(MediaTransceiver transceiver, String s); /** * 连接断开 * 注意:此回调是在新线程中执行的 * @param transceiver Socket收发对象 */ public abstract void onDisconnect(MediaTransceiver transceiver,String falshId); }
这个socket 通讯类大体框架是借鉴某个高手的博客里面的,具体我就不记得了(我只能说对不起了)。我大概说一下我代码里面的功能:
1.MediaServer 采用的是单例模式,没当有一个clinet 连接的时候就会创建一个MediaTransceiver(接收者)对象。
2.MediaClinet 创建一个MediaTransceiver(接收者)对象并发送一个标识自己的falshid。(因为ip是会变的,不会做为唯一键)
3.也是socket比较难处理的一个问题:非正常断线,就是你拔掉网线的时候socket是检查不到断开的。在网上看了很多帖子:(1)设置超时时间
(2)设置keepAlive (3)设置心跳包。第一个超时时间是read的阻塞时间,并不是说socket的运行了这么久然后就超时断开。所以超时和断开没关系,
第二个keepAlive是十二分钟检测一次服务器的活动状态,个人觉得有点久。第三个正常断开还行,非正常断开(拔网线)就检测不出了。
所以我通过发送0给客户端等待客户端响应1回来,没有则判断为离线。等待响应用的是线程,不然如果断线则会卡在read方法那里。
最后java是开源,代码共享,互相进步,延续这种精神。如果有疑问可以加qq群124795403 交流。我不是大牛,我只是代码的搬运工。
相关文章推荐
- C#Socket通讯之超时检测
- python服务器和客户端网络通讯socketserver
- Socket的通讯收发文件数据(含服务端和客户端)
- C#线程池多线程Socket通讯 服务器端和客户端示例
- PHP基于socket实现客户端和服务端通讯功能
- Socket客户端和服务端通讯
- 利用线程池实现多客户端和单服务器端Socket通讯(一):同步方式
- TMG自动发现功能配置正常,但客户端却检测不到TMG服务器。
- Android基于客户端和服务器的Socket编程例子之Socket基础通讯--socket相关知识
- Java实现简单的Socket服务器与客户端字符串通讯(适合初学者阅读)
- SOCKET 检测链接是否断线的三种方法(转载)
- 客户端技术的一点思考(数据存储用SQLite, XMPP通讯用Gloox, Web交互用LibCurl, 数据打包用Protocol Buffer, socket通讯用boost asio)
- linux socket unix socket 断线检测 心跳检测
- 即时通讯检测网络是否正常
- 利用线程池实现多客户端和单服务器端Socket通讯(二):异步编程模型实现
- Socket 服务端使用多线程实现多客户端通讯实例
- Linux下使用socktop来检测socket的通讯状况
- socket 服务器端与客户端简单通讯
- 安卓客户端通过socket与服务器进行通讯
- socket的客户端判断连接是否正常