您的位置:首页 > 其它

activemq使用JMS发送消息和接收消息

2015-11-11 18:08 411 查看
1.发送消息

package com.activemq;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
* 发送消息
* @author suyunlong
*
*/
public class ProducerTool
{
private String user=ActiveMQConnection.DEFAULT_USER;
private String password=ActiveMQConnection.DEFAULT_PASSWORD;
private String url=ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject="TOOL.DEFAULT";
private Destination destination=null;
private Connection connection=null;
private Session session=null;
private MessageProducer producer=null;
// 初始化
private void initialize() throws JMSException, Exception
{
ActiveMQConnectionFactory connectionFactory=new ActiveMQConnectionFactory(
user, password, url);
connection=connectionFactory.createConnection();
session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
destination=session.createQueue(subject);
producer=session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
// 发送消息
public void produceMessage(String message) throws JMSException, Exception
{
initialize();
TextMessage msg=session.createTextMessage(message);
connection.start();
System.out.println("Producer:->Sending message: "+message);
producer.send(msg);
System.out.println("Producer:->Message sent complete!");
}
// 关闭连接
public void close() throws JMSException
{
System.out.println("Producer:->Closing connection");
if(producer != null)
{
producer.close();
}
if(session != null)
{
session.close();
}
if(connection != null)
{
connection.close();
}
}
}

2.接收消息

package com.activemq;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.MessageListener;
import javax.jms.Message;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
* 接受消息,我用的是基于消息监听的机制,需要实现MessageListener接口,这个接口有个onMessage方法,
* 当接受到消息的时候会自动调用这个函数对消息进行处理。
* 如果想主动的去接受消息,而不用消息监听的话,把consumer.setMessageListener(this)改为
* Message message=consumer.receive(),手动去调用MessageConsumer的receive方法即可。
* @author suyunlong
*
*/
public class ConsumerTool implements MessageListener
{
private String user=ActiveMQConnection.DEFAULT_USER;
private String password=ActiveMQConnection.DEFAULT_PASSWORD;
private String url=ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject="TOOL.DEFAULT";
private Destination destination=null;
private Connection connection=null;
private Session session=null;
private MessageConsumer consumer=null;
// 初始化
private void initialize() throws JMSException, Exception
{
//连接工厂是用户创建连接的对象,这里使用的是ActiveMQ的ActiveMQConnectionFactory根据url,username和password创建连接工厂。
ActiveMQConnectionFactory connectionFactory=new ActiveMQConnectionFactory(
user, password, url);
//连接工厂创建一个jms connection
connection=connectionFactory.createConnection();
//是生产和消费的一个单线程上下文。会话用于创建消息的生产者,消费者和消息。会话提供了一个事务性的上下文。
session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);  //不支持事务
//目的地是客户用来指定他生产消息的目标还有他消费消息的来源的对象,两种消息传递方式:点对点和发布/订阅
destination=session.createQueue(subject);
//会话创建消息的生产者将消息发送到目的地
consumer=session.createConsumer(destination);
}
// 消费消息
public void consumeMessage() throws JMSException, Exception
{
initialize();
connection.start();
System.out.println("Consumer:->Begin listening...");
// 开始监听
consumer.setMessageListener(this);
// Message message = consumer.receive();
}
// 关闭连接
public void close() throws JMSException
{
System.out.println("Consumer:->Closing connection");
if(consumer != null)
{
consumer.close();
}
if(session != null)
{
session.close();
}
if(connection != null)
{
connection.close();
}
}
// 消息处理函数
public void onMessage(Message message)
{
try
{
if(message instanceof TextMessage)
{
TextMessage txtMsg=(TextMessage)message;
String msg=txtMsg.getText();
System.out.println("Consumer:->Received: " + msg);
}
else
{
System.out.println("Consumer:->Received: " + message);
}
}
catch(JMSException e)
{
System.out.println(e.getMessage());
e.printStackTrace();
}
}
}

3.测试

package com.activemq;

import org.apache.activemq.ActiveMQConnection;

public class Test
{
public static void main(String[] args)
{
ConsumerTool consumer=new ConsumerTool();
ProducerTool producer=new ProducerTool();
System.out.println(ActiveMQConnection.DEFAULT_BROKER_URL+"------------");
try
{
// 开始监听
producer.produceMessage("Hello, world!");
producer.close();
consumer.consumeMessage();
// 延时500毫秒之后停止接受消息
Thread.sleep(5000);
consumer.close();
}
catch(Exception e)
{
System.out.println(e.getMessage());
e.printStackTrace();
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  activemq