您的位置:首页 > 运维架构 > 网站架构

分布式架构学习之:015--ActiveMQ 的安装与使用(单节点)

2017-05-11 16:48 579 查看
ActiveMQ 的安装与使用(单节点)
IP:192.168.4.101

环境:CentOS 6.6、JDK7

1、 安装 JDK 并配置环境变量(略)

JAVA_HOME=/usr/local/java/jdk1.7.0_72

2、 下载 Linux 版的 ActiveMQ(当前最新版 apache-activemq-5.11.1-bin.tar.gz)

$ wget http://apache.fayea.com/activemq/5.11.1/apache-activemq-5.11.1-bin.tar.gz
3、 解压安装

$ tar -zxvf apache-activemq-5.11.1-bin.tar.gz

$ mv apache-activemq-5.11.1 activemq-01

如果启动脚本 activemq 没有可执行权限,此时则需要授权(此步可选)

$ cd /home/wusc/activemq-01/bin/

$ chmod 755 ./activemq

4、 防火墙中打开对应的端口

ActiveMQ 需要用到两个端口

一个是消息通讯的端口(默认为 61616)

一个是管理控制台端口(默认为 8161)可在
conf/jetty.xml 中修改,如下:

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"> <!-- the default port number for the web console -->

<property name="host" value="0.0.0.0"/> <property name="port" value="8161"/>

</bean>

# vi /etc/sysconfig/iptables

添加:

-A INPUT -m state --state NEW -m tcp -p tcp --dport 61616 -j ACCEPT 

-A INPUT -m state --state NEW -m tcp -p tcp --dport 8161 -j ACCEPT

重启防火墙:

# service iptables restart

5、 启动

$ cd /home/wusc/activemq-01/bin $
./activemq start

6、 打开管理界面:http://192.168.4.101:8161

默认用户名和密码为:admin/admin



7、 安全配置(消息安全)

ActiveMQ 如果不加入安全机制的话,任何人只要知道消息服务的具体地址(包括 ip,端口,消息地址[ 队列或者主题地址 ] , ) ,都可以肆无忌惮的发送、接收消息。关于  ActiveMQ  安装配置http://activemq.apache.org/security.html

ActiveMQ 的消息安全配置策略有多种,我们以简单授权配置为例:


conf/activemq.xml 文件中在
broker 标签最后加入以下内容即可:

$ vi /home/wusc/activemq-01/conf/activemq.xml

<plugins>

<simpleAuthenticationPlugin>

<users>

 <authenticationUser username="wusc" password="wusc.123"
groups="users,admins"/>

</users>

</simpleAuthenticationPlugin>

</plugins>

定义了一个 wusc 用户,密码为 wusc.123,角色为 users,admins

设置 admin 的用户名和密码: $
vi /home/wusc/activemq-01/conf/jetty.xml

<bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint"> <property name="name" value="BASIC" />

<property name="roles" value="admin" /> <property name="authenticate" value="true" />

</bean>

确保 authenticate 的值为 true(默认)

控制台的登录用户名密码保存在 conf/jetty-realm.properties 文件中,内容如下:

 $ vi /home/wusc/activemq-01/conf/jetty-realm.properties

# Defines users that can access the web (console, demo, etc.)
# username: password [,rolename ...]
admin: wusc.123, admin

注意:用户名和密码的格式是用户名
: 密码
,角色名

重启:

$ /home/wusc/activemq-01/bin/activemq restart

设置开机启动:

# vi /etc/rc.d/rc.local

加入以下内容

## ActiveMQ

su - wusc -c '/home/wusc/activemq-01/bin/activemq start'

与spring整合异步发送邮件



消息生产者(发邮件的请求)

mq.properties

## MQ
mq.brokerURL=tcp\://192.168.4.101\:61616
mq.userName=wusc
mq.password=wusc.123
mq.pool.maxConnections=10
#queueName
queueName=wusc.edu.mqtest.v1

spring-mq.xml被spring-context引用
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd" default-autowire="byName" default-lazy-init="false">

<!-- 基于Dubbo的分布式系统架构视频教程,吴水成,wu-sc@foxmail.com,学习交流QQ群:367211134 -->

<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<!-- ActiveMQ服务地址 -->
<property name="brokerURL" value="${mq.brokerURL}" />
<property name="userName" value="${mq.userName}"></property>
<property name="password" value="${mq.password}"></property>
</bean>

<!--
ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory
可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗。
要依赖于 activemq-pool包
-->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="targetConnectionFactory" />
<property name="maxConnections" value="${mq.pool.maxConnections}" />
</bean>

<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="pooledConnectionFactory" />
</bean>

<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->

<!-- 队列模板 -->
<bean id="activeMqJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory"/>
<property name="defaultDestinationName" value="${queueName}"></property>
</bean>

</beans>

MQProducer
@Service("mqProducer")
public class MQProducer {

@Autowired
private JmsTemplate activeMqJmsTemplate;

/**
* 发送消息.
* @param mail
*/
public void sendMessage(final MailParam mail) {
activeMqJmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(JSONObject.toJSONString(mail));
}
});

}

}

测试类
public class MQProducerTest {
private static final Log log = LogFactory.getLog(MQProducerTest.class);

public static void main(String[] args) {
try {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/spring-context.xml");
context.start();

MQProducer mqProducer = (MQProducer) context.getBean("mqProducer");
// 邮件发送
MailParam mail = new MailParam();
mail.setTo("wu-sc@foxmail.com");
mail.setSubject("ActiveMQ测试");
mail.setContent("通过ActiveMQ异步发送邮件!");

mqProducer.sendMessage(mail);

context.stop();
} catch (Exception e) {
log.error("==>MQ context start error:", e);
System.exit(0);
} finally {
log.info("===>System.exit");
System.exit(0);
}
}
}


 消息消费者(真正发邮件)



mail.properties
#SMTP服务配置
mail.host=smtp.qq.com
mail.port=25
mail.username=XXX@qq.com
mail.password=XXXX
mail.smtp.auth=true
mail.smtp.timeout=30000
mail.default.from=XXXXX@qq.com


spring-mq.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd" default-autowire="byName" default-lazy-init="false">

<!-- 基于Dubbo的分布式系统架构视频教程,吴水成,wu-sc@foxmail.com,学习交流QQ群:367211134 -->

<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<!-- ActiveMQ服务地址 -->
<property name="brokerURL" value="${mq.brokerURL}" />
<property name="userName" value="${mq.userName}"></property>
<property name="password" value="${mq.password}"></property>
</bean>

<!--
ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory
可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗。
要依赖于 activemq-pool包
-->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="targetConnectionFactory" />
<property name="maxConnections" value="${mq.pool.maxConnections}" />
</bean>

<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="pooledConnectionFactory" />
</bean>

<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->

<!-- 队列模板 -->
<bean id="activeMqJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory"/>
<property name="defaultDestinationName" value="${queueName}"></property>
</bean>

<!--这个是sessionAwareQueue目的地 -->
<bean id="sessionAwareQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>${queueName}</value>
</constructor-arg>
</bean>

<!-- 可以获取session的MessageListener -->
<bean id="consumerSessionAwareMessageListener" class="wusc.edu.demo.mqtest.listener.ConsumerSessionAwareMessageListener"></bean>

<bean id="sessionAwareListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="sessionAwareQueue" />
<property name="messageListener" ref="consumerSessionAwareMessageListener" />
</bean>

</beans>
spring-mail.xml真正发送邮件的
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:cache="http://www.springframework.org/schema/cache"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.2.xsd http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache-3.2.xsd"> 
<!-- 基于Dubbo的分布式系统架构视频教程,吴水成,wu-sc@foxmail.com,学习交流QQ群:367211134 -->

<!-- Spring提供的发送电子邮件的高级抽象类 -->
<bean id="mailSender" class="org.springframework.mail.javamail.JavaMailSenderImpl">
<property name="host" value="${mail.host}" />
<property name="username" value="${mail.username}" />
<property name="password" value="${mail.password}" />
<property name="defaultEncoding" value="UTF-8"></property>
<property name="javaMailProperties">
<props>
<prop key="mail.smtp.auth">${mail.smtp.auth}</prop>
<prop key="mail.smtp.timeout">${mail.smtp.timeout}</prop>
</props>
</property>
</bean>

