您的位置:首页 > 编程语言 > Qt开发

mqtt 协议之 PINGREQ, PINGRESP

2015-08-17 11:56 483 查看
  mqtt 协议里最简单的是 ping 协议吧 (心跳包), ping 协议是已连接的客户端发往服务端, 告诉服务端,我还"活着"

PINGREQ - PING request

fixed header format.

bit76543210
byte 1Message Type (12)DUP flagQoS levelRETAIN
1100xxxx
byte 2Remaining Length (0)
00000000
no variable header

no payload

response: The response to a PINGREQ message is a PINGRESP message.

PINGRESP - PING response

fixed header

bit76543210
byte 1Message Type (13)DUP flagQoS levelRETAIN
1101xxxx
byte 2Remaining Length (0)
00000000
no variable header

no payload

------------------------------------------------------------------------ 华丽的分界线 ---------------------------------------

客户端会在一个心跳周期内发送一条PINGREQ消息到服务器端。两个字节,固定值。

服务器收到PINGREQ请求之后,会立即响应一个两个字节固定格式的PINGRESP消息。

周期定义在 心跳频率在CONNECT(连接包)可变头部“Keep Alive timer”中定义时间,单位为秒,无符号16位short表示。

ok ,上代码 :

固定头部 FinedHeader

/// <summary>
/// Fixed header
/// </summary>
internal class FixedHeader
{
/// <summary>
/// Message type
/// </summary>
public MessageType MessageType { get; set; }

/// <summary>
/// DUP flag
/// </summary>
public bool Dup { get; set; }

/// <summary>
/// QoS flags
/// </summary>
public Qos Qos { get; set; }

/// <summary>
/// RETAIN 保持
/// </summary>
public bool Retain { get; set; }

/// <summary>
/// Remaining Length 剩余长度
/// 单个字节最大值:01111111,16进制:0x7F,10进制为127。
/// MQTT协议规定,第八位(最高位)若为1,则表示还有后续字节存在。
/// MQTT协议最多允许4个字节表示剩余长度。
/// 最大长度为:0xFF,0xFF,0xFF,0x7F,
/// 二进制表示为:11111111,11111111,11111111,01111111,十进制:268435455
/// </summary>
public int RemaingLength { get; set; }

public FixedHeader() { }

public FixedHeader(Stream stream)
{
if (stream.Length < 2)
throw new Exception("The supplied header is invalid. Header must be at least 2 bytes long.");

var byte1 = stream.ReadByte();
MessageType = (MessageType)((byte1 & 0xf0) >> 4);
Dup = ((byte1 & 0x08) >> 3) > 0;
Qos = (Qos)((byte1 & 0x06) >> 1);
Retain = (byte1 & 0x01) > 0;

//Remaining Length
//var byte2 = stream.ReadByte();
var lengthBytes = ReadLengthBytes(stream);
RemaingLength = CalculateLength(lengthBytes);
}

public void WriteTo(Stream stream)
{
var flags = (byte)MessageType << 4;
flags |= (Dup ? 1 : 0) << 3;
flags |= (byte)Qos << 1;
flags |= Retain ? 1 : 0;

stream.WriteByte((byte)flags);     //byte 1
if (RemaingLength == 0)         //byte 2
stream.WriteByte(0);
else
{
do
{
int digit = RemaingLength & 0x7f;
RemaingLength = RemaingLength >> 7;
if (RemaingLength > 0)
digit = digit | 0x80;
stream.WriteByte((byte)digit);
} while (RemaingLength > 0);
}
}

internal static byte[] ReadLengthBytes(Stream stream)
{
var lengthBytes = new List<byte>();

// read until we've got the entire size, or the 4 byte limit is reached
byte sizeByte;
int byteCount = 0;
do
{
sizeByte = (byte)stream.ReadByte();
lengthBytes.Add(sizeByte);
} while (++byteCount <= 4 && (sizeByte & 0x80) == 0x80);

return lengthBytes.ToArray();
}

internal static int CalculateLength(byte[] lengthBytes)
{
var remainingLength = 0;
var multiplier = 1;

foreach (var currentByte in lengthBytes)
{
remainingLength += (currentByte & 0x7f) * multiplier;
multiplier *= 0x80;
}

return remainingLength;
}
}


消息父类: Message

internal class Message
{
public FixedHeader FixedHeader { get; protected set; }

public Message()
{
}

public Message(MessageType messageType)
{
FixedHeader = new FixedHeader
{
MessageType = messageType
};
}

public virtual void WriteTo(Stream stream)
{
}

public static Message CreateFrom(byte[] buffer)
{
using (var stream = new MemoryStream(buffer))
{
return CreateFrom(stream);
}
}

public static Message CreateFrom(Stream stream)
{
var header = new FixedHeader(stream);
return CreateMessage(header, stream);
}

public static Message CreateMessage(FixedHeader header, Stream stream)
{
switch (header.MessageType)
{
case MessageType.CONNACK:
return new ConnAckMessage(header, stream);
case MessageType.DISCONNECT:
return null;
case MessageType.PINGREQ:
return new PingReqMessage();
case MessageType.PUBACK:
return new PublishAckMessage(header, stream);
case MessageType.PUBCOMP:
//return new MqttPubcompMessage(str, header);
case MessageType.PUBLISH:
//return new MqttPublishMessage(str, header);
case MessageType.PUBREC:
//return new MqttPubrecMessage(str, header);
case MessageType.PUBREL:
//return new MqttPubrelMessage(str, header);
case MessageType.SUBACK:
//return new MqttSubackMessage(str, header);
case MessageType.UNSUBACK:
//return new MqttUnsubackMessage(str, header);
case MessageType.PINGRESP:
return new PingRespMessage(header, stream);
case MessageType.UNSUBSCRIBE:
case MessageType.CONNECT:
case MessageType.SUBSCRIBE:
default:
throw new Exception("Unsupported Message Type");
}
}
}


两个枚举:

MessageType (消息类型)

Qos (服务质量等级)

[Flags]
public enum MessageType : byte
{
CONNECT     = 1,
CONNACK     = 2,
PUBLISH     = 3,
PUBACK      = 4,
PUBREC      = 5,
PUBREL      = 6,
PUBCOMP     = 7,
SUBSCRIBE   = 8,
SUBACK      = 9,
UNSUBSCRIBE = 10,
UNSUBACK    = 11,
PINGREQ     = 12,
PINGRESP    = 13,
DISCONNECT  = 14
}

/// <summary>
/// 服务质量等级
/// </summary>
[Flags]
public enum Qos : byte
{
/// <summary>
///     QOS Level 0 - Message is not guaranteed delivery. No retries are made to ensure delivery is successful.
/// </summary>
AtMostOnce = 0,

/// <summary>
///     QOS Level 1 - Message is guaranteed delivery. It will be delivered at least one time, but may be delivered
///     more than once if network errors occur.
/// </summary>
AtLeastOnce = 1,

/// <summary>
///     QOS Level 2 - Message will be delivered once, and only once. Message will be retried until
///     it is successfully sent..
/// </summary>
ExactlyOnce = 2,
}


ping 请求包: PingReqMessage

响应包: PingRespMessage

internal sealed class PingReqMessage : Message
{
public PingReqMessage()
: base(MessageType.PINGREQ)
{
}

public override void WriteTo(Stream stream)
{
FixedHeader.WriteTo(stream);
}
}

internal class PingRespMessage : Message
{
public PingRespMessage()
: base(MessageType.PINGRESP)
{
}

public PingRespMessage(FixedHeader header, Stream stream)
{
FixedHeader = header;
}
}


OK.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: