activemq 基本使用案例
2016-05-03 15:59
274 查看
activemq有两种消息模式,一种是p2p,一种是pub。其中p2p是1对1,pub是1对多。这里就以这两种的发布和访问做demo。
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * p2p(point-to-point)的消息发送和接收<br> * 注意事项:<br> * 1.点对点,接收者为一人,发送者也为一人<br> * 2.先启动send或者先启动receive都ok * @author zy * */ public class SimpleMessageSendandReceiveApp { public static final String user = "system"; // conf/credentials.properties下的配置 public static final String password = "manager"; public static final String url = "tcp://localhost:61616";// conf/activemq.xml-transportConnectors节点的openwire子节点 public static final String queueName = "test_queue"; // 在localhost:8161/admin中创建的queue public static final String messageBody = "Hello JMS!";// 发送jms的内容,一般是textMessage或者ObjectMessage public static final boolean transacted = false; // 是否使用事务 public static final boolean persistent = false;//提交方式,true/PERSISTENT 持久保留消息,以保证消息不会因为jms provider的失败而丢失 false/NON_PERSISTENT 不要求保持持久 public static void main(String[] args) { sendMessage(); // receiveMessage(); } /** * 发送消息 */ public static void sendMessage(){ Connection connection = null; Session session = null; try { // create the connection ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); connection = connectionFactory.createConnection(); connection.start(); // create the session session = connection.createSession(transacted,Session.AUTO_ACKNOWLEDGE);// Session.AUTO_ACKNOWLEDGE-->consumer.receive()会话确认接收 Destination destination = session.createQueue(queueName); // create the producer MessageProducer producer = session.createProducer(destination); if (persistent) { producer.setDeliveryMode(DeliveryMode.PERSISTENT); } else { producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } // create text message Message message = session.createTextMessage(messageBody); // send the message producer.send(message); System.out.println("Send message: " + ((TextMessage) message).getText()); } catch (Exception e) { e.printStackTrace(); } finally { try { // close session and connection if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (Exception e) { e.printStackTrace(); } } } /** * 接收消息 */ public static void receiveMessage(){ Connection connection = null; Session session = null; try { // create the connection ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); connection = connectionFactory.createConnection(); connection.start(); // create the session session = connection.createSession(transacted,Session.AUTO_ACKNOWLEDGE);// Session.AUTO_ACKNOWLEDGE-->consumer.receive()会话确认接收 Destination destination = session.createQueue(queueName); // create the producer MessageProducer producer = session.createProducer(destination); if (persistent) { producer.setDeliveryMode(DeliveryMode.PERSISTENT); } else { producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } // create the consumer MessageConsumer consumer = session.createConsumer(destination); // blocking till receive the message Message recvMessage = consumer.receive(); System.out.println("Receive message: " + ((TextMessage) recvMessage).getText()); } catch (Exception e) { e.printStackTrace(); } finally { try { // close session and connection if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (Exception e) { e.printStackTrace(); } } } }
import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; /** * 发布模式(pub),一对多<br> * 注意事项:<br> * 1.一对多,接收者为多个<br> * 2.注册接收的时间 >= 消息发送时间,才可接收消息 * @author zy * */ public class TopicMessageSendAndReceiveApp { public static final String user = "system"; // conf/credentials.properties下的配置 public static final String password = "manager"; public static final String url = "tcp://localhost:61616";// conf/activemq.xml-transportConnectors节点的openwire子节点 public static final String queueName = "test_topic"; // 在localhost:8161/admin中创建的queue public static final String messageBody = "Hello JMS!";// 发送jms的内容,一般是textMessage或者ObjectMessage public static final boolean transacted = true; // 是否使用事务 public static final boolean persistent = false;//提交方式,true/PERSISTENT 持久保留消息,以保证消息不会因为jms provider的失败而丢失 false/NON_PERSISTENT 不要求保持持久 public static void main(String[] args) { receive(); receive2(); send(); } public static void send(){ try { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password,url); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(queueName); MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for(int i =0;i<3;i++){ TextMessage message = session.createTextMessage(); message.setText("message_" + System.currentTimeMillis()); producer.send(message); System.out.println("Send message: " + message.getText() + " --->" + i); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } catch (Exception e) { e.printStackTrace(); } } public static void receive(){ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); try { Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(queueName); MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("001 Received message: " + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } catch (JMSException e) { e.printStackTrace(); } } public static void receive2(){ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); try { Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(queueName); MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("002 Received message: " + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } catch (JMSException e) { e.printStackTrace(); } } }
相关文章推荐
- iOS开发 剖析网易新闻标签栏视图切换(addChildViewController属性介绍)
- 记《learning hard C#学习笔记》 书中一个错误
- 234. Palindrome Linked List
- Android Studio 改变 SDK 的源码路径
- Sping 自己学习心得(IOC)
- JavaFX
- java基础-Map
- 使用新的 Android Studio 加速你的开发
- apache主要配置详解
- 讲透Session、Cookie和ServletContext
- Ajax的四步骤使用和Ajax的方法封装回调函数
- 英文
- JAVA生成一次性图片验证码
- java 对象的创建、使用和清除
- js源码封装ajax;
- android stutio创建项目以及运行时遇到的问题总结
- 完美长方体是不可能存在的 第四章
- centos7上安装LAMP
- cocos2d js v3.10 sprite点击事件的区域问题
- JQuery库:(一)简介