您的位置:首页 > 理论基础 > 计算机网络

TCP.Receiver C# 版本TCP协议接收器 V1.0.03

2015-08-29 13:01 369 查看
1.0.0.3版本,增加了对像实例的可得复用性

修改了大上次数据出错时,对像拆包出错的问题.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// 数据同步协义数据接收器
/// </summary>
/// <remarks>
/// 主要功能有
/// 1.将一个TCPSocket的所有数据全部接收
/// 2.解析协义
/// 3.解析完成后的协义调用 Handler通知外部处理
/// 4.定义一个协义解析线程不停的解析协义
/// Ver 1.0.0.2
/// 修改了协议检查处理逻辑
/// Ver 1.0.0.3
/// 修改了StopParseProtocol方法, 在最终加上了对协议头状态,位置索引的初始化
/// </remarks>
public class TCPReceiver : IDisposable
{
#region 构造函数
/// <summary>
/// 数据同步协义数据接收器 实例
/// </summary>
public TCPReceiver()
{
}

/// <summary>
/// 数据同步协义数据接收器 实例
/// </summary>
/// <param name="protocolhead">协议头</param>
/// <param name="protocolfoot">协议尾</param>
public TCPReceiver(byte[] protocolhead, byte[] protocolfoot = null)
{
//邦定包头,与包体
PackageHead = protocolhead;
PackageFoot = protocolfoot;
}

#endregion

/// <summary>
/// 最大单个协义体数据长度,默认10MB
/// </summary>
private int _maxProtocolBinary = 1024 * 1024 * 10;
/// <summary>
/// 最大单个协义体数据长度
/// </summary>
public int MaxProtocolBinary
{
get { return _maxProtocolBinary; }
set { _maxProtocolBinary = value; }
}

/// <summary>
/// 是否正在运行,是否可以运行
/// </summary>
internal bool IsRuning = false;

private Task _task = null;
/// <summary>
/// 当前处理解析协义的线程
/// </summary>
public Task PraseProtocolTask
{
get { return _task; }
}

/// <summary>
/// 接收数据处理事件
/// </summary>
public Action<Protocol, Socket> ProtocolReceivedHandler
{
get;
set;
}

/// <summary>
/// 是从哪个节点接收的数据
/// </summary>
public Socket TargetSocket
{
get;
set;
}

/// <summary>
/// 拆分协议数据包的委托,不指定将调用自身处理方式
/// </summary>
public Func<byte[], List<byte[]>> SplictProtocolPackeg { get; set; }

#region 接收数据添加到队列
/// <summary>
/// 默认开放500空间,100万次单纯添加用时95毫秒
/// </summary>
private ConcurrentQueue<byte[]> receiveByteArrayQueue = new ConcurrentQueue<byte[]>();
/// <summary>
/// 接入队列处理器
/// </summary>
public ConcurrentQueue<byte[]> ReceiveByteArrayQueue
{
get { return receiveByteArrayQueue; }
}

/// <summary>
/// 接收数据
/// </summary>
public void Receive(byte[] buff)
{
lock (receiveByteArrayQueue)
{
//添加对像数据
receiveByteArrayQueue.Enqueue(buff);
}
}
#endregion

#region 线程控制

/// <summary>
/// 线程通知应答
/// </summary>
private CancellationTokenSource tasksource = null;

/// <summary>
/// 开始解析数据指令
/// </summary>
/// <param name="_maxProtocolBinary">最大接收单个数据</param>
/// <returns>如果成功返回true</returns>
public bool StartParseProtocol(int _maxProtocolBinary = 1024*1024*10)
{
//多线程处理
if (_task == null || _task.IsCanceled || _task.IsCompleted || _task.IsFaulted)
{
IsRuning = true;
this._maxProtocolBinary = _maxProtocolBinary;
//设置单
// ByteBuff = new byte[maxProtocolBinary];

tasksource = new CancellationTokenSource();
CancellationToken token = tasksource.Token;
_task = new Task(SplictProtocol, token);
_task.Start();

//解析协议线程
Task taskParse = new Task(ParsePortocol, token);
taskParse.Start();

return true;
}
else
{
// "数据同步节点接收器当前正经进行开始解析指令,请不要重复调用");
}
return false;
}

/// <summary>
/// 停止解析协义
/// </summary>
public void StopParseProtocol()
{
IsRuning = false;
if (tasksource != null)
{
//线程取消继续运行
tasksource.Cancel();
}
try
{
//在线程停止后需要将缓存队列中的数据全部处理完成
for (; receiveByteArrayQueue.Count > 0; )
{
//处理数据
ProcessBytes();
}
//全部解析
for (; ProtocolEntityQueue.Count > 0; )
{
//解析  数据
ParsePortocolItem();
}
}
finally
{
//修改了协议接收器中存在滞留数据的问题
bytes.Clear();
// 找到分包用包头
FindPackageHead = false;
// 找包头的当着序号
findHeadindex = 0;
// 找包尾
findFootIndex = 0;
}
}
#endregion

#region 解析协义数据
/// <summary>
/// 分包用包头
/// </summary>
private byte[] packageHead = new byte[] { 0x7e };

/// <summary>
/// 分包用包头
/// </summary>
public byte[] PackageHead
{
get { return packageHead; }
set
{
if (value != null)
{
packageHead = value;
}
}
}
/// <summary>
/// 分包用包尾
/// </summary>
private byte[] packageFoot = new byte[] { 0x7e };
/// <summary>
/// 分包用包尾
/// </summary>
public byte[] PackageFoot
{
get { return packageFoot; }
set
{
if (value != null)
{
packageFoot = value;
}
}
}
/// <summary>
/// 用于处理数据协义的功能
/// </summary>
List<byte> bytes = new List<byte>();

/// <summary>
/// 协义数据实体队列,已经进行拆包后的协义数据
/// </summary>
internal ConcurrentQueue<byte[]> ProtocolEntityQueue = new ConcurrentQueue<byte[]>();

/// <summary>
/// 找到分包用包头
/// </summary>
bool FindPackageHead = false;
/// <summary>
/// 找包头的当着序号
/// </summary>
int findHeadindex = 0;
/// <summary>
/// 找包尾
/// </summary>
int findFootIndex = 0;

/// <summary>
/// 拆包方法
/// 之所以先全部放到一个query里是进行快速的接收
///
/// </summary>
protected void SplictProtocol()
{
IsRuning = true;
while (IsRuning)
{
ProcessBytes();
}
}
/// <summary>
/// 处理队列中的数据
/// </summary>
/// <param name="canSleep">是否可休眠</param>
public bool ProcessBytes(bool canSleep = true)
{
byte[] arr = null;
//开始解析数据
//1.取出数据
lock (receiveByteArrayQueue)
{
receiveByteArrayQueue.TryDequeue(out arr);
}
try
{
if (arr != null)
{
//调用自定议数据解析.
if (SplictProtocolPackeg != null)
{
var packetBytes = SplictProtocolPackeg(arr);
if (packetBytes != null)
{
lock (ProtocolEntityQueue)
{
packetBytes.ForEach(o => ProtocolEntityQueue.Enqueue(o));
return true;
}
}
else
{
return false;
}
}

//锁处理
lock (bytes)
{
//此协义数据中的协义数据索引
// List<int> ints = new List<int>();

//2.将数据进行包查找
//开始从队列中取数据
for (int k = 0; k < arr.Length; k++)
{
//队列有数据
byte b = arr[k];
//如果超过最大接收字节数
if (_maxProtocolBinary <= bytes.Count)
{
bytes.Clear();
}
//添加到对像集合
bytes.Add(b);
//3.从集合的前面开始取数据.找包头,进行拆包
#region 找包头
//等于包数据
if (packageHead.Length > 0 && b == packageHead[findHeadindex] && !FindPackageHead)
{
//包头找完
if (findHeadindex == packageHead.Length - 1)
{
//ints.Add(k);
Interlocked.Exchange(ref findHeadindex, 0);
if (!FindPackageHead)
{
FindPackageHead = true;
}
//这里取一个完整包
byte[] byteFarm = bytes.Take(bytes.Count - packageHead.Length).ToArray();
//如果是有效的数据
if (byteFarm.Length > packageHead.Length)
{
lock (ProtocolEntityQueue)
{
ProtocolEntityQueue.Enqueue(byteFarm);
}
//开始从 bytes 中移除数据
bytes.Clear();
//添加包头
bytes.AddRange(packageHead);
}
//包头找完则找下一字节
continue;
}
else
{
Interlocked.Increment(ref findHeadindex);
}
}
else
{
Interlocked.Exchange(ref findHeadindex, 0);
//findHeadindex = 0;
if (!FindPackageHead && packageHead.Length == 0)
{
FindPackageHead = true;
}
}
#endregion

#region 找包尾

if (packageFoot != null && packageFoot.Length > 0 && FindPackageHead)
{
if (b == packageFoot[findFootIndex])
{
//包尾找完
if (findFootIndex == packageFoot.Length - 1)
{
//删除包尾字节,可能会包含包头字节
//byte[] byteFarm = bytes.Take(bytes.Count - packageFoot.Length).ToArray();
var byteFarm = bytes.ToArray();
//跳过包头字节,包尾字节
//byte[] byteFarm = bytes.Skip(packageHead.Length).Take(bytes.Count - (packageFoot.Length + packageHead.Length)).ToArray();
//如果是有效的数据
if (byteFarm.Length >= packageFoot.Length)
{
lock (ProtocolEntityQueue)
{
ProtocolEntityQueue.Enqueue(byteFarm);
}
//开始从 bytes 中移除数据
bytes.Clear();
}
FindPackageHead = false;
//包尾找完则找下一字节
continue;
}
else
{
System.Threading.Interlocked.Increment(ref findFootIndex);
}
}
else
{
System.Threading.Interlocked.Exchange(ref findFootIndex, 0);
}
}
#endregion
}
}
}
else
{
if (canSleep)
{
Thread.Sleep(5);
}
return false;
}
}
catch (Exception ex)
{
IsRuning = false;
//"处理协议线程出现问题,未处理数据量{0},稍后将自动处理完末处理数据,出错原因:{1}", (receiveByteArrayQueue.Count + ProtocolEntityQueue.Count), ex));
Dispose();
}
return true;
}

#region 解析协议
/// <summary>
/// 解析协议
/// </summary>
protected void ParsePortocol()
{
while (IsRuning)
{
ParsePortocolItem();
}
}

/// <summary>
/// 解析单个协议
/// </summary>
/// <param name="canSleep">是否要休眠</param>
public bool ParsePortocolItem(bool canSleep = true)
{
try
{
//4.重新组成一个byte[] 进行数据解析
//没解析过的数据协议
byte[] sourceByteArray = null;
if (ProtocolEntityQueue.Count > 0)
{
ProtocolEntityQueue.TryDequeue(out sourceByteArray);
}
//没有数据则进行停止后返回
if (sourceByteArray == null)
{
if (canSleep)
{
Thread.Sleep(100);
}
return false;
}
//解析协议数据,转议消息还原
var bytearr = sourceByteArray;
//数据要大于分包的长度
if (bytearr.Length > packageFoot.Length && bytearr.Length > packageHead.Length)
{
//检查包头包尾
if (packageHead.Length > 0 && packageFoot.Length > 0)
{
if (!(bytearr.Length > 4 && bytearr[0] == packageHead[0] && bytearr[bytearr.Length - 1] == packageFoot[packageFoot.Length - 1]))
{
//"接收到终端TCP验证不通过数据:{0}", bytearr));
return true;
}
}
//接收数据日志,从协议解析器中接收数据
if (RecordLogHandler != null)
{
RecordLogHandler(bytearr, TargetSocket);
}
//进行条件判断
if (ParsePackegHandler != null && ProtocolReceivedHandler != null)
{
var receivebytes = ParsePackegHandler(bytearr, TargetSocket);
//这里代码没有完成请后续完成
ProtocolReceivedHandler.Invoke(receivebytes, TargetSocket);
}
}
}
catch (Exception ex)
{
//string.Format("解析协议异常:{0}", ex));
}
return true;
}

#endregion

#endregion

/// <summary>
/// 接收一包数据
/// </summary>
public Func<byte[], Socket, Protocol> ParsePackegHandler { get; set; }

/// <summary>
/// 记录日志
/// </summary>
public Action<byte[], Socket> RecordLogHandler { get; set; }

public void Dispose(bool can)
{
if (can)
{
Dispose();
}
}

/// <summary>
/// 析构方法
/// </summary>
public void Dispose()
{
StopParseProtocol();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: