通用通信模型的TCP服务端实现完善
2014-10-17 10:34
211 查看
上一篇中的实现没有利用好通信工厂进行整合处理,这一篇中重新对通信工厂进行了处理,增加了应答数据的泛型处理,这样所有的应答数据,更加灵活可靠。
通过对通信端口参数的解耦,避免了通信参数与具体的启动类型的二义性。
增加了输入数据处理消息构造器注入和请求数据注入,修改了异步应答的回调处理。
上干货,主要代码实现:
通信工厂:
通信接口类:
通信接口抽象实现类,还是实现各种调用的业务整合:
具体的通信实现类:
应答消息拦截器接口:
输入消息构造器接口:
请求数据接口:
应答数据接口:
会话构造类:
通过对通信端口参数的解耦,避免了通信参数与具体的启动类型的二义性。
增加了输入数据处理消息构造器注入和请求数据注入,修改了异步应答的回调处理。
上干货,主要代码实现:
通信工厂:
/// <summary> /// 通信工厂构建 /// </summary> /// <typeparam name="TInputMessageFilter">输入消息构造器类型</typeparam> /// <typeparam name="TOutputMessageFilter">应答消息过滤器类型</typeparam> /// <typeparam name="TRequest">请求数据</typeparam> /// <typeparam name="TResponse">应答数据</typeparam> public class CommunicationFactory<TInputMessageFilter, TOutputMessageFilter, TRequest, TResponse> where TInputMessageFilter : BaseInPutMessageFilter<TRequest> where TOutputMessageFilter : BaseOutPutMessageFilter<TRequest, TResponse> where TRequest : IRequestData where TResponse : IResponseData { /// <summary> /// 通信工厂构建 /// </summary> /// <param name="paramerters">通信启动设置参数</param> /// <returns> /// 返回通信对象 /// </returns> public static ICommunication<TRequest, TResponse> Create(BaseCommunicationParamerters paramerters) { ICommunication<TRequest, TResponse> iCommunication = null; if (paramerters is SerialParamertercs) { SerialParamertercs tempParamerters = (SerialParamertercs)paramerters; iCommunication = new SerialPortCommunication<TInputMessageFilter, TOutputMessageFilter, TRequest, TResponse>(tempParamerters.PortName, tempParamerters.BaudRate); } else if (paramerters is TCPClientParamertercs) { TCPClientParamertercs tempParamerters = (TCPClientParamertercs)paramerters; iCommunication = new TCPClientCommunication<TInputMessageFilter, TOutputMessageFilter, TRequest, TResponse>(tempParamerters.Ip, tempParamerters.PortNO); } else if (paramerters is TCPServerParamertercs) { TCPServerParamertercs tempParamerters = (TCPServerParamertercs)paramerters; iCommunication = new TCPServerCommunication<TInputMessageFilter, TOutputMessageFilter, TRequest, TResponse>(tempParamerters.Ip, tempParamerters.PortNO); } else if (paramerters is TCPHostServerParamertercs) { TCPHostServerParamertercs tempParamerters = (TCPHostServerParamertercs)paramerters; iCommunication = new TCPHostSingleServerCommunication<TInputMessageFilter, TOutputMessageFilter, TRequest, TResponse>(tempParamerters.Ip, tempParamerters.PortNO); } else if (paramerters is TCPAsyncServerParamertercs) { TCPAsyncServerParamertercs tempParamerters = (TCPAsyncServerParamertercs)paramerters; iCommunication = new TCPAsyncServerCommunication<TInputMessageFilter, TOutputMessageFilter, TRequest, TResponse>(tempParamerters.Ip, tempParamerters.PortNO); } if (iCommunication != null) { IOutPutMessageFilter<TRequest, TResponse> messageFilter = (IOutPutMessageFilter<TRequest, TResponse>)Activator.CreateInstance<TOutputMessageFilter>(); iCommunication.InitCommunication(paramerters); } return iCommunication; } }
通信接口类:
/// <summary> /// 通信接口 /// </summary> /// <typeparam name="TRequest">请求数据类型</typeparam> /// <typeparam name="TResponse">应答数据类型</typeparam> public interface ICommunication<TRequest,TResponse> where TRequest:IRequestData where TResponse : IResponseData { /// <summary> /// 通信端口初始化 /// </summary> /// <param name="communicationParamertes">通信端口初始化</param> void InitCommunication(BaseCommunicationParamerters communicationParamerters); /// <summary> /// 启动 /// 如果打开失败,会自动尝试关闭掉 /// </summary> void Open(); /// <summary> /// 是否开启 /// </summary> bool IsOpen { get; } /// <summary> /// 请求发送 /// </summary> /// <param name="request">请求发送的数据</param> void Request(TRequest request); /// <summary> /// 异步请求数据,在其他线程上 /// </summary> /// <param name="request">请求发送的数据</param> void ASyncRequest(TRequest request); /// <summary> /// 请求数据,根据会话ID发送, /// 一般用于并发处理场合,根据ID来确认具体的发送会话, /// 用于服务监听应答场合 /// </summary> /// <param name="sessionID">会话ID</param> /// <param name="request">请求发送的数据</param> void Request(Guid sessionID,TRequest request); /// <summary> /// 请求等待应答数据 /// </summary> /// <param name="request">请求数据</param> /// <param name="timeOut">请求超时时间,单位是毫秒</param> /// <returns> /// 返回响应数据 /// </returns> TResponse RequestWait(TRequest request, int timeOut = 500); /// <summary> /// 异步请求等待应答数据 /// 异步线程上消息队列响应 /// </summary> /// <param name="request">异步请求数据</param> void AsyncRequestWait(AsyncRequestWaitHandler<TRequest, TResponse> asyncRequestWait, TRequest requestData); /// <summary> /// 数据应答 /// 异步线程上消息队列响应 /// </summary> event ResponseHandler<TResponse> OnResponseBuffer; /// <summary> /// 异常信息响应 /// 异步线程上消息队列响应 /// </summary> event ExceptionHandler<Exception> OnException; /// <summary> /// 普通信息通知 /// 异步线程上消息队列响应 /// </summary> event ExceptionHandler<string> OnInfo; /// <summary> /// 打开前触发 /// </summary> event ResponseHandler<bool> OnOpening; /// <summary> /// 打开后触发 /// </summary> event ResponseHandler<bool> OnOpened; /// <summary> /// 会话构建后触发 /// </summary> event SessionHandler<TRequest,TResponse> OnSessionAdded; /// <summary> /// 关闭前触发 /// </summary> event ResponseHandler<bool> OnClosing; /// <summary> /// 关闭后触发 /// </summary> event ResponseHandler<bool> OnClosed; /// <summary> /// 关闭 /// </summary> void Close(); }
通信接口抽象实现类,还是实现各种调用的业务整合:
/// <summary> /// 通信基类 /// </summary> /// <typeparam name="TRequest">请求数据类型</typeparam> /// <typeparam name="TResponse">应答数据类型</typeparam> internal abstract class BaseCommunication<TRequest,TResponse> : ICommunication<TRequest,TResponse> where TRequest : IRequestData where TResponse : IResponseData { /// <summary> /// 响应处理消息队列,用于应答排队 /// </summary> private IoCompletionMessageFIFO responseMessageFIFO = null; /// <summary> /// 启动 /// 如果打开失败,会自动尝试关闭掉 /// </summary> public abstract void Open(); /// <summary> /// 是否开启 /// </summary> public bool IsOpen { get { if (responseMessageFIFO == null) { return false; } return responseMessageFIFO.IsOpened; } } /// <summary> /// 关闭 /// </summary> public abstract void Close(); /// <summary> /// 初始化设备通信端口参数 /// </summary> /// <param name="communicationParamertes">设备通信端口初始化参数</param> public abstract void InitCommunication(BaseCommunicationParamerters communicationParamerters); #region 请求写入数据 /// <summary> /// 请求发送的数据 /// </summary> /// <param name="data">请求数据</param> public virtual void Request(TRequest data) { throw new Exception(string.Format("{0}不支持Request的调用", this.GetType().FullName)); } /// <summary> /// 请求数据,根据会话ID发送, /// 一般用于并发处理场合,根据ID来确认具体的发送会话, /// 用于服务监听应答场合 /// </summary> /// <param name="sessionID">会话ID</param> /// <param name="request">请求发送的数据</param> public virtual void Request(Guid sessionID, TRequest request) { throw new Exception(string.Format("{0}不支持Request的调用", this.GetType().FullName)); } /// <summary> /// 异步请求数据,在其他线程上 /// </summary> /// <param name="data"></param> public virtual void ASyncRequest(TRequest data) { throw new Exception(string.Format("{0}不支持ASyncRequest的调用", this.GetType().FullName)); } /// <summary> /// 请求等待应答数据 /// </summary> /// <param name="data">请求数据</param> /// <param name="timeOut">请求超时时间,单位是毫秒</param> /// <returns> /// 返回响应数据 /// </returns> public virtual TResponse RequestWait(TRequest data, int timeOut = 500) { throw new Exception(string.Format("{0}不支持RequestWait的调用", this.GetType().FullName)); } /// <summary> /// 异步请求等待应答数据,一般用于批量请求 /// 在回调中执行处理 /// </summary> /// <param name="data">异步请求数据</param> public virtual void AsyncRequestWait(AsyncRequestWaitHandler<TRequest, TResponse> asyncRequestWait, TRequest requestData) { throw new Exception(string.Format("{0}不支持AsyncRequestWait的调用", this.GetType().FullName)); } #endregion /// <summary> /// 压入数据 /// </summary> /// <param name="obj"></param> protected virtual void Post(object obj) { if (responseMessageFIFO != null && responseMessageFIFO.IsOpened == true) { responseMessageFIFO.Post(obj); } else { if (obj is Exception) { if (OnException != null) { OnException(this, (Exception)obj); } } else if (obj is string) { if (OnInfo != null) { OnInfo(this, (string)obj); } } } } /// <summary> /// 并发式处理 /// </summary> protected virtual void ParallelPost(object obj) { System.Threading.ThreadPool.QueueUserWorkItem(new System.Threading.WaitCallback(responseMessageFIFO_Exceute), obj); } /// <summary> /// 回调处理数据 /// </summary> /// <param name="obj"></param> void responseMessageFIFO_Exceute(object obj) { if (obj is Exception) { if (OnException != null) { OnException(this, (Exception)obj); } } else if (obj is TResponse) { if (OnResponseBuffer != null) { OnResponseBuffer(this, (TResponse)obj); } } else if (obj is string) { if (OnInfo != null) { OnInfo(this, (string)obj); } } } /// <summary> /// 响应消息队列启动 /// </summary> protected virtual void OpenRespnoseMessageFIFO() { if (responseMessageFIFO == null) { responseMessageFIFO = new IoCompletionMessageFIFO(); responseMessageFIFO.Exceute += responseMessageFIFO_Exceute; } if (responseMessageFIFO != null && responseMessageFIFO.IsOpened == false) { responseMessageFIFO.Start(); } } /// <summary> /// 响应消息队列关闭 /// </summary> protected virtual void CloseResponseMessageFIFO() { if (responseMessageFIFO != null && responseMessageFIFO.IsOpened == true) { responseMessageFIFO.Stop(); } } /// <summary> /// 启动前触发 /// </summary> /// <param name="status"></param> protected virtual void RaiseOpening(bool status) { if (OnOpening != null) { OnOpening(this, true); } } /// <summary> /// 启动后触发 /// </summary> /// <param name="status"></param> protected virtual void RaiseOpened(bool status) { if (OnOpened != null) { OnOpened(this, true); } } /// <summary> /// 关闭前触发 /// </summary> /// <param name="status"></param> protected virtual void RaiseClosing(bool status) { if (OnClosing != null) { OnClosing(this, true); } } /// <summary> /// 关闭后触发 /// </summary> /// <param name="status"></param> protected virtual void RaiseClosed(bool status) { if (OnClosed != null) { OnClosed(this, true); } } /// <summary> /// 连接会话建立后触发 /// </summary> /// <param name="session">连接会话</param> /// <returns></returns> protected virtual ConnectSession<TRequest,TResponse> RaiseConnectSession(ConnectSession<TRequest,TResponse> session) { if (OnSessionAdded != null) { OnSessionAdded(session); } return session; } /// <summary> /// 应答数据事件 /// </summary> public event ResponseHandler<TResponse> OnResponseBuffer; /// <summary> /// 应答异常 /// </summary> public event ExceptionHandler<Exception> OnException; /// <summary> /// 应答普通消息 /// </summary> public event ExceptionHandler<string> OnInfo; /// <summary> /// 打开前触发 /// </summary> public event ResponseHandler<bool> OnOpening; /// <summary> /// 打开后触发 /// </summary> public event ResponseHandler<bool> OnOpened; /// <summary> /// 关闭前触发 /// </summary> public event ResponseHandler<bool> OnClosing; /// <summary> /// 关闭后触发 /// </summary> public event ResponseHandler<bool> OnClosed; /// <summary> /// 会话构建后触发 /// </summary> public event SessionHandler<TRequest,TResponse> OnSessionAdded; }
具体的通信实现类:
/// <summary> /// TCP主动监听模式,读取应答交互方式 /// 此模式应答数据统一到一个线程上处理,排队进行数据应答 /// </summary> /// <typeparam name="TOutputMessageFilter">传入的应答消息过滤器类型</typeparam> /// <typeparam name="TResponse">传入的应答数据类型</typeparam> internal class TCPHostSingleServerCommunication<TInputMessageFilter, TOutputMessageFilter, TRequest, TResponse> : BaseCommunication<TRequest, TResponse> where TInputMessageFilter : BaseInPutMessageFilter<TRequest> where TOutputMessageFilter : BaseOutPutMessageFilter<TRequest,TResponse> where TRequest:IRequestData where TResponse : IResponseData { /// <summary> /// TCP连接地址 /// </summary> IPEndPoint end; /// <summary> /// 网络连接的IP地址和端口号 /// </summary> string endPort; /// <summary> /// 已建立的会话 /// </summary> private List<ConnectSession<TRequest, TResponse>> connectSessionList = new List<ConnectSession<TRequest, TResponse>>(); /// <summary> /// 等待中的连接 /// </summary> private List<IAsyncResult> waitAcceptSocketList = new List<IAsyncResult>(); /// <summary> /// 连接处理锁 /// </summary> private object acceptLock = new object(); /// <summary> /// 用于Socket监听的托管队列 /// </summary> private IoCompletionMessageFIFO acceptFIFO = new IoCompletionMessageFIFO(); /// <summary> /// 监听信号量 /// </summary> private ManualResetEvent acceptWaitResetEvent = new ManualResetEvent(false); /// <summary> /// 通信端口初始化参数 /// </summary> protected TCPHostServerParamertercs communicationParamerters; /// <summary> /// 连接模式构造 /// </summary> /// <param name="ip">监听的IP地址</param> /// <param name="port">端口号</param> public TCPHostSingleServerCommunication(string ip, int port) { this.end = new IPEndPoint(IPAddress.Parse(ip), port); this.endPort = string.Format("{0}:{1}", ip, port); acceptFIFO.Exceute += acceptFIFO_Exceute; } /// <summary> /// 监听托管队列处理回调 /// </summary> /// <param name="obj"></param> void acceptFIFO_Exceute(object obj) { if (obj is SocketCommand) { SocketCommand socketCommand = (SocketCommand)obj; if (socketCommand == SocketCommand.Start) { this.StartConnectNetWork(); } } } /// <summary> /// 打开 /// 如果打开失败,会自动尝试关闭掉 /// </summary> public override void Open() { if (communicationParamerters == null) { throw new Exception("设备端口参数未初始化"); } try { RaiseOpening(true); acceptFIFO.Start(); System.Threading.Thread.Sleep(200); if (acceptFIFO != null && acceptFIFO.IsOpened == true) { acceptFIFO.Post(SocketCommand.Start); } this.OpenRespnoseMessageFIFO(); } catch (Exception ex) { Exception portExcp = new Exception(string.Format("网口【{0}】打开失败,失败原因:{1}", endPort, ex.Message)); this.Post(portExcp); throw portExcp; } } /// <summary> /// 通信端口初始化 /// </summary> /// <param name="communicationParamerters"></param> public override void InitCommunication(Trace.CMS.Communication.BaseCommunicationParamerters communicationParamerters) { this.communicationParamerters = (TCPHostServerParamertercs)communicationParamerters; } /// <summary> /// 开始网络监听 /// </summary> private void StartConnectNetWork() { try { acceptWaitResetEvent.Reset(); Socket socket = new Socket(end.AddressFamily, SocketType.Stream, ProtocolType.Tcp); socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.DontLinger, true); socket.Bind(end); socket.Listen(0xFFFF); AsyncCallback acceptCallback = new AsyncCallback(CallbackAccept); lock (acceptLock) { IAsyncResult beginAr = socket.BeginAccept(acceptCallback, socket); waitAcceptSocketList.Add(beginAr); } RaiseOpened(true); acceptWaitResetEvent.WaitOne(); this.StopConnectNetWork(); System.Threading.Thread.Sleep(200); if (socket != null) { socket.Close(); } RaiseClosed(true); } catch (Exception ex) { this.Post(new Exception(string.Format("连接托管异常,异常原因:{0}", ex.Message))); } } /// <summary> /// 监听到一个异步连接 /// 此处用于处理已监听到的连接,并发起新的监听连接,剔除掉死链接 /// </summary> /// <param name="ar"></param> private void CallbackAccept(IAsyncResult ar) { if (ar.IsCompleted == false) { return; } if (IsOpen == true) { try { byte[] inOptionValues = GetOptionValues(); Socket tempSocket = (Socket)ar.AsyncState; Socket endAcceptSocket = tempSocket.EndAccept(ar); ConnectSession<TRequest, TResponse> connectSession = ConnectSession<TRequest, TResponse>.Create<TInputMessageFilter, TOutputMessageFilter>(); connectSession.ReciveSocket = endAcceptSocket; connectSession.ReciveSocket.IOControl(IOControlCode.KeepAliveValues, inOptionValues, null);//IO控制防止死连接 connectSession = RaiseConnectSession(connectSession); connectSession.ReciveSocket.BeginReceive(connectSession.ReciveBuffer, 0, connectSession.ReciveBuffer.Length, SocketFlags.None, new AsyncCallback(OnDataRead), connectSession); lock (acceptLock) { connectSessionList.Add(connectSession); IAsyncResult tempAr = waitAcceptSocketList.Find(delegate(IAsyncResult delAr) { return delAr.AsyncWaitHandle.SafeWaitHandle.Equals(ar.AsyncWaitHandle.SafeWaitHandle); }); if (tempAr != null) { waitAcceptSocketList.Remove(tempAr); } } IPAddress ip = ((System.Net.IPEndPoint)endAcceptSocket.RemoteEndPoint).Address; int remotePort = ((System.Net.IPEndPoint)endAcceptSocket.RemoteEndPoint).Port; this.Post(string.Format("远程客户端:【{0}:{1}】连接成功", ip.ToString(), remotePort)); } catch (Exception ex) { this.Post(new Exception(string.Format("新的全连接添加失败,失败原因:{0}", ex.Message))); } try { AsyncCallback acceptCallback = new AsyncCallback(CallbackAccept); lock (acceptLock) { Socket tempSocket = (Socket)ar.AsyncState; IAsyncResult beginAr = tempSocket.BeginAccept(acceptCallback, tempSocket); waitAcceptSocketList.Add(beginAr); } } catch (Exception ex) { this.Post(new Exception(string.Format("新的半连接添加失败,失败原因:{0}", ex.Message))); } } else { try { Socket tempSocket = (Socket)ar.AsyncState; if (tempSocket == null) { return; } Socket endAcceptSocket = tempSocket.EndAccept(ar); IPAddress ip = null; int remotePort = 0; if (endAcceptSocket != null) { ip = ((System.Net.IPEndPoint)endAcceptSocket.RemoteEndPoint).Address; remotePort = ((System.Net.IPEndPoint)endAcceptSocket.RemoteEndPoint).Port; } if (endAcceptSocket != null) { endAcceptSocket.Close(); } if (ip != null) { this.Post((string.Format("尝试关闭系统时,客户端发起的新的连接【{0}:{1}】强制关闭成功", ip.ToString(), remotePort))); } } catch (Exception ex) { this.Post(string.Format("关闭系统后发起的监听连接关闭失败,失败原因:{0}", ex.Message)); } } } #region 请求写入数据 /// <summary> /// 请求数据,根据会话ID发送, /// 一般用于并发处理场合,根据ID来确认具体的发送会话, /// 用于服务监听应答场合 /// </summary> /// <param name="sessionID">会话ID</param> /// <param name="data">数据</param> public override void Request(Guid sessionID, TRequest data) { ConnectSession<TRequest, TResponse> tempConnectSession; lock (acceptLock) { tempConnectSession = connectSessionList.Find(delegate(ConnectSession<TRequest, TResponse> obj) { return obj.SessionID == sessionID; }); } if (tempConnectSession != null) { this.SendDataEx(tempConnectSession, data); } } /// <summary> /// 发送数据 /// </summary> private void SendDataEx(ConnectSession<TRequest, TResponse> tempConnection, TRequest data) { IInPutMessageFilter<TRequest> inputMessageFilter = tempConnection.InputMessageFilter; byte[] sendBuffer = inputMessageFilter.Process(data); tempConnection.ReciveSocket.BeginSend(sendBuffer, 0, sendBuffer.Length, SocketFlags.None, new AsyncCallback(OnDataSend), tempConnection); } /// <summary> /// 异步发送数据 /// </summary> /// <param name="ar"></param> protected virtual void OnDataSend(IAsyncResult ar) { ConnectSession<TRequest, TResponse> tempSession = ar.AsyncState as ConnectSession<TRequest, TResponse>; SocketError socketError; Int32 bufferSize = tempSession.ReciveSocket.EndSend(ar, out socketError); if (socketError != SocketError.Success) { this.Post(new Exception(SocketErrorInfo.GetErrorMessage(socketError))); } } #endregion #region 应答处理 /// <summary> /// 接收数据 /// </summary> /// <param name="ar"></param> protected virtual void OnDataRead(IAsyncResult ar) { if (IsOpen == false)//关闭连接时不再处理各种可能的数据 { return; } try { ConnectSession<TRequest, TResponse> tempSession = ar.AsyncState as ConnectSession<TRequest, TResponse>; SocketError socketError; Int32 bufferSize = tempSession.ReciveSocket.EndReceive(ar, out socketError); if (socketError != SocketError.Success) { ErrorForceClose(tempSession, socketError); } if (bufferSize == 0) { if (IsOpen == true) { FoceClose(tempSession); } return; } tempSession.MemoryStream.Write(tempSession.ReciveBuffer, 0, bufferSize); if (tempSession.ReciveSocket.Available <= 0) { byte[] buffer = tempSession.MemoryStream.ToArray(); tempSession.MemoryStream.Position = 0; tempSession.MemoryStream.Close(); tempSession.MemoryStream = new MemoryStream(); TResponse resopnseData = tempSession.OutputMessageFilter.Filter(buffer); if (resopnseData != null) { this.ParallelPost(resopnseData); } } tempSession.ReciveSocket.BeginReceive(tempSession.ReciveBuffer, 0, tempSession.ReciveBuffer.Length, SocketFlags.None, new AsyncCallback(OnDataRead), tempSession); } catch (Exception ex) { if (ex is TCPConnectException) { this.Post(string.Format("网口连接发生了一个故障,等待客户端重新建立连接,故障原因:{0}", ex.Message)); } else { this.Post(new TCPConnectException(ex.Message)); } } } #endregion /// <summary> /// 关闭网路连接 /// </summary> public override void Close() { try { RaiseClosing(true); this.CloseResponseMessageFIFO(); acceptWaitResetEvent.Set(); acceptFIFO.Stop(); } catch (Exception ex) { Exception portException = new Exception(string.Format("网口【{0}】关闭失败,失败原因:{1}", endPort, ex.Message)); this.Post(portException); throw portException; } } /// <summary> /// 错误强制关闭连接 /// </summary> /// <param name="tempSession"></param> /// <param name="socketError"></param> private void ErrorForceClose(ConnectSession<TRequest, TResponse> tempSession, SocketError socketError) { IPAddress ip = ((System.Net.IPEndPoint)tempSession.ReciveSocket.RemoteEndPoint).Address; int remotePort = ((System.Net.IPEndPoint)tempSession.ReciveSocket.RemoteEndPoint).Port; try { if (tempSession.ReciveSocket != null && tempSession.ReciveSocket.Connected == true) { SocketAsyncEventArgs e = new SocketAsyncEventArgs(); e.AcceptSocket = tempSession.ReciveSocket; tempSession.ReciveSocket.DisconnectAsync(e);//异常连接直接强制关闭 } lock (acceptLock) { if (connectSessionList.Contains(tempSession)) { connectSessionList.Remove(tempSession); } } } catch { } finally { string errorMessage = SocketErrorInfo.GetErrorMessage(socketError); if (!string.IsNullOrEmpty(errorMessage)) { throw new TCPConnectException(string.Format("端口【{0}】监听的客户端【{1}:{2}】接收数据异常,异常原因:{3}", endPort, ip.ToString(), remotePort, errorMessage)); } else { throw new TCPConnectException(string.Format("端口【{0}】监听的客户端【{1}:{2}】接收数据异常", endPort, ip.ToString(), remotePort)); } } } /// <summary> /// 强制关闭连接 /// </summary> /// <param name="tempSession"></param> protected virtual void FoceClose(ConnectSession<TRequest, TResponse> tempSession) { IPAddress ip = ((System.Net.IPEndPoint)tempSession.ReciveSocket.RemoteEndPoint).Address; int remotePort = ((System.Net.IPEndPoint)tempSession.ReciveSocket.RemoteEndPoint).Port; try { if (tempSession.ReciveSocket != null && tempSession.ReciveSocket.Connected == true) { SocketAsyncEventArgs e = new SocketAsyncEventArgs(); e.AcceptSocket = tempSession.ReciveSocket; tempSession.ReciveSocket.DisconnectAsync(e);//异常连接直接强制关闭 } lock (acceptLock) { if (connectSessionList.Contains(tempSession)) { connectSessionList.Remove(tempSession); } } } catch { } finally { this.Post(string.Format("端口【{0}】监听的客户端【{1}:{2}】主动关闭连接,等待客户端建立新的连接", endPort, ip.ToString(), remotePort)); } } /// <summary> /// 停止网络连接,逐步释放资源 /// </summary> private void StopConnectNetWork() { StringBuilder connectInfo = new StringBuilder(); StringBuilder connectExp = new StringBuilder(); lock (acceptLock) { if (waitAcceptSocketList.Count > 0) { for (int i = 0; i < waitAcceptSocketList.Count; i++) { try { IAsyncResult waitAr = waitAcceptSocketList[i]; waitAr.AsyncWaitHandle.SafeWaitHandle.Close(); connectInfo.AppendLine("半连接关闭成功"); } catch (Exception ex) { connectExp.AppendLine(string.Format("WaitAr释放异常,异常原因:{0}", ex.Message)); } } } if (connectSessionList.Count > 0) { for (int i = 0; i < connectSessionList.Count; i++) { try { Socket tempSocket = connectSessionList[i].ReciveSocket; IPAddress ip = null; int remotePort = 0; if (tempSocket != null) { ip = ((System.Net.IPEndPoint)tempSocket.RemoteEndPoint).Address; remotePort = ((System.Net.IPEndPoint)tempSocket.RemoteEndPoint).Port; } if (tempSocket != null) { tempSocket.Close(); } if (ip != null) { connectInfo.AppendLine(string.Format("全连接对应的远程客户端【{0}:{1}】关闭成功", ip.ToString(), remotePort)); } } catch (Exception ex) { connectExp.AppendLine(string.Format("AcceptSocket释放异常,异常原因:{0}", ex.Message)); } } } if (connectInfo.Length > 0) { this.Post(connectInfo.ToString()); } if (connectExp.Length > 0) { this.Post(new Exception(connectExp.ToString())); } } } /// <summary> /// 获取心跳包参数 /// </summary> /// <returns></returns> private byte[] GetOptionValues() { uint dummy = 0; byte[] inOptionValues = new byte[Marshal.SizeOf(dummy) * 3]; BitConverter.GetBytes((uint)1).CopyTo(inOptionValues, 0); BitConverter.GetBytes((uint)communicationParamerters.HeartRateTimeOut * 100).CopyTo(inOptionValues, Marshal.SizeOf(dummy)); BitConverter.GetBytes((uint)communicationParamerters.HeartRateTimeOut * 100).CopyTo(inOptionValues, Marshal.SizeOf(dummy) * 2); return inOptionValues; } }
应答消息拦截器接口:
/// <summary> /// 消息过滤接口 /// 消息过滤出来后,转化成对应的应答数据 /// </summary> public interface IOutPutMessageFilter<TRequest,TResponse> where TRequest : IRequestData where TResponse : IResponseData { /// <summary> /// 会话ID /// </summary> Guid SessionID { get; } /// <summary> /// 应答数据过滤 /// </summary> /// <returns> /// 返回对应类型的应答数据 /// </returns> TResponse Filter(byte[] buffer); /// <summary> /// 请求命令队列 /// 应答式的请求处理时,可以用于获取对应的请求命令 /// </summary> void RequestCommandQueue(RequestWaitCommand<TRequest,TResponse> command); /// <summary> /// 等待应答数据 /// </summary> /// <returns></returns> TResponse WaitResponseData(RequestWaitCommand<TRequest,TResponse> command); }
输入消息构造器接口:
/// <summary> /// 输入消息构建器接口 /// </summary> public interface IInPutMessageFilter<TRequestData> where TRequestData:IRequestData { /// <summary> /// 会话ID /// </summary> Guid SessionID { get; } /// <summary> /// 处理请求数据到字节流 /// </summary> /// <param name="requestData">请求数据</param> /// <returns> /// 返回请求数据对应的字节流 /// </returns> byte[] Process(TRequestData requestData); }
请求数据接口:
/// <summary> /// 请求数据 /// </summary> public interface IRequestData { /// <summary> /// 请求数据描述 /// </summary> string Describe { get; } /// <summary> /// 是否是异步等待应答数据 /// </summary> bool IsAsyncWaitData { get; } /// <summary> /// 异步请求等待超时 /// </summary> int AsyncRequestWaitTimeOut { get; } /// <summary> /// 异步请求执行失败,是否再次执行 /// </summary> bool AsyncIsExcuteAgain { get; set; } }
应答数据接口:
/// <summary> /// 响应数据接口 /// </summary> public interface IResponseData { /// <summary> /// 响应会话ID /// </summary> Guid SessionID { get; } /// <summary> /// 应答异常 /// </summary> Exception ResponseException { get; set; } }
会话构造类:
/// <summary> /// 连接会话 /// </summary> /// <typeparam name="TRequest">请求数据</typeparam> /// <typeparam name="TResponse">应答数据</typeparam> public class ConnectSession<TRequest,TResponse> where TRequest:IRequestData where TResponse : IResponseData { private ConnectSession() { } /// <summary> /// 构建会话 /// </summary> /// <typeparam name="TInputMessageFilter">请求消息过滤器</typeparam> /// <typeparam name="TOutputMessageFilter">应答消息过滤器</typeparam> /// <returns></returns> public static ConnectSession<TRequest, TResponse> Create<TInputMessageFilter, TOutputMessageFilter>() where TInputMessageFilter : class,IInPutMessageFilter<TRequest> where TOutputMessageFilter : class,IOutPutMessageFilter<TRequest,TResponse> { ConnectSession<TRequest, TResponse> connection = new ConnectSession<TRequest, TResponse>(); connection.sessionID = Guid.NewGuid(); connection.createTime = DateTime.Now; connection.outputMessageFilter = Activator.CreateInstance<TOutputMessageFilter>(); connection.inputMessageFilter = Activator.CreateInstance<TInputMessageFilter>(); return connection; } private Guid sessionID; /// <summary> /// 会话ID /// </summary> public Guid SessionID { get { return sessionID; } } private DateTime createTime; /// <summary> /// 创建时间 /// </summary> public DateTime CreateTime { get { return createTime; } } private IOutPutMessageFilter<TRequest,TResponse> outputMessageFilter; /// <summary> /// 输出消息过滤器 /// </summary> public IOutPutMessageFilter<TRequest,TResponse> OutputMessageFilter { get { return outputMessageFilter; } } private IInPutMessageFilter<TRequest> inputMessageFilter; /// <summary> /// 输入消息过滤器 /// </summary> public IInPutMessageFilter<TRequest> InputMessageFilter { get { return inputMessageFilter; } }
相关文章推荐
- Socket通用TCP通信协议设计及实现(防止粘包,可移植,可靠)
- Java 网络编程中 TCP 通信服务端的多线程模型
- linux网络编程之用socket实现简单客户端和服务端的通信(基于TCP)
- socket编程 -- epoll模型服务端/客户端通信的实现
- 基于 IOCP 的通用异步 Windows Socket TCP 高性能服务端组件的设计与实现
- linux网络编程之用socket实现简单客户端和服务端的通信(基于TCP)
- C语言实现服务端和客户端进行TCP通信实例
- [原][osg][osgEarth]基于qt代码实现:TCP|UDP与飞行模拟软件JSBSim的通信,现实模型飞行!
- 基于 IOCP 的通用异步 Windows Socket TCP 高性能服务端组件的设计与实现
- Socket通用TCP通信协议设计及实现(防止粘包,可移植,可靠)
- socket编程 -- epoll模型服务端/客户端通信的实现
- socket编程 -- epoll模型服务端/客户端通信的实现
- 利用Tcp和socket实现的客户端与服务端的简单通信
- [通信] C# TCP实现多个客户端与服务端 数据 与 文件的传输
- 基于 IOCP 的通用异步 Windows Socket TCP 高性能服务端组件的设计与实现
- 基于 IOCP 的通用异步 Windows Socket TCP 高性能服务端组件的设计与实现
- linux epoll机制对TCP 客户端和服务端的监听C代码通用框架实现
- C语言实现服务端和客户端进行TCP通信实例
- 基于 IOCP 的通用异步 Windows Socket TCP 高性能服务端组件的设计与实现
- 用Java socket (TCP通信模型)实现一个简单的web 服务器