您的位置:首页 > 编程语言 > Java开发

RabbitMQ安装配置以及java的连接配置demo

2017-08-15 17:52 435 查看
一、在linux环境下安装
  1、安装Erlang环境(RabbitMQ需要Erlang环境支持)
cd /usr/local/src/ 
mkdir rabbitmq 
cd rabbitmq
wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm 
rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
rpm --import http://packages.erlang-solutions.com/rpm/erlang_solutions.asc
sudo yum install erlang
依次输入上述命令,安装Erlang环境
 
2、安装RabbitMq
下载:
 wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.4.1/rabbitmq-server-3.4.1-1.noarch.rpm
 安装:    rpm -ivh rabbitmq-server-3.4.1-1.noarch.rpm
3、RabbitMQ服务的启动
     service rabbitmq-server start
4、设置开机自启动
     chkconfig rabbitmq-server on
5、配置配置文件
       cd /etc/rabbitmq
       cp/usr/share/doc/rabbitmq-server-3.4.1/rabbitmq.config.example /etc/rabbitmq/
       mvrabbitmq.config.example rabbitmq.config
6、开启用户远程访问
vi /etc/rabbitmq/rabbitmq.config



注意要去掉后面的逗号。
7、开启web界面管理工具
rabbitmq-plugins enable rabbitmq_management
service rabbitmq-server restart
8、在防火墙开启端口(15672和5672)
9、在浏览器输入ip:15672就可以访问web管理界面
 
 
备注:默认有guest用户,但是guest用户只能在localhost下能使用,远程访问需新建用户
1、新建用户:rabbitmqctl add_user username password
2、赋予用户角色标签:rabbitmqctl set_user_tags username administrator
然后在浏览器打开web管理界面就可以用该用户名和密码登录,
           登录后点击上方的  admin ,点击下方列表中新建的用户名,在打开的页面中点击    setpermissions  按钮,给角色赋予权限,
           至此,所有的配置结束

 
二、java连接配置demo
1、引入java包依赖
     <dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.0.4</version>
</dependency>

<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>

编写类文件:
  抽象类EndPoint:
package com.RabbitMQ;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public abstract class EndPoint{

protected Channel channel;
protected Connection connection;
protected String endPointName;

public EndPoint(String endPointName) throws IOException{
this.endPointName = endPointName;

//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置host
factory.setHost("192.168.10.43");
factory.setPort(5672);
factory.setUsername("xjiuge");
factory.setPassword("123456");
factory.setVirtualHost("/");
factory.setConnectionTimeout(15000);
//获得连接
connection = factory.newConnection();
//创建通道
channel = connection.createChannel();
//为这个通道申明一个队列,如果这个队列不存在,他将在服务器上创建
channel.queueDeclare(endPointName, false, false, false, null);
}

/**
* 关闭channel和connection
* @throws IOException
*/
public void close() throws IOException{
this.channel.close();
this.connection.close();
}
}

生产者类:
package com.RabbitMQ;
import java.io.IOException;
import java.io.Serializable;
import org.apache.commons.lang.SerializationUtils;
public class Producer extends EndPoint {
public Producer(String endPointName) throws IOException {
super(endPointName);
}
public void sendMessage(Serializable obj) throws IOException{
channel.basicPublish("", endPointName, null, SerializationUtils.serialize(obj));
}
}

消费者类:
package com.RabbitMQ;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.SerializationUtils;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
public class QueneConsemer extends EndPoint implements Runnable, Consumer {
public QueneConsemer(String endPointName) throws IOException {
super(endPointName);
}
public void run() {
try {
//开启接收信息,自动确认
channel.basicConsume(endPointName, true,this);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 当用户注册时调用
*/
public void handleCancelOk(String consumerTag) {
System.out.println("Consumer "+consumerTag+" registered");
}
/**
* 当有可用新消息时调用
*/
public void handleDelivery(String consumerTag, Envelope env,
BasicProperties props, byte[] body) throws IOException {
Map map = (HashMap)SerializationUtils.deserialize(body);
System.out.println("Message Number "+map.get("message number")+"received.");
}
public void handleCancel(String consumerTag){}
public void handleConsumeOk(String consumerTag) {}
public void handleRecoverOk(String consumerTag) {}
public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {}
}

测试类:
package com.RabbitMQ;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
4000

import org.apache.commons.lang.SerializationUtils;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
public class QueneConsemer extends EndPoint implements Runnable, Consumer {
public QueneConsemer(String endPointName) throws IOException {
super(endPointName);
}
public void run() {
try {
//开启接收信息,自动确认
channel.basicConsume(endPointName, true,this);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 当用户注册时调用
*/
public void handleCancelOk(String consumerTag) {
System.out.println("Consumer "+consumerTag+" registered");
}
/**
* 当有可用新消息时调用
*/
public void handleDelivery(String consumerTag, Envelope env,
BasicProperties props, byte[] body) throws IOException {
Map map = (HashMap)SerializationUtils.deserialize(body);
System.out.println("Message Number "+map.get("message number")+"received.");
}
public void handleCancel(String consumerTag){}
public void handleConsumeOk(String consumerTag) {}
public void handleRecoverOk(String consumerTag) {}
public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {}
}

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  rabbitmq