您的位置:首页 > 编程语言 > Java开发

springboot RabbitMq的安装以及使用

2018-02-07 11:11 841 查看
安装RabbitMQ

 (1):下载erlang,原因在于RabbitMQ服务端代码是使用并发式语言erlang编写的,下载地址:http://www.erlang.org/downloads,双击.exe文件进行安装就好,安装完成之后创建一个名为ERLANG_HOME的环境变量,其值指向erlang的安装目录,同时将%ERLANG_HOME%\bin加入到Path中,最后打开命令行,输入erl,如果出现erlang的版本信息就表示erlang语言环境安装成功;

 (2):下载RabbitMQ,下载地址:http://www.rabbitmq.com/,同样双击.exe进行安装就好(这里需要注意一点,默认的安装目录是C:/Program
Files/....,这个目录中是存在空格符的,我们需要改变安装目录,貌似RabbitMQ安装目录中是不允许有空格的,我之前踩过这个大坑);

 
(3):安装RabbitMQ-Plugins,这个相当于是一个管理界面,方便我们在浏览器界面查看RabbitMQ各个消息队列以及exchange的工作情况,安装方法是:打开命令行cd进入rabbitmq的sbin目录(我的目录是:E:\software\rabbitmq\rabbitmq_server-3.6.5\sbin),输入:rabbitmq-plugins
enable rabbitmq_management命令,稍等会会发现出现plugins安装成功的提示,默认是安装6个插件,如果你在安装插件的过程中出现了下面的错误:

解决方法
4000
是:首先在命令行输入:rabbitmq-service stop,接着输入rabbitmq-service remove,再接着输入rabbitmq-service install,接着输入rabbitmq-service start,最后重新输入rabbitmq-plugins
enable rabbitmq_management试试,我是这样解决的;还有可能是要使用管理员权限的cmd进行操作;

   (4):插件安装完之后,在浏览器输入http://localhost:15672进行验证,你会看到界面,输入用户名:guest,密码:guest你就可以进入管理界面,当然用户名密码你都可以变的;

如果启动没有成功的话

进入

cmd在rabbitmq_server-3.7.3\sbin>下输入

rabbitmq-server.bat就可以启动服务访问了

了解下RabbitMQ中涉及到的几个概念

producer:消息生产者

    consumer:消息消费者

     virtual host:虚拟主机,在RabbitMQ中,用户只能在虚拟主机的层面上进行一些权限设置,比如我可以访问哪些队列,我可以处理哪些请求等等;

     broker:消息转发者,也就是我们RabbitMQ服务端充当的功能了,那么消息是按照什么规则进行转发的呢?需要用到下面几个概念;

     exchange:交换机,他是和producer直接进行打交道的,有点类似于路由器的功能,主要就是进行转发操作的呗,那么producer到底用哪个exchange进行路由呢?这个取决于routing key(路由键),每个消息都有这个键,我们也可以自己设定,其实就是一字符串;

     queue:消息队列,用于存放消息,他接收exchange路由过来的消息,我们可以对队列内容进行持久化操作,那么queue到底接收那个exchange路由的消息呢?这个时候就要用到binding key(绑定键)了,绑定键会将队列和exchange进行绑定,至于绑定方式,RabbitMQ提供了多种方式,大家可以看看鸿洋大神的RabbitMQ博客系列(点击查看);

     以上就是RabbitMQ涉及到的一些概念了,用一张图表示这些概念之间的关系就是:



RabbitMQ简单使用(单独使用demo)

producer(生产者)端步骤:

    (1):创建ConnectionFactory,并且设置一些参数,比如hostname,portNumber等等

    (2):利用ConnectionFactory创建一个Connection连接

    (3):利用Connection创建一个Channel通道

    (4):创建queue并且和Channel进行绑定

    (5):创建消息,并且发送到队列中

     注意,在我们当前的例子中,并没有用到exchange交换机,RabbitMQ默认情况下是会创建一个空字符串名字的exchange的,如果我们没有创建自己的exchange的话,默认就是使用的这个exchange;

     producer端代码:

[java] view
plain copy

public class Sender {  

    private final static String QUEUE_NAME = "MyQueue";  

      

    public static void main(String[] args) {  

        send();  

    }  

      

    public static void send()  

    {  

        ConnectionFactory factory = null;  

        Connection connection = null;  

        Channel channel = null;  

        try {  

            factory = new ConnectionFactory();  

            factory.setHost("localhost");  

            connection = factory.newConnection();  

            channel = connection.createChannel();  

            channel.queueDeclare(QUEUE_NAME, false, false, false, null);  

            String message = "my first message .....";  

            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));  