<bean id="simpleMailMessage" class="org.springframework.mail.SimpleMailMessage">
<property name="from">
<value>${mail.default.from}</value>
</property>
</bean>

<!-- 配置线程池 -->
<bean id="threadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<!-- 线程池维护线程的最少数量 -->
<property name="corePoolSize" value="5" />
<!-- 线程池维护线程所允许的空闲时间 -->
<property name="keepAliveSeconds" value="30000" />
<!-- 线程池维护线程的最大数量 -->
<property name="maxPoolSize" value="50" />
<!-- 线程池所使用的缓冲队列 -->
<property name="queueCapacity" value="100" />
</bean>

</beans>


队列监听器
/**
*
* @描述: 队列监听器 .
* @作者: WuShuicheng .
* @创建时间: 2015-3-17,下午11:21:23 .
* @版本号: V1.0 .
*/
@Component
public class ConsumerSessionAwareMessageListener implements SessionAwareMessageListener<Message> {

private static final Log log = LogFactory.getLog(ConsumerSessionAwareMessageListener.class);

@Autowired
private JmsTemplate activeMqJmsTemplate;
@Autowired
private Destination sessionAwareQueue;
@Autowired
private MailBiz bailBiz;

public synchronized void onMessage(Message message, Session session) {
try {
ActiveMQTextMessage msg = (ActiveMQTextMessage) message;
final String ms = msg.getText();
log.info("==>receive message:" + ms);
MailParam mailParam = JSONObject.parseObject(ms, MailParam.class);// 转换成相应的对象
if (mailParam == null) {
return;
}

try {
bailBiz.mailSend(mailParam);
} catch (Exception e) {
// 发送异常,重新放回队列
//				activeMqJmsTemplate.send(sessionAwareQueue, new MessageCreator() {
//					public Message createMessage(Session session) throws JMSException {
//						return session.createTextMessage(ms);
//					}
//				});
log.error("==>MailException:", e);
}
} catch (Exception e) {
log.error("==>", e);
}
}
}


发送邮件的业务逻辑
@Component("mailBiz")
public class MailBiz {

@Autowired
private JavaMailSender mailSender;// spring配置中定义
@Autowired
private SimpleMailMessage simpleMailMessage;// spring配置中定义
@Autowired
private ThreadPoolTaskExecutor threadPool;

/**
* 发送模板邮件
*
* @param mailParamTemp需要设置四个参数
*            templateName,toMail,subject,mapModel
* @throws Exception
*
*/
public void mailSend(final MailParam mailParam) {
threadPool.execute(new Runnable() {
public void run() {
try {
simpleMailMessage.setFrom(simpleMailMessage.getFrom()); // 发送人,从配置文件中取得
simpleMailMessage.setTo(mailParam.getTo()); // 接收人
simpleMailMessage.setSubject(mailParam.getSubject());
simpleMailMessage.setText(mailParam.getContent());
mailSender.send(simpleMailMessage);
} catch (MailException e) {
throw e;
}
}
});
}


spring启动类
/**
*
* @描述: ActiveMQ测试启动类  .
* @作者: WuShuicheng .
* @创建时间: 2015-3-17,上午2:25:20 .
* @版本号: V1.0 .
*/
public class MQConsumer {
private static final Log log = LogFactory.getLog(MQConsumer.class);

public static void main(String[] args) {
try {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/spring-context.xml");
context.start();
} catch (Exception e) {
log.error("==>MQ context start error:", e);
System.exit(0);
}
}
}
公共的邮件参数Bean
/**
*
* @描述: 邮件参数封装类 .
* @作者: WuShuicheng .
* @创建时间: 2015-3-18,上午1:08:42 .
* @版本号: V1.0 .
*/
public class MailParam {

/** 发件人 **/
private String from;
/** 收件人 **/
private String to;
/** 主题 **/
private String subject;
/** 邮件内容 **/
private String content;

public MailParam() {
}

public MailParam(String to, String subject, String content) {
this.to = to;
this.subject = subject;
this.content = content;
}.....省略getter/setter
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