使用ActiveMQ发送和接受消息(1)
2013-03-07 11:34
295 查看
第一步:搭建ActiveMQ环境
a.安装jdk与tomcat,设置环境变量%JAVVA_HOME% = C:\Program Files\Java\jdk1.7.0_11
b.解压下载得到的apache-activemq-5.7.0-bin.zip,输出路径:E:\ActiveMQ\apache-activemq-5.7.0
下载地址:http://activemq.apache.org/download-archives.html
c.修改E:\ActiveMQ\apache-activemq-5.7.0\conf\activemq.xml
Line129: <storeUsage limit="10 gb"/>
Line132: <tempUsage limit="1 gb"/>
若磁盘空间不够,在安装过程中会得到WARN信息。
d.双击运行apache-activemq-5.7.0\bin\activemq.bat,一切顺利得信息“INFO | Started SelectChannelConnector@0.0.0.0:8161”
打开http://localhost:8161/,效果如下:
第二步:使用C#实现本地和ActiveMQ之间发送/接受消息
a.下载Apache.NMS.ActiveMQ.dll和Apache.NMS.dll
下载地址:http://activemq.apache.org/nms/download.html
Apache.NMS.ActiveMQ-1.5.6-bin\build\net-2.0\release\Apache.NMS.ActiveMQ.dll
Apache.NMS-1.5.1-bin\net-2.0\release\Apache.NMS.dll
b.新建C#项目,添加引用Apache.NMS.ActiveMQ.dll和Apache.NMS.dll
c.编写生产者类,实现向服务器发送消息
d.编写消费者类,实现从服务器接受消息
e.效果如下
参考资料:
ActiveMQ在C#中的应用
a.安装jdk与tomcat,设置环境变量%JAVVA_HOME% = C:\Program Files\Java\jdk1.7.0_11
b.解压下载得到的apache-activemq-5.7.0-bin.zip,输出路径:E:\ActiveMQ\apache-activemq-5.7.0
下载地址:http://activemq.apache.org/download-archives.html
c.修改E:\ActiveMQ\apache-activemq-5.7.0\conf\activemq.xml
Line129: <storeUsage limit="10 gb"/>
Line132: <tempUsage limit="1 gb"/>
若磁盘空间不够,在安装过程中会得到WARN信息。
d.双击运行apache-activemq-5.7.0\bin\activemq.bat,一切顺利得信息“INFO | Started SelectChannelConnector@0.0.0.0:8161”
打开http://localhost:8161/,效果如下:
第二步:使用C#实现本地和ActiveMQ之间发送/接受消息
a.下载Apache.NMS.ActiveMQ.dll和Apache.NMS.dll
下载地址:http://activemq.apache.org/nms/download.html
Apache.NMS.ActiveMQ-1.5.6-bin\build\net-2.0\release\Apache.NMS.ActiveMQ.dll
Apache.NMS-1.5.1-bin\net-2.0\release\Apache.NMS.dll
b.新建C#项目,添加引用Apache.NMS.ActiveMQ.dll和Apache.NMS.dll
c.编写生产者类,实现向服务器发送消息
//#define TOPIC //使用topic模式 #define QUEUE // 使用queue模式 using System; using System.Threading; using Apache.NMS; using Apache.NMS.ActiveMQ; using Apache.NMS.ActiveMQ.Commands; //生产者 namespace Producer { class Program { static void Main(string[] args) { try { IConnectionFactory pConnFactory = new ConnectionFactory("tcp://localhost:61616"); using (IConnection pConn = pConnFactory.CreateConnection()) { using (ISession pSession = pConn.CreateSession()) { #if TOPIC IDestination pDestination = new ActiveMQTopic("myTopicName"); #endif #if QUEUE IDestination pDestination = new ActiveMQQueue("myQueryName"); #endif IMessageProducer pProducer = pSession.CreateProducer(pDestination); int i = 0; while (!Console.KeyAvailable) { ITextMessage msg = pProducer.CreateTextMessage(); msg.Text = i.ToString(); msg.Properties.SetInt("ITEM_TYPE", i); // 为消息添加属性 Console.WriteLine(string.Format("Sending: {0}", i)); pProducer.Send(msg, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue); Thread.Sleep(2000); i++; } } } Console.ReadLine(); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.Read(); } } } }
d.编写消费者类,实现从服务器接受消息
//#define TOPIC #define QUEUE using System; using System.Threading; using Apache.NMS; using Apache.NMS.ActiveMQ; using Apache.NMS.ActiveMQ.Commands; //消费者 namespace Consumer { class Program { static void Main(string[] args) { try { IConnectionFactory pFactory = new ConnectionFactory("tcp://localhost:61616/"); using (IConnection conn = pFactory.CreateConnection()) { conn.ClientId = "myListenser"; conn.Start(); using (ISession pSession = conn.CreateSession()) { #if TOPIC ITopic pTopic = new ActiveMQTopic("myTopicName"); IMessageConsumer pConsumer = pSession.CreateDurableConsumer(pTopic, "myListenser", null, false); #endif #if QUEUE IDestination pDestination = new ActiveMQQueue("myQueryName"); //IMessageConsumer pConsumer = pSession.CreateConsumer(pDestination, "ITEM_TYPE%3=0"); IMessageConsumer pConsumer = pSession.CreateConsumer(pDestination); #endif pConsumer.Listener += new MessageListener(consumer_Listener); Console.ReadLine(); } conn.Stop(); conn.Close(); } } catch (Exception ex) { Console.WriteLine(ex.Message); Console.ReadKey(); } } static void consumer_Listener(IMessage message) { try { ITextMessage msg = message as ITextMessage; Thread.Sleep(3000); Console.WriteLine(string.Format("Receive 3s ago: {0}", msg.Text)); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.ReadKey(); } } } }
//#define TOPIC #define QUEUE using System; using System.Threading; using Apache.NMS; using Apache.NMS.ActiveMQ; using Apache.NMS.ActiveMQ.Commands; //消费者2 namespace Consumer2 { class Program { static void Main(string[] args) { try { IConnectionFactory pFactory = new ConnectionFactory("tcp://localhost:61616/"); using (IConnection conn = pFactory.CreateConnection()) { conn.ClientId = "myListenser2"; conn.Start(); using (ISession pSession = conn.CreateSession()) { #if TOPIC ITopic pTopic = new ActiveMQTopic("myTopicName"); IMessageConsumer pConsumer = pSession.CreateDurableConsumer(pTopic, "myListenser2", null, false); #endif #if QUEUE IDestination pDestination = new ActiveMQQueue("myQueryName"); IMessageConsumer pConsumer = pSession.CreateConsumer(pDestination); #endif pConsumer.Listener += new MessageListener(consumer_Listener); Console.ReadLine(); } conn.Stop(); conn.Close(); } } catch (Exception ex) { Console.WriteLine(ex.Message); Console.ReadKey(); } } static void consumer_Listener(IMessage message) { try { ITextMessage msg = message as ITextMessage; Console.WriteLine(string.Format("Receive: {0}", msg.Text)); Thread.Sleep(3000); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.ReadKey(); } } } }
e.效果如下
参考资料:
ActiveMQ在C#中的应用
相关文章推荐
- ActiveMQ (二) 使用Queue或者Topic发送/接受消息
- ActiveMQ (二) 使用Queue或者Topic发送/接受消息
- ActiveMQ 使用Queue或者Topic发送/接受消息
- 如何使用activemq jms发送和接受消息
- ActiveMQ(二)———使用Quene来发送消息
- spring-jms(activemq实现)使用queue发送消息简单例子
- ActiveMQ使用spring JmsTemplate发送消息(一)
- ActiveMQ发送消息,接受消息。点对点连接
- ActiveMQ教程(消息发送和接受)
- activemq使用JMS发送消息和接收消息
- ActiveMQ使用spring JmsTemplate发送消息(一)
- ActiveMQ使用spring JmsTemplate发送消息(一)
- ActiveMQ使用spring JmsTemplate发送消息(一)
- ActiveMQ(二):使用队列Queue方式发送消息
- weblogic中使用jms发送和接受消息
- rabbitmq学习9:使用spring-amqp发送消息及同步接受消息
- ActiveMQ使用spring JmsTemplate发送消息(一)
- ActiveMQ使用spring JmsTemplate发送消息(一)
- rabbitmq学习10:使用spring-ampq发送消息及异步接受消息
- ActiveMQ(二)———使用Topic来发送消息