您的位置:首页 > 编程语言 > Java开发

SpringBoot整合ActiveMq

2018-03-10 12:51 651 查看
1、先下载activemq安装
从ActiveMq官方上下载ActiveMq服务
下载地址:http://activemq.apache.org/download.html

我当前下载的是版本是5.15.3 官方备注:当前最新的稳定版本。
下载下来解压后进到window相对应的版本的bin目录下执行activemq.bat
2、创建SpringBoot项目
pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.8.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.40</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
<!-- optional=true,依赖不会传递,该项目依赖devtools;之后依赖myboot项目的项目如果想要使用devtools,需要重新引入 -->
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>生产者:@Service("userProducer")
public class UserProducer {

@Resource
private JmsMessagingTemplate jmsMessagingTemplate;

@Resource
private Queue userQueue;
@Resource
private Queue queue;
@Resource
private Topic topic;

public void sendData(Serializable obj) {
this.jmsMessagingTemplate.convertAndSend(userQueue,obj);
}
public void sendMessage(String message) {
this.jmsMessagingTemplate.convertAndSend(queue,message);
}

public void sendTopicMessage(String message) {
this.jmsMessagingTemplate.convertAndSend(topic,message);
}
}activemq配置@Configuration
public class ActiveMqConfig {

@Bean
public Queue userQueue() {
return new ActiveMQQueue("userMqQueue");
}

@Bean
public ActiveMQQueue queue() {
return new ActiveMQQueue("weisg.queue");
}

@Bean
public Topic topic() {
return new ActiveMQTopic("weisg.topic");
}

// topic模式的ListenerContainer
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}

// queue模式的ListenerContainer
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
}以上的配置为支持同时发送和接收queue/topic

消费者@Service
public class UserConsumer {
@JmsListener(destination = "userMqQueue",containerFactory = "jmsListenerContainerQueue")
public void receive(ObjectMessage message) throws JMSException {
System.out.println("----------UserConsumer-----------"+message);
System.out.println("----------message-----------"+message.getObject());
User user = new User();

BeanUtils.copyProperties(message.getObject(), user);

System.out.println("user--------------"+user);
System.out.println("user数据数据完成...");
}

@JmsListener(destination = "weisg.topic",containerFactory = "jmsListenerContainerTopic")
public void receiveTopicMessage(String message) throws JMSException {
System.out.println("receiveTopicMessage--------------"+message);
}

@JmsListener(destination = "weisg.topic",containerFactory = "jmsListenerContainerTopic")
public void receiveTopicMessage2(String message) throws JMSException {
System.out.println("receiveTopicMessage2--------------"+message);
}
}请求方法@RequestMapping(value="/api/addUser",method = RequestMethod.POST)
public Map<String, Object> beetl(@RequestParam Map<String, Object> params){
Map<String, Object> retMap = new HashMap<String, Object>();
User user = new User();
user.setUserId((String)params.get("userId"));
user.setUserName((String)params.get("userName"));
user.setMobile((String)params.get("mobile"));
userProducer.sendData((Serializable)user);
userProducer.sendMessage(user.getUserId());
for (int i = 0; i < 10; i++) {
userProducer.sendTopicMessage((String)params.get("userId")+i);
}
retMap.put("code", "200");
retMap.put("msg", "success!");

return retMap;
}配置文件:spring.activemq.broker-url=tcp://127.0.0.1:61616
#如果此处设置为true,需要加activemq-pool的依赖包,否则会自动配置失败,报JmsMessagingTemplate注入失败
spring.activemq.pool.enabled=true
spring.activemq.user=admin
# 密码
spring.activemq.password=admin

# 在考虑结束之前等待的时间
spring.activemq.close-timeout=150
# 默认代理URL是否应该在内存中。如果指定了显式代理,则忽略此值。
spring.activemq.in-memory=false
# 是否在回滚回滚消息之前停止消息传递。这意味着当启用此命令时,消息顺序不会被保留。
spring.activemq.non-blocking-redelivery=false

# 等待消息发送响应的时间。设置为0等待永远。
spring.activemq.send-timeout=0

#发布模式,为true时是topic模式,为false是queue模式
spring.jms.pub-sub-domain=true

# 是否信任所有包
#注意:对象传输需要开启包白名单,否则会报错
spring.activemq.packages.trust-all=true
# 要信任的特定包的逗号分隔列表(当不信任所有包时)

#spring.activemq.packages.trusted=
# 当连接请求和池满时是否阻塞。设置false会抛“JMSException异常”。
#spring.activemq.pool.block-if-full=true
# 如果池仍然满,则在抛出异常前阻塞时间。
#spring.activemq.pool.block-if-full-timeout=-1ms
# 是否在启动时创建连接。可以在启动时用于加热池。
#spring.activemq.pool.create-connection-on-startup=true
# 是否用Pooledconnectionfactory代替普通的ConnectionFactory。
#spring.activemq.pool.enabled=false
# 连接过期超时。
#spring.activemq.pool.expiry-timeout=0ms
#空闲的连接过期时间,默认为30秒
spring.activemq.pool.idle-timeout=30
# 连接池最大连接数
spring.activemq.pool.max-connections=10
# 每个连接的有效会话的最大数目。
#spring.activemq.pool.maximum-active-session-per-connection=500
# 当有"JMSException"时尝试重新连接
#spring.activemq.pool.reconnect-on-exception=true
# 在空闲连接清除线程之间运行的时间。当为负数时,没有空闲连接驱逐线程运行。
#spring.activemq.pool.time-between-expiration-check=-1ms
# 是否只使用一个MessageProducer
4000

#spring.activemq.pool.use-anonymous-producers=true
结果:如下2018-03-10 11:51:25.582 [http-nio-8889-exec-4] INFO org.springframework.web.servlet.DispatcherServlet - FrameworkServlet 'dispatcherServlet': initialization completed in 20 ms
----------UserConsumer-----------ActiveMQObjectMessage {commandId = 5, responseRequired = true, messageId = ID:admin-PC-51016-1520653879572-1:4:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:admin-PC-51016-1520653879572-1:4:1:1, destination = queue://userMqQueue, transactionId = null, expiration = 0, timestamp = 1520653885674, arrival = 0, brokerInTime = 1520653885675, brokerOutTime = 1520653885677, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@6df8e57e, marshalledProperties = org.apache.activemq.util.ByteSequence@4306219a, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {timestamp=1520653885637}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false}
----------message-----------User [userId=zhangsan001, userName=张三, userNo=null, mobile=15819295938]
user--------------User [userId=zhangsan001, userName=张三, userNo=null, mobile=15819295938]
user数据数据完成...
receiveTopicMessage--------------zhangsan0010
receiveTopicMessage2--------------zhangsan0010
receiveTopicMessage2--------------zhangsan0011
receiveTopicMessage--------------zhangsan0011
receiveTopicMessage--------------zhangsan0012
receiveTopicMessage2--------------zhangsan0012
receiveTopicMessage--------------zhangsan0013
receiveTopicMessage2--------------zhangsan0013
receiveTopicMessage--------------zhangsan0014
receiveTopicMessage--------------zhangsan0015
receiveTopicMessage2--------------zhangsan0014
receiveTopicMessage2--------------zhangsan0015
receiveTopicMessage--------------zhangsan0016
receiveTopicMessage--------------zhangsan0017
receiveTopicMessage2--------------zhangsan0016
receiveTopicMessage2--------------zhangsan0017
receiveTopicMessage--------------zhangsan0018
receiveTopicMessage2--------------zhangsan0018
receiveTopicMessage--------------zhangsan0019
receiveTopicMessage2--------------zhangsan0019

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: