ActiveMQ静态发现集群
2016-01-25 13:58
423 查看
1.配置
编辑MQ的主配置文件vim /usr/local/activemq1/conf/activemq.xml 找到broker元素的配置,然后修改brokeName集群中不要重复 <broker xmlns="http://activemq.apache.org/schema/core" brokerName="static-brocker1" dataDirectory="${activemq.data}"> 然后在transportConnectors 元素前边加上 <networkConnectors> <networkConnector uri="static:(tcp://localhost:11111,tcp://localhost:22221,tcp://localhost:33331)" duplex="true"/> </networkConnectors> 注意: uri后边的static表示静态,括弧中如果有多个mq实例就写多个uri,用逗号隔开,最后duplex表示双向连接
每个实力都要配置相应的参数,
static:(tcp://localhost:11111,tcp://localhost:22221,tcp://localhost:33331)这个value在所有的节点中配置一样,然后brokeName的value及群中唯一
注意: 如果是一台机器上模拟集群环境,需要修改所有的端口,不能重复使用。
2.启动
启动activemq,每个节点无序启动日志如下:
2016-01-25 12:43:25,635 | WARN | Could not start network bridge between: vm://static-broker2?async=false and: tcp://localhost:11111 due to: 拒绝连接 | org.apache.activemq.network.DiscoveryNetworkConnector | ActiveMQ Task-2 2016-01-25 12:43:33,636 | INFO | Establishing network connection from vm://static-broker2?async=false to tcp://localhost:11111 | org.apache.activemq.network.DiscoveryNetworkConnector | ActiveMQ Task-1
我的集群一共是三个点,这两条日志来自第二个节点,日志第二条显示第一个节点启动了,所以连接成功,第三个节点还没启动,所以连接拒绝。
3.java demo
java demo使用方法编译class文件,依赖的activemq-all-5.13.0.jar包在mq的安装根目录
# javac -classpath ./:/usr/local/activemq1/activemq-all-5.13.0.jar Receiver.java # javac -classpath ./:/usr/local/activemq1/activemq-all-5.13.0.jar Sender.java
运行发送消息
# java -classpath ./:/usr/local/activemq1/activemq-all-5.13.0.jar Sender tcp://localhost:22221
运行接收消息
# java -classpath ./:/usr/local/activemq1/activemq-all-5.13.0.jar Receiver tcp://localhost:22221
首先将单实例配起来,测试java测试和熟悉工具,在A实例发送,A实例接受
然后将集群配置起来,在A实例上send,在B实例上Receiver,在B实例上接收到A实例发送的消息
集群多个节点测试
首先在A实例上启动一个Receiver,然后在B和C实例上同时发送,A应当能受到B和C法的。
在A实例和B实例上进行Send,然后在C实例上进行receiver应当能受到A和B同时发出来的消息。
Sender.java
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Sender { private static final int SEND_NUMBER = 500; public static void main(String[] args) { String uri = "tcp://localhost:22221"; if (!args[0].equals("") && args.length == 1) uri = args[0]; else { System.err.println("Use: java -classpath ./:${ACTIVEMP_BASE}/activemq-all-<version>.jar Sender tcp://HOST:PORT"); System.err.println("Eg. : \n\tjava -classpath /usr/local/activemq1/activemq-all-5.13.0.jar:./ Sender tcp://localhost:22221"); return; } ConnectionFactory connectionFactory; Connection connection = null; Session session; Destination destination; MessageProducer producer; connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, uri); try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("FirstQueue"); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); sendMessage(session, producer); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } public static void sendMessage(Session session, MessageProducer producer) throws Exception { int y = 0; for (int i = 1; i <= SEND_NUMBER; i++) { TextMessage message = session.createTextMessage("ActiveMq John Send " + i); y++; //if (i%200 == 0) { System.out.println("发送消息:" + "[ActiveMq John Send " + i + "]"); //} producer.send(message); } System.out.println("====================================================\nSend " + y + " Messages!!!"); } }
Receiver.java
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Receiver { public static void main(String[] args) { String uri = "tcp://localhost:22221"; if (!args[0].equals("") && args.length == 1) uri = args[0]; else { System.err.println("Use: java -classpath ./:${ACTIVEMP_BASE}/activemq-all-<version>.jar Receiver tcp://HOST:PORT"); System.err.println("Eg. : \n\tjava -classpath /usr/local/activemq1/activemq-all-5.13.0.jar:./ Receiver tcp://localhost:22221"); return; } ConnectionFactory connectionFactory; Connection connection = null; Session session; Destination destination; MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, uri); try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("FirstQueue"); consumer = session.createConsumer(destination); int y = 0; while (true) { TextMessage message = (TextMessage) consumer.receive(100000); if (null != message) { System.out.println("收到条消息" + message.getText()); y++; } else { break; } } System.out.println("====================================\nReceiver " + y + " Messages!!"); } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } }
4.扩展
其他动态发现集群和整个集群的高可用负载均衡见http://blog.csdn.net/cwg_1992/article/details/50587473
#######################
作者:john
转载请注明出处
相关文章推荐
- Linux系统下常用命令操作
- 微信模板信息发送给用户(JAVA)
- 88.MJRefresh使用中的注意事项 MJRefresh引起的崩溃问题
- iOS CATextLayer 富文本
- C语言 百炼成钢13
- 深入理解PHP内核(二)概览-PHP生命周期与Zend引擎
- IE中cookie 无法设置domain=localhost
- checkbox 全选的问题
- 测试1111.md
- (1)shell脚本介绍
- [小技巧] shell 下查看串口是否工作正常
- CentOS+Nginx+PHP+MySQL详细配置(图解)
- 一天 第一章 花在花苞未有泪,人见不识无言对
- 实现ViewPager+Fragment的懒加载功能
- 时间util
- Linux 服务器环境启动
- Date类型笔记
- p6spy介绍
- 虚拟化类型
- 常用传感器协议11:CJ/T-188 冷热量表协议解析5