您的位置:首页 > 其它

Socket通信完整实例(心跳包,客户端断线重连,服务端超时断开客户端连接)

2018-02-25 21:05 1081 查看

Socket通信完整实例(心跳包,客户端断线重连,服务端超时断开客户端连接)

通用工具类

public class SocketUtil {

public static String ADDRESS = "192.168.1.123";
public static int PORT = 10086;

/**
* 读数据
*
* @param bufferedReader
*/
public static String readFromStream(BufferedReader bufferedReader) {
try {
String s;
if ((s = bufferedReader.readLine()) != null) {
return s;
}
} catch (IOException e) {
e.printStackTrace();
}
return null;
}

/**
* 写数据
*
* @param data
* @param printWriter
*/
public static void write2Stream(String data, PrintWriter printWriter) {
if (data == null) {
return;
}
if (printWriter != null) {
printWriter.println(data);
}
}

/**
* 关闭输入流
*
* @param socket
*/
public static void inputStreamShutdown(Socket socket) {
try {
if (!socket.isClosed() && !socket.isInputShutdown()) {
socket.shutdownInput();
}
} catch (IOException e) {
e.printStackTrace();
}
}

/**
* 关闭BufferedReader
*
* @param br
*/
public static void closeBufferedReader(BufferedReader br) {
try {
if (br != null) {
br.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}

/**
* 关闭输出流
*
* @param socket
*/
public static void outputStreamShutdown(Socket socket) {
try {
if (!socket.isClosed() && !socket.isOutputShutdown()) {
socket.shutdownOutput();
}
} catch (IOException e) {
e.printStackTrace();
}
}

/**
* 关闭PrintWriter
*
* @param pw
*/
public static void closePrintWriter(PrintWriter pw) {
if (pw != null) {
pw.close();
}
}

/**
* 获取本机IP地址
*/
public static String getIP() {
String hostIP = null;
try {
Enumeration nis = NetworkInterface.getNetworkInterfaces();
InetAddress ia = null;
while (nis.hasMoreElements()) {
NetworkInterface ni = (NetworkInterface) nis.nextElement();
Enumeration<InetAddress> ias = ni.getInetAddresses();
while (ias.hasMoreElements()) {
ia = ias.nextElement();
if (ia instanceof Inet6Address) {
continue;// skip ipv6
}
String ip = ia.getHostAddress();
if (!"127.0.0.1".equals(ip)) {
hostIP = ia.getHostAddress();
break;
}
}
}
} catch (SocketException e) {
e.printStackTrace();
}
return hostIP;

}

}


客户端

状态回调接口

public interface SocketClientResponseInterface<T> {

/**
* 客户端连接回调
*/
void onSocketConnect();

/**
* 客户端收到服务端消息回调
*
* @param socketResult
* @param code
*/
void onSocketReceive(T socketResult, int code);

/**
* 客户端关闭回调
*
* @param msg
* @param code
*/
void onSocketDisable(String msg, int code);
}


客户端Socket实例代码

SocketClientThread连接线程

/**
* 写数据采用死循环,没有数据时wait,有新消息时notify
* Created by gavinandre on 18-1-8.
*/
public class SocketClientThread extends Thread implements SocketCloseInterface {

private static final String TAG = SocketClientThread.class.getSimpleName();

private volatile String name;

private boolean isLongConnection = true;
private boolean isReConnect = true;
private SocketSendThread mSocketSendThread;
private SocketReceiveThread mSocketReceiveThread;
private SocketHeartBeatThread mSocketHeartBeatThread;
private Socket mSocket;

private boolean isSocketAvailable;

private SocketClientResponseInterface socketClientResponseInterface;

public SocketClientThread(String name, SocketClientResponseInterface socketClientResponseInterface) {
this.name = name;
this.socketClientResponseInterface = socketClientResponseInterface;
}

@Override
public void run() {
final Thread currentThread = Thread.currentThread();
final String oldName = currentThread.getName();
currentThread.setName("Processing-" + name);
try {
initSocket();
Log.i(TAG, "run: SocketClientThread end");
} finally {
currentThread.setName(oldName);
}
}

/**
* 初始化socket客户端
*/
private void initSocket() {
try {
mSocket = SocketFactory.getDefault().createSocket();
SocketAddress socketAddress = new InetSocketAddress(SocketUtil.ADDRESS, SocketUtil.PORT);
mSocket.connect(socketAddress, 10000);

isSocketAvailable = true;

//开启接收线程
mSocketReceiveThread = new SocketReceiveThread("SocketReceiveThread",
new BufferedReader(new InputStreamReader(mSocket.getInputStream(), "UTF-8")),
socketClientResponseInterface, this);
mSocketReceiveThread.start();

//开启发送线程
mSocketSendThread = new SocketSendThread("SocketSendThread",
new PrintWriter(mSocket.getOutputStream(), true));
mSocketSendThread.setCloseSendTask(false);
mSocketSendThread.start();

//开启心跳线程
if (isLongConnection) {
mSocketHeartBeatThread = new SocketHeartBeatThread("SocketHeartBeatThread",
new PrintWriter(mSocket.getOutputStream(), true),
mSocket, this);
mSocketHeartBeatThread.start();
}

if (socketClientResponseInterface != null) {
socketClientResponseInterface.onSocketConnect();
}
} catch (ConnectException e) {
failedMessage("服务器连接异常,请检查网络", SocketUtil.FAILED);
e.printStackTrace();
stopThread();
} catch (IOException e) {
failedMessage("网络发生异常,请稍后重试", SocketUtil.FAILED);
e.printStackTrace();
stopThread();
}
}

/**
* 发送消息
*/
public void sendMsg(String data) {
if (mSocketSendThread != null) {
mSocketSendThread.sendMsg(data);
}
}

/**
* 关闭socket客户端
*/
public synchronized void stopThread() {
//关闭接收线程
closeReceiveTask();
//唤醒发送线程并关闭
wakeSendTask();
//关闭心跳线程
closeHeartBeatTask();
//关闭socket
closeSocket();
//清除数据
clearData();
failedMessage("断开连接", SocketUtil.FAILED);
if (isReConnect) {
SocketUtil.toWait(this, 15000);
initSocket();
Log.i(TAG, "stopThread: " + Thread.currentThread().getName());
}
}

/**
* 唤醒后关闭发送线程
*/
private void wakeSendTask() {
if (mSocketSendThread != null) {
mSocketSendThread.wakeSendTask();
}
}

/**
* 关闭接收线程
*/
private void closeReceiveTask() {
if (mSocketReceiveThread != null) {
mSocketReceiveThread.close();
mSocketReceiveThread = null;
}
}

/**
* 关闭心跳线程
*/
private void closeHeartBeatTask() {
if (mSocketHeartBeatThread != null) {
mSocketHeartBeatThread.close();
}
}

/**
* 关闭socket
*/
private void closeSocket() {
if (mSocket != null) {
if (!mSocket.isClosed() && mSocket.isConnected()) {
try {
mSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
isSocketAvailable = false;
mSocket = null;
}
}

/**
* 清除数据
*/
private void clearData() {
if (mSocketSendThread != null) {
mSocketSendThread.clearData();
}
}

/**
* 连接失败回调
*/
private void failedMessage(String msg, int code) {
if (socketClientResponseInterface != null) {
socketClientResponseInterface.onSocketDisable(msg, code);
}
}

/**
* 判断本地socket连接状态
*/
private boolean isConnected() {
if (mSocket.isClosed() || !mSocket.isConnected()) {
stopThread();
return false;
}
return true;
}

@Override
public void onSocketShutdownInput() {
if (isSocketAvailable) {
SocketUtil.inputStreamShutdown(mSocket);
}
}

@Override
public void onSocketDisconnection() {
isSocketAvailable = false;
stopThread();
}

/**
* 设置是否断线重连
*/
public void setReConnect(boolean reConnect) {
isReConnect = reConnect;
}

}


SocketReceiveThread 接收线程

/**
* Created by gavinandre on 18-3-13.
* 数据接收线程
*/
public class SocketReceiveThread extends Thread {

private static final String TAG = SocketReceiveThread.class.getSimpleName();

private volatile String name;

private volatile boolean isCancel = false;

private BufferedReader bufferedReader;

private SocketCloseInterface socketCloseInterface;

private SocketClientResponseInterface socketClientResponseInterface;

public SocketReceiveThread(String name, BufferedReader bufferedReader,
SocketClientResponseInterface socketClientResponseInterface,
SocketCloseInterface socketCloseInterface) {
this.name = name;
this.bufferedReader = bufferedReader;
this.socketClientResponseInterface = socketClientResponseInterface;
this.socketCloseInterface = socketCloseInterface;
}

@Override
public void run() {
final Thread currentThread = Thread.currentThread();
final String oldName = currentThread.getName();
currentThread.setName("Processing-" + name);
try {
while (!isCancel) {
//if (!isConnected()) {
//    break;
//}

if (bufferedReader != null) {
String receiverData = SocketUtil.readFromStream(bufferedReader);
if (receiverData != null) {
successMessage(receiverData);
} else {
Log.i(TAG, "run: receiverData==null");
break;
}
}
}
} finally {
//循环结束则退出输入流
SocketUtil.closeBufferedReader(bufferedReader);
currentThread.setName(oldName);
}
}

/**
* 接收消息回调
*/
private void successMessage(String data) {
if (socketClientResponseInterface != null) {
socketClientResponseInterface.onSocketReceive(data, SocketUtil.SUCCESS);
}
}

public void close() {
isCancel = true;
this.interrupt();
if (bufferedReader != null) {
if (socketCloseInterface != null) {
socketCloseInterface.onSocketShutdownInput();
}
SocketUtil.closeBufferedReader(bufferedReader);
bufferedReader = null;
}
}

}


SocketSendThread 发送线程

/**
* Created by gavinandre on 18-3-13.
* 数据发送线程,当没有发送数据时让线程等待
*/
public class SocketSendThread extends Thread {

private static final String TAG = SocketSendThread.class.getSimpleName();

private volatile String name;

private volatile boolean isCancel = false;
private boolean closeSendTask;
private PrintWriter printWriter;

private CountDownLatch latch = new CountDownLatch(1);

protected volatile ConcurrentLinkedQueue<String> dataQueue = new ConcurrentLinkedQueue<>();

public SocketSendThread(String name, PrintWriter printWriter) {
this.name = name;
this.printWriter = printWriter;
}

@Override
public void run() {
final Thread currentThread = Thread.currentThread();
final String oldName = currentThread.getName();
currentThread.setName("Processing-" + name);
try {
while (!isCancel) {
//if (!isConnected()) {
//    break;
//}

String dataContent = dataQueue.poll();
if (dataContent == null) {
//没有发送数据则等待
SocketUtil.toWait(dataQueue, 0);
if (closeSendTask) {
//notify()调用后,并不是马上就释放对象锁的,所以在此处中断发送线程
close();
latch.countDown();
}
} else if (printWriter != null) {
synchronized (printWriter) {
Log.i(TAG, "before: write2Stream");
SocketUtil.write2Stream(dataContent, printWriter);
Log.i(TAG, "after: write2Stream");
}
}
}
} finally {
//循环结束则退出输出流
SocketUtil.closePrintWriter(printWriter);
currentThread.setName(oldName);
}
}

/**
* 发送消息
*/
public void sendMsg(String data) {
dataQueue.add(data);
//有新增待发送数据,则唤醒发送线程
SocketUtil.toNotifyAll(dataQueue);
}

/**
* 清除数据
*/
public void clearData() {
dataQueue.clear();
}

public void close() {
isCancel = true;
this.interrupt();
if (printWriter != null) {
//防止写数据时停止,写完再停
synchronized (printWriter) {
SocketUtil.closePrintWriter(printWriter);
printWriter = null;
}
}
}

public void wakeSendTask() {
try {
closeSendTask = true;
SocketUtil.toNotifyAll(dataQueue);
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public void setCloseSendTask(boolean closeSendTask) {
this.closeSendTask = closeSendTask;
}
}


SocketHeartBeatThread 心跳线程

/**
* 心跳实现,频率5秒
* Created by gavinandre on 18-3-13.
*/
public class SocketHeartBeatThread extends Thread {

private static final String TAG = SocketHeartBeatThread.class.getSimpleName();

private volatile String name;

private static final int REPEAT_TIME = 5000;
private boolean isCancel = false;
private PrintWriter printWriter;
private Socket mSocket;

private SocketCloseInterface socketCloseInterface;

public SocketHeartBeatThread(String name, PrintWriter printWriter,
Socket mSocket, SocketCloseInterface socketCloseInterface) {
this.name = name;
this.printWriter = printWriter;
this.mSocket = mSocket;
this.socketCloseInterface = socketCloseInterface;
}

@Override
public void run() {
final Thread currentThread = Thread.currentThread();
final String oldName = currentThread.getName();
currentThread.setName("Processing-" + name);
try {
while (!isCancel) {
//if (!isConnected()) {
//    break;
//}

try {
mSocket.sendUrgentData(0xFF);
} catch (IOException e) {
if (socketCloseInterface != null) {
socketCloseInterface.onSocketDisconnection();
}
break;
}

if (printWriter != null) {
SocketUtil.write2Stream("ping", printWriter);
}

try {
Thread.sleep(REPEAT_TIME);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
//循环结束则退出输入流
SocketUtil.closePrintWriter(printWriter);
currentThread.setName(oldName);
}
}

public void close() {
isCancel = true;
if (printWriter != null) {
SocketUtil.closePrintWriter(printWriter);
printWriter = null;
}
}

}


客户端开启三个线程:接收线程,发送线程和心跳包线程,接收线程无消息时会堵塞在bufferedReader.readLine()处,发送线程无发送消息时阻塞,有发送消息时唤醒,使用ConcurrentLinkedQueue来作为消息队列,心跳包线程每隔五秒发送一次,并且进行sendUrgentData(0xFF)操作,判断连接是否断开。

在initSocket()的catch块内调用initSocket(),这样就能实现第一次连接超过10秒后重连,在stopThread()内调用initSocket()来实现通讯过程中由于网络环境较差导致的连接断开后的重连操作。

public class SocketClient {

private SocketClientThread socketClientThread;

public SocketClient() {
socketClientThread = new SocketClientThread("socketClientThread", this);
new Thread(socketClientThread).start();
}

@Override
public void onSocketConnect() {
Log.i(TAG, "onSocketConnect: 连接成功");
}

@Override
public void onSocketReceive(Object socketResult, int code) {
Log.i(TAG, "onSocketReceive: 收到消息 ,  data: " + socketResult + " , code: " + code);
}

@Override
public void onSocketDisable(String msg, int code) {
Log.i(TAG, "onSocketDisable: 连接断开 , msg: " + msg + " , code: " + code);
}

public <T> void sendData(T data) {
//convert to string or serialize object
String s = (String) data;
if (TextUtils.isEmpty(s)) {
Log.i(TAG, "sendData: 消息不能为空");
return;
}
if (socketClientThread != null) {
socketClientThread.addRequest(s);
}
}

public void stopSocket() {
//一定要在子线程内执行关闭socket等IO操作
new Thread(() -> {
socketClientThread.setReConnect(false);
socketClientThread.stopThread();
}).start();
}
}


服务端

状态回调接口

public interface SocketServerResponseInterface {

/**
* 客户端断线回调
*/
void clientOffline();

/**
* 客户端上线回调
*
* @param clientIp
*/
void clientOnline(String clientIp);
}


服务端代码

public class ServerResponseThread implements Runnable {

private ReceiveThread receiveThread;
private SendThread sendThread;
private SocketStatusThread socketStatusThread;
private Socket socket;
private SocketServerResponseInterface socketServerResponseInterface;

private volatile ConcurrentLinkedQueue<String> dataQueue = new ConcurrentLinkedQueue<>();
private static ConcurrentHashMap<String, Socket> onLineClient = new ConcurrentHashMap<>();

private long lastReceiveTime = System.currentTimeMillis();

private String userIP;

public String getUserIP() {
return userIP;
}

public ServerResponseThread(Socket socket, SocketServerResponseInterface socketServerResponseInterface) {
this.socket = socket;
this.socketServerResponseInterface = socketServerResponseInterface;
this.userIP = socket.getInetAddress().getHostAddress();
onLineClient.put(userIP, socket);
System.out.println("用户:" + userIP
+ " 加入了聊天室,当前在线人数:" + onLineClient.size());
}

@Override
public void run() {
try {
//开启接收线程
receiveThread = new ReceiveThread();
receiveThread.bufferedReader = new BufferedReader(
new InputStreamReader(socket.getInputStream(), "UTF-8"));
receiveThread.start();

//开启发送线程
sendThread = new SendThread();
sendThread.printWriter = new PrintWriter(socket.getOutputStream(), true);
sendThread.start();

//开启判断心跳包超时线程
socketStatusThread = new SocketStatusThread();
socketStatusThread.start();
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 断开socket连接
*/
public void stop() {
try {
System.out.println("stop");
if (receiveThread != null) {
receiveThread.isCancel = true;
receiveThread.interrupt();
if (receiveThread.bufferedReader != null) {
SocketUtil.inputStreamShutdown(socket);
System.out.println("before closeBufferedReader");
SocketUtil.closeBufferedReader(receiveThread.bufferedReader);
System.out.println("after closeBufferedReader");
receiveThread.bufferedReader = null;
}
receiveThread = null;
System.out.println("stop receiveThread");
}

if (sendThread != null) {
sendThread.isCancel = true;
toNotifyAll(sendThread);
sendThread.interrupt();
if (sendThread.printWriter != null) {
//防止写数据时停止,写完再停
synchronized (sendThread.printWriter) {
sendThread.printWriter = null;
}
}
sendThread = null;
System.out.println("stop sendThread");
}
if (socketStatusThread != null) {
socketStatusThread.isCancel = true;
toNotifyAll(socketStatusThread);
socketStatusThread.interrupt();
System.out.println("stop socketStatusThread");
}
onLineClient.remove(socket);
System.out.println("用户:" + userIP
+ " 退出,当前在线人数:" + onLineClient.size());
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 发送消息
*/
public void addMessage(String data) {
if (!isConnected()) {
return;
}

dataQueue.offer(data);
//有新增待发送数据,则唤醒发送线程
toNotifyAll(dataQueue);
}

/**
* 获取已接连的客户端
*/
public Socket getConnectdClient(String clientID) {
return onLineClient.get(clientID);
}

/**
* 打印已经连接的客户端
*/
public static void printAllClient() {
if (onLineClient == null) {
return;
}
Iterator<String> inter = onLineClient.keySet().iterator();
while (inter.hasNext()) {
System.out.println("client:" + inter.next());
}
}

/**
* 阻塞线程,millis为0则永久阻塞,知道调用notify()
*/
public void toWaitAll(Object o) {
synchronized (o) {
try {
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

/**
* notify()调用后,并不是马上就释放对象锁的,而是在相应的synchronized(){}语句块执行结束,自动释放锁后
*/
public void toNotifyAll(Object obj) {
synchronized (obj) {
obj.notifyAll();
}
}

/**
* 判断本地socket连接状态
*/
private boolean isConnected() {
if (socket.isClosed() || !socket.isConnected()) {
onLineClient.remove(userIP);
ServerResponseThread.this.stop();
System.out.println("socket closed...");
return false;
}
return true;
}

/**
* 数据接收线程
*/
public class ReceiveThread extends Thread {

private BufferedReader bufferedReader;
private boolean isCancel;

@Override
public void run() {
try {
while (!isCancel) {
if (!isConnected()) {
isCancel = true;
break;
}

String msg = SocketUtil.readFromStream(bufferedReader);
if (msg != null) {
if ("bye".equals(msg)) {
ServerResponseThread.this.stop();
System.out.println("用户" + userIP + " : bye");
socketServerResponseInterface.clientOffline();
break;
} else if ("ping".equals(msg)) {
System.out.println("收到心跳包");
lastReceiveTime = System.currentTimeMillis();
socketServerResponseInterface.clientOnline(userIP);
} else {
msg = "用户" + userIP + " : " + msg;
System.out.println(msg);
addMessage(msg);
socketServerResponseInterface.clientOnline(userIP);
}
} else {
System.out.println("client is offline...");
ServerResponseThread.this.stop();
socketServerResponseInterface.clientOffline();
break;
}
System.out.println("ReceiveThread");
}

SocketUtil.inputStreamShutdown(socket);
SocketUtil.closeBufferedReader(bufferedReader);
System.out.println("ReceiveThread is finish");
} catch (Exception e) {
e.printStackTrace();
}
}
}

/**
* 数据发送线程,当没有发送数据时让线程等待
*/
public class SendThread extends Thread {

private PrintWriter printWriter;
private boolean isCancel;

@Override
public void run() {
try {
while (!isCancel) {
if (!isConnected()) {
isCancel = true;
break;
}

String msg = dataQueue.poll();
if (msg == null) {
toWaitAll(dataQueue);
} else if (printWriter != null) {
synchronized (printWriter) {
SocketUtil.write2Stream(msg, printWriter);
}
}
System.out.println("SendThread");
}

SocketUtil.outputStreamShutdown(socket);
SocketUtil.closePrintWriter(printWriter);
System.out.println("SendThread is finish");
} catch (Exception e) {
e.printStackTrace();
}
}
}

/**
* 客户端状态监控线程,判断客户端心跳包是否发送
*/
class SocketStatusThread extends Thread {

private boolean isCancel;

@Override
public void run() {
while (!isCancel) {
if (!isConnected()) {
isCancel = true;
break;
}
System.out.println("SocketStatusThread");
if (!socket.isClosed()) {
if (System.currentTimeMillis() - lastReceiveTime > 10000) {
System.out.println("timeout");
//关闭输入流后bufferedReader.readLine()会返回null
SocketUtil.inputStreamShutdown(socket);
break;
}
}
try {
synchronized (this) {
this.wait(3000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("SocketStatusThread is finish");
}
}

}


服务端同样开启三个线程:接收线程,发送线程和状态监控线程,接收和发送线程和客户端同理,这里说下状态监控线程,客户端如果在连上服务端的情况下突然断网服务端是不会有任何反应的,因此就要通过客户端的心跳包机制来做超时处理,客户端超过一定的时间后不发心跳包的话就关闭输入流,这样接收线程的bufferedReader.readLine()就会返回null,就会进行断开socket连接操作了。

public class Main {

private static boolean isStart = true;
private static ServerResponseThread serverResponseThread;

public static void main(String[] args) {
ServerSocket serverSocket = null;
ExecutorService executorService = Executors.newCachedThreadPool();
System.out.println("服务端 " + SocketUtil.getIP() + " 运行中...\n");
try {
serverSocket = new ServerSocket(SocketUtil.PORT);
while (isStart) {
Socket socket = serverSocket.accept();
serverResponseThread = new ServerResponseThread(socket,
new SocketServerResponseInterface() {

@Override
public void clientOffline() {// 对方不在线
System.out.println("offline");
}

@Override
public void clientOnline(String clientIp) {
System.out.println(clientIp + " is online");
System.out.println("-----------------------------------------");
}
});

if (socket.isConnected()) {
executorService.execute(serverResponseThread);
}
}

serverSocket.close();

} catch (IOException e) {
e.printStackTrace();
} finally {
if (serverSocket != null) {
try {
isStart = false;
serverSocket.close();
if (serverSocket != null)
serverResponseThread.stop();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}


完整demo:https://github.com/GavinAndre/MVPSocket

参考:

http://blog.csdn.net/u010818425/article/details/53448817
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  socket 通信