TCP.Receiver C# 版本TCP协议接收器 V1.0.03
2015-08-29 13:01
369 查看
1.0.0.3版本,增加了对像实例的可得复用性
修改了大上次数据出错时,对像拆包出错的问题.
修改了大上次数据出错时,对像拆包出错的问题.
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; /// <summary> /// 数据同步协义数据接收器 /// </summary> /// <remarks> /// 主要功能有 /// 1.将一个TCPSocket的所有数据全部接收 /// 2.解析协义 /// 3.解析完成后的协义调用 Handler通知外部处理 /// 4.定义一个协义解析线程不停的解析协义 /// Ver 1.0.0.2 /// 修改了协议检查处理逻辑 /// Ver 1.0.0.3 /// 修改了StopParseProtocol方法, 在最终加上了对协议头状态,位置索引的初始化 /// </remarks> public class TCPReceiver : IDisposable { #region 构造函数 /// <summary> /// 数据同步协义数据接收器 实例 /// </summary> public TCPReceiver() { } /// <summary> /// 数据同步协义数据接收器 实例 /// </summary> /// <param name="protocolhead">协议头</param> /// <param name="protocolfoot">协议尾</param> public TCPReceiver(byte[] protocolhead, byte[] protocolfoot = null) { //邦定包头,与包体 PackageHead = protocolhead; PackageFoot = protocolfoot; } #endregion /// <summary> /// 最大单个协义体数据长度,默认10MB /// </summary> private int _maxProtocolBinary = 1024 * 1024 * 10; /// <summary> /// 最大单个协义体数据长度 /// </summary> public int MaxProtocolBinary { get { return _maxProtocolBinary; } set { _maxProtocolBinary = value; } } /// <summary> /// 是否正在运行,是否可以运行 /// </summary> internal bool IsRuning = false; private Task _task = null; /// <summary> /// 当前处理解析协义的线程 /// </summary> public Task PraseProtocolTask { get { return _task; } } /// <summary> /// 接收数据处理事件 /// </summary> public Action<Protocol, Socket> ProtocolReceivedHandler { get; set; } /// <summary> /// 是从哪个节点接收的数据 /// </summary> public Socket TargetSocket { get; set; } /// <summary> /// 拆分协议数据包的委托,不指定将调用自身处理方式 /// </summary> public Func<byte[], List<byte[]>> SplictProtocolPackeg { get; set; } #region 接收数据添加到队列 /// <summary> /// 默认开放500空间,100万次单纯添加用时95毫秒 /// </summary> private ConcurrentQueue<byte[]> receiveByteArrayQueue = new ConcurrentQueue<byte[]>(); /// <summary> /// 接入队列处理器 /// </summary> public ConcurrentQueue<byte[]> ReceiveByteArrayQueue { get { return receiveByteArrayQueue; } } /// <summary> /// 接收数据 /// </summary> public void Receive(byte[] buff) { lock (receiveByteArrayQueue) { //添加对像数据 receiveByteArrayQueue.Enqueue(buff); } } #endregion #region 线程控制 /// <summary> /// 线程通知应答 /// </summary> private CancellationTokenSource tasksource = null; /// <summary> /// 开始解析数据指令 /// </summary> /// <param name="_maxProtocolBinary">最大接收单个数据</param> /// <returns>如果成功返回true</returns> public bool StartParseProtocol(int _maxProtocolBinary = 1024*1024*10) { //多线程处理 if (_task == null || _task.IsCanceled || _task.IsCompleted || _task.IsFaulted) { IsRuning = true; this._maxProtocolBinary = _maxProtocolBinary; //设置单 // ByteBuff = new byte[maxProtocolBinary]; tasksource = new CancellationTokenSource(); CancellationToken token = tasksource.Token; _task = new Task(SplictProtocol, token); _task.Start(); //解析协议线程 Task taskParse = new Task(ParsePortocol, token); taskParse.Start(); return true; } else { // "数据同步节点接收器当前正经进行开始解析指令,请不要重复调用"); } return false; } /// <summary> /// 停止解析协义 /// </summary> public void StopParseProtocol() { IsRuning = false; if (tasksource != null) { //线程取消继续运行 tasksource.Cancel(); } try { //在线程停止后需要将缓存队列中的数据全部处理完成 for (; receiveByteArrayQueue.Count > 0; ) { //处理数据 ProcessBytes(); } //全部解析 for (; ProtocolEntityQueue.Count > 0; ) { //解析 数据 ParsePortocolItem(); } } finally { //修改了协议接收器中存在滞留数据的问题 bytes.Clear(); // 找到分包用包头 FindPackageHead = false; // 找包头的当着序号 findHeadindex = 0; // 找包尾 findFootIndex = 0; } } #endregion #region 解析协义数据 /// <summary> /// 分包用包头 /// </summary> private byte[] packageHead = new byte[] { 0x7e }; /// <summary> /// 分包用包头 /// </summary> public byte[] PackageHead { get { return packageHead; } set { if (value != null) { packageHead = value; } } } /// <summary> /// 分包用包尾 /// </summary> private byte[] packageFoot = new byte[] { 0x7e }; /// <summary> /// 分包用包尾 /// </summary> public byte[] PackageFoot { get { return packageFoot; } set { if (value != null) { packageFoot = value; } } } /// <summary> /// 用于处理数据协义的功能 /// </summary> List<byte> bytes = new List<byte>(); /// <summary> /// 协义数据实体队列,已经进行拆包后的协义数据 /// </summary> internal ConcurrentQueue<byte[]> ProtocolEntityQueue = new ConcurrentQueue<byte[]>(); /// <summary> /// 找到分包用包头 /// </summary> bool FindPackageHead = false; /// <summary> /// 找包头的当着序号 /// </summary> int findHeadindex = 0; /// <summary> /// 找包尾 /// </summary> int findFootIndex = 0; /// <summary> /// 拆包方法 /// 之所以先全部放到一个query里是进行快速的接收 /// /// </summary> protected void SplictProtocol() { IsRuning = true; while (IsRuning) { ProcessBytes(); } } /// <summary> /// 处理队列中的数据 /// </summary> /// <param name="canSleep">是否可休眠</param> public bool ProcessBytes(bool canSleep = true) { byte[] arr = null; //开始解析数据 //1.取出数据 lock (receiveByteArrayQueue) { receiveByteArrayQueue.TryDequeue(out arr); } try { if (arr != null) { //调用自定议数据解析. if (SplictProtocolPackeg != null) { var packetBytes = SplictProtocolPackeg(arr); if (packetBytes != null) { lock (ProtocolEntityQueue) { packetBytes.ForEach(o => ProtocolEntityQueue.Enqueue(o)); return true; } } else { return false; } } //锁处理 lock (bytes) { //此协义数据中的协义数据索引 // List<int> ints = new List<int>(); //2.将数据进行包查找 //开始从队列中取数据 for (int k = 0; k < arr.Length; k++) { //队列有数据 byte b = arr[k]; //如果超过最大接收字节数 if (_maxProtocolBinary <= bytes.Count) { bytes.Clear(); } //添加到对像集合 bytes.Add(b); //3.从集合的前面开始取数据.找包头,进行拆包 #region 找包头 //等于包数据 if (packageHead.Length > 0 && b == packageHead[findHeadindex] && !FindPackageHead) { //包头找完 if (findHeadindex == packageHead.Length - 1) { //ints.Add(k); Interlocked.Exchange(ref findHeadindex, 0); if (!FindPackageHead) { FindPackageHead = true; } //这里取一个完整包 byte[] byteFarm = bytes.Take(bytes.Count - packageHead.Length).ToArray(); //如果是有效的数据 if (byteFarm.Length > packageHead.Length) { lock (ProtocolEntityQueue) { ProtocolEntityQueue.Enqueue(byteFarm); } //开始从 bytes 中移除数据 bytes.Clear(); //添加包头 bytes.AddRange(packageHead); } //包头找完则找下一字节 continue; } else { Interlocked.Increment(ref findHeadindex); } } else { Interlocked.Exchange(ref findHeadindex, 0); //findHeadindex = 0; if (!FindPackageHead && packageHead.Length == 0) { FindPackageHead = true; } } #endregion #region 找包尾 if (packageFoot != null && packageFoot.Length > 0 && FindPackageHead) { if (b == packageFoot[findFootIndex]) { //包尾找完 if (findFootIndex == packageFoot.Length - 1) { //删除包尾字节,可能会包含包头字节 //byte[] byteFarm = bytes.Take(bytes.Count - packageFoot.Length).ToArray(); var byteFarm = bytes.ToArray(); //跳过包头字节,包尾字节 //byte[] byteFarm = bytes.Skip(packageHead.Length).Take(bytes.Count - (packageFoot.Length + packageHead.Length)).ToArray(); //如果是有效的数据 if (byteFarm.Length >= packageFoot.Length) { lock (ProtocolEntityQueue) { ProtocolEntityQueue.Enqueue(byteFarm); } //开始从 bytes 中移除数据 bytes.Clear(); } FindPackageHead = false; //包尾找完则找下一字节 continue; } else { System.Threading.Interlocked.Increment(ref findFootIndex); } } else { System.Threading.Interlocked.Exchange(ref findFootIndex, 0); } } #endregion } } } else { if (canSleep) { Thread.Sleep(5); } return false; } } catch (Exception ex) { IsRuning = false; //"处理协议线程出现问题,未处理数据量{0},稍后将自动处理完末处理数据,出错原因:{1}", (receiveByteArrayQueue.Count + ProtocolEntityQueue.Count), ex)); Dispose(); } return true; } #region 解析协议 /// <summary> /// 解析协议 /// </summary> protected void ParsePortocol() { while (IsRuning) { ParsePortocolItem(); } } /// <summary> /// 解析单个协议 /// </summary> /// <param name="canSleep">是否要休眠</param> public bool ParsePortocolItem(bool canSleep = true) { try { //4.重新组成一个byte[] 进行数据解析 //没解析过的数据协议 byte[] sourceByteArray = null; if (ProtocolEntityQueue.Count > 0) { ProtocolEntityQueue.TryDequeue(out sourceByteArray); } //没有数据则进行停止后返回 if (sourceByteArray == null) { if (canSleep) { Thread.Sleep(100); } return false; } //解析协议数据,转议消息还原 var bytearr = sourceByteArray; //数据要大于分包的长度 if (bytearr.Length > packageFoot.Length && bytearr.Length > packageHead.Length) { //检查包头包尾 if (packageHead.Length > 0 && packageFoot.Length > 0) { if (!(bytearr.Length > 4 && bytearr[0] == packageHead[0] && bytearr[bytearr.Length - 1] == packageFoot[packageFoot.Length - 1])) { //"接收到终端TCP验证不通过数据:{0}", bytearr)); return true; } } //接收数据日志,从协议解析器中接收数据 if (RecordLogHandler != null) { RecordLogHandler(bytearr, TargetSocket); } //进行条件判断 if (ParsePackegHandler != null && ProtocolReceivedHandler != null) { var receivebytes = ParsePackegHandler(bytearr, TargetSocket); //这里代码没有完成请后续完成 ProtocolReceivedHandler.Invoke(receivebytes, TargetSocket); } } } catch (Exception ex) { //string.Format("解析协议异常:{0}", ex)); } return true; } #endregion #endregion /// <summary> /// 接收一包数据 /// </summary> public Func<byte[], Socket, Protocol> ParsePackegHandler { get; set; } /// <summary> /// 记录日志 /// </summary> public Action<byte[], Socket> RecordLogHandler { get; set; } public void Dispose(bool can) { if (can) { Dispose(); } } /// <summary> /// 析构方法 /// </summary> public void Dispose() { StopParseProtocol(); } }
相关文章推荐
- HTTP协议
- NS2网络仿真环境的搭建和使用
- c# http get post 用法
- python爬虫 - python requests网络请求简洁之道
- python爬虫 - python requests网络请求简洁之道
- 网络抓包以及进行简单数据分析
- C#异步SOCKET发送帮肋类,支持UDP,TCP
- Idiot的间谍网络
- [Q&A] 初次yum安装httpd 遇到的问题
- 新版XMLHttpRequest支持跨域请求
- POJ1273-Drainage Ditches-网络流-最大流(模板题)
- muduo网络库源码学习————互斥锁
- iOS开发 网络请求——HTTP协议
- 《TCP/IP详解卷1:协议》——第3章 IP:网际协议(转载)
- TCP/UDP相关知识总汇
- Android网络请求,请求参数是中文导致的乱码问题
- linux网络编程中阻塞和非阻塞socket的区别
- httpclient请求方法
- TCP/IP网络常用名词缩写
- 使用HttpURLConnection下载图片