您的位置:首页 > 编程语言 > Java开发

zookeeper+activeMQ 集群配置及java调用

2017-05-05 15:35 633 查看
规划三台节点:192.168.46.130  

                            192.168.46.131 

                            192.168.46.132

分别在三台服务器上安装部署zookeeper,此处不进行赘述

在三台服务器上分别安装activemq,参考前面的单机版安装

配置

    在三台机器上完成activemq安装之后,开始集群配置,通过配置使三个activemq实例组成集群。下面的配置在三个实例上保持一致,除了标红部分,主要修改配置文件conf/activemq.xml。

    (1)broker-name的统一

        将broker标签的brokerName属性设置为统一的值,我将这个值设置为“activeCluster”,只有三个实例的brokerName一致,zookeeper才能识别它们属于同一个集群。

如:

 <broker xmlns="http://activemq.apache.org/schema/core" brokerName="activemqCluster" dataDirectory="${activemq.data}">

    (2)persistenceAdapter的配置

        persistenceAdapter设置持久化方式,主要有三种方式:kahaDB(默认方式)、数据库持久化、levelDB(v5.9.0提供支持)。

        本文采用levelDB来进行持久化,并使用zookeeper实现集群的高可用,配置如下:

        首先注释掉原来kahaDB的持久化方式,然后配置levelDB+zookeeper的持久化方式。

      <!-- <persistenceAdapter>

            <kahaDB directory="${activemq.data}/kahadb"/>

        </persistenceAdapter>
-->
    <persistenceAdapter>

               <replicatedLevelDB
directory="${activemq.data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:0"
zkAddress="192.168.46.130:2181,192.168.46.131:2181,192.168.46.132:2181"
hostname="192.168.46.130"
sync="local_disk"
zkPath="/usr/local/mqData/leveldb-stores"/>

        </persistenceAdapter>

注意上述配置中的hostname属性值,不同的activemq实例对应不同的hostname值,其他两个实例配置的hostname值分别为:192.168.46.130,192.168.46.131,192.168.46.132。

zkPath="/usr/local/mqData/leveldb-stores" 需要
在每台服务器上创建目录

mkdir -p /usr/local/mqData/leveldb-stores

服务启动顺序

1.关闭三台服务器的防火墙

service iptables stop

2.分别启动三台服务器的zookeeper

bin/zkServer.sh start 

查看状态

bin/zkServer.sh status

3.分别启动三台服务器的activemq,停止(stop)

bin/activemq start    

调用过程中谁是当前活动的主节点可进行mq页面的访问
http://192.168.46.131:8161/admin 
用户名/密码 admin/amin

如果131节点挂掉,mq自动重新选择活动节点后,如选择130为活动节点时,可用http://192.168.46.131:8161/admin访问

常见异常处理

  配置完成之后启动zookeeper集群,然后依次启动三个activemq实例,启动之后报错:"activemq LevelDB IOException handler"。

    原因:版本5.10.0存在的依赖冲突。

    解决方案:

        (1)移除lib目录中的pax-url-aether-1.5.2.jar包;

        (2)注释掉配置文件中的日志配置;      

 <!-- <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"

          lazy-init="false" scope="singleton"

          init-method="start" destroy-method="stop">

    </bean>

  -->

 JAVA 客户端调用

配置文件属性
#activemq's brokerURL
activeMq.brokerURL=tcp://192.168.46.129:61616?wireFormat.maxInactivityDuration=0
#activemq cluster brokerURL
activeMq.cluster.brokerURL=failover:(tcp://192.168.46.130:61616,tcp://192.168.46.131:61616,tcp://192.168.46.132:61616)

//代码创建连接

private static Logger log = Logger.getLogger(ActivemqUtil.class);

static ConnectionFactory connectionFactory;
static Connection connection = null;
static Session session;
static Map sendQueues = new ConcurrentHashMap();
static Map getQueues = new ConcurrentHashMap();
static {
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
CommonConstant.ACTIVEMQ_CLUSTER_BROKERURL);
try
{
connection = connectionFactory.createConnection();

connection.start();
// 第一个参数是否使用事务:当消息发送者向消息提供者(即消息代理)发送消息时,消息发送者等待消息代理的确认,没有回应则抛出异常,消息发送程序负责处理这个错误。
// 第二个参数消息的确认模式:
// AUTO_ACKNOWLEDGE :
// 指定消息提供者在每次收到消息时自动发送确认。消息只向目标发送一次,但传输过程中可能因为错误而丢失消息。 //
// CLIENT_ACKNOWLEDGE :
// 由消息接收者确认收到消息,通过调用消息的acknowledge()方法(会通知消息提供者收到了消息) //
// DUPS_OK_ACKNOWLEDGE :
// 指定消息提供者在消息接收者没有确认发送时重新发送消息(这种确认模式不在乎接收者收到重复的消息)。
session = connection.createSession(Boolean.FALSE.booleanValue(),
Session.AUTO_ACKNOWLEDGE);

}
catch (Exception e) {
e.printStackTrace();
log.info("消息队列异常:" + e.getMessage());
}
}


具体参看百度云:jarOrZip项目/activeMq/kuaixin-common(activemq公用服务等)

项目可也修改自己的百度云上的项目:jarOrZip项目/activeMq/kuaixin-common(activemq公用服务等)或者

                                                          jarOrZip项目/activeMq/activemq集群或单机调用(只需修改brokerURL).zip

                                                          

activemq.xml的完整配置如下

file:${activemq.conf}/credentials.properties
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息