ActiveMQ的queue和topic两种模式的示例演示
2010-05-06 14:33
363 查看
以前从网上收集到的代码,很实用,可惜忘了地址了。在这里感谢一下原作者的辛勤劳动,改日发现链接的话一定补上。
郑重声明:代码源自网络!
queue模式:
ConsumerTest.java
ConsumerTool.java
ProducerTest.java
ProducerTool.java
topic模式:
ConsumerTest.java
ConsumerTool.java
ProducerTest.java
ProducerTool.java
郑重声明:代码源自网络!
queue模式:
ConsumerTest.java
package queue; import javax.jms.JMSException; public class ConsumerTest implements Runnable { static Thread t1 = null; /** * @param args * @throws InterruptedException * @throws InterruptedException * @throws JMSException * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { t1 = new Thread(new ConsumerTest()); t1.start(); while (true) { System.out.println(t1.isAlive()); if (!t1.isAlive()) { t1 = new Thread(new ConsumerTest()); t1.start(); System.out.println("重新启动"); } Thread.sleep(5000); } // 延时500毫秒之后停止接受消息 // Thread.sleep(500); // consumer.close(); } public void run() { try { ConsumerTool consumer = new ConsumerTool(); consumer.consumeMessage(); while (ConsumerTool.isconnection) { } } catch (Exception e) { } } }
ConsumerTool.java
package queue; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.MessageListener; import javax.jms.Message; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class ConsumerTool implements MessageListener, ExceptionListener { private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; private String url = ActiveMQConnection.DEFAULT_BROKER_URL; private String subject = "myqueue"; private Destination destination = null; private Connection connection = null; private Session session = null; private MessageConsumer consumer = null; private ActiveMQConnectionFactory connectionFactory = null; public static Boolean isconnection = false; // 初始化 private void initialize() throws JMSException { connectionFactory = new ActiveMQConnectionFactory(user, password, url); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(subject); consumer = session.createConsumer(destination); } // 消费消息 public void consumeMessage() throws JMSException { initialize(); connection.start(); consumer.setMessageListener(this); connection.setExceptionListener(this); System.out.println("Consumer:->Begin listening..."); isconnection = true; // 开始监听 // Message message = consumer.receive(); } // 关闭连接 public void close() throws JMSException { System.out.println("Consumer:->Closing connection"); if (consumer != null) consumer.close(); if (session != null) session.close(); if (connection != null) connection.close(); } // 消息处理函数 public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage txtMsg = (TextMessage) message; String msg = txtMsg.getText(); System.out.println("Consumer:->Received: " + msg); } else { System.out.println("Consumer:->Received: " + message); } } catch (JMSException e) { e.printStackTrace(); } } public void onException(JMSException arg0) { isconnection = false; } }
ProducerTest.java
package queue; import javax.jms.JMSException; public class ProducerTest { /** * @param args * @throws Exception * @throws JMSException */ public static void main(String[] args) throws JMSException, Exception{ ProducerTool producer = new ProducerTool(); producer.produceMessage("Hello, world!"); producer.close(); } }
ProducerTool.java
package queue; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class ProducerTool { private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; private String url = ActiveMQConnection.DEFAULT_BROKER_URL; private String subject = "myqueue"; private Destination destination = null; private Connection connection = null; private Session session = null; private MessageProducer producer = null; // 初始化 private void initialize() throws JMSException, Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(subject); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } // 发送消息 public void produceMessage(String message) throws JMSException, Exception { initialize(); TextMessage msg = session.createTextMessage(message); connection.start(); System.out.println("Producer:->Sending message: " + message); producer.send(msg); System.out.println("Producer:->Message sent complete!"); } // 关闭连接 public void close() throws JMSException { System.out.println("Producer:->Closing connection"); if (producer != null) producer.close(); if (session != null) session.close(); if (connection != null) connection.close(); } }
topic模式:
ConsumerTest.java
package topic; import javax.jms.JMSException; public class ConsumerTest implements Runnable { static Thread t1 = null; /** * @param args * @throws InterruptedException * @throws InterruptedException * @throws JMSException * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { t1 = new Thread(new ConsumerTest()); t1.start(); while (true) { System.out.println(t1.isAlive()); if (!t1.isAlive()) { t1 = new Thread(new ConsumerTest()); t1.start(); System.out.println("重新启动"); } Thread.sleep(5000); } // 延时500毫秒之后停止接受消息 // Thread.sleep(500); // consumer.close(); } public void run() { try { ConsumerTool consumer = new ConsumerTool(); consumer.consumeMessage(); while (ConsumerTool.isconnection) { } } catch (Exception e) { } } }
ConsumerTool.java
package topic; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.MessageListener; import javax.jms.Message; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class ConsumerTool implements MessageListener, ExceptionListener { private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; private String url = ActiveMQConnection.DEFAULT_BROKER_URL; private String subject = "mytopic"; private Destination destination = null; private Connection connection = null; private Session session = null; private MessageConsumer consumer = null; public static Boolean isconnection = false; // 初始化 private void initialize() throws JMSException, Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createTopic(subject); consumer = session.createConsumer(destination); } // 消费消息 public void consumeMessage() throws JMSException, Exception { initialize(); connection.start(); consumer.setMessageListener(this); connection.setExceptionListener(this); isconnection = true; System.out.println("Consumer:->Begin listening..."); // 开始监听 // Message message = consumer.receive(); } // 关闭连接 public void close() throws JMSException { System.out.println("Consumer:->Closing connection"); if (consumer != null) consumer.close(); if (session != null) session.close(); if (connection != null) connection.close(); } // 消息处理函数 public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage txtMsg = (TextMessage) message; String msg = txtMsg.getText(); System.out.println("Consumer:->Received: " + msg); } else { System.out.println("Consumer:->Received: " + message); } } catch (JMSException e) { e.printStackTrace(); } } public void onException(JMSException arg0) { isconnection = false; } }
ProducerTest.java
package topic; import javax.jms.JMSException; public class ProducerTest { public static void main(String[] args) throws JMSException, Exception { ProducerTool producer = new ProducerTool(); producer.produceMessage("Hello, world!"); producer.close(); } }
ProducerTool.java
package topic; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class ProducerTool { private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; private String url = ActiveMQConnection.DEFAULT_BROKER_URL; private String subject = "mytopic"; private Destination destination = null; private Connection connection = null; private Session session = null; private MessageProducer producer = null; // 初始化 private void initialize() throws JMSException, Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createTopic(subject); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } // 发送消息 public void produceMessage(String message) throws JMSException, Exception { initialize(); TextMessage msg = session.createTextMessage(message); connection.start(); System.out.println("Producer:->Sending message: " + message); producer.send(msg); System.out.println("Producer:->Message sent complete!"); } // 关闭连接 public void close() throws JMSException { System.out.println("Producer:->Closing connection"); if (producer != null) producer.close(); if (session != null) session.close(); if (connection != null) connection.close(); } }
相关文章推荐
- 消息队列:快速上手ActiveMQ消息队列的JMS方式使用(两种模式:Topic和Queue的消息推送和订阅)
- JMS使用ActiveMQ实现Queue和Topic两种模式
- ActiveMQ的queue以及topic两种消息处理机制分析
- ActiveMQ的queue以及topic两种消息处理机制分析
- ActiveMQ学习总结——(二)Queue队列模式示例
- 博文搬家(iteye)--ActiveMQ的queue以及topic两种消息处理机制分析
- 消息队列MQ实践----实现Queue(队列消息)和Topic(主题消息)两种模式
- 消息队列MQ实践----实现Queue(队列消息)和Topic(主题消息)两种模式
- ActiveMQ的queue以及topic两种消息处理机制分析
- ActiveMQ的queue以及topic两种消息处理机制分析
- 利用Spring与ActiveMQ整合发送、接收消息实例(Queue与Topic模式)
- ActiveMQ学习总结——(三)Topic主题模式示例
- ActiveMQ基本配置与示例演示
- Jms两种message传输方式Topic和Queue的比较
- ActiveMq中Queue与Topic的简单分析
- ActiveMQ基本配置与示例演示
- 命令模式详解及示例代码演示
- 【Java消息中间件】Java消息中间件( 第4章 使用activemq - 队列模式、主题模式的消息演示 )
- ActiveMQ 使用Queue或者Topic发送/接受消息
- java 工厂模式的作用,为什么要用工厂模式以及示例演示