您的位置:首页 > 其它

ActiveMQ使用笔记(三)ActiveMQ消息发送与接收

2012-09-24 19:43 459 查看
配置完了持久化之后,我们就可以使用代码来发送和接收ActiveMQ中的消息了,我这里配置的持久化是KahaDB。

需要导入的jar包:



一段发送消息的代码:

Java

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

    publicstatic
voidsend(){
   try
{
           
// 创建一个连接工厂
           String
url=
"tcp://localhost:61616";
           
ActiveMQConnectionFactory connectionFactory=
newActiveMQConnectionFactory(url);
           // 设置用户名和密码,这个用户名和密码在conf目录下的credentials.properties文件中,也可以在activemq.xml中配置
           
connectionFactory.setUserName("system");
           connectionFactory.setPassword("manager");
           
// 创建连接
           Connection
connection=
connectionFactory.createConnection();
           connection.start();
           // 创建Session,参数解释:
           // 第一个参数是否使用事务:当消息发送者向消息提供者(即消息代理)发送消息时,消息发送者等待消息代理的确认,没有回应则抛出异常,消息发送程序负责处理这个错误。
           // 第二个参数消息的确认模式:
           // AUTO_ACKNOWLEDGE : 指定消息提供者在每次收到消息时自动发送确认。消息只向目标发送一次,但传输过程中可能因为错误而丢失消息。
           // CLIENT_ACKNOWLEDGE : 由消息接收者确认收到消息,通过调用消息的acknowledge()方法(会通知消息提供者收到了消息)
           // DUPS_OK_ACKNOWLEDGE : 指定消息提供者在消息接收者没有确认发送时重新发送消息(这种确认模式不在乎接收者收到重复的消息)。
           Session
session=
connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
           // 创建目标,就创建主题也可以创建队列
           Destination
destination=
session.createQueue("test");
           // 创建消息生产者
           MessageProducer
producer=
session.createProducer(destination);
           // 设置持久化,DeliveryMode.PERSISTENT和DeliveryMode.NON_PERSISTENT
           producer.setDeliveryMode(DeliveryMode.PERSISTENT);
           // 创建消息
           String
text=
"Hello ActiveMQ!";
           TextMessage
message=
session.createTextMessage(text);
           // 发送消息到ActiveMQ
           producer.send(message);
           System.out.println("Message
is sent!");
           // 关闭资源
           session.close();
           connection.close();
       }
       
catch (Exceptione)
{
           e.printStackTrace();
       
}
   }

执行了上面的发送方法之后,在ActiveMQ的监视控制可以看到有一个test队列,并且有一条消息,如图:



点击队列名test,然后点击消息ID即可查看消息内容,如图:



如果DeliveryMode没有设置或者设置为NON_PERSISTENT,那么重启MQ之后消息就会丢失。

一段接收消息的代码:

Java

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

publicstatic
voidget(){
   try{
       
String url=
"tcp://localhost:61616";
       ActiveMQConnectionFactory
connectionFactory=
newActiveMQConnectionFactory(url);
       
// 设置用户名和密码,这个用户名和密码在conf目录下的credentials.properties文件中,也可以在activemq.xml中配置
       connectionFactory.setUserName("system");
       
connectionFactory.setPassword("manager");
       // 创建连接
       
Connection connection=
connectionFactory.createConnection();
       connection.start();
       
// 创建Session
       Session
session=
connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
       
// 创建目标,就创建主题也可以创建队列
       Destination
destination=
session.createQueue("test");
       
// 创建消息消费者
       MessageConsumer
consumer=
session.createConsumer(destination);
       
// 接收消息,参数:接收消息的超时时间,为0的话则不超时,receive返回下一个消息,但是超时了或者消费者被关闭,返回null
       Message
message=
consumer.receive(1000);
       
if (messageinstanceof
TextMessage){
           TextMessage
textMessage=
(TextMessage)message;
           String
text=
textMessage.getText();
           System.out.println("Received:
"+ text);
       
} else{
           System.out.println("Received:
"+ message);
       
}
       consumer.close();
       
session.close();
       connection.close();
   
} catch(Exception
e) {
       e.printStackTrace();
   
}
}

执行了上面的接收方法之后,在ActiveMQ的监视控制可以看到test队列的消息已经被消费了,如图:



这里的代码只是测试用,在正式开发中一般与Spring结合使用jmsTemplate来发送消息,现实JMS的MessageListener来监听消息。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息