您的位置:首页 > 其它

使用ActiveMQ发送和接受消息(1)

2013-03-07 11:34 295 查看
第一步:搭建ActiveMQ环境

a.安装jdk与tomcat,设置环境变量%JAVVA_HOME% = C:\Program Files\Java\jdk1.7.0_11

b.解压下载得到的apache-activemq-5.7.0-bin.zip,输出路径:E:\ActiveMQ\apache-activemq-5.7.0

下载地址:http://activemq.apache.org/download-archives.html

c.修改E:\ActiveMQ\apache-activemq-5.7.0\conf\activemq.xml

Line129: <storeUsage limit="10 gb"/>

Line132: <tempUsage limit="1 gb"/>

若磁盘空间不够,在安装过程中会得到WARN信息。

d.双击运行apache-activemq-5.7.0\bin\activemq.bat,一切顺利得信息“INFO | Started SelectChannelConnector@0.0.0.0:8161”

打开http://localhost:8161/,效果如下:



第二步:使用C#实现本地和ActiveMQ之间发送/接受消息

a.下载Apache.NMS.ActiveMQ.dll和Apache.NMS.dll

下载地址:http://activemq.apache.org/nms/download.html

Apache.NMS.ActiveMQ-1.5.6-bin\build\net-2.0\release\Apache.NMS.ActiveMQ.dll

Apache.NMS-1.5.1-bin\net-2.0\release\Apache.NMS.dll

b.新建C#项目,添加引用Apache.NMS.ActiveMQ.dll和Apache.NMS.dll

c.编写生产者类,实现向服务器发送消息

//#define TOPIC //使用topic模式
#define QUEUE  // 使用queue模式

using System;
using System.Threading;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Apache.NMS.ActiveMQ.Commands;

//生产者
namespace Producer
{
class Program
{
static void Main(string[] args)
{
try
{
IConnectionFactory pConnFactory = new ConnectionFactory("tcp://localhost:61616");
using (IConnection pConn = pConnFactory.CreateConnection())
{
using (ISession pSession = pConn.CreateSession())
{
#if TOPIC
IDestination pDestination = new ActiveMQTopic("myTopicName");
#endif
#if QUEUE
IDestination pDestination = new ActiveMQQueue("myQueryName");
#endif
IMessageProducer pProducer = pSession.CreateProducer(pDestination);

int i = 0;
while (!Console.KeyAvailable)
{
ITextMessage msg = pProducer.CreateTextMessage();
msg.Text = i.ToString();
msg.Properties.SetInt("ITEM_TYPE", i);  // 为消息添加属性

Console.WriteLine(string.Format("Sending: {0}", i));
pProducer.Send(msg, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue);

Thread.Sleep(2000);
i++;
}
}
}
Console.ReadLine();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
Console.Read();
}
}
}
}


d.编写消费者类,实现从服务器接受消息

//#define TOPIC
#define QUEUE

using System;
using System.Threading;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Apache.NMS.ActiveMQ.Commands;

//消费者
namespace Consumer
{
class Program
{
static void Main(string[] args)
{
try
{
IConnectionFactory pFactory = new ConnectionFactory("tcp://localhost:61616/");
using (IConnection conn = pFactory.CreateConnection())
{
conn.ClientId = "myListenser";
conn.Start();

using (ISession pSession = conn.CreateSession())
{
#if TOPIC
ITopic pTopic = new ActiveMQTopic("myTopicName");
IMessageConsumer pConsumer = pSession.CreateDurableConsumer(pTopic, "myListenser", null, false);
#endif

#if QUEUE
IDestination pDestination = new ActiveMQQueue("myQueryName");
//IMessageConsumer pConsumer = pSession.CreateConsumer(pDestination, "ITEM_TYPE%3=0");
IMessageConsumer pConsumer = pSession.CreateConsumer(pDestination);
#endif
pConsumer.Listener += new MessageListener(consumer_Listener);

Console.ReadLine();
}

conn.Stop();
conn.Close();
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
Console.ReadKey();
}
}

static void consumer_Listener(IMessage message)
{
try
{
ITextMessage msg = message as ITextMessage;
Thread.Sleep(3000);
Console.WriteLine(string.Format("Receive 3s ago: {0}", msg.Text));
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
Console.ReadKey();
}
}
}
}


//#define TOPIC
#define QUEUE

using System;
using System.Threading;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Apache.NMS.ActiveMQ.Commands;

//消费者2
namespace Consumer2
{
class Program
{
static void Main(string[] args)
{
try
{
IConnectionFactory pFactory = new ConnectionFactory("tcp://localhost:61616/");
using (IConnection conn = pFactory.CreateConnection())
{
conn.ClientId = "myListenser2";
conn.Start();

using (ISession pSession = conn.CreateSession())
{
#if TOPIC
ITopic pTopic = new ActiveMQTopic("myTopicName");
IMessageConsumer pConsumer = pSession.CreateDurableConsumer(pTopic, "myListenser2", null, false);
#endif

#if QUEUE
IDestination pDestination = new ActiveMQQueue("myQueryName");
IMessageConsumer pConsumer = pSession.CreateConsumer(pDestination);
#endif

pConsumer.Listener += new MessageListener(consumer_Listener);

Console.ReadLine();
}

conn.Stop();
conn.Close();
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
Console.ReadKey();
}
}

static void consumer_Listener(IMessage message)
{
try
{
ITextMessage msg = message as ITextMessage;
Console.WriteLine(string.Format("Receive: {0}", msg.Text));

Thread.Sleep(3000);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
Console.ReadKey();
}
}
}
}


e.效果如下



参考资料:

ActiveMQ在C#中的应用
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: