您的位置:首页 > 业界新闻

针对高并发,可扩展的互联网架构,搭建消息队列(一)

2014-12-18 14:47 211 查看
针对高并发,可扩展的互联网架构,搭建消息队列(一)

  想开发高并发可扩展的互联网架构,消息队列是不可缺少的,目前主流的消息队列,有windows自带的MSMQ,还有跨平台的强大的ZeroMQ,这里我们就选用ZeroMQ.

  ZeroMQ介绍:(也拼写作ØMQ、0MQ或ZMQ)是个非常轻量级的开源消息队列软件。它没有独立的服务器,消息直接从一个应用程序被发送到另一个应用程序。ZeroMQ的学习和应用也非常简单,它只有一个C++编写成的单个库文件libzmq.dll,可以链接到任何应用程序中。如果要在.NET环境中使用,我们需要用到一个C#编写的名为clrzmq.dll包装库。ZeroMQ可以在Windows、OSX和Linux等多种操作系统上运行,C、C++、C#、Java、Python等语言都可以编写ZeroMQ应用程序这使得不同平台上的不同应用程序之间可以相互通讯。

1、环境搭建:

  codeproject专题,下载对应的Downloadbinaries-377.6KB,解压缩到你的指定路径。

  这里我们就不详细介绍,主要说一下C#封装好的版本,NetMQ,是基于ZeroMQ进行封装的。就不需要下载了,直接nuget上获取:

  PM>Install-PackageNetMQ

  为什么不直接用ZeroMQ,而使用NetMQ,运行非托管代码的托管应用程序内可能会出现许多想不到的问题,像内存泄漏和奇怪的没有访问错误。而NetMQ使用原生的C#语言,它更容易调试原生C#代码,你可以下载代码,调试你的系统。你可以在github上贡献。

  待安装好后,系统会自动添加NetMQ的引用。

  


  可以看到,NetMQ是基于zmq进行开发的,其实就是ZeroMQ了,并且已经为我们封装了各种功能的MQ对象,比如REP/REQ,PUB/SUB(主题式订阅),XPUB/XSUB(非主题订阅),Push/Pull,甚至还有路由模式等,从字面意义上,应该能看出个大概,后面我们一个一个进行测试使用。

  先看个简单的demo,初步了解一下:

  

classProgram
{
staticvoidMain(string[]args)
{
using(NetMQContextcontext=NetMQContext.Create())
{
TaskserverTask=Task.Factory.StartNew(()=>Server(context));
TaskclientTask=Task.Factory.StartNew(()=>Client(context));
Task.WaitAll(serverTask,clientTask);
}
}

staticvoidServer(NetMQContextcontext)
{
using(NetMQSocketserverSocket=context.CreateResponseSocket())
{
serverSocket.Bind("tcp://*:5555");

while(true)
{
stringmessage=serverSocket.ReceiveString();

Console.WriteLine("Receivemessage{0}",message);

serverSocket.Send("World");

if(message=="exit")
{
break;
}
}
}
}

staticvoidClient(NetMQContextcontext)
{
using(NetMQSocketclientSocket=context.CreateRequestSocket())
{
clientSocket.Connect("tcp://127.0.0.1:5555");

while(true)
{
Console.WriteLine("Pleaseenteryourmessage:");
stringmessage=Console.ReadLine();
clientSocket.Send(message);

stringanswer=clientSocket.ReceiveString();

Console.WriteLine("Answerfromserver:{0}",answer);

if(message=="exit")
{
break;
}
}
}
}
}


  代码比较简洁的介绍了REP/REQ模式下NetMQ的使用,而且我们可以看到,这个Mq对象是可以在不同的线程间切换使用的,也许你会测试中文,那就先序列化再反序列化吧,因为可能会出现乱码哟。

  这里,我先简单根据NetMQ,封装一个Server端和一个Client端,方便后面使用,当然也可以不封装,直接使用:

  Server:

  

///<summary>
///Mq服务端
///</summary>
publicclassOctMQServer:IDisposable
{
publiceventEventHandler<DataEventArgs<NetMQSocket,NetMQMessage>>OnReceive;

protectedvirtualvoidOnOnReceive(DataEventArgs<NetMQSocket,NetMQMessage>e)
{
EventHandler<DataEventArgs<NetMQSocket,NetMQMessage>>handler=OnReceive;
if(handler!=null)handler(this,e);
}

privateint_port;
privateNetMQSocket_serverSocket;
privateServerType_type;
privateNetMQContext_context;

publicvoidInit(intport,ServerTypetype)
{
_type=type;
_port=port;
_context=NetMQContext.Create();
CreateServer();
}

voidCreateServer()
{
switch(_type)
{
caseServerType.Response:
_serverSocket=_context.CreateResponseSocket();break;
caseServerType.Pub:
_serverSocket=_context.CreatePushSocket();break;
caseServerType.Router:
_serverSocket=_context.CreateRouterSocket();break;
caseServerType.Stream:
_serverSocket=_context.CreateStreamSocket();break;
caseServerType.Push:
_serverSocket=_context.CreatePushSocket();break;
caseServerType.XPub:
_serverSocket=_context.CreateXPublisherSocket();break;
default:
_serverSocket=_context.CreateResponseSocket();break;
}
_serverSocket.Bind("tcp://*:"+_port);
Task.Factory.StartNew(()=>
AsyncRead(_serverSocket),TaskCreationOptions.LongRunning);
}

privatevoidAsyncRead(NetMQSocketserverSocket)
{
while(true)
{
varmsg=serverSocket.ReceiveMessage();
OnOnReceive(newDataEventArgs<NetMQSocket,NetMQMessage>(serverSocket,msg));
}
}

publicNetMQSocketServer
{
get{return_serverSocket;}
}

publicvoidDispose()
{
_serverSocket.Dispose();
_context.Dispose();
}

publicvoidSend(NetMQMessagemsg)
{
_serverSocket.SendMessage(msg);
}

publicNetMQMessageCreateMessage()
{
returnnewNetMQMessage();
}
}


  这样,使用者就可以根据枚举进行服务端的创建,不用纠结到底用哪一种服务端,并且封装了一些消息的异步事件,方便在开发中使用,可以使用多播委托,针对不同的消息进行不同的处理,我这里使用的while循环,当然,在netmq内部提供了循环器和心跳等,都可以在实际的开发中进行扩展和使用:Poller和NetMQTimer。

  Client:

  

///<summary>
///MQ客户端
///</summary>
publicclassOctMQClient:IDisposable
{
publiceventEventHandler<DataEventArgs<NetMQSocket,NetMQMessage>>OnReceive;

protectedvirtualvoidOnOnReceive(DataEventArgs<NetMQSocket,NetMQMessage>e)
{
EventHandler<DataEventArgs<NetMQSocket,NetMQMessage>>handler=OnReceive;
if(handler!=null)handler(this,e);
}

privateint_port;
privateNetMQSocket_clientSocket;
privateClientType_type;
privateNetMQContext_context;
privatestring_ip;
privateTasktask;
publicvoidInit(stringip,intport,ClientTypetype)
{
_type=type;
_ip=ip;
_port=port;
_context=NetMQContext.Create();
CreateClient();
}

voidCreateClient()
{
switch(_type)
{
caseClientType.Request:
_clientSocket=_context.CreateRequestSocket();break;
caseClientType.Sub:
_clientSocket=_context.CreateSubscriberSocket();break;
caseClientType.Dealer:
_clientSocket=_context.CreateDealerSocket();break;
caseClientType.Stream:
_clientSocket=_context.CreateStreamSocket();break;
caseClientType.Pull:
_clientSocket=_context.CreatePullSocket();break;
caseClientType.XSub:
_clientSocket=_context.CreateXSubscriberSocket();break;
default:
_clientSocket=_context.CreateRequestSocket();break;
}
_clientSocket.Connect("tcp://"+_ip+":"+_port);
}

publicvoidStartAsyncReceive()
{
task=Task.Factory.StartNew(()=>
AsyncRead(_clientSocket),TaskCreationOptions.LongRunning);

}

privatevoidAsyncRead(NetMQSocketcSocket)
{
while(true)
{
varmsg=cSocket.ReceiveMessage();
OnOnReceive(newDataEventArgs<NetMQSocket,NetMQMessage>(cSocket,msg));
}
}

publicNetMQSocketClient
{
get{return_clientSocket;}
}

publicTGetClient<T>()whereT:NetMQSocket
{
return(T)_clientSocket;
}

publicvoidSend(NetMQMessagemsg)
{
_clientSocket.SendMessage(msg);
}

publicNetMQMessageCreateMessage()
{
returnnewNetMQMessage();
}

publicNetMQMessageReceiveMessage()
{
return_clientSocket.ReceiveMessage();
}

publicvoidDispose()
{
_clientSocket.Dispose();
_context.Dispose();
if(task!=null)
{
task.Dispose();
}
}
}


  客户端提供了,同步接受消息和异步接收消息两种方式,当启动异步时,就开始循环的读取消息了,当读到消息时抛出事件,并且针对任务等做了资源的释放。并提供创建消息和返回MQ对象等公共方法,可以在开发过程中快速的入手和使用。

  先简单说一下response和request模式,就是响应模式,当mq客户端向mq的服务端发送消息时,需要得到及时的响应,并返回给使用者或者是用户,这就需要及时响应的服务端程序,一般的MQ都会有这种功能,也是使用最广泛的,我们就先写一个这种类型的demo,基于我们前面提供的客户端和服务端。

  ServerConsole

  这里我提供了2种也是最常用的2种服务端方式,并且提供了不同的处理方式。

classProgram
{
privatestaticOctMQServer_server;
staticServerType_type;
staticvoidMain(string[]args)
{
AppDomain.CurrentDomain.UnhandledException+=CurrentDomain_UnhandledException;
CreateCmd();

}

///<summary>
///创建mq对象
///</summary>
staticvoidCreate()
{
_server=newOctMQServer();
_server.OnReceive+=server_OnReceive;
_server.Init(5555,_type);

}

///<summary>
///选择类型
///</summary>
privatestaticvoidCreateCmd()
{
Csl.Wl(ConsoleColor.Red,"请选择您要创建的MQ服务端类型");
Csl.Wl(ConsoleColor.Yellow,"1.PUB2.REP");
varkey=System.Console.ReadLine();
switch(key)
{
case"1":
{
_type=ServerType.Pub;
Create();
Cmd();
}

break;
case"2":
_type=ServerType.Response;
Create();
Cmd();
break;
default:
{
CreateCmd();

}
break;
}
}

staticvoidCurrentDomain_UnhandledException(objectsender,UnhandledExceptionEventArgse)
{
Csl.WlEx((Exception)e.ExceptionObject);
}

///<summary>
///接收消息
///</summary>
privatestaticvoidCmd()
{
if(_type==ServerType.Pub)
{
Csl.Wl(ConsoleColor.Red,"请输入您要发个订阅者的信息主题与信息用空格分开");
}
else
{
Csl.Wl(ConsoleColor.Red,"等待消息");
}
varcmd=System.Console.ReadLine();

switch(cmd)
{
case"exit":
Csl.Wl("正在关闭应用程序。。。等待最后一个心跳执行完成。。。");
_server.Dispose();
break;

default:
{
varstr=cmd.Split('');
varmsg=_server.CreateMessage();
msg.Append(str[0],Encoding.UTF8);
msg.Append(str[1],Encoding.UTF8);
_server.Send(msg);
Cmd();
break;
}
return;
}
}

staticvoidserver_OnReceive(objectsender,DataEventArgs<NetMQ.NetMQSocket,NetMQ.NetMQMessage>e)
{
varmsg=e.Arg2;
varserver=e.Arg1;
Csl.Wl(msg.Pop().ConvertToString(Encoding.UTF8));
server.Send("你好,您的请求已处理,并返回消息及处理结果",Encoding.UTF8);
}
}


  ClientForm

  客户端,我使用winform来处理,并且配合控制台使用,这个用法有些巧妙,不会的同学可以私密我,嘿嘿,先上截图,也是可以同时处理两种方式,给个demo,方便大家在实际项目中使用:

  响应式:

  


  订阅者式:

  


  不会做gif,我就逐步说吧,从订阅者模式中我们可以看到,我的打开顺序1-》2->3,先打开1,订阅了t的主题,发了2个消息,内容1和内容2,第一个程序均收到,这时我启动另外一个程序,同样订阅t这个主题,发现消息是通过轮询的方式分别向两个订阅者发送,这样,我们在处理一些比较耗时的业务逻辑,并且不会因为并发出现问题时,就可以使用多个订阅者,分别处理业务从而大大的提高我们的系统性能。

  然后打开第三个,订阅y这个主题,这时发送y的主题消息,前2个订阅者就无法收到了,这样我们还可以区分业务,进行多进程的处理,更高的提高可用性和可扩展性,并结合高性能的缓存解决方案处理高并发的业务逻辑。

  贴出客户端代码:

  

publicpartialclassForm1:Form
{
publicForm1()
{
InitializeComponent();
Csl.Init();
}

///<summary>
///mq客户端
///</summary>
privateOctMQClient_client;

///<summary>
///订阅者模式连接
///</summary>
///<paramname="sender"></param>
///<paramname="e"></param>
privatevoidbtnConn_Click(objectsender,EventArgse)
{
_client=newOctMQClient();
_client.OnReceive+=_client_OnReceive;

_client.Init(txtip.Text,int.Parse(txtport.Text),ClientType.Sub);
varsub=(SubscriberSocket)_client.Client;
sub.Subscribe(txtTop.Text);
_client.StartAsyncReceive();

}

///<summary>
///订阅者模式受到消息
///</summary>
///<paramname="sender"></param>
///<paramname="e"></param>
void_client_OnReceive(objectsender,Core.Args.DataEventArgs<NetMQ.NetMQSocket,NetMQ.NetMQMessage>e)
{
varmsg=e.Arg2;
Csl.Wl("主题:"+msg.Pop().ConvertToString(Encoding.UTF8));
Csl.Wl("内容:"+msg.Pop().ConvertToString(Encoding.UTF8));
}

///<summary>
///发送响应消息
///</summary>
///<paramname="sender"></param>
///<paramname="e"></param>
privatevoidbtnSend_Click(objectsender,EventArgse)
{
using(_client=newOctMQClient())
{
_client.Init(txtip.Text,int.Parse(txtport.Text),ClientType.Request);
varcontent=txtContent.Text;
varmsg=_client.CreateMessage();
msg.Append(content,Encoding.UTF8);
_client.Send(msg);
varrmsg=_client.ReceiveMessage();
varreqStr=rmsg.Pop().ConvertToString(Encoding.UTF8);
Csl.Wl(reqStr);
}

}

///<summary>
///释放资源
///</summary>
///<paramname="e"></param>
protectedoverridevoidOnClosed(EventArgse)
{
base.OnClosed(e);
if(_client!=null)
{
_client.Dispose();
}
}
}


  好了,大家先消化一下,等系列写完了,我会提交到github上。下一期,会写一些并发情况下的应用。


                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: