您的位置:首页 > 运维架构 > 网站架构

开源jms服务ActiveMQ的负载均衡+高可用部署方案探索

2016-06-08 10:14 736 查看
    最近公司做项目需要用到jms消息服务,最终选择了apache的activemq这个开源消息总线,但是在activemq的官网没能找到既满足高可用又满足集群部署的方案,所以探索了其集群+高可用部署方案,经试用验证ok,这里和大家分享下。

一、架构和技术介绍

1、简介

ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。完全支持JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现

2、activemq的特性

1. 多种语言和协议编写客户端。语言 : Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议 : OpenWire,Stomp REST,WS Notification,XMPP,AMQP

2. 完全支持 JMS1.1 和 J2EE 1.4 规范 ( 持久化 ,XA 消息 , 事务 )

3. 对 Spring 的支持 ,ActiveMQ 可以很容易内嵌到使用 Spring 的系统里面去 , 而且也支持 Spring2.0 的特性

4. 通过了常见 J2EE 服务器 ( 如 Geronimo,JBoss 4, GlassFish,WebLogic) 的测试 ,其中通过 JCA 1.5 resourceadaptors 的配置 , 可以让 ActiveMQ 可以自动的部署到任何兼容 J2EE1.4 商业服务器上

5. 支持多种传送协议 :in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

6. 支持通过 JDBC 和 journal 提供高速的消息持久化

7. 从设计上保证了高性能的集群 , 客户端 - 服务器 , 点对点

8. 支持 Ajax

9. 支持与 Axis 的整合

10. 可以很容易得调用内嵌 JMS provider, 进行测试

3、下载和安装 ActiveMQ

1 、下载

ActiveMQ 的最新版本是 5.10.0 ,但由于我们内网下载存在问题,所以目前通过内网只能下载到 5.9.0 ,下载地址: http://activemq.apache.org/activemq-590-release.html 

2 、安装

         如果是在 windows 系统中运行,可以直接解压 apache-activemq-5.9.0-bin.zip,并运行 bin 目录下的 activemq.bat 文件,此时使用的是默认的服务端口: 61616 和默认的 console 端口: 8161 。

         如果是在 linux 或 unix 下运行,在 bin 目录下执行命令: ./activemq setup

3 、修改 ActiveMQ 的服务端口和 console 端口

         A 、修改服务端口:打开 conf/activemq.xml 文件,修改以下红色字体部分

        <transportConnectors>

           <transportConnector name="openwire" uri="tcp:// 10.42.220.72:61618"discoveryUri="multicast://default"/>

       </transportConnectors>

B 、修改 console 的地址和端口 : 打开 conf/jetty.xml 文件,修改以下红色字体部分

    <bean id="jettyPort"class="org.apache.activemq.web.WebConsolePort"init-method="start">

       <property name="port" value=" 8162 "/>

 </bean>

4、通过客户端代码试用 ActiveMQ

        需要提前将activemq解压包中的lib目录下的相关包引入到工程中,再进行如下编码:

1 、发送端的代码:

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.DeliveryMode;

import javax.jms.Destination;

import javax.jms.MessageProducer;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {

    private static final int SEND_NUMBER = 5;

    public static void main(String[] args) {

        // ConnectionFactory :连接工厂, JMS 用它创建连接

        ConnectionFactory connectionFactory;

        // Connection : JMS 客户端到 JMS Provider 的连接

        Connection connection = null ;

        // Session : 一个发送或接收消息的线程

        Session session;

        // Destination :消息的目的地 ; 消息发送给谁 .

        Destination destination;

        // MessageProducer :消息发送者

        MessageProducer producer;

        // TextMessage message;

        // 构造 ConnectionFactory 实例对象,此处采用 ActiveMq 的实现 jar

        connectionFactory = new ActiveMQConnectionFactory(

                ActiveMQConnection. DEFAULT_USER ,

                ActiveMQConnection. DEFAULT_PASSWORD ,

                "failover:(tcp://10.42.220.72:61617,tcp://10.42.220.72:61618)" );

        try {

            // 构造从工厂得到连接对象

            connection =connectionFactory.createConnection();

            // 启动

            connection.start();

            // 获取操作连接

            session = connection.createSession( true , Session.AUTO_ACKNOWLEDGE );

            // 获取 session

            destination = session.createQueue( "FirstQueue" );

             // 得到消息生成者【发送者】

            producer =session.createProducer(destination);

            // 设置不持久化,此处学习,实际根据项目决定

            producer.setDeliveryMode(DeliveryMode. NON_PERSISTENT );

            // 构造消息,此处写死,项目就是参数,或者方法获取

            sendMessage (session, producer);

            session.commit();

        } catch (Exception e) {

            e.printStackTrace();

        } finally {

            try {

                if ( null != connection)

                    connection.close();

            } catch (Throwable ignore) {

    public static void sendMessage(Session session,MessageProducer producer)

            throws Exception {

        for ( int i = 1; i <= SEND_NUMBER ; i++) {

            TextMessage message = session

                    .createTextMessage( "ActiveMq 发送的消息 " + i);

            // 发送消息到目的地方

            System. out .println( " 发送消息: " + "ActiveMq 发送的消息 " + i);

            producer.send(message);

}

2 、接收端代码:

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.Destination;

import javax.jms.MessageConsumer;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Receive {

    public static void main(String[] args) {

        // ConnectionFactory :连接工厂, JMS 用它创建连接

        ConnectionFactory connectionFactory;

        // Connection : JMS 客户端到 JMS Provider 的连接

        Connection connection = null ;

        // Session : 一个发送或接收消息的线程

        Session session;

        // Destination :消息的目的地 ; 消息发送给谁 .

        Destination destination;

        // 消费者,消息接收者

        MessageConsumer consumer;

        connectionFactory = new ActiveMQConnectionFactory(

                ActiveMQConnection. DEFAULT_USER ,

                ActiveMQConnection. DEFAULT_PASSWORD ,

                "failover:(tcp://10.42.220.72:61617,tcp://10.42.220.72:61618)" );

        try {

            // 构造从工厂得到连接对象

            connection =connectionFactory.createConnection();

            // 启动

            connection.start();

            // 获取操作连接

            session = connection.createSession( false ,

                    Session. AUTO_ACKNOWLEDGE );

            // 获取 session

            destination = session.createQueue( "FirstQueue" );

            consumer =session.createConsumer(destination);

            while ( true ) {

                // 设置接收者接收消息的时间,为了便于测试,这里谁定为 100s

                TextMessage message =(TextMessage) consumer.receive(100000);

                if ( null != message) {

                    System. out .println( " 收到消息 " + message.getText());

                } else {

                    break ;

                }

        } catch (Exception e) {

            e.printStackTrace();

        } finally {

            try {

                if ( null != connection)

                    connection.close();

            } catch (Throwable ignore) {

}

3 、通过监控查看消息堆栈的记录:

      登陆 http://localhost:8162/admin/queues.jsp ,默认的用户名和密码:admin/admin



二、ActiveMQ 的多种部署方式

         单点的 ActiveMQ 作为企业应用无法满足高可用和集群的需求,所以ActiveMQ 提供了 master-slave 、 broker cluster 等多种部署方式,但通过分析多种部署方式之后我认为需要将两种部署方式相结合才能满足我们公司分布式和高可用的需求,所以后面就重点将解如何将两种部署方式相结合。

1、Master-Slave部署方式

1)shared filesystem Master-Slave部署方式


         主要是通过共享存储目录来实现 master 和 slave 的热备,所有的 ActiveMQ 应用都在不断地获取共享目录的控制权,哪个应用抢到了控制权,它就成为 master 。

         多个共享存储目录的应用,谁先启动,谁就可以最早取得共享目录的控制权成为 master ,其他的应用就只能作为 slave 。




2)shared database Master-Slave方式


         与 shared filesystem 方式类似,只是共享的存储介质由文件系统改成了数据库而已。

3)Replicated LevelDB Store方式


         这种主备方式是 ActiveMQ5.9 以后才新增的特性,使用 ZooKeeper 协调选择一个 node 作为 master 。被选择的 master broker node 开启并接受客户端连接。

其他 node 转入 slave 模式,连接 master 并同步他们的存储状态。 slave 不接受客户端连接。所有的存储操作都将被复制到连接至 Master 的 slaves 。

如果 master 死了,得到了最新更新的 slave 被允许成为 master 。 fialed node 能够重新加入到网络中并连接 master 进入 slave mode 。所有需要同步的 disk 的消息操作都将等待存储状态被复制到其他法定节点的操作完成才能完成。所以,如果你配置了replicas=3 ,那么法定大小是 (3/2)+1=2. Master 将会存储并更新然后等待 (2-1)=1 个slave 存储和更新完成,才汇报 success 。至于为什么是 2-1 ,熟悉 Zookeeper 的应该知道,有一个 node 要作为观擦者存在。

单一个新的 master 被选中,你需要至少保障一个法定 node 在线以能够找到拥有最新状态的 node 。这个 node 将会成为新的 master 。因此,推荐运行至少 3 个 replica nodes ,以防止一个 node 失败了,服务中断。



2、Broker-Cluster部署方式

         前面的 Master-Slave 的方式虽然能解决多服务热备的高可用问题,但无法解决负载均衡和分布式的问题。 Broker-Cluster 的部署方式就可以解决负载均衡的问题。

         Broker-Cluster 部署方式中,各个 broker 通过网络互相连接,并共享 queue。当 broker-A 上面指定的 queue-A 中接收到一个 message 处于 pending 状态,而此时没有 consumer 连接 broker-A 时。如果 cluster 中的 broker-B 上面由一个 consumer在消费 queue-A 的消息,那么 broker-B 会先通过内部网络获取到 broker-A 上面的message ,并通知自己的 consumer 来消费。

1)static Broker-Cluster部署


         在 activemq.xml 文件中静态指定 Broker 需要建立桥连接的其他 Broker :

1、   首先在 Broker-A 节点中添加 networkConnector 节点:

<networkConnectors> 

                <networkConnector   uri="static:(tcp:// 0.0.0.0:61617)"duplex="false"/>

</networkConnectors>

2、   修改 Broker-A 节点中的服务提供端口为 61616 :

<transportConnectors>

         <transportConnectorname="openwire"uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>

</transportConnectors>

3、   在 Broker-B 节点中添加 networkConnector 节点:

<networkConnectors> 

                <networkConnector   uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>

</networkConnectors>

4、   修改 Broker-A 节点中的服务提供端口为 61617 :

<transportConnectors>

         <transportConnectorname="openwire"uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>

</transportConnectors>

5 、分别启动 Broker-A 和 Broker-B 。

2)Dynamic Broker-Cluster部署


         在 activemq.xml 文件中不直接指定 Broker 需要建立桥连接的其他 Broker ,由activemq 在启动后动态查找:

1、   首先在 Broker-A 节点中添加 networkConnector 节点:

<networkConnectors> 

                <networkConnectoruri="multicast://default"

           dynamicOnly="true"

           networkTTL="3"

           prefetchSize="1"

           decreaseNetworkConsumerPriority="true" />

</networkConnectors>

2 、修改 Broker-A 节点中的服务提供端口为 61616 :

<transportConnectors>

         <transportConnectorname="openwire"uri="tcp://0.0.0.0:61616? " discoveryUri="multicast://default"/>

</transportConnectors>

3 、在 Broker-B 节点中添加 networkConnector 节点:

<networkConnectors> 

                <networkConnectoruri="multicast://default"

           dynamicOnly="true"

           networkTTL="3"

           prefetchSize="1"

           decreaseNetworkConsumerPriority="true" />

</networkConnectors>

4 、修改 Broker-B 节点中的服务提供端口为 61617 :

<transportConnectors>

         <transportConnectorname="openwire"uri="tcp://0.0.0.0:61617" discoveryUri="multicast://default"/>

</transportConnectors>

5 、启动 Broker-A 和 Broker-B

2、Master-Slave与 Broker-Cluster 相结合的部署方式

         可以看到 Master-Slave 的部署方式虽然解决了高可用的问题,但不支持负载均衡, Broker-Cluster 解决了负载均衡,但当其中一个 Broker 突然宕掉的话,那么存在于该 Broker 上处于 Pending 状态的 message 将会丢失,无法达到高可用的目的。

         由于目前 ActiveMQ 官网上并没有一个明确的将两种部署方式相结合的部署方案,所以我尝试者把两者结合起来部署:

        



1、部署的配置修改


         这里以 Broker-A + Broker-B 建立 cluster , Broker-C 作为 Broker-B 的 slave为例:

1) 首先在 Broker-A 节点中添加 networkConnector 节点:

<networkConnectors> 

                <networkConnector   uri=" masterslave:(tcp://0.0.0.0:61617,tcp:// 0.0.0.0:61618 )" duplex="false"/>

</networkConnectors>

2) 修改 Broker-A 节点中的服务提供端口为 61616 :

<transportConnectors>

         <transportConnectorname="openwire"uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>

</transportConnectors>

3) 在 Broker-B 节点中添加 networkConnector 节点:

<networkConnectors> 

                <networkConnector   uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>

</networkConnectors>

4) 修改 Broker-B 节点中的服务提供端口为 61617 :

<transportConnectors>

         <transportConnectorname="openwire"uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>

</transportConnectors>

5) 修改 Broker-B 节点中的持久化方式:

       <persistenceAdapter>

           <kahaDB directory=" /localhost/kahadb "/>

        </persistenceAdapter>

6) 在 Broker-C 节点中添加 networkConnector 节点:

<networkConnectors> 

                <networkConnector   uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>

</networkConnectors>

7) 修改 Broker-C 节点中的服务提供端口为 61618 :

<transportConnectors>

         <transportConnectorname="openwire"uri="tcp://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>

</transportConnectors>

8) 修改 Broker-B 节点中的持久化方式:

       <persistenceAdapter>

           <kahaDB directory=" /localhost/kahadb "/>

       </persistenceAdapter>

9) 分别启动 broker-A 、 broker-B 、 broker-C ,因为是 broker-B 先启动,所以“/localhost/kahadb ”目录被 lock 住, broker-C 将一直处于挂起状态,当人为停掉broker-B 之后, broker-C 将获取目录“ /localhost/kahadb ”的控制权,重新与 broker-A组成 cluster 提供服务。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: