您的位置:首页 > 运维架构 > 网站架构

ActiveMQ实现负载均衡+高可用部署方案

2016-01-07 16:54 543 查看

一、架构和技术介绍

1、简介ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现2、activemq的特性1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resourceadaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE1.4商业服务器上5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA6. 支持通过JDBC和journal提供高速的消息持久化7. 从设计上保证了高性能的集群,客户端-服务器,点对点8. 支持Ajax9. 支持与Axis的整合10. 可以很容易得调用内嵌JMS provider,进行测试3、下载和安装ActiveMQ1、下载ActiveMQ的最新版本是5.10.0,但由于我们内网下载存在问题,所以目前通过内网只能下载到5.9.0,下载地址:http://activemq.apache.org/activemq-590-release.html。2、安装 如果是在windows系统中运行,可以直接解压apache-activemq-5.9.0-bin.zip,并运行bin目录下的activemq.bat文件,此时使用的是默认的服务端口:61616和默认的console端口:8161。 如果是在linux或unix下运行,在bin目录下执行命令:./activemq setup3、修改ActiveMQ的服务端口和console端口 A、修改服务端口:打开conf/activemq.xml文件,修改以下红色字体部分 <transportConnectors> <transportConnector name="openwire" uri="tcp://10.42.220.72:61618"discoveryUri="multicast://default"/> </transportConnectors> B、修改console的地址和端口:打开conf/jetty.xml文件,修改以下红色字体部分 <bean id="jettyPort"class="org.apache.activemq.web.WebConsolePort"init-method="start"> <property name="port" value="8162"/> </bean>4、通过客户端代码试用ActiveMQ 需要提前将activemq解压包中的lib目录下的相关包引入到工程中,再进行如下编码:1、发送端的代码:importjavax.jms.Connection;importjavax.jms.ConnectionFactory;importjavax.jms.DeliveryMode;importjavax.jms.Destination;importjavax.jms.MessageProducer;importjavax.jms.Session;importjavax.jms.TextMessage;importorg.apache.activemq.ActiveMQConnection;importorg.apache.activemq.ActiveMQConnectionFactory;publicclass Sender { privatestaticfinalintSEND_NUMBER = 5; publicstaticvoid main(String[] args) { // ConnectionFactory:连接工厂,JMS用它创建连接 ConnectionFactory connectionFactory; // Connection:JMS客户端到JMS Provider的连接 Connection connection = null; // Session:一个发送或接收消息的线程 Session session; // Destination:消息的目的地;消息发送给谁. Destination destination; // MessageProducer:消息发送者 MessageProducer producer; // TextMessage message; //构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "failover:(tcp://10.42.220.72:61617,tcp://10.42.220.72:61618)"); try { //构造从工厂得到连接对象 connection =connectionFactory.createConnection(); //启动 connection.start(); //获取操作连接 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //获取session destination = session.createQueue("FirstQueue"); //得到消息生成者【发送者】 producer =session.createProducer(destination); //设置不持久化,此处学习,实际根据项目决定 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //构造消息,此处写死,项目就是参数,或者方法获取 sendMessage(session, producer); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } publicstaticvoid sendMessage(Session session,MessageProducer producer) throws Exception { for (int i = 1; i <=SEND_NUMBER; i++) { TextMessage message = session .createTextMessage("ActiveMq发送的消息" + i); //发送消息到目的地方 System.out.println("发送消息:" + "ActiveMq 发送的消息" + i); producer.send(message); } }} 2、接收端代码:importjavax.jms.Connection;importjavax.jms.ConnectionFactory;importjavax.jms.Destination;importjavax.jms.MessageConsumer;importjavax.jms.Session;importjavax.jms.TextMessage;importorg.apache.activemq.ActiveMQConnection;importorg.apache.activemq.ActiveMQConnectionFactory; publicclass Receive { publicstaticvoid main(String[] args) { // ConnectionFactory:连接工厂,JMS用它创建连接 ConnectionFactory connectionFactory; // Connection:JMS客户端到JMS Provider的连接 Connection connection = null; // Session:一个发送或接收消息的线程 Session session; // Destination:消息的目的地;消息发送给谁. Destination destination; //消费者,消息接收者 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "failover:(tcp://10.42.220.72:61617,tcp://10.42.220.72:61618)"); try { //构造从工厂得到连接对象 connection =connectionFactory.createConnection(); //启动 connection.start(); //获取操作连接 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //获取session destination = session.createQueue("FirstQueue"); consumer =session.createConsumer(destination); while (true) { //设置接收者接收消息的时间,为了便于测试,这里谁定为100s TextMessage message =(TextMessage) consumer.receive(100000); if (null != message) { System.out.println("收到消息" + message.getText()); } else { break; } } } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } }}3、通过监控查看消息堆栈的记录: 登陆http://localhost:8162/admin/queues.jsp,默认的用户名和密码:admin/admin
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: