您的位置:首页 > 其它

activeMQ消息队列的使用总结

2018-01-20 01:04 1031 查看
工作原理

Created with Raphaël 2.1.2用户注册用户注册ActiveMQ消息队列平台ActiveMQ消息队列平台短信邮件服务平台短信邮件服务平台帮我发个短信/邮件吧!activeMQ:马上就发,别急。快帮我弄完这堆玩意儿好了,你要干的事我帮你干完了



作用:

1. 调用异步化,增加系统并发处理量,提高服务器性能

2. 利用分布式消息队列可以降低系统耦合性

应用场景分析:

电商促销、秒杀相关活动的大量高并发用户请求处理,有效削减高并发

如阿里旅行、去哪儿类似的订票系统,下单支付完成后,并不是立即返回出票成功,而是等铁路、航空公司处理完成才收到短信提示出票成功

短信验证码、邮件激活等实时性要求不高(验证码一般是60/120秒的等待时间,邮件激活一般是24小时)的功能都可以用MQ,大不了超时收不到,让用户点击重新发送

搜索平台、缓存平台

查询数据,建立缓存、索引 ,不从数据库查询,从缓存或者索引库查询

当增加、修改、删除数据时,发送消息给 MQ, 缓存平台、索引平台 从 MQ 获取

到这个信息,更新缓存或者索引

官网: http://activemq.apache.org/

activemq的使用:直接解压 \bin\win64 目录下 启动active.bat文件

访问:http://localhost:8161/ 用户名和密码都是 admin

先来activeMQ的demo

maven坐标导入

<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>${activemq.version}</version>


编写MQ生产者

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;

public class ActiveMQProducer {
@Test
public void testProduceMQ() throws Exception {
// 连接工厂
// 使用默认用户名、密码、路径
// 路径 tcp://host:61616
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
// 获取一个连接
Connection connection = connectionFactory.createConnection();
// 建立会话
Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
// 创建队列或者话题对象
Queue queue = session.createQueue("HelloWorld");
// 创建生产者 或者 消费者
MessageProducer producer = session.createProducer(queue);

// 发送消息
for (int i = 0; i < 10; i++) {
producer.send(session.createTextMessage("你好,activeMQ:" + i));
}
// 提交操作
session.commit();

}
}


编写MQ消费者代码

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;

public class ActiveMQConsumer {

@Test
// 直接消费
public void testCosumeMQ() throws Exception {
// 连接工厂
// 使用默认用户名、密码、路径
// 路径 tcp://host:61616
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
// 获取一个连接
Connection connection = connectionFactory.createConnection();
// 开启连接
connection.start();
// 建立会话
// 第一个参数,是否使用事务,如果设置true,操作消息队列后,必须使用 session.commit();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
// 创建队列或者话题对象
Queue queue = session.createQueue("HelloWorld");
// 创建消费者
MessageConsumer messageConsumer = session.createConsumer(queue);

while (true) {
TextMessage message = (TextMessage) messageConsumer.receive(10000);
if (message != null) {
System.out.println(message.getText());
} else {
break;
}
}
}

@Test
// 使用监听器消费
public void testCosumeMQ2() throws Exception {
// 连接工厂
// 使用默认用户名、密码、路径
// 路径 tcp://host:61616
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
// 获取一个连接
Connection connection = connectionFactory.createConnection();
// 开启连接
connection.start();
// 建立会话
// 第一个参数,是否使用事务,如果设置true,操作消息队列后,必须使用 session.commit();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
// 创建队列或者话题对象
Queue queue = session.createQueue("HelloWorld");
// 创建消费者
MessageConsumer messageConsumer = session.createConsumer(queue);

messageConsumer.setMessageListener(new MessageListener() {
// 每次接收消息,自动调用 onMessage
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});

while (true) {
// 不能让junit线程死掉
}
}
}


接下来结合spring完成ActiveMQ

导入avtiveMQ_spring相关jar包,maven坐标如下:

ActiveMQ

<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>${activemq.version}</version>


Spring开发测试

<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>

<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>

<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>


Spring 整合 activeMQ

<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>


最后配置spring mq 的connectionFactory

首先编写配置生产者 producer

<!-- ActiveMQ 连接工厂 -->
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
<property name="userName" value="admin"/>
<property name="password" value="admin"/>
</bean>
<!-- Spring Caching连接工厂 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="mqConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<!-- <constructor-arg ref="amqConnectionFactory" /> -->
<!-- Session缓存数量 -->
<pro
c076
perty name="sessionCacheSize" value="100" />
</bean>


以上声明配置在生产者(producer)和消费者(consumer)中相同,接下来来看区别

生产者(producer)分为 Quene和 Topic ,如果是1生产对1消费,则使用Quene,1对多消费 则使用 ,具体配置如下:

Quene

<!-- Spring JmsTemplate 的消息生产者 start-->

<!-- 定义JmsTemplate的Queue类型 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是自己定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="mqConnectionFactory" />
<!-- 非pub/sub模型(发布/订阅),即队列模式 -->
<property name="pubSubDomain" value="false" />
</bean>


Topic

<!-- 定义JmsTemplate的Topic类型 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="mqConnectionFactory" />
<!-- pub/sub模型(发布/订阅) -->
<property name="pubSubDomain" value="true" />
</bean>


完成生产者的代码部分

Queue

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

@Service
public class QueueSender {
// 注入jmsTemplate
@Autowired
@Qualifier("jmsQueueTemplate")
private JmsTemplate jmsTemplate;

public void send(String queueName, final String message) {
jmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
}
}


Topic 编程与Quene 类似

@Service
public class TopicSender {
// 注入jmsTemplate
@Autowired
@Qualifier("jmsTopicTemplate")
private JmsTemplate jmsTemplate;

public void send(String topicName, final String message) {
jmsTemplate.send(topicName, new MessageCreator() {

public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
}
}


测试用例:

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import cn.itcast.activemq.producer.queue.QueueSender;
import cn.itcast.activemq.producer.topic.TopicSender;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext-mq.xml")
public class ProducerTest {
@Autowired
private QueueSender queueSender;

@Autowired
private TopicSender topicSender;

@Test
public void testSendMessage() {
queueSender.send("spring_queue", "byebye,ActiveMQ");
topicSender.send("spring_topic", "再见,消息队列");
}
}


写完生产者,接下来该说消费者

关于工厂配置部分与生产者相同

<!-- 定义Queue监听器 -->
<jms:listener-container destination-type="queue" container-type="default"
connection-factory="connectionFactory" acknowledge="auto">
<!-- 默认注册bean名称,应该是类名首字母小写  -->
<jms:listener destination="spring_queue" ref="queueConsumer"/>
</jms:listener-container>

<!-- 定义Topic监听器 -->
<jms:listener-container destination-type="topic" container-type="default"
connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="spring_topic" ref="topicConsumer"/>

</jms:listener-container>


消费者编码

Queue

@Service
public class QueueConsumer implements MessageListener {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out
.println("消费者QueueConsumer获得消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}


Topic

@Service
public class TopicConsumer implements MessageListener {

public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out
.println("消费者TopicConsumer获取消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}


以上为activeMQ的基本使用
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  activemq 并发