您的位置:首页 > 编程语言 > Delphi

DELPHI高性能大容量SOCKET并发(四):粘包、分包、解包

2017-10-11 16:50 543 查看
转载连接: http://blog.csdn.net/sqldebug_fan/article/details/7907765

粘包

使用TCP长连接就会引入粘包的问题,粘包是指发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾。粘包可能由发送方造成,也可能由接收方造成。TCP为提高传输效率,发送方往往要收集到足够多的数据后才发送一包数据,造成多个数据包的粘连。如果接收进程不及时接收数据,已收到的数据就放在系统接收缓冲区,用户进程读取数据时就可能同时读到多个数据包。

粘包一般的解决办法是制定通讯协议,由协议来规定如何分包解包。

分包

在IOCPDemo例子程序中,我们分包的逻辑是先发一个长度,然后紧接着是数据包内容,这样就可以把每个包分开。

应用层数据包格式如下:

应用层数据包格式 
数据包长度Len:Cardinal(4字节无符号整数)数据包内容,长度为Len
IOCPSocket分包处理主要代码,我们收到的数据都是在TSocketHandle.ProcessIOComplete方法中处理:

[delphi] view
plain copy

procedure TSocketHandle.ProcessIOComplete(AIocpRecord: PIocpRecord;  

  const ACount: Cardinal);  

begin  

  case AIocpRecord.IocpOperate of  

    ioNone: Exit;  

    ioRead: //收到数据  

    begin  

      FActiveTime := Now;  

      ReceiveData(AIocpRecord.WsaBuf.buf, ACount);  

      if FConnected then  

        PreRecv(AIocpRecord); //投递请求  

    end;  

    ioWrite: //发送数据完成,需要释放AIocpRecord的指针  

    begin  

      FActiveTime := Now;  

      FSendOverlapped.Release(AIocpRecord);  

    end;  

    ioStream:  

    begin  

      FActiveTime := Now;  

      FSendOverlapped.Release(AIocpRecord);  

      WriteStream; //继续发送流  

    end;  

  end;  

end;  

如果是收到数据,则调用ReceiveData函数,ReceiveData主要功能是把数据的写入流中,然后调用Process分包。FInputBuf是一个内存流(FInputBuf: TMemoryStream),内存流的每次写入会造成一次内存分配,如果要获得更高的效率,可以替换为内存池等更好的内存管理方式。还有一种更好的解决方案是规定每次发包的大小,如每个包最大不超过64K,哪么缓冲区的最大大小可以设置为128K(缓存两个数据包),这样就可以每次创建对象时一次分配好,减少内存分配次数,提高效率。(内存的分配和释放比内存的读写效率要低)

[delphi] view
plain copy

procedure TSocketHandle.ReceiveData(AData: PAnsiChar; const ALen: Cardinal);  

begin  

  FInputBuf.Write(AData^, ALen);  

  Process;  

end;  

Process则根据收到的数据进行分包逻辑,如果不够一个包,则继续等待接收数据,如果够一个或多个包,则循环调用Execute函数进行处理,代码如下:

[delphi] view
plain copy

procedure TSocketHandle.Process;  

var  

  AData, ALast, NewBuf: PByte;  

  iLenOffset, iOffset, iReserveLen: Integer;  

  

  function ReadLen: Integer;  

  var  

    wLen: Word;  

    cLen: Cardinal;  

  begin  

    FInputBuf.Position := iOffset;  

    if FLenType = ltWord then  

    begin  

      FInputBuf.Read(wLen, SizeOf(wLen));  

      //wLen := ntohs(wLen);  

      Result := wLen;  

    end  

    else  

    begin  

      FInputBuf.Read(cLen, SizeOf(cLen));  

      //cLen := ntohl(cLen);  

      Result := cLen;  

    end;  

  end;  

begin  

  case FLenType of  

    ltWord, ltCardinal:  

    begin  

      if FLenType = ltWord then  

        iLenOffset := 2  

      else  

        iLenOffset := 4;  

      iReserveLen := 0;  

      FPacketLen := 0;  

      iOffset := 0;  

      if FPacketLen <= 0 then  

      begin  

        if FInputBuf.Size < iLenOffset then Exit;  

        FInputBuf.Position := 0; //移动到最前面  

        FPacketLen := ReadLen;  

        iOffset := iLenOffset;  

        iReserveLen := FInputBuf.Size - iOffset;  

        if FPacketLen > iReserveLen then //不够一个包的长度  

        begin  

          FInputBuf.Position := FInputBuf.Size; //移动到最后,以便接收后续数据  

          FPacketLen := 0;  

          Exit;  

        end;  

      end;  

      while (FPacketLen > 0) and (iReserveLen >= FPacketLen) do //如果数据够长,则处理  

      begin //多个包循环处理  

        AData := Pointer(Longint(FInputBuf.Memory) + iOffset); //取得当前的指针  

        Execute(AData, FPacketLen);  

        iOffset := iOffset + FPacketLen; //移到下一个点  

        FPacketLen := 0;  

        iReserveLen := FInputBuf.Size - iOffset;  

        if iReserveLen > iLenOffset then //剩下的数据  

        begin  

          FPacketLen := ReadLen;  

          iOffset := iOffset + iLenOffset;  

          iReserveLen := FInputBuf.Size - iOffset;  

          if FPacketLen > iReserveLen then //不够一个包的长度,需要把长度回退  

          begin  

            iOffset := iOffset - iLenOffset;  

            iReserveLen := FInputBuf.Size - iOffset;  

            FPacketLen := 0;  

          end;  

        end  

        else //不够长度字节数  

          FPacketLen := 0;  

      end;  

      if iReserveLen > 0 then //把剩下的自己缓存起来  

      begin  

        ALast := Pointer(Longint(FInputBuf.Memory) + iOffset);  

        GetMem(NewBuf, iReserveLen);  

        try  

          CopyMemory(NewBuf, ALast, iReserveLen);  

          FInputBuf.Clear;  

          FInputBuf.Write(NewBuf^, iReserveLen);  

        finally  

          FreeMemory(NewBuf);  

        end;  

      end  

      else  

      begin  

        FInputBuf.Clear;  

      end;  

    end;  

  else  

    begin  

      FInputBuf.Position := 0;  

      AData := Pointer(Longint(FInputBuf.Memory)); //取得当前的指针  

      Execute(AData, FInputBuf.Size);  

      FInputBuf.Clear;  

    end;  

  end;  

end;  

解包

由于我们应用层数据包既可以传命令也可以传数据,因而针对每个包我们进行解包,分出命令和数据分别处理,因而每个Socket服务对象都需要解包,我们解包的逻辑是放在TBaseSocket.DecodePacket中,命令和数据的包格式为:

命令长度Len:Cardinal(4字节无符号整数)命令数据
这里和第一版公布的代码不同,这版的代码对命令进行了编码,采用UTF-8编码,代码如下:

[delphi] view
plain copy

function TBaseSocket.DecodePacket(APacketData: PByte;  

  const ALen: Integer): Boolean;  

var  

  CommandLen: Integer;  

  UTF8Command: UTF8String;  

begin  

  if ALen > 4 then //命令长度为4字节,因而长度必须大于4  

  begin  

    CopyMemory(@CommandLen, APacketData, SizeOf(Cardinal)); //获取命令长度  

    Inc(APacketData, SizeOf(Cardinal));  

    SetLength(UTF8Command, CommandLen);  

    CopyMemory(PUTF8String(UTF8Command), APacketData, CommandLen); //读取命令  

    Inc(APacketData, CommandLen);  

    FRequestData := APacketData; //数据  

    FRequestDataLen := ALen - SizeOf(Cardinal) - CommandLen; //数据长度  

    FRequest.Text := Utf8ToAnsi(UTF8Command); //把UTF8转为Ansi  

    Result := True;  

  end  

  else  

    Result := False;   

