您的位置:首页 > 编程语言 > Java开发

集群与负载均衡系列(4)——消息队列之Rabbitmq的搭建

2017-05-10 16:00 681 查看
前面的三篇文章介绍了共享session,从这篇文章开始介绍消息队列,这里用的是Rabbitmq。对于Rabbitmq的一些基本概念,不打算在这里总结了。因为网上有大把总结的不错的文章,比如点击打开链接

这篇文章介绍Rabbitmq的安装。

下载安装erlang

由于rabbitmq是基于erlang的,因此先下载安装erlang。我这里下载的版本为erlang-18.1-1.el6.x86_64.rpm

rpm -ihv erlang-18.1-1.el6.x86_64.rpm

下载安装Rabbitmq

一定要下载和erlang对应版本的Rabbitmq,对于erlang-18.1-1.el6.x86_64的对应Rabbitmq为rabbitmq-server-3.5.6-1.noarch

rpm -ihv rabbitmq-server-3.5.6-1.noarch.rpm

启动服务

/sbin/service rabbitmq-server start



添加用户

./rabbitmqctl add_user admin nmamtf



授权

./rabbitmqctl set_user_tags admin administrator



开启浏览器管理

./rabbitmq-plugins enable

./rabbitmq-plugins enable rabbitmq_management



在浏览器中管理

地址为: ip:15672/#/



用户管理



权限是安装virtual hosts为粒度进行管理的

到此为止,RabbitMq已经搭建完成

测试代码

接下来,我们写点简单的代码来测试一下

生产端发送信息

1、创建rabbitmq服务连接

2、创建连接队列的channel

3、创建队列

4、发送消息

5、关闭连接和channel

package com.wlf.demo;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;

import com.rabbitmq.client.Channel;

public class SendTest {

private static String QUEUE_NAME="test";

public static void main(String[] argv) throws Exception{
//create connection
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.58.144");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("nmamtf");
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
//create channel
Channel channel = connection.createChannel();
//create queue
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//finally
channel.close();
connection.close();
}

}
这个时候,我们能看到test队列里有一条待处理的消息



消费端处理消息

1、创建rabbitmq服务连接

2、建立到队列的channel

3、处理消息的回调

4、处理消息

package com.wlf.demo;

import java.io.IOException;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;

public class RecvTest {

private static String QUEUE_NAME="test";

public static void main(String[] argv) throws Exception{

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.58.140");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("nmamtf");
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages.");

Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);

}

}


同样,我们可以看到消息被处理了

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java web RabbitMq
相关文章推荐