您的位置:首页 > 编程语言 > Java开发

ddpush 学习之路 7 UDPClientBase.java

2015-03-22 13:54 351 查看
上一篇,我们学习了TCPClientBase这个类。这是个以TCP通信方式的一个客户端类。我们下面来学习另一种方式通信的UDP客户端类。

这个UDP客户端类。和TCP客户端类基本上是相同的。在上一篇TCP里的大部分内容。UDP里也都有。

下面我们就来看看这个 我理解的添加注释的UDPClientBase.java这个类代码。

//UDP客户端基类
public abstract class UDPClientBase implements Runnable {

//UDP套接字
protected DatagramSocket ds;
//最后发包时间
protected long lastSent = 0;
//最后收包时间
protected long lastReceived = 0;
//Socket端口
protected int remotePort = 9966;
//应用ID
protected int appid = 1;
//uuid
protected byte[] uuid;
//服务器地址
protected String remoteAddress = null;
//并发的 消息队列
protected ConcurrentLinkedQueue<Message> mq = new ConcurrentLinkedQueue<Message>();
//向消息队列里添加消息的计数器
protected AtomicLong queueIn = new AtomicLong(0);
//从消息队列里取出消息的计数器
protected AtomicLong queueOut = new AtomicLong(0);
//消息缓冲区长度
protected int bufferSize = 1024;
//心跳时间间隔
protected int heartbeatInterval = 50;
//存放消息数据的byte[]
protected byte[] bufferArray;
//消息数据包缓冲区
protected ByteBuffer buffer;
//是否需要充值客户端和服务器端连接标志位
protected boolean needReset = true;
//客户端 运行状态标识
protected boolean started = false;
//客户端 停止状态标识
protected boolean stoped = false;
//当前UDPClient线程
protected Thread receiverT;
//消息处理类
protected Worker worker;
//消息处理线程
protected Thread workerT;
//发送数据包的个数
private long sentPackets;
//接收数据包的个数
private long receivedPackets;

//UDP客户端构造
public UDPClientBase(byte[] uuid, int appid, String serverAddr, int serverPort) throws Exception {
if (uuid == null || uuid.length != 16) {
throw new java.lang.IllegalArgumentException("uuid byte array must be not null and length of 16 bytes");
}
if (appid < 1 || appid > 255) {
throw new java.lang.IllegalArgumentException("appid must be from 1 to 255");
}
if (serverAddr == null || serverAddr.trim().length() == 0) {
throw new java.lang.IllegalArgumentException("server address illegal: " + serverAddr);
}

this.uuid = uuid;
this.appid = appid;
this.remoteAddress = serverAddr;
this.remotePort = serverPort;
}

//入队 将消息包添加到消息队列
protected boolean enqueue(Message message) {
boolean result = mq.add(message);
if (result == true) {
queueIn.addAndGet(1);
}
return result;
}

//出队 从消息队列取出消息包
protected Message dequeue() {
Message m = mq.poll();
if (m != null) {
queueOut.addAndGet(1);
}
return m;
}

//初始化消息数据存放数组和缓冲区
private synchronized void init() {
bufferArray = new byte[bufferSize];
buffer = ByteBuffer.wrap(bufferArray);
}

//客户端重置 如果连接被关闭,就重新连接
protected synchronized void reset() throws Exception {
//如果连接是正常的。就不重新连接
if (needReset == false) {
return;
}

//销毁当前UDP连接
if (ds != null) {
try {
ds.close();
} catch (Exception e) {
}
}
//如果当前网络状态正常 就从新创建UDP连接
if (hasNetworkConnection() == true) {
ds = new DatagramSocket();
ds.connect(new InetSocketAddress(remoteAddress, remotePort));
needReset = false;
} else {
try {
Thread.sleep(1000);
} catch (Exception e) {
}
}
}

//启动UDP客户端
public synchronized void start() throws Exception {
//如果UDP客户端启动标识为true 就说明UDP客户端已经被启动了,就不需要再次启动
if (this.started == true) {
return;
}
//初始化缓冲区以及存放数据包数据的数组
this.init();

//创建当前UDPClient线程 用来发送心跳包 以及接收服务器发送的消息包
receiverT = new Thread(this, "udp-client-receiver");
//守护线程
receiverT.setDaemon(true);
synchronized (receiverT) {
receiverT.start();
receiverT.wait();
}

//创建工作线程,用来处理接收到的消息包
worker = new Worker();
workerT = new Thread(worker, "udp-client-worker");
workerT.setDaemon(true);
synchronized (workerT) {
workerT.start();
workerT.wait();
}

this.started = true;
}

//停止UDP客户端
public void stop() throws Exception {
//修改UDP客户端 停止状态标志
stoped = true;
//销毁UDP连接
if (ds != null) {
try {
ds.close();
} catch (Exception e) {
}
ds = null;
}
//关闭 中断 当前UDP客户端线程
if (receiverT != null) {
try {
receiverT.interrupt();
} catch (Exception e) {
}
}
//关闭 中断 消息队列处理线程
if (workerT != null) {
try {
workerT.interrupt();
} catch (Exception e) {
}
}
}

//UDP客户端线程体
public void run() {

synchronized (receiverT) {
receiverT.notifyAll();
}

//如果UDP客户端没有被stop就一直运行
while (stoped == false) {
try {
//跟TCPClient一样。检测网络连接直到网络连接正常
if (hasNetworkConnection() == false) {
try {
trySystemSleep();
Thread.sleep(1000);
} catch (Exception e) {
}
continue;
}
//检测是否需要重新连接服务器
reset();
//发送心跳包
heartbeat();
//接收服务器发送过来的数据
receiveData();
} catch (java.net.SocketTimeoutException e) {

} catch (Exception e) {
e.printStackTrace();
//从服务器接收数据出错。连接被关闭。就需要重新连接
this.needReset = true;
} catch (Throwable t) {
t.printStackTrace();
this.needReset = true;
} finally {
//如果连接出问题了。就等一会儿。
if (needReset == true) {
try {
trySystemSleep();
Thread.sleep(1000);
} catch (Exception e) {
}
}
if (mq.isEmpty() == true || hasNetworkConnection() == false) {
try {
trySystemSleep();
Thread.sleep(1000);
} catch (Exception e) {
}
}
}
}
//释放UDP连接
if (ds != null) {
try {
ds.close();
} catch (Exception e) {
}
ds = null;
}
}

//向服务器发送心跳包
private void heartbeat() throws Exception {
//心跳时间控制
if (System.currentTimeMillis() - lastSent < heartbeatInterval * 1000) {
return;
}
//构造心跳包。并发送
byte[] buffer = new byte[Message.CLIENT_MESSAGE_MIN_LENGTH];
ByteBuffer.wrap(buffer).put((byte) Message.version).put((byte) appid).put((byte) Message.CMD_0x00).put(uuid).putChar((char) 0);
send(buffer);
}

//接收服务器发送的消息
private void receiveData() throws Exception {
//标准的UDP收包过程
DatagramPacket dp = new DatagramPacket(bufferArray, bufferArray.length);
ds.setSoTimeout(5 * 1000);
ds.receive(dp);
//如果收到的数据包内容出错就丢西这个UDP数据包
if (dp.getLength() <= 0 || dp.getData() == null || dp.getData().length == 0) {
return;
}
//根据收到的数据长度创建一个临时的数据存储数组
byte[] data = new byte[dp.getLength()];
//将数据拷贝到临时数组 用来创建Message消息包
System.arraycopy(dp.getData(), 0, data, 0, dp.getLength());
//用收到的数据创建Message消息包
Message m = new Message(dp.getSocketAddress(), data);
//检测这个消息格式是否正确,格式不正确就丢弃
if (m.checkFormat() == false) {
return;
}
//记录收到数据包个数以及最后一次收到数据包的时间
this.receivedPackets++;
this.lastReceived = System.currentTimeMillis();
//告诉服务器 我收到了一个 xxx样子的数据包
this.ackServer(m);
//如果是心跳包,直接丢弃,不用处理
if (m.getCmd() == Message.CMD_0x00) {
return;
}
//将收到的消息包添加到消息队列中
this.enqueue(m);
//唤醒消息队列处理线程,开始处理消息
worker.wakeup();
}

//告诉服务器,我收到了一个什么样子的数据包
private void ackServer(Message m) throws Exception {
if (m.getCmd() == Message.CMD_0x10) {
byte[] buffer = new byte[Message.CLIENT_MESSAGE_MIN_LENGTH];
ByteBuffer.wrap(buffer).put((byte) Message.version).put((byte) appid).put((byte) Message.CMD_0x10).put(uuid).putChar((char) 0);
send(buffer);
}
if (m.getCmd() == Message.CMD_0x11) {
byte[] buffer = new byte[Message.CLIENT_MESSAGE_MIN_LENGTH + 8];
byte[] data = m.getData();
ByteBuffer.wrap(buffer).put((byte) Message.version).put((byte) appid).put((byte) Message.CMD_0x11).put(uuid).putChar((char) 8)
.put(data, Message.SERVER_MESSAGE_MIN_LENGTH, 8);
send(buffer);
}
if (m.getCmd() == Message.CMD_0x20) {
byte[] buffer = new byte[Message.CLIENT_MESSAGE_MIN_LENGTH];
ByteBuffer.wrap(buffer).put((byte) Message.version).put((byte) appid).put((byte) Message.CMD_0x20).put(uuid).putChar((char) 0);
send(buffer);
}
}

//向服务器发送数据
private void send(byte[] data) throws Exception {
//数据 以及 UDP连接常规验证
if (data == null) {
return;
}
if (ds == null) {
return;
}
//创建UDP数据包 并发送
DatagramPacket dp = new DatagramPacket(data, data.length);
dp.setSocketAddress(ds.getRemoteSocketAddress());
ds.send(dp);
//记录本次发送数据包的时间
lastSent = System.currentTimeMillis();
//增加发送数据包数量
this.sentPackets++;
}

//获取发送数据包个数
public long getSentPackets() {
return this.sentPackets;
}

//获取接收到数据包的个数
public long getReceivedPackets() {
return this.receivedPackets;
}

//设置要连接的服务器的端口号
public void setServerPort(int port) {
this.remotePort = port;
}

//获取服务器端口号
public int getServerPort() {
return this.remotePort;
}

//设置服务器地址
public void setServerAddress(String addr) {
this.remoteAddress = addr;
}

//取得服务器地址
public String getServerAddress() {
return this.remoteAddress;
}

//设置存放消息数据的数组大小
public void setBufferSize(int bytes) {
this.bufferSize = bytes;
}

//获取存放消息数据的数组大小
public int getBufferSize() {
return this.bufferSize;
}

//获取最后心跳时间
public long getLastHeartbeatTime() {
return lastSent;
}

//获取最后一次收到数据包得时间
public long getLastReceivedTime() {
return lastReceived;
}

//设置心跳包发送的时间间隔
public void setHeartbeatInterval(int second) {
if (second <= 0) {
return;
}
this.heartbeatInterval = second;
}

//获取心跳包的发送时间间隔
public int getHeartbeatInterval() {
return this.heartbeatInterval;
}

//验证网络连接状态
public abstract boolean hasNetworkConnection();

//休眠
public abstract void trySystemSleep();

//收到消息数据包的具体处理回调
public abstract void onPushMessage(Message message);

//消息队列处理线程类
class Worker implements Runnable {
public void run() {
synchronized (workerT) {
workerT.notifyAll();
}
//如果UDP客户端没有被stop 就一直处理消息
while (stoped == false) {
try {
handleEvent();
} catch (Exception e) {
e.printStackTrace();
} finally {
waitMsg();
}
}
}

private void waitMsg() {
synchronized (this) {
try {
this.wait(1000);
} catch (java.lang.InterruptedException e) {

} catch (Exception e) {
e.printStackTrace();
}
}
}

private void wakeup() {
synchronized (this) {
this.notifyAll();
}
}
//消息队列处理
private void handleEvent() throws Exception {
Message m = null;
//处理所有的消息队列中得消息包
while (true) {
//取出一个消息包,如果取出的消息包为空就返回(消息队列没消息了)
m = dequeue();
if (m == null) {
return;
}
//如果取出的消息格式不对。就丢弃,继续处理下一个数据包
if (m.checkFormat() == false) {
continue;
}

// 消息包处理回调 real work here
onPushMessage(m);
}
// finish work here, such as release wake lock
}

}
}


以上,就是UDPClientBase.java被我添加了注释之后的代码。

在上一篇。我们学习了TCPClientBase没看过的。先看看我上一篇讲得把。这一篇我不准备讲太多了。因为上一篇这个客户端的整体处理流程都说了一遍。这里不再做详细的讲解。

我这里要出得就是UDPClientBase 和 TCPClientBase之间的区别。

相同:

整体流程、工作内容都一样的。

不同:

这里重点说一下 UDPClientBase 和 TCPClientBase之间的不同之处。

第一个不同之处:

TCP和服务器通信使用的时 SocketCannel 来和服务器通信。UDP使用的是DatagramSocket 当然这是两种通信方式,这个肯定是不一样的。这也没啥好说的。我们只是说明不同之处。

第二个不同之处:

TCP是在Worker线程中发送心跳包。UDP是在receiver也就是UDPClietBase这个线程类的线程体内调用心跳包的发送。这个区别。我暂时没有看出来有什么不同的作用。大家知道的可以回复告诉我。谢谢。

第三个不同之处:

这个不同之处是接收数据的时候。

TCP是先接收数据包头,然后从数据包头里找这个消息数据的消息内容数据长度。如果有消息数据内容,再去读取消息数据内容。

UDP是直接一次性接受完数据包的全部内容。

当然这区别也是和TCP UDP在消息的接收和发送的不同有关。

最后其他的一些不同之处就是:

UDP多了一些获取和设置属性的一些函数。这里 就不多说了。

好了。这就是UDPClientBase的学习。当然最好是先看 ddpush 学习之路 6 TCPClientBase.java 这个篇文章。也就是上一篇文章。看完之后。再来学习本篇文章。

by brok1n 20150322
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  ddpush udp