end;  

具体每个协议可以集成Execute方法,调用DecodePacket进行解包,然后根据命令进行协议逻辑处理,例如TSQLSocket主要代码如下:

[delphi] view
plain copy

{* SQL查询SOCKET基类 *}  

TSQLSocket = class(TBaseSocket)  

private  

  {* 开始事务创建TADOConnection,关闭事务时释放 *}  

  FBeginTrans: Boolean;  

  FADOConn: TADOConnection;  

protected  

  {* 处理数据接口 *}  

  procedure Execute(AData: PByte; const ALen: Cardinal); override;  

  {* 返回SQL语句执行结果 *}  

  procedure DoCmdSQLOpen;  

  {* 执行SQL语句 *}  

  procedure DoCmdSQLExec;  

  {* 开始事务 *}  

  procedure DoCmdBeginTrans;  

  {* 提交事务 *}  

  procedure DoCmdCommitTrans;  

  {* 回滚事务 *}  

  procedure DoCmdRollbackTrans;  

public  

  procedure DoCreate; override;  

  destructor Destroy; override;  

  {* 获取SQL语句 *}  

  function GetSQL: string;  

  property BeginTrans: Boolean read FBeginTrans;  

end;  

Exceute是调用DecodePacket进行解包,然后获取命令分别调用不同的命令处理逻辑,代码如下:

[delphi] view
plain copy

procedure TSQLSocket.Execute(AData: PByte; const ALen: Cardinal);  

var  

  sErr: string;  

begin  

  inherited;  

  FRequest.Clear;  

  FResponse.Clear;  

  try  

    AddResponseHeader;  

    if ALen = 0 then  

    begin  

      DoFailure(CIPackLenError);  

      DoSendResult;  

      Exit;  

    end;  

    if DecodePacket(AData, ALen) then  

    begin  

      FResponse.Clear;  

      AddResponseHeader;  

      case StrToSQLCommand(Command) of  

        scLogin:  

        begin  

          DoCmdLogin;  

          DoSendResult;  

        end;  

        scActive:  

        begin  

          DoSuccess;  

          DoSendResult;  

        end;  

        scSQLOpen:  

        begin  

          DoCmdSQLOpen;  

        end;  

        scSQLExec:  

        begin  

          DoCmdSQLExec;  

          DoSendResult;  

        end;  

        scBeginTrans:  

        begin  

          DoCmdBeginTrans;  

          DoSendResult;  

        end;  

        scCommitTrans:  

        begin  

          DoCmdCommitTrans;  

          DoSendResult;  

        end;  

        scRollbackTrans:  

        begin  

          DoCmdRollbackTrans;  

          DoSendResult;  

        end;  

      else  

        DoFailure(CINoExistCommand, 'Unknow Command');  

        DoSendResult;  

      end;  

    end  

    else  

    begin  

      DoFailure(CIPackFormatError, 'Packet Must Include \r\n\r\n');  

      DoSendResult;  

    end;  

  except  

    on E: Exception do //发生未知错误,断开连接  

    begin  

      sErr := RemoteAddress + ':' + IntToStr(RemotePort) + CSComma + 'Unknow Error: ' + E.Message;  

      WriteLogMsg(ltError, sErr);  

      Disconnect;  

    end;  

  end;  

end;  

更详细代码见示例代码的IOCPSocket单元。

V1版下载地址:http://download.csdn.net/detail/sqldebug_fan/4510076,需要资源10分,有稳定性问题,可以作为研究稳定性用;
V2版下载地址:http://download.csdn.net/detail/sqldebug_fan/5560185,不需要资源分,解决了稳定性问题和提高性能;免责声明:此代码只是为了演示IOCP编程,仅用于学习和研究,切勿用于商业用途。水平有限,错误在所难免,欢迎指正和指导。邮箱地址:fansheng_hx@163.com
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: