您的位置:首页 > 其它

SharpStreaming项目开发纪实:构建基本的服务器及客户端应用(六)——服务器通信部分初步实现

2010-07-21 21:44 1021 查看
本篇文章将介绍服务器通信部分的核心代码实现。

首先从整体来看,服务器在启动时要创建套接字并开始不断地进行侦听,一旦有新的客户端连接,将会创建一个客户端会话实例并通过该会话实例管理自己的套接字,同时采用异步方式来实现数据的接收与发送。在这里需要说明的是,为了使各自的职责更清晰,笔者特别对每一个客户端采用两个套接字,一个套接字专门用于与服务器的普通通信(如各种消息的收发等),另一个套接字专门用于发送或接收流数据。当然,也许这样设计可能会存在一定的缺陷,但姑且先这样去实现,若有可能,待日后有更好的思路时再做进一步改进。

根据上面所述的基本思路,要编码实现服务器的通信部分,需要完成以下工作:

1、新建一个RtspServer类来负责服务器的启动、停止、同时维护客户端会话列表等;

2、新建一个ServerSocket类来创建服务器套接字、侦听连接请求、关闭服务器套接字和客户端套接字等;

3、新建一个ClientSession类来负责处理每一个客户端会话的请求等,该类继承于ClientSessionInfo类(专门用于维护客户端会话基本信息的类);

4、新建一个ClientSocket类来进行异步接收和发送数据;

5、采用事件来处理RtspServer与UI的交互、ServerSocket与RtspServer的交互、ClientSession与RtspServer的交互、ClientSocket与ClientSession的交互等。

6、RtspCommon类用来处理RTSP的请求数据(如解析URL等),Utils为公共使用的工具类,Constants为常量类。

根据上面所述,初步的类关系图如下图所示:



除了主线程之外,使用了线程池来管理侦听和定时检查每一个客户端会话的状态的工作。这两个任务在启动服务器时加入线程池,代码示例如下:
/// <summary>
/// Starts the server.
/// </summary>
/// <returns>success or failed</returns>
public bool StartServer()
{
if (!m_IsServerClosed)
{
return true;
}

if (m_ServerSocket == null)
{
return false;
}

m_IsServerClosed = true;

this.ClearCountValues();

try
{
if (!m_ServerSocket.CreateServerSocket())
{
return false;
}

if (!ThreadPool.QueueUserWorkItem(this.StartServerListening))
{
return false;
}

if (!ThreadPool.QueueUserWorkItem(this.CheckClientSession))
{
return false;
}

m_IsServerClosed = false;

this.OnServerStarted();
}
catch (System.Exception e)
{
this.OnExceptionOccurred(e);
throw;
}

return !m_IsServerClosed;
}


/// <summary>
/// Starts the server listening, this is a loop until the current server is closed.
/// </summary>
/// <param name="state"></param>
private void StartServerListening(object state)
{
m_CheckSocketAcceptResetEvent.Reset();

Socket clientSocket = null;
while (!m_IsServerClosed)
{
if (m_ServerSocket != null)
{
m_ServerSocket.StartListening(clientSocket);
}
}

m_CheckSocketAcceptResetEvent.Set();
}

/// <summary>
/// Checks the rtsp client session.
/// In this method, we do lost of things such as check the client session
/// is time out or not, check the client session is tear down or not.
/// </summary>
/// <param name="state"></param>
private void CheckClientSession(object state)
{
m_CheckClientSessionResetEvent.Reset();

while (!m_IsServerClosed)
{
lock (m_ClientSessionTable)
{
List<int> clientSessionIdList = new List<int>();

foreach (ClientSession session in m_ClientSessionTable.Values)
{
if (m_IsServerClosed)
{
break;
}

if (session.SessionState == ClientSessionState.Teardown)
{
m_ServerSocket.CloseClientSocket(session.Socket);
clientSessionIdList.Add(session.ClientSessionId);
this.OnClientSessionDisconnected(session);
}
else
{
if (session.CheckTimeout(m_MaxSessionTimeout))
{
this.OnClientSessionTimeout(session);
}
}
}

foreach (int id in clientSessionIdList)
{
m_ClientSessionTable.Remove(id);
}

clientSessionIdList.Clear();
}

// Sleep the current thread, and then continue to check the client session.
Thread.Sleep(100);
}

m_CheckClientSessionResetEvent.Set();
}


上述代码中,m_ServerSocket为侦听类的一个对象,m_ServerSocket.CreateServerSocket()则创建了一个基于TCP协议的套接字,通过while循环来进行不断侦听,侦听的代码片段如下:
/// <summary>
/// Starts the listening.
/// </summary>
/// <param name="clientSocket">The client socket(null).</param>
public void StartListening(Socket clientSocket)
{
if (m_Socket == null)
{
this.CreateServerSocket();
}

try
{
if (m_Socket.Poll(m_AcceptListenTimeInterval, SelectMode.SelectRead))
{
clientSocket = m_Socket.Accept();
if (clientSocket != null && clientSocket.Connected)
{
this.OnClientSocketAccepted(clientSocket);
}
else
{
// The client socket is null or it was not connected, so we
// try to close it.
this.CloseClientSocket(clientSocket);
}
}
}
catch (System.Exception e)
{
this.CloseClientSocket(clientSocket);
this.OnExceptionOccurred(e);
throw;
}
}


每当有新的连接进来时,通过OnClientSocketAccepted(Socket socket)事件将socket带回到RtspServer类中,并在RtspServer类中做相应的处理,代码片段如下:
/// <summary>
/// Called when [client socket accepted].
/// </summary>
/// <param name="sender">The sender.</param>
/// <param name="e">The <see cref="Simon.SharpStreamingServer.Core.TSocketEventArgs"/> instance containing the event data.</param>
protected virtual void OnClientSocketAccepted(object sender, TSocketEventArgs e)
{
// Is the current connection count greater than the maximum
// connection count? If so, we close the client socket, or
// else, we add the client session as a new client session.
if (m_CurConnectionCount > m_MaxConnectionCount)
{
if (m_ServerSocket != null)
{
m_ServerSocket.CloseClientSocket(e.ClientSocket);
this.OnClientSessionRejected();
}
}
else
{
this.AddClientSession(e.ClientSocket);
}
}


在上述这个方法中,首先判断连接数,如果当前连接数大于最大允许的连接数,则拒绝连接请求,否则将新增客户端会话(创建新的客户端会话对象)。

再回过头来看CheckClientSession(object state)方法,该方法同样是通过一个while循环来不断地检查每一个客户端会话的状态,如果状态为Teardown的,则关闭其socket并最终将其从会话列表中移除,同时亦会检查每一个客户端会话是否超时(如果超时,则将其状态标记为Teardown)。m_ClientSessionTable用于维护客户端会话列表,为Dictionary<int, ClientSession>类型。

此外,使用ManualResetEvent对象来管理多个线程的同步,这主要是在停止服务器时发挥作用。停止服务器的代码片段如下:
/// <summary>
/// Stops the server.
/// </summary>
/// <returns>success or failed</returns>
public bool StopServer()
{
if (m_IsServerClosed)
{
return true;
}

m_IsServerClosed = true;

// First, waits for two threads.
m_CheckSocketAcceptResetEvent.WaitOne();
m_CheckClientSessionResetEvent.WaitOne();

// And then, closes each client socket.
if (m_ClientSessionTable != null && m_ServerSocket != null)
{
lock (m_ClientSessionTable)
{
foreach (ClientSession session in m_ClientSessionTable.Values)
{
m_ServerSocket.CloseClientSocket(session.Socket);
}
}
}

// Then, closes the server socket.
if (m_ServerSocket != null)
{
m_ServerSocket.CloseServerSocket();
}

// At last, clears the client session table.
if (m_ClientSessionTable != null)
{
lock (m_ClientSessionTable)
{
m_ClientSessionTable.Clear();
}
}

this.OnServerStopped();

return true;
}


每当一个新客户端会话对象被创建后,即开始异步接收数据。代码片段如下:
/// <summary>
/// Adds a new client session
/// </summary>
/// <param name="socket">client socket</param>
private void AddClientSession(Socket socket)
{
Interlocked.Increment(ref m_ClientSessionSeqNo);

ClientSession clientSession = new ClientSession(m_ClientSessionSeqNo, socket);
clientSession.ExceptionOccurred += new EventHandler<TExceptionEventArgs>(this.OnExceptionOccurred);

lock(m_ClientSessionTable)
{
m_ClientSessionTable.Add(m_ClientSessionSeqNo, clientSession);
}

// Starts handling the rtsp client request.
clientSession.ReceiveClientRequest();

this.OnClientSessionConnected(clientSession);
}


ReceiveClientRequest()的主要工作就是调用ClientSocket的ReceiveData()方法开始进行数据的异步接收。每当ClientSocket接收到数据时,通过OnDatagramReceived(byte[] revBuffer, int revBufferSize)事件将数据传回给对应的ClientSession对象,并由ClientSession做相应的处理。
综上所述的是服务器通信部分的核心实现思路及代码片段,鉴于篇幅关系,在这里将不再贴出全部的代码。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