ddpush 学习之路 7 UDPClientBase.java
2015-03-22 13:54
351 查看
上一篇,我们学习了TCPClientBase这个类。这是个以TCP通信方式的一个客户端类。我们下面来学习另一种方式通信的UDP客户端类。
这个UDP客户端类。和TCP客户端类基本上是相同的。在上一篇TCP里的大部分内容。UDP里也都有。
下面我们就来看看这个 我理解的添加注释的UDPClientBase.java这个类代码。
以上,就是UDPClientBase.java被我添加了注释之后的代码。
在上一篇。我们学习了TCPClientBase没看过的。先看看我上一篇讲得把。这一篇我不准备讲太多了。因为上一篇这个客户端的整体处理流程都说了一遍。这里不再做详细的讲解。
我这里要出得就是UDPClientBase 和 TCPClientBase之间的区别。
相同:
整体流程、工作内容都一样的。
不同:
这里重点说一下 UDPClientBase 和 TCPClientBase之间的不同之处。
第一个不同之处:
TCP和服务器通信使用的时 SocketCannel 来和服务器通信。UDP使用的是DatagramSocket 当然这是两种通信方式,这个肯定是不一样的。这也没啥好说的。我们只是说明不同之处。
第二个不同之处:
TCP是在Worker线程中发送心跳包。UDP是在receiver也就是UDPClietBase这个线程类的线程体内调用心跳包的发送。这个区别。我暂时没有看出来有什么不同的作用。大家知道的可以回复告诉我。谢谢。
第三个不同之处:
这个不同之处是接收数据的时候。
TCP是先接收数据包头,然后从数据包头里找这个消息数据的消息内容数据长度。如果有消息数据内容,再去读取消息数据内容。
UDP是直接一次性接受完数据包的全部内容。
当然这区别也是和TCP UDP在消息的接收和发送的不同有关。
最后其他的一些不同之处就是:
UDP多了一些获取和设置属性的一些函数。这里 就不多说了。
好了。这就是UDPClientBase的学习。当然最好是先看 ddpush 学习之路 6 TCPClientBase.java 这个篇文章。也就是上一篇文章。看完之后。再来学习本篇文章。
by brok1n 20150322
这个UDP客户端类。和TCP客户端类基本上是相同的。在上一篇TCP里的大部分内容。UDP里也都有。
下面我们就来看看这个 我理解的添加注释的UDPClientBase.java这个类代码。
//UDP客户端基类 public abstract class UDPClientBase implements Runnable { //UDP套接字 protected DatagramSocket ds; //最后发包时间 protected long lastSent = 0; //最后收包时间 protected long lastReceived = 0; //Socket端口 protected int remotePort = 9966; //应用ID protected int appid = 1; //uuid protected byte[] uuid; //服务器地址 protected String remoteAddress = null; //并发的 消息队列 protected ConcurrentLinkedQueue<Message> mq = new ConcurrentLinkedQueue<Message>(); //向消息队列里添加消息的计数器 protected AtomicLong queueIn = new AtomicLong(0); //从消息队列里取出消息的计数器 protected AtomicLong queueOut = new AtomicLong(0); //消息缓冲区长度 protected int bufferSize = 1024; //心跳时间间隔 protected int heartbeatInterval = 50; //存放消息数据的byte[] protected byte[] bufferArray; //消息数据包缓冲区 protected ByteBuffer buffer; //是否需要充值客户端和服务器端连接标志位 protected boolean needReset = true; //客户端 运行状态标识 protected boolean started = false; //客户端 停止状态标识 protected boolean stoped = false; //当前UDPClient线程 protected Thread receiverT; //消息处理类 protected Worker worker; //消息处理线程 protected Thread workerT; //发送数据包的个数 private long sentPackets; //接收数据包的个数 private long receivedPackets; //UDP客户端构造 public UDPClientBase(byte[] uuid, int appid, String serverAddr, int serverPort) throws Exception { if (uuid == null || uuid.length != 16) { throw new java.lang.IllegalArgumentException("uuid byte array must be not null and length of 16 bytes"); } if (appid < 1 || appid > 255) { throw new java.lang.IllegalArgumentException("appid must be from 1 to 255"); } if (serverAddr == null || serverAddr.trim().length() == 0) { throw new java.lang.IllegalArgumentException("server address illegal: " + serverAddr); } this.uuid = uuid; this.appid = appid; this.remoteAddress = serverAddr; this.remotePort = serverPort; } //入队 将消息包添加到消息队列 protected boolean enqueue(Message message) { boolean result = mq.add(message); if (result == true) { queueIn.addAndGet(1); } return result; } //出队 从消息队列取出消息包 protected Message dequeue() { Message m = mq.poll(); if (m != null) { queueOut.addAndGet(1); } return m; } //初始化消息数据存放数组和缓冲区 private synchronized void init() { bufferArray = new byte[bufferSize]; buffer = ByteBuffer.wrap(bufferArray); } //客户端重置 如果连接被关闭,就重新连接 protected synchronized void reset() throws Exception { //如果连接是正常的。就不重新连接 if (needReset == false) { return; } //销毁当前UDP连接 if (ds != null) { try { ds.close(); } catch (Exception e) { } } //如果当前网络状态正常 就从新创建UDP连接 if (hasNetworkConnection() == true) { ds = new DatagramSocket(); ds.connect(new InetSocketAddress(remoteAddress, remotePort)); needReset = false; } else { try { Thread.sleep(1000); } catch (Exception e) { } } } //启动UDP客户端 public synchronized void start() throws Exception { //如果UDP客户端启动标识为true 就说明UDP客户端已经被启动了,就不需要再次启动 if (this.started == true) { return; } //初始化缓冲区以及存放数据包数据的数组 this.init(); //创建当前UDPClient线程 用来发送心跳包 以及接收服务器发送的消息包 receiverT = new Thread(this, "udp-client-receiver"); //守护线程 receiverT.setDaemon(true); synchronized (receiverT) { receiverT.start(); receiverT.wait(); } //创建工作线程,用来处理接收到的消息包 worker = new Worker(); workerT = new Thread(worker, "udp-client-worker"); workerT.setDaemon(true); synchronized (workerT) { workerT.start(); workerT.wait(); } this.started = true; } //停止UDP客户端 public void stop() throws Exception { //修改UDP客户端 停止状态标志 stoped = true; //销毁UDP连接 if (ds != null) { try { ds.close(); } catch (Exception e) { } ds = null; } //关闭 中断 当前UDP客户端线程 if (receiverT != null) { try { receiverT.interrupt(); } catch (Exception e) { } } //关闭 中断 消息队列处理线程 if (workerT != null) { try { workerT.interrupt(); } catch (Exception e) { } } } //UDP客户端线程体 public void run() { synchronized (receiverT) { receiverT.notifyAll(); } //如果UDP客户端没有被stop就一直运行 while (stoped == false) { try { //跟TCPClient一样。检测网络连接直到网络连接正常 if (hasNetworkConnection() == false) { try { trySystemSleep(); Thread.sleep(1000); } catch (Exception e) { } continue; } //检测是否需要重新连接服务器 reset(); //发送心跳包 heartbeat(); //接收服务器发送过来的数据 receiveData(); } catch (java.net.SocketTimeoutException e) { } catch (Exception e) { e.printStackTrace(); //从服务器接收数据出错。连接被关闭。就需要重新连接 this.needReset = true; } catch (Throwable t) { t.printStackTrace(); this.needReset = true; } finally { //如果连接出问题了。就等一会儿。 if (needReset == true) { try { trySystemSleep(); Thread.sleep(1000); } catch (Exception e) { } } if (mq.isEmpty() == true || hasNetworkConnection() == false) { try { trySystemSleep(); Thread.sleep(1000); } catch (Exception e) { } } } } //释放UDP连接 if (ds != null) { try { ds.close(); } catch (Exception e) { } ds = null; } } //向服务器发送心跳包 private void heartbeat() throws Exception { //心跳时间控制 if (System.currentTimeMillis() - lastSent < heartbeatInterval * 1000) { return; } //构造心跳包。并发送 byte[] buffer = new byte[Message.CLIENT_MESSAGE_MIN_LENGTH]; ByteBuffer.wrap(buffer).put((byte) Message.version).put((byte) appid).put((byte) Message.CMD_0x00).put(uuid).putChar((char) 0); send(buffer); } //接收服务器发送的消息 private void receiveData() throws Exception { //标准的UDP收包过程 DatagramPacket dp = new DatagramPacket(bufferArray, bufferArray.length); ds.setSoTimeout(5 * 1000); ds.receive(dp); //如果收到的数据包内容出错就丢西这个UDP数据包 if (dp.getLength() <= 0 || dp.getData() == null || dp.getData().length == 0) { return; } //根据收到的数据长度创建一个临时的数据存储数组 byte[] data = new byte[dp.getLength()]; //将数据拷贝到临时数组 用来创建Message消息包 System.arraycopy(dp.getData(), 0, data, 0, dp.getLength()); //用收到的数据创建Message消息包 Message m = new Message(dp.getSocketAddress(), data); //检测这个消息格式是否正确,格式不正确就丢弃 if (m.checkFormat() == false) { return; } //记录收到数据包个数以及最后一次收到数据包的时间 this.receivedPackets++; this.lastReceived = System.currentTimeMillis(); //告诉服务器 我收到了一个 xxx样子的数据包 this.ackServer(m); //如果是心跳包,直接丢弃,不用处理 if (m.getCmd() == Message.CMD_0x00) { return; } //将收到的消息包添加到消息队列中 this.enqueue(m); //唤醒消息队列处理线程,开始处理消息 worker.wakeup(); } //告诉服务器,我收到了一个什么样子的数据包 private void ackServer(Message m) throws Exception { if (m.getCmd() == Message.CMD_0x10) { byte[] buffer = new byte[Message.CLIENT_MESSAGE_MIN_LENGTH]; ByteBuffer.wrap(buffer).put((byte) Message.version).put((byte) appid).put((byte) Message.CMD_0x10).put(uuid).putChar((char) 0); send(buffer); } if (m.getCmd() == Message.CMD_0x11) { byte[] buffer = new byte[Message.CLIENT_MESSAGE_MIN_LENGTH + 8]; byte[] data = m.getData(); ByteBuffer.wrap(buffer).put((byte) Message.version).put((byte) appid).put((byte) Message.CMD_0x11).put(uuid).putChar((char) 8) .put(data, Message.SERVER_MESSAGE_MIN_LENGTH, 8); send(buffer); } if (m.getCmd() == Message.CMD_0x20) { byte[] buffer = new byte[Message.CLIENT_MESSAGE_MIN_LENGTH]; ByteBuffer.wrap(buffer).put((byte) Message.version).put((byte) appid).put((byte) Message.CMD_0x20).put(uuid).putChar((char) 0); send(buffer); } } //向服务器发送数据 private void send(byte[] data) throws Exception { //数据 以及 UDP连接常规验证 if (data == null) { return; } if (ds == null) { return; } //创建UDP数据包 并发送 DatagramPacket dp = new DatagramPacket(data, data.length); dp.setSocketAddress(ds.getRemoteSocketAddress()); ds.send(dp); //记录本次发送数据包的时间 lastSent = System.currentTimeMillis(); //增加发送数据包数量 this.sentPackets++; } //获取发送数据包个数 public long getSentPackets() { return this.sentPackets; } //获取接收到数据包的个数 public long getReceivedPackets() { return this.receivedPackets; } //设置要连接的服务器的端口号 public void setServerPort(int port) { this.remotePort = port; } //获取服务器端口号 public int getServerPort() { return this.remotePort; } //设置服务器地址 public void setServerAddress(String addr) { this.remoteAddress = addr; } //取得服务器地址 public String getServerAddress() { return this.remoteAddress; } //设置存放消息数据的数组大小 public void setBufferSize(int bytes) { this.bufferSize = bytes; } //获取存放消息数据的数组大小 public int getBufferSize() { return this.bufferSize; } //获取最后心跳时间 public long getLastHeartbeatTime() { return lastSent; } //获取最后一次收到数据包得时间 public long getLastReceivedTime() { return lastReceived; } //设置心跳包发送的时间间隔 public void setHeartbeatInterval(int second) { if (second <= 0) { return; } this.heartbeatInterval = second; } //获取心跳包的发送时间间隔 public int getHeartbeatInterval() { return this.heartbeatInterval; } //验证网络连接状态 public abstract boolean hasNetworkConnection(); //休眠 public abstract void trySystemSleep(); //收到消息数据包的具体处理回调 public abstract void onPushMessage(Message message); //消息队列处理线程类 class Worker implements Runnable { public void run() { synchronized (workerT) { workerT.notifyAll(); } //如果UDP客户端没有被stop 就一直处理消息 while (stoped == false) { try { handleEvent(); } catch (Exception e) { e.printStackTrace(); } finally { waitMsg(); } } } private void waitMsg() { synchronized (this) { try { this.wait(1000); } catch (java.lang.InterruptedException e) { } catch (Exception e) { e.printStackTrace(); } } } private void wakeup() { synchronized (this) { this.notifyAll(); } } //消息队列处理 private void handleEvent() throws Exception { Message m = null; //处理所有的消息队列中得消息包 while (true) { //取出一个消息包,如果取出的消息包为空就返回(消息队列没消息了) m = dequeue(); if (m == null) { return; } //如果取出的消息格式不对。就丢弃,继续处理下一个数据包 if (m.checkFormat() == false) { continue; } // 消息包处理回调 real work here onPushMessage(m); } // finish work here, such as release wake lock } } }
以上,就是UDPClientBase.java被我添加了注释之后的代码。
在上一篇。我们学习了TCPClientBase没看过的。先看看我上一篇讲得把。这一篇我不准备讲太多了。因为上一篇这个客户端的整体处理流程都说了一遍。这里不再做详细的讲解。
我这里要出得就是UDPClientBase 和 TCPClientBase之间的区别。
相同:
整体流程、工作内容都一样的。
不同:
这里重点说一下 UDPClientBase 和 TCPClientBase之间的不同之处。
第一个不同之处:
TCP和服务器通信使用的时 SocketCannel 来和服务器通信。UDP使用的是DatagramSocket 当然这是两种通信方式,这个肯定是不一样的。这也没啥好说的。我们只是说明不同之处。
第二个不同之处:
TCP是在Worker线程中发送心跳包。UDP是在receiver也就是UDPClietBase这个线程类的线程体内调用心跳包的发送。这个区别。我暂时没有看出来有什么不同的作用。大家知道的可以回复告诉我。谢谢。
第三个不同之处:
这个不同之处是接收数据的时候。
TCP是先接收数据包头,然后从数据包头里找这个消息数据的消息内容数据长度。如果有消息数据内容,再去读取消息数据内容。
UDP是直接一次性接受完数据包的全部内容。
当然这区别也是和TCP UDP在消息的接收和发送的不同有关。
最后其他的一些不同之处就是:
UDP多了一些获取和设置属性的一些函数。这里 就不多说了。
好了。这就是UDPClientBase的学习。当然最好是先看 ddpush 学习之路 6 TCPClientBase.java 这个篇文章。也就是上一篇文章。看完之后。再来学习本篇文章。
by brok1n 20150322
相关文章推荐
- ddpush 学习之路 6 TCPClientBase.java
- ddpush 学习之路 11 MyUdpClient.java
- ddpush 学习之路 14 UdpConnector.java
- ddpush 学习之路 9 ClientMessage.java
- ddpush 学习之路 10 MyTcpClient.java
- 黑马程序员 【】java学习之路——网络编程 UDP 键盘录入传输
- ddpush 学习之路 5 Message.java
- 黑马程序员 【】java学习之路——UDP之模拟聊天
- ddpush 学习之路 3 StringUtil.java
- ddpush 学习之路 12 Sender.java
- ddpush 学习之路 8 Constant.java
- Java学习之路7——UDP通信
- ddpush 学习之路 4 PropertyUtil.java
- ddpush 学习之路 2 DateTimeUtil.java
- 黑马程序员 【】java学习之路——网络编程之UDP传输简析
- java学习之路——基于UDP的Socket网络通信实例
- ddpush 学习之路 13 Receiver.java
- Java学习之路:少走弯路,就是捷径
- Java学习之路:不走弯路,就是捷径【转载】
- Java学习之路:不走弯路,就是捷径