您的位置:首页 > 编程语言 > Java开发

rabbitmq基本概念以及和spring的结合

2017-11-27 17:55 357 查看
今天好困,随便写写


RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现,简单介绍一下RabbitMQ的几个概念

broker:简单来说就是消息队列服务器实体

Exchange:消息交换机,指定消息按什么规则路由到哪个队列

Queue:消息队列载体,每一个消息都会被投入到一个或者多个队列

Binding:绑定,它的作用就是把Exchange和Queue按照路由规则进行绑 定

Routing key :路由关键字,exchange根据关键字进行消息投递

channel:消息通道,在客户端的每一个连接里面,可建立多个channel,每一个channel代表一个消息任务

消息队列的使用过程大概如下

(1):客户端连接消息队列服务器,打开一个channel;

(2):定义队列,并设置相关属性

(3):声明一个exchange,并设置相关属性

(4):客户端使用routing key,在exchange和queue之间建立绑定关系

(5):客户端投递消息到exchange

exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。

exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列

exchange类似于通信网络中的交换机,提供消息路由策略。rabbitmq中,producer不是通过信道直接将信息发送给queue,而是先发给exchange。一个exchange可以和多个queue队列绑定,producer在传递消息的时候,会传递一个routing_key,exchange会根据这个routing_key按照特定的路由算法,将消息路由给指定的Queue。exchange和queue一样可以设置为永久化,临时或者自动删除

exchange可以有四种类型

Direct

直接交换器,工作方式类似于单播,exchange会将消息发送到完全匹配的 routing_key的队列里

Fanout

广播式交换器,不管消息的routing_key设置为什么,exchange都会将消息转发给所有绑定的queue

Topic

主题交换器,工作方式类似于组播,exchange会将消息转发和routing_key匹配模式相同的所有队列

Headers

消息体的header匹配(ignore)

rabbitMQ.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd"> <!--配置connection-factory,指定连接rabbit server参数 -->
<rabbit:connection-factory id="connectionFactory" virtual-host="vhostSJJ"
username="shijunjie" password="wssjj123" host="123.206.228.200" port="5672"
/>
<!-- <rabbit:connection-factory id="connectionFactory"
username="test2" password="test2"
host="123.206.228.200" port="5672" /> -->

<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
<rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />

<!--定义queue -->
<rabbit:queue name="queueTest" durable="true" auto-delete="false"
exclusive="false" declared-by="connectAdmin" />

<!-- 定义direct exchange,绑定queueTest -->
<rabbit:direct-exchange name="exchangeTest"
durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="queueTest" key="queueTestKey"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>

<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
exchange="exchangeTest" />

<!-- 消息接收者 -->
<bean id="messageReceiver" class="me.shijunjie.consumer.MessageConsumer"></bean>

<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
<rabbit:listener-container
connection-factory="connectionFactory">
<rabbit:listener queues="queueTest" ref="messageReceiver" />
</rabbit:listener-container>

<!--定义queue -->
<rabbit:queue name="queueChris" durable="true"
auto-delete="false" exclusive="false" declared-by="connectAdmin" />

<!-- 定义direct exchange,绑定queueTest -->
<rabbit:direct-exchange name="exchangeTest"
durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="queueChris" key="queueTestChris"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>

<!-- 消息接收者 -->
<bean id="receiverChris" class="me.shijunjie.consumer.ChrisConsumer"></bean>

<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
<rabbit:listener-container
connection-factory="connectionFactory">
<rabbit:listener queues="queueChris" ref="receiverChris" />
</rabbit:listener-container>

<!-- 分隔线 -->
<!--配置connection-factory,指定连接rabbit server参数 -->
<rabbit:connection-factory id="connectionFactory2" virtual-host="vhostSJJ"
username="shijunjie" password="wssjj123" host="123.206.228.200" port="5672" />

<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
<rabbit:admin id="connectAdmin2" connection-factory="connectionFactory2" />

<!--定义queue -->
<rabbit:queue name="queueShijj" durable="true"
auto-delete="false" exclusive="false" declared-by="connectAdmin2" />

<!-- 定义direct exchange,绑定queueTest -->
<rabbit:topic-exchange name="exchangeTest2"
durable="true" auto-delete="false" declared-by="connectAdmin2">
<rabbit:bindings>
<rabbit:binding queue="queueShijj" pattern="shijj.#"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>

<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="amqpTemplate2" connection-factory="connectionFactory2"
exchange="exchangeTest2" />

<!-- 消息接收者 -->
<bean id="recieverShijj" class="me.shijunjie.consumer.ShijjConsumer"></bean>

<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
<rabbit:listener-container
connection-factory="connectionFactory2">
<rabbit:listener queues="queueShijj" ref="recieverShijj" />
</rabbit:listener-container>
</beans>


application.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd"> 
<import resource="classpath*:rabbitMQ.xml" />

<!-- 扫描指定package下所有带有如@controller,@services,@resource,@ods并把所注释的注册为Spring Beans -->
<context:component-scan base-package="me.shijunjie.consumer, me.shijunjie.producer" />

<!-- 激活annotation功能 -->
<context:annotation-config />
<!-- 激活annotation功能 -->
<context:spring-configured />

</beans>


consumer

package me.shijunjie.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class ChrisConsumer implements MessageListener {
private Logger logger = LoggerFactory.getLogger(ChrisConsumer.class);
@Override
public void onMessage(Message message) {
logger.info("chris receive message------->:{}", message);
}

}


producer

package me.shijunjie.producer;

import java.io.IOException;

import javax.annotation.Resource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Service;

@Service
public class MessageProducer {
private Logger logger = LoggerFactory.getLogger(MessageProducer.class);

@Resource(name="amqpTemplate")
private AmqpTemplate amqpTemplate;

@Resource(name="amqpTemplate2")
private AmqpTemplate amqpTemplate2;

public void sendMessage(Object message) throws IOException {
logger.info("to send message:{}", message);
amqpTemplate.convertAndSend("queueTestKey", message);
amqpTemplate.convertAndSend("queueTestChris", message);
amqpTemplate2.convertAndSend("shijj.xxxx.wsdwd", message);
}
}


参考文档资料:RabbitMQ (三) 发布/订阅
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: