您的位置:首页 > 其它

ActiveMq 集群部署 三种方案 + 负载均衡+其他细节点

2017-09-25 16:27 567 查看
备忘用

消息的存储三种方式 : kahaDB ,levelDB,数据库。

(1) kahaDB 可以通过文件共享来实现 高可用,需要对linux进行配置,这里不做具体介绍。

(2)levelDB 是 activeMq 支持的一种高可用策略 ,需要搭建至少三个(奇数个)节点的zk集群 ,我们的activeMq 也是需要三个。

(3)基于数据库实现activeMq高可用 。

(4)通过负载可以实现“高可用”,缺陷是,挂掉的mq 中积压的消息,只能等他恢复,才能继续消费(半可用)。

负载实现

静态网络连接 & 动态网络连接 :

两个mq 的配置文件中,分别配置静态网络连接标签,uri=static 分别指向对方的ip 。这样他们之间可以互相消费,除此之外我们还需配置消息回流,这样 brokerA 的消费者去消费 队列 queueMsg时 , brokerA 没有回去brokerB要,当 brokerB 的消费者去消费 队列 queueMsg 时,brokerB 会从 brokerA中要回来,就这样互相来回消费。

弊端 : 当brokerA要到queueMsg 队列上的消息时宕机了,该消息只能等brokerA恢复了在提供给消费者们。

<!--静态网络连接-->
<networkConnectors>
<networkConnector uri="static:(tcp://127.0.0.1:61617)"  />
</networkConnectors>

<!--动态网络连接,用的较少-->
<networkConnectors>
<networkConnector uri="multicast://default" name="bridge" dynamicOnly="false"                 conduitSubscriptions="true" decreaseNetworkConsumerPriority="false">
</networkConnectors>


配置好后, 我们在客户端通过 : failover 来进行对两个 mq 的连接,randomize\=true 我们客户端会随机对brokerA,brokerB 进行连接,当 brokerA宕机了,他会将所有的连接都打在brokerB上 。 randomize\=false , 所有的客户端连接都会打在第一个ip:port 上,只有第一个挂了,他才会打在第二个mq 上。

mq.property 文件

