您的位置:首页 > 编程语言 > Java开发

Java UDP Server的轻量级实现

2016-05-26 14:03 501 查看
Java UDP Server的轻量级实现

实现方法

接收线程:只处理收包,收完后之后放入工作线程

发送线程:负责发送udp包到其它的server

工作线程:解析包体,实现业务逻辑

工作线程消息处理:在工作线程中解析出协议包体后,根据messageId实现消息处理

主要的java类

ServerManager.java

SenderThread.java

ReceiverThread.java

WorkThread.java

 

ServerManager.java 相关代码

public static void
startListener()
throws UDPException{

        try {

            if(tdUdpServerIsStart){

                throw new UDPException("服务已启动");

            }

            hasStartSet =
false;

            tdUdpServerIsStart =
true;

            new Thread(new
WorkThread()).start();

            new Thread(new
ReceiverThread()).start();

            new Thread(new
SenderThread()).start();

           

            datagramSocket =
new DatagramSocket(PORT);

            datagramSocket.setSoTimeout(TIME_OUT);

            datagramSocket.setReceiveBufferSize(UDP_RECEIVER_BUFF_SIZE);

        } catch
(SocketException e) {

            log.error("",e);

            throw new UDPException("UDPServer
启动异常,端口:"+PORT,e);

        }

    }
 

 

 

SenderThread.java 相关代码:

逻辑:读取队列中的消息。并实现发送

@Override
    public void run() {
        log.info("UDP Sender thread started");
        while(ServerManager.tdUdpServerIsStart) try {
            UDPMessage reqBody = senderQueue.take();//队列中没有时会一直等待
            byte[] bodyBit = reqBody.getMsgBits();

            if(reqBody.getHeader() == null){
                log.warn("Header is null");
                continue;
            }
            if(reqBody.getHeader().getMessageId() == null){
                log.warn("MessageId is null!");
                continue;
            }
//            log.debug("Send head:"+reqBody.getHeader().toString());
//            log.info(String.format("Preprocess messageId:%s,len:%s,sessionId:%s to %s", reqBody.getHeader().getMessageId(),bodyBit.length, reqBody.getHeader().getSessionId(), reqBody.getHeader().getSocketAddress()));
//            byte[] body = ProtMsgData.makeMsgStream(reqBody.getSessionId(), reqBody.getMessageId().getMessageId(), bodyBit);
            byte[] body = reqBody.getMsgBits();

            DatagramPacket dataPacket = new DatagramPacket(body, body.length,
                        reqBody.getHeader().getSocketAddress());
            //测试CODE
//            DatagramPacket dataPacket = new DatagramPacket(body, body.length,
//                    InetAddress.getByName("10.10.10.100"), 6060);

//            DatagramSocket dataSocket = new DatagramSocket(reqBody.getHeader().getInetSocketAddress().getPort());
//            DatagramSocket dataSocket = new DatagramSocket(45677);
            DatagramSocket dataSocket = getSocket(reqBody.getHeader().getInetSocketAddress().getPort());
            if(dataSocket == null){
                log.warn(String.format("Send error!Not found socket by port:%s",reqBody.getHeader().getInetSocketAddress().getPort()));
                continue;
            }

            log.info(String.format("Send message to %s,id:%s,len:%s,data:%s",reqBody.getHeader().getSocketAddress().toString(),reqBody.getHeader().getMessageId(),body.length,ProtUtils.getHexByString(body)));
            dataSocket.send(dataPacket);

            //如果需要接收消息,先将记录保存起来
//            if (reqBody.isNeedReceive()) {
//                UDPSessionManager.add(reqBody);
//            }
        } catch (Exception e) {
            log.error("Send message error", e);
        }
    }

 

 

 

ReceiverThread.java 相关代码

封装数据,给工作线程使用。也可以在这里解析协议的头,方便后续使用

@Override
public void run() {
    Init();
}

public void Init() {

    while(UCSIServerManager.tdUdpServerIsStart){

        try {

            byte[] receiveBody = new byte[BUFFER_SIZE];
            DatagramPacket dataPacketBody = new DatagramPacket(receiveBody, receiveBody.length);
            if(dataPacketBody == null) continue;

            ServerManager.receive(dataPacketBody);
            if(dataPacketBody == null) continue;

            //添加到工作线程队列
            ProtRespInfo respInfo = new ProtRespInfo(null,dataPacketBody);
            UDPWorkThread.push(respInfo);

        } catch (Exception e) {
        }
    }
}

 

 

 

WorkThread.java 相关代码

@Override
public void run() {
    while(UCSIServerManager.tdUdpServerIsStart){
        try {

            ProtRespInfo packet = workQueue.take();
            UDPReceiverHandler handler = new UDPReceiverHandler(packet);

            handler.receive();

        } catch (InterruptedException e) {
            e.printStackTrace();
            continue;
        }
    }
}

public static void push(ProtRespInfo respInfo){
    try {
        workQueue.put(respInfo);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

 

 

UDPReceiverHandler.java

实现在解析协议头和协议体。并根据messageId来调用对应的消息实现,我这里使用的注解的方式,程序启时扫描相关注解,如果新加协议只需要实现对应的接口,并加上注解。

public void receive(){
        log.debug("UDPReceiver receive...");
        UDPHeader header = getUDPHeader(respInfo.getDeviceType());
        if(header == null){
            log.warn("Not found head by port:"+respInfo.getDataPacket().getPort());
            return;
        }

        header.setSocketAddress(respInfo.getDataPacket().getSocketAddress());
        header.data = respInfo.getDataPacket().getData();
        header.len = header.getHeaderSize();
        header.decode();
        header.setDeviceType(respInfo.getDeviceType());
        respInfo.setHeader(header);

        if(respInfo.getHeader() == null || respInfo.getHeader().getMessageId() == null){
            log.warn("消息头错误,来自IP:"+respInfo.getHeader().getRemoteAddr()+",端口:"+respInfo.getHeader().getRemotePort());
            return;
        }
        try {
            IReceiverHandler handler = ReceiverHandlerManager.get(respInfo.getHeader().getMessageId().getMessageId());
            if(handler == null){
                log.error("没有找到对应的消息Handler,ID:"+respInfo.getHeader().getMessageId());
                return;
            }
            handler.handler(header,respInfo);
        } catch (Exception e) {
            log.error("",e);
        }

    }

 

 

小结

丢包严重:可以尝试增加DatagramSocket.setReceiveBufferSize来增加缓冲区大小

数据解析:定义好协议后,将协议头和协议体抽象出来。我这里抽象了一个UDPCoder(一些数据的公用read方法和write方法),UDPHeader,UDPBody。

 

PS:有空之后一定把源码整理出来
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  udp java udp