针对高并发,可扩展的互联网架构,搭建消息队列(一)
2014-12-18 14:47
211 查看
针对高并发,可扩展的互联网架构,搭建消息队列(一)
想开发高并发可扩展的互联网架构,消息队列是不可缺少的,目前主流的消息队列,有windows自带的MSMQ,还有跨平台的强大的ZeroMQ,这里我们就选用ZeroMQ.
ZeroMQ介绍:(也拼写作ØMQ、0MQ或ZMQ)是个非常轻量级的开源消息队列软件。它没有独立的服务器,消息直接从一个应用程序被发送到另一个应用程序。ZeroMQ的学习和应用也非常简单,它只有一个C++编写成的单个库文件libzmq.dll,可以链接到任何应用程序中。如果要在.NET环境中使用,我们需要用到一个C#编写的名为clrzmq.dll包装库。ZeroMQ可以在Windows、OSX和Linux等多种操作系统上运行,C、C++、C#、Java、Python等语言都可以编写ZeroMQ应用程序这使得不同平台上的不同应用程序之间可以相互通讯。
1、环境搭建:
codeproject专题,下载对应的Downloadbinaries-377.6KB,解压缩到你的指定路径。
这里我们就不详细介绍,主要说一下C#封装好的版本,NetMQ,是基于ZeroMQ进行封装的。就不需要下载了,直接nuget上获取:
PM>Install-PackageNetMQ
为什么不直接用ZeroMQ,而使用NetMQ,运行非托管代码的托管应用程序内可能会出现许多想不到的问题,像内存泄漏和奇怪的没有访问错误。而NetMQ使用原生的C#语言,它更容易调试原生C#代码,你可以下载代码,调试你的系统。你可以在github上贡献。
待安装好后,系统会自动添加NetMQ的引用。
可以看到,NetMQ是基于zmq进行开发的,其实就是ZeroMQ了,并且已经为我们封装了各种功能的MQ对象,比如REP/REQ,PUB/SUB(主题式订阅),XPUB/XSUB(非主题订阅),Push/Pull,甚至还有路由模式等,从字面意义上,应该能看出个大概,后面我们一个一个进行测试使用。
先看个简单的demo,初步了解一下:
代码比较简洁的介绍了REP/REQ模式下NetMQ的使用,而且我们可以看到,这个Mq对象是可以在不同的线程间切换使用的,也许你会测试中文,那就先序列化再反序列化吧,因为可能会出现乱码哟。
这里,我先简单根据NetMQ,封装一个Server端和一个Client端,方便后面使用,当然也可以不封装,直接使用:
Server:
这样,使用者就可以根据枚举进行服务端的创建,不用纠结到底用哪一种服务端,并且封装了一些消息的异步事件,方便在开发中使用,可以使用多播委托,针对不同的消息进行不同的处理,我这里使用的while循环,当然,在netmq内部提供了循环器和心跳等,都可以在实际的开发中进行扩展和使用:Poller和NetMQTimer。
Client:
客户端提供了,同步接受消息和异步接收消息两种方式,当启动异步时,就开始循环的读取消息了,当读到消息时抛出事件,并且针对任务等做了资源的释放。并提供创建消息和返回MQ对象等公共方法,可以在开发过程中快速的入手和使用。
先简单说一下response和request模式,就是响应模式,当mq客户端向mq的服务端发送消息时,需要得到及时的响应,并返回给使用者或者是用户,这就需要及时响应的服务端程序,一般的MQ都会有这种功能,也是使用最广泛的,我们就先写一个这种类型的demo,基于我们前面提供的客户端和服务端。
ServerConsole
这里我提供了2种也是最常用的2种服务端方式,并且提供了不同的处理方式。
ClientForm
客户端,我使用winform来处理,并且配合控制台使用,这个用法有些巧妙,不会的同学可以私密我,嘿嘿,先上截图,也是可以同时处理两种方式,给个demo,方便大家在实际项目中使用:
响应式:
订阅者式:
不会做gif,我就逐步说吧,从订阅者模式中我们可以看到,我的打开顺序1-》2->3,先打开1,订阅了t的主题,发了2个消息,内容1和内容2,第一个程序均收到,这时我启动另外一个程序,同样订阅t这个主题,发现消息是通过轮询的方式分别向两个订阅者发送,这样,我们在处理一些比较耗时的业务逻辑,并且不会因为并发出现问题时,就可以使用多个订阅者,分别处理业务从而大大的提高我们的系统性能。
然后打开第三个,订阅y这个主题,这时发送y的主题消息,前2个订阅者就无法收到了,这样我们还可以区分业务,进行多进程的处理,更高的提高可用性和可扩展性,并结合高性能的缓存解决方案处理高并发的业务逻辑。
贴出客户端代码:
好了,大家先消化一下,等系列写完了,我会提交到github上。下一期,会写一些并发情况下的应用。
想开发高并发可扩展的互联网架构,消息队列是不可缺少的,目前主流的消息队列,有windows自带的MSMQ,还有跨平台的强大的ZeroMQ,这里我们就选用ZeroMQ.
ZeroMQ介绍:(也拼写作ØMQ、0MQ或ZMQ)是个非常轻量级的开源消息队列软件。它没有独立的服务器,消息直接从一个应用程序被发送到另一个应用程序。ZeroMQ的学习和应用也非常简单,它只有一个C++编写成的单个库文件libzmq.dll,可以链接到任何应用程序中。如果要在.NET环境中使用,我们需要用到一个C#编写的名为clrzmq.dll包装库。ZeroMQ可以在Windows、OSX和Linux等多种操作系统上运行,C、C++、C#、Java、Python等语言都可以编写ZeroMQ应用程序这使得不同平台上的不同应用程序之间可以相互通讯。
1、环境搭建:
这里我们就不详细介绍,主要说一下C#封装好的版本,NetMQ,是基于ZeroMQ进行封装的。就不需要下载了,直接nuget上获取:
PM>Install-PackageNetMQ
为什么不直接用ZeroMQ,而使用NetMQ,运行非托管代码的托管应用程序内可能会出现许多想不到的问题,像内存泄漏和奇怪的没有访问错误。而NetMQ使用原生的C#语言,它更容易调试原生C#代码,你可以下载代码,调试你的系统。你可以在github上贡献。
待安装好后,系统会自动添加NetMQ的引用。
可以看到,NetMQ是基于zmq进行开发的,其实就是ZeroMQ了,并且已经为我们封装了各种功能的MQ对象,比如REP/REQ,PUB/SUB(主题式订阅),XPUB/XSUB(非主题订阅),Push/Pull,甚至还有路由模式等,从字面意义上,应该能看出个大概,后面我们一个一个进行测试使用。
先看个简单的demo,初步了解一下:
classProgram { staticvoidMain(string[]args) { using(NetMQContextcontext=NetMQContext.Create()) { TaskserverTask=Task.Factory.StartNew(()=>Server(context)); TaskclientTask=Task.Factory.StartNew(()=>Client(context)); Task.WaitAll(serverTask,clientTask); } } staticvoidServer(NetMQContextcontext) { using(NetMQSocketserverSocket=context.CreateResponseSocket()) { serverSocket.Bind("tcp://*:5555"); while(true) { stringmessage=serverSocket.ReceiveString(); Console.WriteLine("Receivemessage{0}",message); serverSocket.Send("World"); if(message=="exit") { break; } } } } staticvoidClient(NetMQContextcontext) { using(NetMQSocketclientSocket=context.CreateRequestSocket()) { clientSocket.Connect("tcp://127.0.0.1:5555"); while(true) { Console.WriteLine("Pleaseenteryourmessage:"); stringmessage=Console.ReadLine(); clientSocket.Send(message); stringanswer=clientSocket.ReceiveString(); Console.WriteLine("Answerfromserver:{0}",answer); if(message=="exit") { break; } } } } }
代码比较简洁的介绍了REP/REQ模式下NetMQ的使用,而且我们可以看到,这个Mq对象是可以在不同的线程间切换使用的,也许你会测试中文,那就先序列化再反序列化吧,因为可能会出现乱码哟。
这里,我先简单根据NetMQ,封装一个Server端和一个Client端,方便后面使用,当然也可以不封装,直接使用:
Server:
///<summary> ///Mq服务端 ///</summary> publicclassOctMQServer:IDisposable { publiceventEventHandler<DataEventArgs<NetMQSocket,NetMQMessage>>OnReceive; protectedvirtualvoidOnOnReceive(DataEventArgs<NetMQSocket,NetMQMessage>e) { EventHandler<DataEventArgs<NetMQSocket,NetMQMessage>>handler=OnReceive; if(handler!=null)handler(this,e); } privateint_port; privateNetMQSocket_serverSocket; privateServerType_type; privateNetMQContext_context; publicvoidInit(intport,ServerTypetype) { _type=type; _port=port; _context=NetMQContext.Create(); CreateServer(); } voidCreateServer() { switch(_type) { caseServerType.Response: _serverSocket=_context.CreateResponseSocket();break; caseServerType.Pub: _serverSocket=_context.CreatePushSocket();break; caseServerType.Router: _serverSocket=_context.CreateRouterSocket();break; caseServerType.Stream: _serverSocket=_context.CreateStreamSocket();break; caseServerType.Push: _serverSocket=_context.CreatePushSocket();break; caseServerType.XPub: _serverSocket=_context.CreateXPublisherSocket();break; default: _serverSocket=_context.CreateResponseSocket();break; } _serverSocket.Bind("tcp://*:"+_port); Task.Factory.StartNew(()=> AsyncRead(_serverSocket),TaskCreationOptions.LongRunning); } privatevoidAsyncRead(NetMQSocketserverSocket) { while(true) { varmsg=serverSocket.ReceiveMessage(); OnOnReceive(newDataEventArgs<NetMQSocket,NetMQMessage>(serverSocket,msg)); } } publicNetMQSocketServer { get{return_serverSocket;} } publicvoidDispose() { _serverSocket.Dispose(); _context.Dispose(); } publicvoidSend(NetMQMessagemsg) { _serverSocket.SendMessage(msg); } publicNetMQMessageCreateMessage() { returnnewNetMQMessage(); } }
这样,使用者就可以根据枚举进行服务端的创建,不用纠结到底用哪一种服务端,并且封装了一些消息的异步事件,方便在开发中使用,可以使用多播委托,针对不同的消息进行不同的处理,我这里使用的while循环,当然,在netmq内部提供了循环器和心跳等,都可以在实际的开发中进行扩展和使用:Poller和NetMQTimer。
Client:
///<summary> ///MQ客户端 ///</summary> publicclassOctMQClient:IDisposable { publiceventEventHandler<DataEventArgs<NetMQSocket,NetMQMessage>>OnReceive; protectedvirtualvoidOnOnReceive(DataEventArgs<NetMQSocket,NetMQMessage>e) { EventHandler<DataEventArgs<NetMQSocket,NetMQMessage>>handler=OnReceive; if(handler!=null)handler(this,e); } privateint_port; privateNetMQSocket_clientSocket; privateClientType_type; privateNetMQContext_context; privatestring_ip; privateTasktask; publicvoidInit(stringip,intport,ClientTypetype) { _type=type; _ip=ip; _port=port; _context=NetMQContext.Create(); CreateClient(); } voidCreateClient() { switch(_type) { caseClientType.Request: _clientSocket=_context.CreateRequestSocket();break; caseClientType.Sub: _clientSocket=_context.CreateSubscriberSocket();break; caseClientType.Dealer: _clientSocket=_context.CreateDealerSocket();break; caseClientType.Stream: _clientSocket=_context.CreateStreamSocket();break; caseClientType.Pull: _clientSocket=_context.CreatePullSocket();break; caseClientType.XSub: _clientSocket=_context.CreateXSubscriberSocket();break; default: _clientSocket=_context.CreateRequestSocket();break; } _clientSocket.Connect("tcp://"+_ip+":"+_port); } publicvoidStartAsyncReceive() { task=Task.Factory.StartNew(()=> AsyncRead(_clientSocket),TaskCreationOptions.LongRunning); } privatevoidAsyncRead(NetMQSocketcSocket) { while(true) { varmsg=cSocket.ReceiveMessage(); OnOnReceive(newDataEventArgs<NetMQSocket,NetMQMessage>(cSocket,msg)); } } publicNetMQSocketClient { get{return_clientSocket;} } publicTGetClient<T>()whereT:NetMQSocket { return(T)_clientSocket; } publicvoidSend(NetMQMessagemsg) { _clientSocket.SendMessage(msg); } publicNetMQMessageCreateMessage() { returnnewNetMQMessage(); } publicNetMQMessageReceiveMessage() { return_clientSocket.ReceiveMessage(); } publicvoidDispose() { _clientSocket.Dispose(); _context.Dispose(); if(task!=null) { task.Dispose(); } } }
客户端提供了,同步接受消息和异步接收消息两种方式,当启动异步时,就开始循环的读取消息了,当读到消息时抛出事件,并且针对任务等做了资源的释放。并提供创建消息和返回MQ对象等公共方法,可以在开发过程中快速的入手和使用。
先简单说一下response和request模式,就是响应模式,当mq客户端向mq的服务端发送消息时,需要得到及时的响应,并返回给使用者或者是用户,这就需要及时响应的服务端程序,一般的MQ都会有这种功能,也是使用最广泛的,我们就先写一个这种类型的demo,基于我们前面提供的客户端和服务端。
ServerConsole
这里我提供了2种也是最常用的2种服务端方式,并且提供了不同的处理方式。
classProgram { privatestaticOctMQServer_server; staticServerType_type; staticvoidMain(string[]args) { AppDomain.CurrentDomain.UnhandledException+=CurrentDomain_UnhandledException; CreateCmd(); } ///<summary> ///创建mq对象 ///</summary> staticvoidCreate() { _server=newOctMQServer(); _server.OnReceive+=server_OnReceive; _server.Init(5555,_type); } ///<summary> ///选择类型 ///</summary> privatestaticvoidCreateCmd() { Csl.Wl(ConsoleColor.Red,"请选择您要创建的MQ服务端类型"); Csl.Wl(ConsoleColor.Yellow,"1.PUB2.REP"); varkey=System.Console.ReadLine(); switch(key) { case"1": { _type=ServerType.Pub; Create(); Cmd(); } break; case"2": _type=ServerType.Response; Create(); Cmd(); break; default: { CreateCmd(); } break; } } staticvoidCurrentDomain_UnhandledException(objectsender,UnhandledExceptionEventArgse) { Csl.WlEx((Exception)e.ExceptionObject); } ///<summary> ///接收消息 ///</summary> privatestaticvoidCmd() { if(_type==ServerType.Pub) { Csl.Wl(ConsoleColor.Red,"请输入您要发个订阅者的信息主题与信息用空格分开"); } else { Csl.Wl(ConsoleColor.Red,"等待消息"); } varcmd=System.Console.ReadLine(); switch(cmd) { case"exit": Csl.Wl("正在关闭应用程序。。。等待最后一个心跳执行完成。。。"); _server.Dispose(); break; default: { varstr=cmd.Split(''); varmsg=_server.CreateMessage(); msg.Append(str[0],Encoding.UTF8); msg.Append(str[1],Encoding.UTF8); _server.Send(msg); Cmd(); break; } return; } } staticvoidserver_OnReceive(objectsender,DataEventArgs<NetMQ.NetMQSocket,NetMQ.NetMQMessage>e) { varmsg=e.Arg2; varserver=e.Arg1; Csl.Wl(msg.Pop().ConvertToString(Encoding.UTF8)); server.Send("你好,您的请求已处理,并返回消息及处理结果",Encoding.UTF8); } }
ClientForm
客户端,我使用winform来处理,并且配合控制台使用,这个用法有些巧妙,不会的同学可以私密我,嘿嘿,先上截图,也是可以同时处理两种方式,给个demo,方便大家在实际项目中使用:
响应式:
订阅者式:
不会做gif,我就逐步说吧,从订阅者模式中我们可以看到,我的打开顺序1-》2->3,先打开1,订阅了t的主题,发了2个消息,内容1和内容2,第一个程序均收到,这时我启动另外一个程序,同样订阅t这个主题,发现消息是通过轮询的方式分别向两个订阅者发送,这样,我们在处理一些比较耗时的业务逻辑,并且不会因为并发出现问题时,就可以使用多个订阅者,分别处理业务从而大大的提高我们的系统性能。
然后打开第三个,订阅y这个主题,这时发送y的主题消息,前2个订阅者就无法收到了,这样我们还可以区分业务,进行多进程的处理,更高的提高可用性和可扩展性,并结合高性能的缓存解决方案处理高并发的业务逻辑。
贴出客户端代码:
publicpartialclassForm1:Form { publicForm1() { InitializeComponent(); Csl.Init(); } ///<summary> ///mq客户端 ///</summary> privateOctMQClient_client; ///<summary> ///订阅者模式连接 ///</summary> ///<paramname="sender"></param> ///<paramname="e"></param> privatevoidbtnConn_Click(objectsender,EventArgse) { _client=newOctMQClient(); _client.OnReceive+=_client_OnReceive; _client.Init(txtip.Text,int.Parse(txtport.Text),ClientType.Sub); varsub=(SubscriberSocket)_client.Client; sub.Subscribe(txtTop.Text); _client.StartAsyncReceive(); } ///<summary> ///订阅者模式受到消息 ///</summary> ///<paramname="sender"></param> ///<paramname="e"></param> void_client_OnReceive(objectsender,Core.Args.DataEventArgs<NetMQ.NetMQSocket,NetMQ.NetMQMessage>e) { varmsg=e.Arg2; Csl.Wl("主题:"+msg.Pop().ConvertToString(Encoding.UTF8)); Csl.Wl("内容:"+msg.Pop().ConvertToString(Encoding.UTF8)); } ///<summary> ///发送响应消息 ///</summary> ///<paramname="sender"></param> ///<paramname="e"></param> privatevoidbtnSend_Click(objectsender,EventArgse) { using(_client=newOctMQClient()) { _client.Init(txtip.Text,int.Parse(txtport.Text),ClientType.Request); varcontent=txtContent.Text; varmsg=_client.CreateMessage(); msg.Append(content,Encoding.UTF8); _client.Send(msg); varrmsg=_client.ReceiveMessage(); varreqStr=rmsg.Pop().ConvertToString(Encoding.UTF8); Csl.Wl(reqStr); } } ///<summary> ///释放资源 ///</summary> ///<paramname="e"></param> protectedoverridevoidOnClosed(EventArgse) { base.OnClosed(e); if(_client!=null) { _client.Dispose(); } } }
好了,大家先消化一下,等系列写完了,我会提交到github上。下一期,会写一些并发情况下的应用。
相关文章推荐
- 互联网通用架构技术----消息队列消息顺序控制
- 互联网业务场景下消息队列架构
- 使用.NET Core搭建分布式音频效果处理服务(五)利用消息队列提升水平扩展灵活性
- 互联网业务场景下消息队列架构
- 互联网业务场景下消息队列架构
- Java互联网架构-高并发分布式消息中间件技术ActiveMQ事务
- 互联网业务场景下消息队列架构
- 柯南君:看大数据时代下的IT架构(7)消息队列之RabbitMQ--案例(routing 起航)
- 对任何架构或应用使用消息队列 的十个理由
- (总结)高并发消息队列常用通知机制
- java后端系统架构之消息队列篇:kafka的实验
- 柯南君:看大数据时代下的IT架构(6)消息队列之RabbitMQ--案例(Publish/Subscribe起航)
- 互联网公司高并发图片存储服务架构设计一
- CentOS下的rabbitMQ集群安装,高并发消息队列中间件(何志雄)
- 适于互联网的SEDA高并发架构
- 适于互联网的SEDA高并发架构
- 柯南君:看大数据时代下的IT架构(9)消息队列之RabbitMQ--案例(RPC起航)
- F5扩展Synthesis架构实现可靠的应用与互联网访问
- 柯南君:看大数据时代下的IT架构(8)消息队列之RabbitMQ--案例(topic起航)
- 柯南君:看大数据时代下的IT架构(3)消息队列之RabbitMQ-安装、配置与监控