mq.brokerURL=failover\:(tcp\://47.3.4.60\:61616,tcp\://47.13.12.16\:61616)?randomize\=true&initialReconnectDelay\=1000&maxReconnectDelay\=30000

spring-mq.xml 文件

<bean id="targetConnectionFactory"class="org.apache.activemq.ActiveMQConnectionFactory">
<!-- ActiveMQ服务地址 -->
<property name="brokerURL" value="${mq.brokerURL}" />
<property name="userName" value="${mq.userName}"></property>
<property name="password" value="${mq.password}"></property>
</bean>


mysql 实现高可用

说明 :客户端连接同上 。

在两个(以上)mq 的配置文件中(activeMq.xml) 修改如下配置 :

修改 persistenceAdapter
<persistenceAdapter>
<!--kahadb 存储-->
<!--<kahaDB directory="${activemq.data}/kahadb_slavor"/>  -->
<!--mysql 存储-->
<jdbcPersistenceAdapter  useDatabaseLock="true" dataSource="#mysql-ds"/>
</persistenceAdapter>

<!--mysql 前 加 缓存-->
<persistenceFactory>
     <journalPersistenceAdapterFactory  dataSource="#mysql_ds" dataDirectory="activemq-data"  />
</persistenceFactory>

<!-- 数据库连接配置 -->
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost:3306/activemq-db?useUnicode=true"/>
<property name="username" value="root"/>
<property name="password" value="agui"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>


还需要引入三个jar 到 activemq 安装的lib 目录下,如图 :



启动两个(以上)mq , 会发现只有一个mq 成功启动,另外一个处于等待锁的状态,这样当 抢到锁的mq 宕机了,其他的等待的mq 就继续抢锁,因为数据都在数据库中,所有我们能保证所有的消息都能完美消费。

弊端 : 吞吐性能完全依靠数据库,效率低 。 即使加上了缓存,可以提高速度,但是brokerA宕机了,他缓存中的消息还是无法写入到数据库,也无法给 新启动的broker 去消费。

activemq-db数据库表结构 :

/*
Navicat MySQL Data Transfer

Source Server         : ceshi
Source Server Version : 50711
Source Host           : localhost:3306
Source Database       : activemq-db

Target Server Type    : MYSQL
Target Server Version : 50711
File Encoding         : 65001

Date: 2017-09-25 15:42:06
*/

SET FOREIGN_KEY_CHECKS=0;

-- ----------------------------
-- Table structure for activemq_acks
-- ----------------------------
DROP TABLE IF EXISTS `activemq_acks`;
CREATE TABLE `activemq_acks` (
`CONTAINER` varchar(250) NOT NULL,
`SUB_DEST` varchar(250) DEFAULT NULL,
`CLIENT_ID` varchar(250) NOT NULL,
`SUB_NAME` varchar(250) NOT NULL,
`SELECTOR` varchar(250) DEFAULT NULL,
`LAST_ACKED_ID` bigint(20) DEFAULT NULL,
`PRIORITY` bigint(20) NOT NULL DEFAULT '5',
`XID` varchar(250) DEFAULT NULL,
PRIMARY KEY (`CONTAINER`,`CLIENT_ID`,`SUB_NAME`,`PRIORITY`),
KEY `ACTIVEMQ_ACKS_XIDX` (`XID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------
-- Table structure for activemq_lock
-- ----------------------------
DROP TABLE IF EXISTS `activemq_lock`;
CREATE TABLE `activemq_lock` (
`ID` bigint(20) NOT NULL,
`TIME` bigint(20) DEFAULT NULL,
`BROKER_NAME` varchar(250) DEFAULT NULL,
PRIMARY KEY (`ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------
-- Table structure for activemq_msgs
-- ----------------------------
DROP TABLE IF EXISTS `activemq_msgs`;
CREATE TABLE `activemq_msgs` (
`ID` bigint(20) NOT NULL,
`CONTAINER` varchar(250) DEFAULT NULL,
`MSGID_PROD` varchar(250) DEFAULT NULL,
`MSGID_SEQ` bigint(20) DEFAULT NULL,
`EXPIRATION` bigint(20) DEFAULT NULL,
`MSG` longblob,
`PRIORITY` bigint(20) DEFAULT NULL,
`XID` varchar(250) DEFAULT NULL,
PRIMARY KEY (`ID`),
KEY `ACTIVEMQ_MSGS_MIDX` (`MSGID_PROD`,`MSGID_SEQ`),
KEY `ACTIVEMQ_MSGS_CIDX` (`CONTAINER`),
KEY `ACTIVEMQ_MSGS_EIDX` (`EXPIRATION`),
KEY `ACTIVEMQ_MSGS_PIDX` (`PRIORITY`),
KEY `ACTIVEMQ_MSGS_XIDX` (`XID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


Zookeeper 实现 高可用

客户端连接同上

说明 :三个节点的zookeeper集群部署在我的其他博文中已经介绍过了,这里不再介绍。

缺点 :

1) 占用的节点数过多,1个zk集群至少3个节点,1个activemq集群也至少得3个节点,但其实正常运行时,只有一个master节点在对外响应,换句话说,花6个节点的成本只为了保证1个activemq master节点的高可用,太浪费资源了。

2) 性能下降太明显,比起单节点的activemq,性能下降了近1个数量级。

修改三个mq 配置文件 :

<persistenceAdapter>
<!--<kahaDB directory="${activemq.data}/kahadb"/>-->
<replicatedLevelDB
directory="activemq-data"
replicas="3"
bind="tcp://0.0.0.0:0"
zkAddress="127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"
zkSessionTimeout="2s"
zkPath="/activemq/leveldb-stores"
/>
</persistenceAdapter>


介绍

directory : 存储数据的路径

replicas : 集群中的节点数【(replicas/2)+1公式表示集群中至少要正常运行的服务数量】, 3台集群那么允许1台宕机, 另外两台要正常运行

bind : 当这个节点成为Master, 它会绑定配置好的地址和端口来履行主从复制协议

zkAddress : ZooKeeper的ip和port, 如果是集群, 则用逗号隔开(这里作为简单示例ZooKeeper配置为单点, 这样已经适用于大多数环境了, 集群也就多几个配置)

zkPassword : 当连接到ZooKeeper服务器时用的密码

hostname : 本机ip

sync : 在认为消息被消费完成前, 同步信息所存贮的策略, 如果有多种策略用逗号隔开, ActiveMQ会选择较强的策略(local_mem, local_disk则肯定选择存贮在本地硬盘)

zkPath : ZooKeeper选举信息交换的存贮路径

测试 :

Step1 :

根据ZooKeeper的策略, 从三台ActiveMQ服务器选一台运行, 其他两台等待运行, 只是做数据上的主从同步。

所以, 启动ZooKeeper服务器和ActiveMQ服务器后, 访问http://172.17.110.1:8161/admin/http://172.17.110.2:8161/admin/http://172.17.110.3:8161/admin/ 只会有一个成功。

Step2 :

关闭能访问http://ip:8161/admin/的ActiveMQ服务, 访问其他两个, 其中有一个能访问, 说明ActiveMQ + ZooKeeper 集群高可用配置已经成功。

注:为方便观察输出,建议启动activemq时,用./activemq.sh console启动

测试Failover

正常启动后,然后手动停掉master,然后观察剩下的2个节点终端输出,正常情况下,应该过一会儿,有一个会自动提升为master.

最后提醒一下:采用上述HA(静态回流)方案后,虽然系统可用性提高了,但是在本机上测试发现,跟上篇同样的测试代码和用例,单节点运行时,1秒可以发8k+条消息,采用zookeeper的HA方案后,每秒只能写入500条消息左右,对于性能要求较高的场景,建议采用其它方案 。

优缺点对比

一、activeMQ主要的几类部署方式比较

1、默认的单机部署(kahadb)

activeMQ的默认存储的单机方式,以本地kahadb文件的方式存储,所以性能指标完全依赖本地磁盘IO,不能提供高可用。

2、基于zookeeper的主从(levelDB Master/Slave)

5.9.0新推出的主从实现,基于zookeeper来选举出一个master,其他节点自动作为slave实时同步消息。

因为有实时同步数据的slave的存在,master不用担心数据丢失,所以leveldb会优先采用内存存储消息,异步同步到磁盘。所以该方式的activeMQ读写性能都最好,特别是写性能能够媲美非持久化消息。

优点:

实现高可用和数据安全

性能较好

缺点:

因为选举机制要超过半数,所以最少需要3台节点,才能实现高可用。

3、基于共享数据库的主从(Shared JDBC Master/Slave)

可以基于postgres、mysql、oracle等常用数据库。

每个节点启动都会争抢数据库锁,从而保证master的唯一性,其他节点作为备份,一直等待数据库锁的释放。

因为所有消息读写,其实都是数据库操作,activeMQ节点本身压力很小,性能完全取决于数据库性能。

优点:

实现高可用和数据安全

简单灵活,2台节点就可以实现高可用

缺点:

稳定性依赖数据库

性能依赖数据库

二、性能测试:

用同一台测试机作为master。测试代码为java,使用activemq-client-5.11.1提供的client,每个线程用一个连接反复写/读,所有线程完毕后,汇总测试结果。

每秒写/读吞吐量测试结果如下(测试机性能较差,测试结果仅仅用来做3种部署方式的横向比较):

150/500表示每秒写入150条,读取500条消息。



灾备方案(两种方式都已经测试通过)

1、leveldb方式

需要3台节点,2台在主机房,1台在备机房,主机房的节点权重设的比备机房节点高。

当master节点故障:

因为权重的关系,主机房另一个节点会自动被选举为新的master。

当主机房故障:

需要启动备用机房时,先更改备机房节点配置,把集群规模从3改成1,这样1个节点可以临时提供服务(此时不再保证高可用)。

2、共享数据库方式

3台节点即可,2台在主机房,1台在备机房,备机房节点指向灾备activeMQ数据库,平时要关闭。

当master节点故障:

主机房的另一个节点会自动取得数据库锁,成为新的master。

当主机房故障:

需要启动备用机房时,先完成activeMQ数据库的切换,然后启动灾备activeMQ节点即可临时外提供服务(此时不再保证高可用)。

负载均衡 + 高可用 :

其实上面三种 高可用策略(共享kahadb , leveldb ,sql) 加上我们的 静态网络连接 + 回流即可实现 : 负载均衡 + 高可用。

值得说的是 :

配置静态网络连接的的时候比较麻烦,还容易乱。我们 两个masterA,B, 每个master 各有一个slaveA,B。 我们不光需要在两个 master 的配置文件中配置彼此静态网络连接,还得加上他们各自从的静态网络连接对应的ip 。例如masterA和slaveA都要配置 masterB, slaveB 这两个ip,如果 master水平扩展一个masterC,masterA和slaveA还要配置masterC,slaveC 的ip , 其他 同理 。只有这样集群之间才能都互相监听,即使某个节点挂了也不怕。

集群 : 负载均衡 + 高可用 :
通过配置静态网络连接,来实现负载均衡, 高可用策略不变,唯一要说的是 : 在高可用基础上配置 networkconnector  uri=static:(tcp://1,2,3,4) 。
就是通过 各个 节点的mq (尤其是 cluster中 多个 主从架构中,主1的从 要静态连接 主2和 主2下的从),只有这样才能保证 主2挂了, 主2下的从 顶替了主2 , 主1获取主1的从 还能从 新的 “主2”上要来数据。


其他

(1)优化 :

内存不足的时候,对接收端做限流 .

采用集群。

(2)同步和异步

Queue 同步 : 当消费端没有做出收到确认操作之前, 发送端会堵塞当前进程, 等待消费端确认收到之后,开启下一个进程。

异步: 发送端只管发送,接收端只管接收。两者互不影响。

(3)延迟消费注意事项 :

为消息加上延迟消费,还需要在 amq.xml 中加上这个配置即可,如下图 :



还是决定补充上 : kahadb 这种共享文件,加锁的方式,实现master-slave

比较简单:单机部署两个Amq , 直接修改各自的配置文件 :

<persistenceAdapter>
<kahaDB directory="/home/simomme/mount/nas116/amq/kahadb"/>
</persistenceAdapter>


基于NFS搭建 文件 的共享 :

Linux下需要开启NFS服务,具体操作如下:

创建共享目录(192.168.0.1):

1、 修改etc/exports,添加需要共享的目录:/opt/mq/data *(rw,no_root_squash)
2、 启动NFS服务 service nfs start/restart
3、 查看共享 showmount –e
4、 NFS服务自启动 chkconfig –level 35 nfs on


挂载共享目录(192.168.0.2):
1、 挂载:mount –t nfs 192.168.0.1:/opt/mq/data /opt/mq/data
2、 启动自动挂载:在etc/fstab文件添加10.175.40.244:/opt/mq/data /opt/mq/data nfs defaults 0 0
然后指定KahaDB的持久化目录为/opt/mq/data即可。


<persistenceAdapter>
<kahaDB directory="//ip/mqdata/kahadb"/>
</persistenceAdapter>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: