您的位置:首页 > 理论基础 > 计算机网络

项目需要,最近在着手开发的一个网站队列处理系统的一个网络模块

2011-01-24 16:06 1016 查看
其中使用到了Socket的非阻塞方法来接受数据,然后交由另一个线程进行命令的解释(只是将命令整条提取出来放入队列或者列表中),客户端测试程序有空再放出吧,测试的话用网络调试软件发ascii字符就可以
[6]abcd12
6为长度
abcd12为命令字符串(提取到队列或列表中的部分)
using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Data;
using System.Collections.Generic;
using System.Text.RegularExpressions;

// State object for reading client data asynchronously
public class StateObject
{
// Client  socket.
public Socket workSocket = null;
// Size of receive buffer.
public const int BufferSize = 1024;
// Receive buffer.
public byte[] buffer = new byte[BufferSize];
// Received data string.
public StringBuilder sb = new StringBuilder();
// Offset
public int counter = 0;
//
public bool isDisconnected = false;
//
public Thread dataParseThread = null;
//
public ManualResetEvent readDone = new ManualResetEvent(false);
//
public ManualResetEvent ProcessDone = new ManualResetEvent(false);
}

public class AsynchronousSocketListener
{
public static int Main(String[] args)
{
//StartListening();

AsyncSocketServer asyncServer = new AsyncSocketServer();
asyncServer.StartListening();

return 0;
}
}

public class AsyncSocketServer
{
// Thread signal.
private ManualResetEvent allDone = new ManualResetEvent(false);

public AsyncSocketServer()
{
}

public void StartListening()
{
// Data buffer for incoming data.
byte[] bytes = new Byte[1024];

// Establish the local endpoint for the socket.
// The DNS name of the computer
// running the listener is "host.contoso.com".
IPHostEntry ipHostInfo = Dns.GetHostEntry("127.0.0.1");
IPAddress ipAddress = ipHostInfo.AddressList[0];
IPEndPoint localEndPoint = new IPEndPoint(ipAddress, 11000);

// Create a TCP/IP socket.
Socket listener = new Socket(AddressFamily.InterNetwork,
SocketType.Stream, ProtocolType.Tcp);

// Bind the socket to the local endpoint and listen for incoming connections.
try
{
listener.Bind(localEndPoint);
listener.Listen(10);

while (true)
{
// Set the event to nonsignaled state.
allDone.Reset();

// Start an asynchronous socket to listen for connections.
Console.WriteLine("Waiting for a connection...");
listener.BeginAccept(
new AsyncCallback(AcceptCallback),
listener);

// Wait until a connection is made before continuing.
allDone.WaitOne();
}

}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}

Console.WriteLine("/nPress ENTER to continue...");
Console.Read();

}

public void AcceptCallback(IAsyncResult ar)
{
// Signal the main thread to continue.
allDone.Set();

// Get the socket that handles the client request.
Socket listener = (Socket)ar.AsyncState;
Socket handler = listener.EndAccept(ar);

// Create the state object.
StateObject state = new StateObject();
state.workSocket = handler;

DataProcessor dateProcessor = new DataProcessor(state);

Thread t = new Thread(dateProcessor.ParseRowData);
state.dataParseThread = t;
t.Start();

IAsyncResult iar = handler.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0,
new AsyncCallback(ReadCallback), state);
}

public void ReadCallback(IAsyncResult ar)
{
String content = String.Empty;

// Retrieve the state object and the handler socket
// from the asynchronous state object.
StateObject state = (StateObject)ar.AsyncState;
Socket handler = state.workSocket;

try
{

// Read data from the client socket.
int bytesRead = handler.EndReceive(ar);

if (bytesRead <= 0)
{
Console.WriteLine("Server can't handle because received data length is zero...");
handler.Shutdown(SocketShutdown.Both);
handler.Close();
return;
}

state.readDone.Reset();

string tmp = System.Text.Encoding.ASCII.GetString(state.buffer, 0, bytesRead);
state.sb.Append(tmp);

Console.WriteLine(state.sb.ToString());

state.ProcessDone.Set();
state.readDone.WaitOne();

IAsyncResult iar = handler.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0,
new AsyncCallback(ReadCallback), state);

}
catch (Exception ex)
{
;
}
}

private void Send(Socket handler, String data)
{
// Convert the string data to byte data using ASCII encoding.
byte[] byteData = Encoding.ASCII.GetBytes(data);

// Begin sending the data to the remote device.
handler.BeginSend(byteData, 0, byteData.Length, 0,
new AsyncCallback(SendCallback), handler);
}

private void SendCallback(IAsyncResult ar)
{
try
{
// Retrieve the socket from the state object.
Socket handler = (Socket)ar.AsyncState;

// Complete sending the data to the remote device.
int bytesSent = handler.EndSend(ar);
Console.WriteLine("Sent {0} bytes to client.", bytesSent);

handler.Shutdown(SocketShutdown.Both);
handler.Close();

}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
}
}

public class DataProcessor
{
private StateObject st = null;
// hold command txt
private static List<string> commandQueue = new List<string>();

public DataProcessor(StateObject st)
{
this.st = st;
}
//
private Mutex addCommandMutex = new Mutex(false, "MUTEX_ADD_COMMAND");

public void ParseRowData()
{
Regex regex = new Regex(@"/[([/d]+)/]");

while (true)
{
st.ProcessDone.WaitOne();
if (st.sb.Length > 0)
{
Console.WriteLine("Start to parse data...");

string tmp = st.sb.ToString();
Match match = regex.Match(tmp);

if (match.Success == false)
Console.WriteLine("Not matched...");

while (match.Success)
{
string strLen = match.Groups[1].Value;
int len = int.Parse(strLen);

string strMatch = match.ToString();
if (tmp.Length - strMatch.Length - match.Index >= len)
{
string strToAdd = tmp.Substring(strMatch.Length + match.Index, len);
Console.WriteLine("Adding command: " + strToAdd);
AddToCommandQueue(strToAdd);
st.sb.Remove(0, match.Index + strMatch.Length + len);
}
else
{
Console.WriteLine("The string is not completed, waiting the reset data...");
break;
}

PrintCommandQueue();

tmp = st.sb.ToString();
match = regex.Match(tmp);
}

Console.WriteLine("End parsing data...");
}
else
{
Console.WriteLine("No data will be processed...");
}

st.readDone.Set();
st.ProcessDone.Reset();
}
}

private void AddToCommandQueue(string str)
{
addCommandMutex.WaitOne();
commandQueue.Add(str);
addCommandMutex.ReleaseMutex();
}

public static void PrintCommandQueue()
{
foreach(string item in commandQueue)
{
Console.WriteLine(item);
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