您的位置:首页 > 其它

activeMQ 消息发送 服务器集群配置

2018-01-16 15:00 399 查看
1 下载activeMQ,然后解压,调用宾文件的下的activemq.bat 来启动mq 服务,然后就在localhost:61616下访问该控制台了,默认用户名和密码为admin,自启动的时候如果启动不了,有可能是jdk版本的问题,注意各个版本的对对应关系
然后写一个生产者和消费者

生产者:
public class AppProducer {
     private static final String url "tcp://127.0.0.1:61616";
    //private static final 
4000
String url "failover:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true";
 // 用来做集群

    private static final String queueName "myQueue";

    public static void main(String[] ars) throws JMSException {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

        Connection connection = connectionFactory.createConnection();

        connection.start();

        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

        Destination destination = session.createQueue(queueName);// 目标
        // 生产者
        MessageProducer messageProducer = session.createProducer(destination);

        for (int i = 0;i<100;i++){

            TextMessage textMessage = session.createTextMessage("test"+i);

            messageProducer.send(textMessage);

            System.out.println("消息已经发送"+i);

        }

    /*  User user = new User("ryz");
        ObjectMessage textMessage = session.createObjectMessage(user);
        messageProducer.send(textMessage);*/
        connection.close();

    }
}
生产者在队列为myQueue中放入100条数据,可以到服务器的控制台页面查看,确实存在这一百条数据,现在我们写一个消费者来消费数据
消费者
public class AppConsumer {
private static final String url "tcp://127.0.0.1:61616";
   // private static final String url "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true";//
用来做集群

    private static final String queueName "myQueue";

    public static void main(String[] ars) throws JMSException {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

        Connection connection = connectionFactory.createConnection();

        connection.start();

        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

        Destination destination = session.createQueue(queueName);// 目标
        // 生产者
        MessageConsumer messageConsumer = session.createConsumer(destination);

        messageConsumer.setMessageListener(new MessageListener() {

            public void onMessage(Message message) {

                TextMessage textMessage = (TextMessage) message;

                try {

                    System.out.println("收到消息"+textMessage.getText());

                } catch (JMSException e) {

                    e.printStackTrace();

                }

            }
        });
       // connection.close();// 注意关闭连接的时机

    }
}
消费者在消费数据的是一个异步的,所以不能立马关闭连接
现在我们做一个集群的操作
三台集群的关系如下



A,B,C 三台服务器,AB,AC的关系是消息同步,BC与数据库进行持久化,



kahadb 文件夹是共享数据存储器
在activemq 的配置文件activemq.xml 中配置
nodea: 在activemq.xml 中配置
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<networkConnectors>
                        <networkConnector name="local_network" uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)"/>
                </networkConnectors>
在jetty.xml  配置 <property name="port" value="8161"/>
nodeb: 在activemq.xml 中配置
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<networkConnectors>
                        <networkConnector name="local_a" uri="static:(tcp://127.0.0.1:61616)"/>
                </networkConnectors>
<persistenceAdapter>
            <kahaDB directory="/ActiveMQ/activemq/kahadb"/>
        </persistenceAdapter>
在jetty.xml  配置 <property name="port" value="8162"/>
nodec: 在activemq.xml 中配置
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<networkConnectors>
                        <networkConnector name="local_b" uri="static:(tcp://127.0.0.1:61616)"/>
                </networkConnectors>
<persistenceAdapter>
            <kahaDB directory="/ActiveMQ/activemq/kahadb"/>
        </persistenceAdapter>
在jetty.xml  配置 <property name="port" value="8163"/>

好了三个服务都配置好了,现在启动三个服务。然后依次登录对应的服务控制台,可以看到a是可以访问,b也是可以访问的,但是c无法访问,因为c在等到共享资源的锁,如果得到锁,他就会立马启动。现在我们把b关闭,看看c是不是启动了?
好了,现在我们用程序来测试我们的服务集群,首先用生产者生产100数据,可以看到c中没有数据,因为c没有启动嘛,现在把b关闭了,再看c中有没有数据?当然有啦。a中自然没有,因为我们没有把a当发送服务器。现在启动消费者,看看数据是不是被消费掉了,当然是消费掉了啊。
好啦,一个简单的服务器集群做好了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: