activemq使用JMS发送消息和接收消息
2015-11-11 18:08
411 查看
1.发送消息
2.接收消息
3.测试
package com.activemq;
import org.apache.activemq.ActiveMQConnection;
public class Test
{
public static void main(String[] args)
{
ConsumerTool consumer=new ConsumerTool();
ProducerTool producer=new ProducerTool();
System.out.println(ActiveMQConnection.DEFAULT_BROKER_URL+"------------");
try
{
// 开始监听
producer.produceMessage("Hello, world!");
producer.close();
consumer.consumeMessage();
// 延时500毫秒之后停止接受消息
Thread.sleep(5000);
consumer.close();
}
catch(Exception e)
{
System.out.println(e.getMessage());
e.printStackTrace();
}
}
}
package com.activemq; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 发送消息 * @author suyunlong * */ public class ProducerTool { private String user=ActiveMQConnection.DEFAULT_USER; private String password=ActiveMQConnection.DEFAULT_PASSWORD; private String url=ActiveMQConnection.DEFAULT_BROKER_URL; private String subject="TOOL.DEFAULT"; private Destination destination=null; private Connection connection=null; private Session session=null; private MessageProducer producer=null; // 初始化 private void initialize() throws JMSException, Exception { ActiveMQConnectionFactory connectionFactory=new ActiveMQConnectionFactory( user, password, url); connection=connectionFactory.createConnection(); session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE); destination=session.createQueue(subject); producer=session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } // 发送消息 public void produceMessage(String message) throws JMSException, Exception { initialize(); TextMessage msg=session.createTextMessage(message); connection.start(); System.out.println("Producer:->Sending message: "+message); producer.send(msg); System.out.println("Producer:->Message sent complete!"); } // 关闭连接 public void close() throws JMSException { System.out.println("Producer:->Closing connection"); if(producer != null) { producer.close(); } if(session != null) { session.close(); } if(connection != null) { connection.close(); } } }
2.接收消息
package com.activemq; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.MessageListener; import javax.jms.Message; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 接受消息,我用的是基于消息监听的机制,需要实现MessageListener接口,这个接口有个onMessage方法, * 当接受到消息的时候会自动调用这个函数对消息进行处理。 * 如果想主动的去接受消息,而不用消息监听的话,把consumer.setMessageListener(this)改为 * Message message=consumer.receive(),手动去调用MessageConsumer的receive方法即可。 * @author suyunlong * */ public class ConsumerTool implements MessageListener { private String user=ActiveMQConnection.DEFAULT_USER; private String password=ActiveMQConnection.DEFAULT_PASSWORD; private String url=ActiveMQConnection.DEFAULT_BROKER_URL; private String subject="TOOL.DEFAULT"; private Destination destination=null; private Connection connection=null; private Session session=null; private MessageConsumer consumer=null; // 初始化 private void initialize() throws JMSException, Exception { //连接工厂是用户创建连接的对象,这里使用的是ActiveMQ的ActiveMQConnectionFactory根据url,username和password创建连接工厂。 ActiveMQConnectionFactory connectionFactory=new ActiveMQConnectionFactory( user, password, url); //连接工厂创建一个jms connection connection=connectionFactory.createConnection(); //是生产和消费的一个单线程上下文。会话用于创建消息的生产者,消费者和消息。会话提供了一个事务性的上下文。 session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE); //不支持事务 //目的地是客户用来指定他生产消息的目标还有他消费消息的来源的对象,两种消息传递方式:点对点和发布/订阅 destination=session.createQueue(subject); //会话创建消息的生产者将消息发送到目的地 consumer=session.createConsumer(destination); } // 消费消息 public void consumeMessage() throws JMSException, Exception { initialize(); connection.start(); System.out.println("Consumer:->Begin listening..."); // 开始监听 consumer.setMessageListener(this); // Message message = consumer.receive(); } // 关闭连接 public void close() throws JMSException { System.out.println("Consumer:->Closing connection"); if(consumer != null) { consumer.close(); } if(session != null) { session.close(); } if(connection != null) { connection.close(); } } // 消息处理函数 public void onMessage(Message message) { try { if(message instanceof TextMessage) { TextMessage txtMsg=(TextMessage)message; String msg=txtMsg.getText(); System.out.println("Consumer:->Received: " + msg); } else { System.out.println("Consumer:->Received: " + message); } } catch(JMSException e) { System.out.println(e.getMessage()); e.printStackTrace(); } } }
3.测试
package com.activemq;
import org.apache.activemq.ActiveMQConnection;
public class Test
{
public static void main(String[] args)
{
ConsumerTool consumer=new ConsumerTool();
ProducerTool producer=new ProducerTool();
System.out.println(ActiveMQConnection.DEFAULT_BROKER_URL+"------------");
try
{
// 开始监听
producer.produceMessage("Hello, world!");
producer.close();
consumer.consumeMessage();
// 延时500毫秒之后停止接受消息
Thread.sleep(5000);
consumer.close();
}
catch(Exception e)
{
System.out.println(e.getMessage());
e.printStackTrace();
}
}
}
相关文章推荐
- 解析ActiveMQ的使用说明总结
- ActiveMQ发消息、收消息、持久化,查询队列剩余消息数、出队数的实现
- activemq报EOFExceptionjvm错误
- ActiveMQ 消息服务(一)
- ActiveMQ 消息服务(二)
- ActiveMQ 消息服务(三)
- 基于zookeeper+leveldb搭建activemq集群
- ActiveMQ 实例
- Ubuntu 14.04.1 安装 activemq 5.11.1
- 在Spring中使用ActiveMQ发送邮件
- 多个地市连接MQ,如果较长时间没有消息发送,ActiveMQ的消费端会自动断开连接(topic端)
- 通过Java操作ActiveMQ的代码记录
- JMS和消息驱动Bean(MDB)
- JMS_使用ActiveMQ实现消息的发送和接收
- ActiveMQ 消息游标
- ActiveMQ 生产者流量控制
- ActiveMQ 全排序(Total Ordering)
- ActiveMQ 使用笔记
- ActiveMq 不能正确获取有过期时间的消息的原因
- Spring JMS笔记