您的位置:首页 > 其它

socket 通讯检测客户端非正常断线。

2016-09-26 14:11 204 查看
package com.ist.socket;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Date;

/**
*  Socket收发器 通过Socket发送数据,并使用新线程监听Socket接收到的数据
* @author songqiang
* @createtime 2015-12-15
*/
public abstract class MediaTransceiver implements Runnable{

protected Socket socket;
protected InetAddress addr;
protected String falshId;
protected DataInputStream in;
protected DataOutputStream out;
private boolean runFlag;
//是否在线标识
private boolean onlineFlag;
/**
* 服务器端实例化
* @param socket
*/
public MediaTransceiver(Socket socket) {
this.socket = socket;
this.addr = socket.getInetAddress();
}
/**
* 监听Socket接收的数据(新线程中运行)
*/
@Override
public void run() {
try {
//socket输入流
in = new DataInputStream(this.socket.getInputStream());
//socket输出流
out = new DataOutputStream(this.socket.getOutputStream());
} catch (IOException e) {
e.printStackTrace();
runFlag = false;
}
//无限接收服务端信息,直到连接端口
while(runFlag){
try{
//接受数据
final String s = in.readUTF();
if(s.equals("1")){
onlineFlag = true;
}
this.onReceive(addr, s);
}catch(EOFException e){

}catch (IOException e){
// 连接被断开(被动)
runFlag = false;
}
}
//断开连接
try {
in.close();
out.close();
socket.close();
in = null;
out = null;
socket = null;
} catch (IOException e) {
e.printStackTrace();
}
this.onDisconnect(addr);
}
static void delay() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 开启Socket收发
* 如果开启失败,会断开连接并回调{@code onDisconnect()}
*/
public void start(){
runFlag = true;
new Thread(this).start();
}

/**
* 断开连接(主动)
* 连接断开后,会回调{@code onDisconnect()}
*/
public void stop(){
runFlag = false;
try {
socket.shutdownInput();
in.close();
} catch (Exception e) {
e.printStackTrace();
}
}
//向服务器发送falshId
public void sendFalshId(String falshId){
if(out!=null){
try{
send("falshId:"+falshId);
}catch(Exception e){
e.printStackTrace();
}
}else{
if(runFlag){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
sendFalshId(falshId);
}
}
}
/**
* 向服务端发送信息
* @param s
* @return
*/
public boolean send(String s){
if(out!=null){
try{
out.writeUTF(s);
out.flush();
return true;
}catch(Exception e){
e.printStackTrace();
}
}
return false;
}
/**
* 检查socket是否执行
*/
public void checkOnLine(){
send("0");
onlineFlag = false;
//线程等待onlineFlag标识是否改变
Thread checkThread = new Thread(new Runnable() {
@Override
public void run() {
delay();
//标识没有改变则判断为离线
if(!onlineFlag){
stop();
}
}
});
checkThread.start();
}
/**
* 获取连接到的Socket地址
*
* @return InetAddress对象
*/
public InetAddress getInetAddress() {
return addr;
}

public String getFalshId() {
return falshId;
}

public void setFalshId(String falshId) {
this.falshId = falshId;
}
//-------------------------------实例化时实现---------------------------
/**
* 接收到数据
* 注意:此回调是在新线程中执行的
* @param addr 连接到的Socket地址
* @param s:收到的字符串
*/
public abstract void onReceive(InetAddress addr, String s);

/**
* 连接断开
* 注意:此回调是在新线程中执行的
* @param addr
* 连接到的Socket地址
*/
public abstract void onDisconnect(InetAddress addr);

}

public abstract class MediaServer implements Runnable{
private static MediaServer mediaServer=null;
//监听端口
private int port;
//服务端运行标识
private boolean runFlag;
//客户端响应集合(ConcurrentHashMap线程安全解决并发修改问题)
private Map results = new ConcurrentHashMap();
//客户端集合
private Map clinets = new ConcurrentHashMap();
/**
* 实例化服务端
* @param port
*/
private MediaServer(int port) {
this.port = port;
}
//加同步锁(线程安全)
private static synchronized void syncInit(){
if(mediaServer==null){
mediaServer = new MediaServer(port) {
//服务器停止
@Override
public void onServerStop() {

}
//接收方法
@Override
public void onReceive(MediaTransceiver client, String s) {
this.getResults().put(client.falshId, s);
delay(1);
//System.out.println("接收结果:"+this.toString()+":"+this.getResults().get(client.falshId));
}
//断开连接
@Override
public void onDisconnect(MediaTransceiver client) {
if(null!=client){
this.getClinets().remove(client.falshId);
this.getResults().remove(client.falshId);
updateLogoutTime(client.falshId);
}
}
//连接失败
@Override
public void onConnectFailed() {
System.out.println("连接失败!");
}
//连接上
@Override
public void onConnect(MediaTransceiver client) {

}
};
mediaServer.start();
}
}
/**
* 获取服务器实例与创建实例分开
* 如果在构造函数中抛出异常,实例将永远得不到创建
* @return
*/
public static synchronized MediaServer getInstance(){
if(mediaServer == null){
syncInit();
mediaServer.updateAllOffLine();
}
mediaServer.checkOnLine();
return mediaServer;
}
/**
* 服务器启动
* 如果启动失败,会回调onServerStop()
*/
public void start(){
runFlag = true;
new Thread(this).start();
}
/**
* 服务器停止
* 服务器停止后,会回调{@code onServerStop()}
*/
public void stop(){
runFlag = false;
}
/**
* 监听端口,接受客户端连接(新线程中运行)
*/
@Override
public void run() {
try{
final ServerSocket server = new ServerSocket(port);
//无限等待启动客户端,直到服务器关闭
while(runFlag){
try{
final Socket socket = server.accept();
//socket.setSoTimeout(10000);//十秒连接超时
//与客户端建立连接
startClinet(socket);
} catch (IOException e) {
// ServerSocket对象创建出错,服务器启动失败
this.onConnectFailed();
}
}
//停止服务器
try{
for(String key:clinets.keySet()){
clinets.get(key).stop();
}
clinets.clear();
results.clear();
server.close();
mediaServer=null;
}catch (IOException e) {
// ServerSocket对象创建出错,服务器启动失败
e.printStackTrace();
}

} catch (IOException e) {
// ServerSocket对象创建出错,服务器启动失败
e.printStackTrace();
}
this.onServerStop();
}

/**
*  启动客户端收发
* @param socket
*/
public void startClinet(Socket socket){
//服务器端接收对象
MediaTransceiver clinet = new MediaTransceiver(socket) {
//接收信息
@Override
public void onReceive(InetAddress addr, String s) {
if("0".equals(s)){
this.send("1");
}else{
System.out.println("接收:"+s);
if(null!=s && s.startsWith("falshId:")){//添加客户端
s=s.replace("falshId:", "");
if(s!=null && !"null".equals(s.trim()) && hasFalshId(s)){
this.falshId=s;
clinets.put(this.falshId,this);
updateOnline(falshId);
}
}else if(this.falshId!=null){//接受发送消息
System.out.println(this.falshId+":"+s);
MediaServer.this.onReceive(this, s);
}else{
//System.out.println("返回值"+s);
}
}
}
//服务器断开连接
@Override
public void onDisconnect(InetAddress addr) {
if(null!=this && null!= this.falshId){

4000
MediaServer.this.onDisconnect(this);
}
}
};
clinet.start();
this.onConnect(clinet);
}
//向客户端发送命令
public boolean send(String falshId,String s){
for(String key:clinets.keySet()){
MediaTransceiver mt = clinets.get(key);
if(null!=mt && mt.falshId!=null && mt.falshId.equals(falshId)){
//连接不为空且soket是连接着的
if(null!=mt.socket && mt.socket.isConnected()){
mt.send(s);
return true;
}
}
}
return false;
}
//等待客户端响应
public String getResult(String falshId){
String resultstr = this.getResults().get(falshId);
if(null == resultstr){
return "";
}else{//去掉已返回命令
this.getResults().remove(falshId);
}
return resultstr;
}
//等待客户端响应
public Map getResults(){
return results;
}
public Map getClinets() {
return clinets;
}

//修改离线时间
public void updateLogoutTime(String falshId){
具体实现
}
//修改在线终端
public void updateOnline(String falshId){
具体实现
}
public boolean hasFalshId(String falshId){
具体实现
}
//修改所有终端为离线
public void updateAllOffLine(){
具体实现
}

/**
* 检查是否在线
*/
public void checkOnLine(){
for(String clinetId:clinets.keySet()){
MediaTransceiver clinet = clinets.get(clinetId);
clinet.checkOnLine();
}
}

static void delay(int count) {
try {
Thread.sleep(count*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//---------------------------具体实现交给子类,或是实例化时实现-----------------------
/**
* 客户端:连接建立
* 注意:此回调是在新线程中执行的
* @param client 客户端对象
*/
public abstract void onConnect(MediaTransceiver client);

/**
* 客户端:连接建立失败
* 注意:此回调是在新线程中执行的
*/
public abstract void onConnectFailed();

/**
* 客户端:收到字符串
* 注意:此回调是在新线程中执行的
* @param client 客户端对象
* @param s 字符串
*/
public abstract void onReceive(MediaTransceiver client, String s);

/**
* 客户端:连接断开
* 注意:此回调是在新线程中执行的
* @param client MediaCline客户端对象
*/
public abstract void onDisconnect(MediaTransceiver client);

/**
* 服务器停止
* 注意:此回调是在新线程中执行的
*/
public abstract void onServerStop();

package com.ist.socket;

import java.net.InetAddress;
import java.net.Socket;

public abstract class MediaClinet implements Runnable{

private int port;
private String hostIP;
private boolean mBlnNetIsOk = false;
private MediaTransceiver transceiver;
//终端标识码
private String falshId;

public MediaClinet(String falshId) {
this.falshId = falshId;
}
/**
* 建立连接
* 连接的建立将在新线程中进行
* 连接建立成功,回调{@code onConnect()}
* 连接建立失败,回调{@code onConnectFailed()}
* @param hostIP 服务器主机IP
* @param port 端口
*/
public void connect(String hostIP, int port) {
this.hostIP = hostIP;
this.port = port;
new Thread(this).start();
}

private void connect(){
try{
Socket socket = new Socket(hostIP, port);
transceiver = new MediaTransceiver(socket) {

@Override
public void onReceive(InetAddress addr, String s) {
if(s.equals("0")){
this.send("1");
}
MediaClinet.this.onReceive(transceiver, s);

}

@Override
public void onDisconnect(InetAddress addr) {
mBlnNetIsOk = false;
MediaClinet.this.onDisconnect(transceiver,falshId);
}
};
transceiver.start();
mBlnNetIsOk = true;
transceiver.sendFalshId(falshId);
this.onConnect(transceiver,falshId);

} catch (Exception e) {
e.printStackTrace();
this.onConnectFailed();
}
}
@Override
public void run(){
connect();
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
while (true) {
if (mBlnNetIsOk == false) {
connect();
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
private boolean sendPacket(String string) {
// TODO Auto-generated method stub
if(transceiver!=null){
return transceiver.send(string);
}
return false;
}
/**
* 断开连接
* 连接断开,回调{@code onDisconnect()}
*/
public void disconnect() {
if (transceiver != null) {
transceiver.stop();
transceiver = null;
}
}
/**
* 获取Socket收发器
*
* @return 未连接则返回null
*/
public MediaTransceiver getTransceiver() {
return isConnected() ? transceiver : null;
}

/**
* 判断是否连接
*
* @return 当前处于连接状态,则返回true
*/
public boolean isConnected() {
return mBlnNetIsOk;
}
/**
* 连接建立
* @param transceiver SocketTransceiver对象
*/
public abstract void onConnect(MediaTransceiver transceiver,String falshId);

/**
* 连接建立失败
*/
public abstract void onConnectFailed();

/**
* 接收到数据
* 注意:此回调是在新线程中执行的
* @param transceiver Socket收发对象
* @param s 字符串
*/
public abstract void onReceive(MediaTransceiver transceiver, String s);

/**
* 连接断开
* 注意:此回调是在新线程中执行的
* @param transceiver Socket收发对象
*/
public abstract void onDisconnect(MediaTransceiver transceiver,String falshId);

}

这个socket 通讯类大体框架是借鉴某个高手的博客里面的,具体我就不记得了(我只能说对不起了)。我大概说一下我代码里面的功能:

1.MediaServer 采用的是单例模式,没当有一个clinet 连接的时候就会创建一个MediaTransceiver(接收者)对象。

2.MediaClinet 创建一个MediaTransceiver(接收者)对象并发送一个标识自己的falshid。(因为ip是会变的,不会做为唯一键)

3.也是socket比较难处理的一个问题:非正常断线,就是你拔掉网线的时候socket是检查不到断开的。在网上看了很多帖子:(1)设置超时时间

(2)设置keepAlive (3)设置心跳包。第一个超时时间是read的阻塞时间,并不是说socket的运行了这么久然后就超时断开。所以超时和断开没关系,

第二个keepAlive是十二分钟检测一次服务器的活动状态,个人觉得有点久。第三个正常断开还行,非正常断开(拔网线)就检测不出了。

所以我通过发送0给客户端等待客户端响应1回来,没有则判断为离线。等待响应用的是线程,不然如果断线则会卡在read方法那里。

 

最后java是开源,代码共享,互相进步,延续这种精神。如果有疑问可以加qq群124795403 交流。我不是大牛,我只是代码的搬运工。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  socket 非正常断开