activemq service code
linux安装 :高版本可能要jdk1.8
maven :<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-spring</artifactId>
<version>5.14.5</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>3.2.8.RELEASE</version>
</dependency>
package com.activemq.listener;
import java.util.Arrays;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import com.ecarry.core.exception.BusinessException;
import com.activemq.ActivemqMessage;
import com.netty.server.NettyMessagePush;
/**
* @ClassName: AactiveMqMessageListener
* @Description: AactiveMqMessageListener相当于消息者
* @author:
* @date: 2019年6月4日 上午11:31:20
*/
public class ActiveMqMessageListener implements MessageListener {
private static final Logger logger = LoggerFactory.getLogger(ActiveMqMessageListener.class);
@Autowired
NettyMessagePush nettyMessagePush;
@Override
public void onMessage(Message message) {
ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message;
try {
logger.info("------------------msg={}-------------------", msg);
ActivemqMessage activemqMessage = (ActivemqMessage) msg.getObject();
logger.info("------------------activeMqMessageListener activemqMessage==={}-----------------", activemqMessage);
String clientId = activemqMessage.getClientId();
String jsonMessage = activemqMessage.getMessage();
nettyMessagePush.push(Arrays.asList(clientId), jsonMessage);
} catch (Exception e) {
logger.error("AactiveMqMessageListener failed e={}", e);
throw new BusinessException("AactiveMqMessageListener failed");
}
}
}
package com.activemq.server;
import java.util.List;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import com.activemq.ActivemqMessage;
/**
* @ClassName: AactiveMqMessageServer
* @Description: AactiveMqMessageServer相当于生产者
* @author: wangkaifei
* @date: 2019年6月4日 上午11:39:34
*/
public class ActiveMqMessageServer {
private static final Logger logger = LoggerFactory.getLogger(ActiveMqMessageServer.class);
private JmsTemplate template;
/**
* @Title: send
* @Description: 发送消息
* @author:
* @date: 2019年6月4日 上午11:41:04
* @param 消息体
* @return: void
*/
public void send(String destinationName, String jsonMessage, List<String> clientIdList) {
try {
for (String clientId : clientIdList) {
template.send(destinationName, new MessageCreator() {
/**
* 模板模式中暴露给调用者的方法
* @throws JMSException
*/
@Override
public Message createMessage(Session session) throws JMSException {
ActivemqMessage activemqMessage = new ActivemqMessage();
activemqMessage.setClientId(clientId);
activemqMessage.setMessage(jsonMessage);
return session.createObjectMessage(activemqMessage);
}
});
}
} catch (Exception e) {
logger.error("failed e={}", e);
}
}
public JmsTemplate getTemplate() {
return template;
}
public void setTemplate(JmsTemplate template) {
this.template = template;
}
}
package com.activemq;
import java.io.Serializable;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
/**
* @ClassName: ActivemqMessage
* @Description: ActivemqMessage体
* @author:
* @date: 2019年6月4日 下午12:10:37
*/
@Setter
@Getter
@ToString
public class ActivemqMessage implements Serializable {
private static final long serialVersionUID = 1L;
// 消息体
private String message;
// 客户端Id
private String clientId;
}
application-activemq.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.1.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.14.5.xsd">
<!-- ActiveMQ 连接工厂 -->
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<!-- 需提供访问路径tcp://ip:61616;以及用户名,密码 -->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://192.168.43.101:61616" userName="admin"
password="admin" trustAllPackages="true"/>
<!-- Spring Caching连接工厂 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<!-- Session缓存数量 -->
<property name="sessionCacheSize" value="100" />
</bean>
<!-- 消息生产者 start -->
<!-- 定义JmsTemplate对象. 此类型由Spring框架JMS组件提供. 用于访问ActiveMQ使用. -->
<bean id="jmsQueueTemplate"
class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="connectionFactory" />
<!-- 非pub/sub模型(发布/订阅),即队列模式, 默认数据可省略配置 -->
<!-- <property name="pubSubDomain" value="false" /> -->
</bean>
<!-- 定义生成者对象 -->
<bean id="mqProducer" class="com.activemq.server.ActiveMqMessageServer">
<!-- 为属性赋值 -->
<property name="template" ref="jmsQueueTemplate"></property>
</bean>
<!--消息生产者 end -->
<!-- 消息消费者 start -->
<!-- 定义消息监听器, 此组件为spring-jms组件定义. 可以一次注册若干消息监听器. 属性解释: destination-type
- 目的地类型, queue代表消息队列 可选值: queue | topic | durableTopic queue - 默认值. 代表消息队列
topic - 代表消息队列集合 durableTopic - 持久化的消息队列集合. ActiveMQ会保证消息的消费者一定接收到此消息. container-type
- 容器类型 可选值: default | simple default - 默认值. 默认容器类型, 对应DefaultMessageListenerContainer
simple - 简单容器类型, 对应SimpleMessageListenerContainer connection-factory - 链接工厂,
注入的是Spring-JMS组件提供的链接工厂对象. acknowledge - 确认方式 可选值: auto | client | dups-ok
| transacted auto - 默认值, 即自动确认消息 client - 客户端确认消息 dups-ok - 可使用副本的客户端确认消息
transacted - 有事务的持久化消息确认机制. 需开启对ActiveMQ的事务控制才可应用. -->
<jms:listener-container destination-type="queue"
container-type="default" connection-factory="connectionFactory"
acknowledge="auto">
<!-- 注册消息监听器. 如果需要注册多个, 重复定义下述标签. -->
<jms:listener destination="spring-MQ" ref="mqReciver" />
</jms:listener-container>
<!-- 容器管理消息监听器实现类对象 -->
<bean id="mqReciver" class="com.activemq.listener.ActiveMqMessageListener" />
<!-- 消息消费者 end -->
</beans>
- service activemq does not support chkconfig
- 访问activemqProblem accessing /. Reason: Service Unavailable Powered by Jett
- activemq serviceMix集成学习
- activemq exclusive-consumer
- mysql.service: Main process exited, code=exited, status=1/FAILURE
- 基于Spring+JMS+ActiveMQ+Tomcat
- 启动Docker 出现Job for docker.service failed because the control process exited with error code. See "错误
- activeMQ开发笔记,activeMQy与Spring框架集成
- jms activeMQ与spring的集成(转载)
- MQ选型对比RabbitMQ RocketMQ ActiveMQ Kafka
- 消息中间件(2)-ActiveMq & Spring 技术集成
- how to configure and use activemq in camel
- ActiveMQ + 中间件 项目研究
- activemq in action(1)
- Active MQ CRITICAL - QUEUE.XXX holding: 89994
- ActiveMq+spring jms+MQ主备
- ActiveMQ的activemq.xml配置(内存设置、策略配置、流控、协议、认证授权)
- MQ java Code
- Custom tool error: Failed to generate code for the service reference ××××××. Please check other erro
- ActiveMQ demos-PTP domain