activeMQ消息队列的使用总结
2018-01-20 01:04
1031 查看
工作原理:
Created with Raphaël 2.1.2用户注册用户注册ActiveMQ消息队列平台ActiveMQ消息队列平台短信邮件服务平台短信邮件服务平台帮我发个短信/邮件吧!activeMQ:马上就发,别急。快帮我弄完这堆玩意儿好了,你要干的事我帮你干完了
作用:
1. 调用异步化,增加系统并发处理量,提高服务器性能
2. 利用分布式消息队列可以降低系统耦合性
应用场景分析:
电商促销、秒杀相关活动的大量高并发用户请求处理,有效削减高并发
如阿里旅行、去哪儿类似的订票系统,下单支付完成后,并不是立即返回出票成功,而是等铁路、航空公司处理完成才收到短信提示出票成功
短信验证码、邮件激活等实时性要求不高(验证码一般是60/120秒的等待时间,邮件激活一般是24小时)的功能都可以用MQ,大不了超时收不到,让用户点击重新发送
搜索平台、缓存平台
查询数据,建立缓存、索引 ,不从数据库查询,从缓存或者索引库查询
当增加、修改、删除数据时,发送消息给 MQ, 缓存平台、索引平台 从 MQ 获取
到这个信息,更新缓存或者索引
官网: http://activemq.apache.org/
activemq的使用:直接解压 \bin\win64 目录下 启动active.bat文件
访问:http://localhost:8161/ 用户名和密码都是 admin
先来activeMQ的demo
maven坐标导入
编写MQ生产者
编写MQ消费者代码
接下来结合spring完成ActiveMQ
导入avtiveMQ_spring相关jar包,maven坐标如下:
ActiveMQ
Spring开发测试
Spring 整合 activeMQ
最后配置spring mq 的connectionFactory
首先编写配置生产者 producer
以上声明配置在生产者(producer)和消费者(consumer)中相同,接下来来看区别
生产者(producer)分为 Quene和 Topic ,如果是1生产对1消费,则使用Quene,1对多消费 则使用 ,具体配置如下:
Quene
Topic
完成生产者的代码部分
Queue
Topic 编程与Quene 类似
测试用例:
写完生产者,接下来该说消费者
关于工厂配置部分与生产者相同
消费者编码
Queue
Topic
以上为activeMQ的基本使用
Created with Raphaël 2.1.2用户注册用户注册ActiveMQ消息队列平台ActiveMQ消息队列平台短信邮件服务平台短信邮件服务平台帮我发个短信/邮件吧!activeMQ:马上就发,别急。快帮我弄完这堆玩意儿好了,你要干的事我帮你干完了
作用:
1. 调用异步化,增加系统并发处理量,提高服务器性能
2. 利用分布式消息队列可以降低系统耦合性
应用场景分析:
电商促销、秒杀相关活动的大量高并发用户请求处理,有效削减高并发
如阿里旅行、去哪儿类似的订票系统,下单支付完成后,并不是立即返回出票成功,而是等铁路、航空公司处理完成才收到短信提示出票成功
短信验证码、邮件激活等实时性要求不高(验证码一般是60/120秒的等待时间,邮件激活一般是24小时)的功能都可以用MQ,大不了超时收不到,让用户点击重新发送
搜索平台、缓存平台
查询数据,建立缓存、索引 ,不从数据库查询,从缓存或者索引库查询
当增加、修改、删除数据时,发送消息给 MQ, 缓存平台、索引平台 从 MQ 获取
到这个信息,更新缓存或者索引
官网: http://activemq.apache.org/
activemq的使用:直接解压 \bin\win64 目录下 启动active.bat文件
访问:http://localhost:8161/ 用户名和密码都是 admin
先来activeMQ的demo
maven坐标导入
<groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>${activemq.version}</version>
编写MQ生产者
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.junit.Test; public class ActiveMQProducer { @Test public void testProduceMQ() throws Exception { // 连接工厂 // 使用默认用户名、密码、路径 // 路径 tcp://host:61616 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 获取一个连接 Connection connection = connectionFactory.createConnection(); // 建立会话 Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 创建队列或者话题对象 Queue queue = session.createQueue("HelloWorld"); // 创建生产者 或者 消费者 MessageProducer producer = session.createProducer(queue); // 发送消息 for (int i = 0; i < 10; i++) { producer.send(session.createTextMessage("你好,activeMQ:" + i)); } // 提交操作 session.commit(); } }
编写MQ消费者代码
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.junit.Test; public class ActiveMQConsumer { @Test // 直接消费 public void testCosumeMQ() throws Exception { // 连接工厂 // 使用默认用户名、密码、路径 // 路径 tcp://host:61616 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 获取一个连接 Connection connection = connectionFactory.createConnection(); // 开启连接 connection.start(); // 建立会话 // 第一个参数,是否使用事务,如果设置true,操作消息队列后,必须使用 session.commit(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建队列或者话题对象 Queue queue = session.createQueue("HelloWorld"); // 创建消费者 MessageConsumer messageConsumer = session.createConsumer(queue); while (true) { TextMessage message = (TextMessage) messageConsumer.receive(10000); if (message != null) { System.out.println(message.getText()); } else { break; } } } @Test // 使用监听器消费 public void testCosumeMQ2() throws Exception { // 连接工厂 // 使用默认用户名、密码、路径 // 路径 tcp://host:61616 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 获取一个连接 Connection connection = connectionFactory.createConnection(); // 开启连接 connection.start(); // 建立会话 // 第一个参数,是否使用事务,如果设置true,操作消息队列后,必须使用 session.commit(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建队列或者话题对象 Queue queue = session.createQueue("HelloWorld"); // 创建消费者 MessageConsumer messageConsumer = session.createConsumer(queue); messageConsumer.setMessageListener(new MessageListener() { // 每次接收消息,自动调用 onMessage public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println(textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); while (true) { // 不能让junit线程死掉 } } }
接下来结合spring完成ActiveMQ
导入avtiveMQ_spring相关jar包,maven坐标如下:
ActiveMQ
<groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>${activemq.version}</version>
Spring开发测试
<groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring.version}</version> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> <scope>test</scope>
Spring 整合 activeMQ
<groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring.version}</version>
最后配置spring mq 的connectionFactory
首先编写配置生产者 producer
<!-- ActiveMQ 连接工厂 --> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616"/> <property name="userName" value="admin"/> <property name="password" value="admin"/> </bean> <!-- Spring Caching连接工厂 --> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="mqConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> <!-- <constructor-arg ref="amqConnectionFactory" /> --> <!-- Session缓存数量 --> <pro c076 perty name="sessionCacheSize" value="100" /> </bean>
以上声明配置在生产者(producer)和消费者(consumer)中相同,接下来来看区别
生产者(producer)分为 Quene和 Topic ,如果是1生产对1消费,则使用Quene,1对多消费 则使用 ,具体配置如下:
Quene
<!-- Spring JmsTemplate 的消息生产者 start--> <!-- 定义JmsTemplate的Queue类型 --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是自己定义的Spring提供的那个ConnectionFactory对象 --> <constructor-arg ref="mqConnectionFactory" /> <!-- 非pub/sub模型(发布/订阅),即队列模式 --> <property name="pubSubDomain" value="false" /> </bean>
Topic
<!-- 定义JmsTemplate的Topic类型 --> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <constructor-arg ref="mqConnectionFactory" /> <!-- pub/sub模型(发布/订阅) --> <property name="pubSubDomain" value="true" /> </bean>
完成生产者的代码部分
Queue
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Service; @Service public class QueueSender { // 注入jmsTemplate @Autowired @Qualifier("jmsQueueTemplate") private JmsTemplate jmsTemplate; public void send(String queueName, final String message) { jmsTemplate.send(queueName, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); } }
Topic 编程与Quene 类似
@Service public class TopicSender { // 注入jmsTemplate @Autowired @Qualifier("jmsTopicTemplate") private JmsTemplate jmsTemplate; public void send(String topicName, final String message) { jmsTemplate.send(topicName, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); } }
测试用例:
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import cn.itcast.activemq.producer.queue.QueueSender; import cn.itcast.activemq.producer.topic.TopicSender; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:applicationContext-mq.xml") public class ProducerTest { @Autowired private QueueSender queueSender; @Autowired private TopicSender topicSender; @Test public void testSendMessage() { queueSender.send("spring_queue", "byebye,ActiveMQ"); topicSender.send("spring_topic", "再见,消息队列"); } }
写完生产者,接下来该说消费者
关于工厂配置部分与生产者相同
<!-- 定义Queue监听器 --> <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> <!-- 默认注册bean名称,应该是类名首字母小写 --> <jms:listener destination="spring_queue" ref="queueConsumer"/> </jms:listener-container> <!-- 定义Topic监听器 --> <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> <jms:listener destination="spring_topic" ref="topicConsumer"/> </jms:listener-container>
消费者编码
Queue
@Service public class QueueConsumer implements MessageListener { public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out .println("消费者QueueConsumer获得消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
Topic
@Service public class TopicConsumer implements MessageListener { public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out .println("消费者TopicConsumer获取消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
以上为activeMQ的基本使用
相关文章推荐
- msmq消息队列使用及测试总结
- ActiveMQ(二):使用队列Queue方式发送消息
- 使用消息队列的理由总结
- ActiveMQ学习总结(8)——消息队列设计精要
- ActiveMQ学习总结(8)——消息队列设计精要
- 开源消息队列ActiveMQ使用 .net window
- ActiveMQ学习总结(4)——业界消息队列简介
- PHP中使用ActiveMQ实现消息队列
- 进程间通信使用消息队列的试验代码与总结
- Windows平台下的ActiveMQ消息队列的简单使用
- 【ActiveMq】ActiveMQ消息队列的使用及应用
- activemq 支持mysql持久化 消息队列使用
- ActiveMQ消息队列的单机使用及应用(一)
- Redis使用总结(3):实现简单的消息队列
- ActiveMQ消息队列的使用及应用
- 关于消息队列的使用----ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ
- ActiveMQ学习总结(4)——业界消息队列简介
- 关于消息队列的使用----ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ
- ActiveMQ学习总结(4)——业界消息队列简介
- ActiveMQ学习总结(8)——消息队列设计精要