您的位置:首页 > 理论基础 > 计算机网络

Java的TCP/IP编程学习--构建和解析协议消息

2019-03-20 21:38 288 查看

一、投票信息类–VoteMsg

package vote;

/**
* @ClassName VoteMsg
* 在实现一个协议时,定义一个专门类存放消息
* 该类提供了操作消息中字段的方法
* @Description TODO
* @Author Cays
* @Date 2019/3/17 9:04
* @Version 1.0
**/
public class VoteMsg {
//true时表示查询请求
private boolean isInquiry;
//true时表示是响应消息
private boolean isResponse;
//候选人ID 0--1000
private int candidateID;
//候选人获得的选票总数 非负 响应消息中非零
private long voteCount;
public static final int MAX_CANDIDATE_ID=1000;

public VoteMsg(boolean isInquiry, boolean isResponse, int candidateID, long voteCount)
throws IllegalArgumentException{
if (voteCount!=0&&!isResponse){
throw new IllegalArgumentException("Request vote count must be zero");
}
if (candidateID<0||candidateID>MAX_CANDIDATE_ID){
throw new IllegalArgumentException("Bad candidateID:"+candidateID);
}
if (voteCount<0){
throw new IllegalArgumentException("Total must be  >= zero");
}
this.isInquiry = isInquiry;
this.isResponse = isResponse;
this.candidateID = candidateID;
this.voteCount = voteCount;
}

public boolean isInquiry() {
return isInquiry;
}

public void setInquiry(boolean inquiry) {
isInquiry = inquiry;
}

public boolean isResponse() {
return isResponse;
}

public void setResponse(boolean response) {
isResponse = response;
}

public int getCandidateID() {
return candidateID;
}

public void setCandidateID(int candidateID) {
if (candidateID<0||candidateID>MAX_CANDIDATE_ID){
throw new IllegalArgumentException("Bad candidateID:"+candidateID);
}
this.candidateID = candidateID;
}

public long getVoteCount() {
return voteCount;
}

public void setVoteCount(long voteCount) {
if ((voteCount!=0&&!isResponse)||voteCount<0){
throw new IllegalArgumentException("bad vote count");
}
this.voteCount = voteCount;
}

@Override
public String toString() {
String res=(isInquiry?"inquiry":"vote")+" for candidate "+candidateID;
if (isResponse){
res="response to "+res+" who new has "+voteCount+" vote(s)";
}
return res;
}
}

二、序列化和反序列化接口–VoteMsgCoder

VoteMsgCoder接口提供对信息进行序列化和反序列化的方法

package vote;

import java.io.IOException;

/**
* @ClassName VoteMsgCoder
* @Description TODO
* 对投票消息进行序列化和反序列化
* @Author Cays
* @Date 2019/3/17 9:15
* @Version 1.0
**/
public interface VoteMsgCoder {
/**
* toMire()方法用于根据一个特定的协议,将投票消息转换成一个字节序列,
* @param msg
* @return
* @throws IOException
*/
byte[] toWire(VoteMsg msg) throws IOException;

/**
* fromMire()方法则根据相同的协议,对给定的字节序列进行解析,
* 并根据信息的内容构造出消息类的一个实例。
* @param input
* @return
* @throws IOException
*/
VoteMsg fromWire(byte[] input) throws IOException;
}

三、基于文本的表示方法–VoteMsgTextCoder

首先,我们介绍一个用文本方式对消息进行编码的版本。该协议指定使用ASCIl字符集对文本进行编码。消息的开头是一个所谓的“魔术字符串”,即一个字符序列,用于接收者快速将投票协议的消息和网络中随机到来的垃圾消息区分开。投票/查询布尔值被编码成字符形式,‘v’表示投票消息,‘i’表示查询消息。消息的状态,即是否为服务器的响应,由字符‘R’指示。状态标记后面是候选人ID,其后跟的是选票总数,它们都编码成十进制字符串。VoteMsgTextCoder类提供了一种基于文本的Votelsg编码方法。

package vote;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Scanner;

/**
* @ClassName VoteMsgTextCoder
* @Description TODO
* @Author Cays
* @Date 2019/3/17 9:20
* @Version 1.0
**/
public class VoteMsgTextCoder implements VoteMsgCoder {
public static final String MAGIC="Voting";
public static final String VOTESTR="v";
public static final String INQSTR="i";
public static final String RESPONSESTR="R";
public static final String CHARSRTNAME="US-ASCII";
public static final String DELIMSTR=" ";
public static final int MAX_WIRE_LENGTH=2000;
@Override
public byte[] toWire(VoteMsg msg) throws IOException {
//将文本转换为字符串
String msgString=MAGIC+DELIMSTR+(msg.isInquiry()?INQSTR:VOTESTR)
+DELIMSTR+(msg.isResponse()?RESPONSESTR+DELIMSTR:"")
+Integer.toString(msg.getCandidateID())+DELIMSTR
+Long.toString(msg.getVoteCount());
byte[] data=msgString.getBytes();
return data;
}

@Override
public VoteMsg fromWire(byte[] input) throws IOException {
//根据字符串规则将信息提取出来
ByteArrayInputStream msgStream=new ByteArrayInputStream(input);
Scanner s=new Scanner(new InputStreamReader(msgStream,CHARSRTNAME));
boolean isInquiry,isResponse;
int candidateID;long voteCount;
String token;
try {
//提取MAGIC
token=s.next();
if (!token.equals(MAGIC)){
throw new IOException("bad magic string:"+token);
}
//提取请求类型
token=s.next();
if (token.equals(VOTESTR)){
isInquiry=false;
}else if(!token.equals(INQSTR)){
throw new IOException("bad vote/inq:"+token);
}else {
isInquiry=true;
}
//提取响应消息与否
token=s.next();
if (token.equals(RESPONSESTR)){
isResponse=true;
token=s.next();
}else {
isResponse=false;
}
//提取选票人ID
candidateID=Integer.parseInt(token);
if (isResponse){
//如果是响应信息提取票数
token=s.next();
voteCount=Long.parseLong(token);
}else {
//如果不是响应信息票数为0
voteCount=0;
}
}catch (IOException e){
throw new IOException("Parse error......");
}
//生成选票信息类返回
return new VoteMsg(isInquiry,isResponse,candidateID,voteCount);
}
}

四、二进制的表示方法–VoteMsgBinCoder

package vote;

import java.io.*;

/**
* @ClassName VoteMsgBinCoder
* @Description TODO
* @Author Cays
* @Date 2019/3/17 9:41
* @Version 1.0
**/
public class VoteMsgBinCoder implements VoteMsgCoder {
public static final int MIN_WIRE_LENGTH=4;
public static final int MAX_WIRE_LENGTH=18;
public static final int MAGIC=0x5400;//01010100 00000000
public static final int MAGIC_MASK=0xfc00;//11111100 00000000
public static final int MAGIC_SHIFT=8;
public static final int RESPONSE_FLAG=0x0200;
public static final int INQUIRE_FLAG=0x0100;

@Override
public byte[] toWire(VoteMsg msg) throws IOException {
ByteArrayOutputStream byteStream=new ByteArrayOutputStream();
DataOutputStream out=new DataOutputStream(byteStream);
short magicAndFlag=MAGIC;
if (msg.isInquiry()){
magicAndFlag|=INQUIRE_FLAG;
}
if (msg.isResponse()){
magicAndFlag|=RESPONSE_FLAG;
}
out.write(magicAndFlag);
out.writeShort((short)msg.getCandidateID());
if (msg.isResponse()){
out.writeLong(msg.getVoteCount());
}
out.flush();
byte[] data=byteStream.toByteArray();
return data;
}

@Override
public VoteMsg fromWire(byte[] input) throws IOException {
if (input.length<MIN_WIRE_LENGTH){
throw new IOException("Runt message");
}
ByteArrayInputStream bs=new ByteArrayInputStream(input);
DataInputStream in=new DataInputStream(bs);
int magic=in.readShort();
if ((magic&MAGIC_MASK)!=MAGIC){
throw new IOException("bad magic:"+((magic&MAGIC_MASK)>>MAGIC_SHIFT));
}
boolean resp=((magic&RESPONSE_FLAG)!=0);
boolean inq=((magic&INQUIRE_FLAG)!=0);
int candidateID=in.readShort();
if (candidateID<0||candidateID>1000){
throw new IOException("bad candidateID:"+candidateID);
}
long voteCount=0;
if (resp){
voteCount=in.readLong();
if (voteCount<0){
throw new IOException("bad vote count:"+voteCount);
}
}
return new VoteMsg(inq,resp,candidateID,voteCount);
}
}

五、发送和接收

  1. VoteService处理Request
package vote;

import java.util.HashMap;
import java.util.Map;

/**
* @ClassName VoteService
* @Description TODO
* 1. 维护一个候选人ID与其获得选票数的映射
* 2. 记录提交的选票
* 3. 根据其获得的选票数,查询,响应消息
* @Author Cays
* @Date 2019/3/18 12:52
* @Version 1.0
**/
public class VoteService {
//查询选票信息
private Map<Integer,Long> results=new HashMap<>();
public VoteMsg handleRequest(VoteMsg msg){
if (msg.isResponse()){
//如果消息是一个响应消息,不修改
return msg;
}
msg.setResponse(true);
int candidateID=msg.getCandidateID();
Long count=results.get(candidateID);
//如果候选人ID不存在,票数为零
if (count==null){
count=0L;
}
if (!msg.isInquiry()){
//将增加后的选票数存回映射
results.put(candidateID,++count);
}
msg.setVoteCount(count);
return msg;
}

}
  1. VoteClientTCP:TCP投票客户端
package vote;

import framer.Framer;
import framer.LengthFramer;

import java.io.OutputStream;
import java.net.Socket;

/**
* @ClassName VoteClientTCP
* @Description TODO
* @Author Cays
* @Date 2019/3/18 15:12
* @Version 1.0
**/
public class VoteClientTCP {
public static final int CANDIDATEID=888;

public static void main(String[] args) throws Exception {
if (args.length!=2){
throw new IllegalArgumentException("Parameter(s):<Port>");
}
String destAddr=args[0];
int destPort=Integer.parseInt(args[1]);
//创建套接字,获取输出流
Socket sock=new Socket(destAddr,destPort);
OutputStream out=sock.getOutputStream();
//创建二进制编码器和基于长度的成帧器
VoteMsgCoder coder=new VoteMsgBinCoder();
Framer framer=new LengthFramer(sock.getInputStream());
//创建,编码,成帧和发送请求
VoteMsg msg=new VoteMsg(false,true,CANDIDATEID,0);
//将选票信息转换成二进制信息
byte[] encodedMsg=coder.toWire(msg);
//Send request
System.out.println("Sending Inquiry ("+encodedMsg.length+" bytes):");
System.out.println(msg);
//out中增加/写入长度前缀
framer.frameMsg(encodedMsg,out);
//now send a vote
msg.setInquiry(false);
encodedMsg=coder.toWire(msg);
System.out.println("Sending Vote ("+encodedMsg.length+" bytes):");
framer.frameMsg(encodedMsg,out);
//receive inquiry response
//获取out下一条编码后的消息,并通过fromWire解析/编码 m
encodedMsg=framer.nextMsg();
msg=coder.fromWire(encodedMsg);
System.out.println("Received Response("+encodedMsg.length+
" bytes):");
System.out.println(msg);
//receive vote response
msg=coder.fromWire(framer.nextMsg());
System.out.println("Received Response("+encodedMsg.length+
" bytes):");
System.out.println(msg);
sock.close();
}
}
  1. TCP投票服务器:对客户端消息进行响应
package vote;

import framer.Framer;
import framer.LengthFramer;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