            System.out.println("已经发送消息....."+message);  

        } catch (IOException e) {  

            e.printStackTrace();  

        } catch (TimeoutException e) {  

            e.printStackTrace();  

        }finally{  

            try {  

                //关闭资源  

                channel.close();  

                connection.close();  

            } catch (IOException e) {  

                e.printStackTrace();  

            } catch (TimeoutException e) {  

                e.printStackTrace();  

            }  

        }  

    }  

}  

     consumer(消费者)端步骤:

     (1):创建ConnectionFactory,并且设置一些参数,比如hostname,portNumber等等

     (2):利用ConnectionFactory创建一个Connection连接

     (3):利用Connection创建一个Channel通道

     (4):将queue和Channel进行绑定,注意这里的queue名字要和前面producer创建的queue一致

     (5):创建消费者Consumer来接收消息,同时将消费者和queue进行绑定

     consumer端代码:

[java] view
plain copy

public class Receiver {  

    private final static String QUEUE_NAME = "MyQueue";  

      

    public static void main(String[] args) {  

        receive();  

    }  

      

    public static void receive()  

    {  

        ConnectionFactory factory = null;  

        Connection connection = null;  

        Channel channel = null;  

          

        try {  

            factory = new ConnectionFactory();  

            factory.setHost("localhost");  

            connection = factory.newConnection();  

            channel = connection.createChannel();  

            channel.queueDeclare(QUEUE_NAME, false, false, false, null);  

            Consumer consumer = new DefaultConsumer(channel){  

                @Override  

                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,  

                        byte[] body) throws IOException {  

                    System.out.println("11111111111");  

                    String message = new String(body, "UTF-8");  

                    System.out.println("收到消息....."+message);  

                }};  

            channel.basicConsume(QUEUE_NAME, true,consumer);  

        } catch (IOException e) {  

            e.printStackTrace();  

        } catch (TimeoutException e) {  

            e.printStackTrace();  

        }finally{  

            try {  

                //关闭资源  

                channel.close();  

                connection.close();  

            } catch (IOException e) {  

                e.printStackTrace();  

            } catch (TimeoutException e) {  

                e.printStackTrace();  

            }  

        }  

    }  



   好了,这篇先到这了,下一篇我会简单介绍点更深入的东西,后续也会对RabbitMQ原生API进行封装,便于我们自己开发;
RabbitMQ和项目整合(整合)

Springboot引入依赖

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>


生产方配置类:

@Configuration
public class RabbitConfig {

/**
* 交换机名称
*/
public static final String EXCHANGE_NAME = "test";

/**
* RoutingKey路由键
*/
public static final String ROUTING_KEY_COMPANY = "company";

/**
*列队名称
*/
public static final String QUEUE_COMP = "comp_queue";

//声明交换机
@Bean
DirectExchange directExchange() {
return new DirectExchange(EXCHANGE_NAME);
}

//声明队列
@Bean
public Queue queue1() {
return new Queue(QUEUE_COMP, true); // true表示持久化该队列
}

//绑定  //交换机和列队绑定需要RoutingKey
@Bean
public Binding binding1() {
return BindingBuilder.bind(queue1()).to(directExchange()).with(ROUTING_KEY_COMPANY);
}

}


生产方发送消息类:

@Component
public class Sender implements RabbitTemplate.ConfirmCallback, ReturnCallback {

@Autowired
private RabbitTemplate rabbitTemplate;

@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息发送成功:" + correlationData);
} else {
System.out.println("消息发送失败:" + cause);
}

}

@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println(message.getMessageProperties().getCorrelationIdString() + " 发送失败");

}

public void send(String routingKey, String msg){

System.out.println("开始发送消息 : " + msg.toLowerCase());
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, routingKey, msg);
//String response = rabbitTemplate.convertSendAndReceive("topicExchange", "key.1", msg, correlationId).toString();
System.out.println("结束发送消息 : " + msg.toLowerCase());
}

public void send(String routingKey, Object object){
send(routingKey, JSON.toJSONString(object));
}
}


消费者消费(由于配置信息可在不同的的项目进行消费):

@Component
public class MsgReceiver {

@RabbitListener(queues = "comp_queue")//列队名称
public void processMessage1(String msg) {
System.out.println(Thread.currentThread().getName() + " 接收到来自comp_queue,队列的消息:" + msg);
}
}


两者都需要配置:

#rabbitmq
spring.rabbitmq.host= 127.0.0.1
spring.rabbitmq.port= 5672
spring.rabbitmq.username= guest
spring.rabbitmq.password= guest


了解更多订阅发布方式可以参考:http://blog.csdn.net/lmj623565791/article/category/2386657
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: