springboot整合rabbitmq实战,topic,direct,fanout模式详情及实例。
springboot整合rabbitmq实战,各类模式详情及实例。
大家好,我是酷酷的韩~
版权声明:本文为博主(酷酷的韩)原创文章,未经博主允许不得转载
一.RabbitMQ简介.
RabbitMQ是采用Erlang语言实现AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的消息中间件,它最初起源于金融系统,用于在分布式系统中存储转发消息。(这是百度来的,简单来说就是用作信息转发的消息中间件,下面将会从简介到实例实战进行分享)
二.什么是消息中间件?
1.消息中间件是消息传输过程中保存消息的容器,提供路由,保证信息传递,即使消费者出问题,消息仍然保存一直等到被消费,但保存消息时间也是有期限的。
2.采用异步处理方式:发送消息无需等待反应结果。
3.应用程序之间为松耦合关系,发送者和接受者不必去了解对方,只需要确认消息,过分些说,发送者和接收者不必同时在线,各自做自己的事情就可以啦。
三.RabbitMQ使用场景?
1.比如注册账号时需要发短信或者邮箱短信去认证,此时将发送短信或者邮箱短信发入队列中,任去消费即可,这是异步处理得一个例子。
2.比如在电商中,订单和库存。在传统模式中,比如提交个订单,订单会先记录信息然后将信息发到库存,库存处理完然后才能返回结果,这需要一段较长时间(程序眼中),这时采用rabbitmq中间件,生成订单不需要再去将信息发到库存了,只需要抛到队列中,任去消费即可。节省时间,加强用户体验。
3. 流量削峰,在秒杀场景中一般在秒杀活动中应用广泛,在秒杀活动中一般会因为流量过大,导致应用挂掉。为了解决这个问题,一般在应用前端加入消息队列。用户请求,服务器收到后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求跳转到失败页面(这就是大家秒杀失败的原因,哈哈哈哈哈哈)。
四.RabbitMQ概念说明。
Broker:消息队列服务器实体。 Exchange:消息转换机,消息转发通过这个进行,它指定消息按照什么规则,路由到哪个队列。 Queue:消息队列载体。 Binding:作用是将Exchange和Queue按照路由规则绑定起来,形成某种关系。 Routing key:路由关键字,exchange通过这个关键字进行消息投递。 vhost:虚拟主机,一个broker可以开设多个vhost,用作不同用户的权限分离。 producter:消息生产者,可以投递消息的程序。 consumer:消息消费者,接受消息的程序。 channel:消息通道,在客户端每个连接里,可建立多个channel,每个channel代表一个会话任务。
五.RabbitMQ各类模式详解。
1.direct(直连)
直连模式 一对一的绑定关系, Routing key(路由关键字)和Binding key只有完全匹配,才能消费成功。
2.fanout (广播)
发送到该交换机的所有信息都将转发到与该exchange绑定的queue中。
3.topic
在exchange中routing key 和binding key相匹配就可以绑定成功,可以一对一也可一对多。
bingding key 可以存在两种特殊字符,# :匹配一个或多个单词。 *:匹配一个单词。
4.headers
Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
六.RabbitMQ各类模式实例。
1.引入pom文件。
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> </dependency>
2.topic为例(其它的与其模式相同,路由绑定key和绑定key不同)
(1)生产者
发送类:
@Component @Slf4j public class TestTopicSender { /** * 自动注入RabbitTemplate模板类 */ @Autowired private RabbitTemplate rabbitTemplate; /** * 发送消息方法调用: 构建Message消息 * * @param message * @param properties * @throws Exception */ public void send(Object message, Map<String, Object> properties) throws Exception { MessageHeaders mhs = new MessageHeaders(properties); Message msg = MessageBuilder.createMessage(message, mhs); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); //id + 时间戳 全局唯一 CorrelationData correlationData = new CorrelationData(System.currentTimeMillis() + ""); rabbitTemplate.convertAndSend(TestTopicConfig.EXCHANGE, TestTopicConfig.ROUTINGKEY_V1, msg, correlationData); } /** * 回调函数: confirm确认 */ final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("correlationData:{}; ack:{}; cause:{}", correlationData, ack, cause); if (!ack) { log.error("回调函数: confirm确认异常correlationData:{}; ack:{}; cause:{}", correlationData, ack, cause); } } }; /** * 回调函数: return返回 */ final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("return exchange: " + exchange + ", routingKey: " + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText); } };
配置类:
//交换机名称 public final static String EXCHANGE = "test-Exchanges"; //路由key public final static String ROUTINGKEY_V1 = "test-V1.hjq"; public final static String ROUTINGKEY_V2 = "test-V2.hjq"; //队列名称 public final static String QUEUE = "queue-test"; //是否持久化 public final static String DERABLE = "true"; //消息路由规则 public final static String TYPE = "topic"; // 忽略声明异常 public final static String IGNOREDECEXCEPTION = "true"; /** * 绑定的路由键或模式。 * *(星号):可以(只能)匹配一个单词 * #(井号):可以匹配多个单词(或者零个) */ public final static String KEY_V1 = "test-V1.#"; public final static String KEY_V2 = "test-V2.#";
定时器发送:
@Service @Slf4j public class TestScheduleService { @Autowired private TestTopicSender testTopicSender; @Scheduled(cron = "0/10 * * * * ? ") public void test() { System.out.println("11"); TestVo testVo = new TestVo(); testVo.setUuid(StrUtil.genUUID()); try { testTopicSender.send(JSONObject.toJSONString(testVo), null); } catch (Exception e) { log.error(e.getMessage(), e); } } }
(2)消费者
@Component @Slf4j public class TestCustomerRecever { @Autowired @Qualifier(value = "dealDataLogTask") private DealDataLogTask dealDataLogTask; @RabbitListener(bindings = @QueueBinding( value = @Queue(value = TestTopicConfig.QUEUE,//队列名称 durable = TestTopicConfig.DERABLE),//是否持久化 exchange = @Exchange(value = TestTopicConfig.EXCHANGE, //交换机名称 durable = TestTopicConfig.DERABLE,//是否持久化o type = TestTopicConfig.TYPE, //消息路由规则 ignoreDeclarationExceptions = TestTopicConfig.IGNOREDECEXCEPTION), // 忽略声明异常 key = TestTopicConfig.KEY_V1 // 绑定的路由键或模式。 ) ) @RabbitHandler public void onMessage(Message message, Channel channel) throws Exception { log.info("消费端Payload: " + message.getPayload()); Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); //TODO 处理业务逻辑 String msg = (String) message.getPayload(); dealTestMsg(msg); //手工ACK channel.basicAck(deliveryTag, false); } public void dealTestMsg(String text) { TestVo testVo = JSON.parseObject(text, TestVo.class); dealDataLogTask.doTestTask(testVo); }
配置类:
//交换机名称 public final static String EXCHANGE = "test-Exchanges"; //路由key public final static String ROUTINGKEY_V1 = "test-V1.hjq"; public final static String ROUTINGKEY_V2 = "test-V2.hjq"; //队列名称 public final static String QUEUE = "queue-test"; //是否持久化 public final static String DERABLE = "true"; //消息路由规则 public final static String TYPE = "topic"; // 忽略声明异常 public final static String IGNOREDECEXCEPTION = "true"; /** * 绑定的路由键或模式。 * *(星号):可以(只能)匹配一个单词 * #(井号):可以匹配多个单词(或者零个) */ public final static String KEY_V1 = "test-V1.#"; public final static String KEY_V2 = "test-V2.#";
针对direct模式,在send类中将生产者和消费者路由绑定的key一致,fanout中在同一个exchange中即可。酷酷的韩一直在,有问题留言即可,我会第一时间看到回复的。
你必须成功,因为你不能失败。------酷酷的韩
- Spring Boot整合RabbitMQ实例(Topic模式)
- spring boot整合RabbitMQ(Topic模式)
- spring boot整合RabbitMQ(Topic模式)
- spring boot整合RabbitMQ(Direct模式)
- spring boot整合RabbitMQ实例详解(Fanout模式)
- spring boot整合RabbitMQ(Direct模式)
- Spring Boot 整合 RabbitMQ 之 Topic转发模式 (二)
- spring boot整合RabbitMQ(Direct模式)
- Spring Boot 整合 RabbitMQ 之 Fanout Exchange模式 (三)
- springboot整合activemq案例,queue,topic两种模式
- spring-boot | rabbitMq-Direct模式
- Spring Boot整合RabbitMQ实例
- spring boot整合activeMQ,实现ptp和topic两者消息模式
- spring boot整合RabbitMQ(Fanout模式)
- spring boot整合RabbitMQ(Fanout模式)
- spring boot实战(番外篇)整合RabbitMQ
- SpringBoot整合RabbitMQ之典型应用场景实战二
- spring集成rabbitMq(基于topic和fanout模式)
- spring boot实战(第十二篇)整合RabbitMQ
- RabbitMQ 实战(二)Spring Boot 整合 RabbitMQ