/**
* @ClassName VoteServerTCP
* @Description TODO
* @Author Cays
* @Date 2019/3/20 21:32
* @Version 1.0
**/
public class VoteServerTCP {
public static void main(String[] args) throws IOException {
int port=8888;
ServerSocket serverSocket=new ServerSocket(port);
VoteMsgCoder coder=new VoteMsgBinCoder();
VoteService service=new VoteService();
while (true){
Socket clntSocket=serverSocket.accept();
System.out.println("Handling client at "+clntSocket.getRemoteSocketAddress());
Framer framer=new LengthFramer(clntSocket.getInputStream());
try {
byte[] req;
while ((req=framer.nextMsg())!=null){
System.out.println("Received message ("+req.length+" bytes)");
VoteMsg responseMsg=service.handleRequest(coder.fromWire(req));
framer.frameMsg(coder.toWire(responseMsg),clntSocket.getOutputStream());
}
}catch (IOException ioe){
System.out.println("Error handling client:"+ioe.getMessage());
}finally {
System.out.println("Closing connection");
clntSocket.close();
}
}
}
}
  1. UDP投票服务器
package vote;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketException;
import java.util.Arrays;

/**
* @ClassName VoteServerUDP
* @Description TODO
* @Author Cays
* @Date 2019/3/18 16:53
* @Version 1.0
**/
public class VoteServerUDP {
public static void main(String[] args) throws IOException {
if (args.length!=1){
throw new IllegalArgumentException("Pa:<Port>");
}
int port=Integer.parseInt(args[0]);
DatagramSocket socket=new DatagramSocket(port);
//为服务器创建接收缓存区,
byte[] inBuffer=new byte[VoteMsgTextCoder.MAX_WIRE_LENGTH];
//编码器
VoteMsgCoder coder=new VoteMsgTextCoder();
//投票服务
VoteService service=new VoteService();
while (true){
DatagramPacket packet=new DatagramPacket(inBuffer,inBuffer.length);
//接收数据报文,抽取数据,UDP完成了成帧工作
socket.receive(packet);
byte[] encodedMsg= Arrays.copyOfRange(packet.getData(),0,packet.getLength());
System.out.println("Handling request from "+packet.getSocketAddress()+
" ("+encodedMsg.length+" bytes)");
try {
//解码和处理请求
VoteMsg msg=coder.fromWire(encodedMsg);
msg=service.handleRequest(msg);
//编码并传递
packet.setData(coder.toWire(msg));
System.out.println("Sending response ("+packet.getLength()+" bytes):");
System.out.println(msg);
//发送响应消息
socket.send(packet);
}catch (IOException e){
System.err.println("Parse error in message:"+e.getMessage());
}
}
}
}
  1. UDP投票客户端
package vote;

import java.io.IOException;
import java.net.*;
import java.util.Arrays;

/**
* @ClassName VoteClientUDP
* @Description TODO
* @Author Cays
* @Date 2019/3/18 16:40
* @Version 1.0
**/
public class VoteClientUDP {
public static void main(String[] args) throws IOException {
if (args.length!=3){
throw new IllegalArgumentException("Pa:<Port>");
}
InetAddress destAddr=InetAddress.getByName(args[0]);
int destPort=Integer.parseInt(args[1]);
int candidate=Integer.parseInt(args[2]);
DatagramSocket sock=new DatagramSocket();
//通过connet方法连接远程ip/端口
sock.connect(destAddr,destPort);
//创建vote
VoteMsg vote=new VoteMsg(false,false,candidate,0);
VoteMsgCoder coder=new VoteMsgTextCoder();
//将vote转换成文本消息,返回文本的二进制流
byte[] encodedVote=coder.toWire(vote);
System.out.println("Sending Text-Encoded request("+encodedVote.length+
" bytes):");
System.out.println(vote);
//不需要边界,UDP协议为我们保留了边界信息
DatagramPacket message=new DatagramPacket(encodedVote,encodedVote.length);
sock.send(message);
message=new DatagramPacket(new byte[VoteMsgTextCoder.MAX_WIRE_LENGTH],VoteMsgTextCoder.MAX_WIRE_LENGTH);
sock.receive(message);
encodedVote= Arrays.copyOfRange(message.getData(),0,message.getLength());
System.out.println("Received Text-Encoded response ("+encodedVote.length+
" bytes):");
vote=coder.fromWire(encodedVote);
System.out.println(vote);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: