Java SpringBoot集成RabbitMq实战和总结
2018-09-27 18:01
1096 查看
目录
交换器、队列、绑定的声明
关于消息序列化
同一个队列多消费类型
注解将消息和消息头注入消费者方法
关于消费者确认
关于发送者确认模式
消费消息、死信队列和RetryTemplate
RPC模式的消息(不常用)
关于消费模型
关于RabbitMq客户端的线程模型
在公司里一直在用RabbitMQ,由于api已经封装的很简单,关于RabbitMQ本身还有封装的实现没有了解,最近在看RabbitMQ实战这本书,结合网上的一些例子和spring文档,实现了RabbitMQ和spring的集成,对着自己平时的疑惑做了一些总结。
交换器、队列、绑定的声明
SpringAMQP项目对RabbitMQ做了很好的封装,可以很方便的手动声明队列,交换器,绑定。如下:
/** * 队列 * @return */ @Bean @Qualifier(RabbitMQConstant.PROGRAMMATICALLY_QUEUE) Queue queue() { return new Queue(RabbitMQConstant.PROGRAMMATICALLY_QUEUE, false, false, true); } /** * 交换器 * @return */ @Bean @Qualifier(RabbitMQConstant.PROGRAMMATICALLY_EXCHANGE) TopicExchange exchange() { return new TopicExchange(RabbitMQConstant.PROGRAMMATICALLY_EXCHANGE, false, true); } /** * 声明绑定关系 * @return */ @Bean Binding binding(@Qualifier(RabbitMQConstant.PROGRAMMATICALLY_EXCHANGE) TopicExchange exchange, @Qualifier(RabbitMQConstant.PROGRAMMATICALLY_QUEUE) Queue queue) { return BindingBuilder.bind(queue).to(exchange).with(RabbitMQConstant.PROGRAMMATICALLY_KEY); } /** * 声明简单的消费者,接收到的都是原始的{@link Message} * * @param connectionFactory * * @return */ @Bean SimpleMessageListenerContainer simpleContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setMessageListener(message -> log.info("simple receiver,message:{}", message)); container.setQueueNames(RabbitMQConstant.PROGRAMMATICALLY_QUEUE); return container; }
消费者和生产者都可以声明,交换器这种一般经常创建,可以手动创建。需要注意对于没有路由到队列的消息会被丢弃。
如果是Spring的话还需要声明连接:
@Bean ConnectionFactory connectionFactory(@Value("${spring.rabbitmq.port}") int port, @Value("${spring.rabbitmq.host}") String host, @Value("${spring.rabbitmq.username}") String userName, @Value("${spring.rabbitmq.password}") String password, @Value("${spring.rabbitmq.publisher-confirms}") boolean isConfirm, @Value("${spring.rabbitmq.virtual-host}") String vhost) { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setVirtualHost(vhost); connectionFactory.setPort(port); connectionFactory.setUsername(userName); connectionFactory.setPassword(password); connectionFactory.setPublisherConfirms(isConfirm); }
在配置类使用@EnableRabbit的情况下,也可以基于注解进行声明,在Bean的方法上加上@RabbitListener,如下:
/** * 可以直接通过注解声明交换器、绑定、队列。但是如果声明的和rabbitMq中已经存在的不一致的话 * 会报错便于测试,我这里都是不使用持久化,没有消费者之后自动删除 * {@link RabbitListener}是可以重复的。并且声明队列绑定的key也可以有多个. * * @param headers * @param msg */ @RabbitListener( bindings = @QueueBinding( exchange = @Exchange(value = RabbitMQConstant.DEFAULT_EXCHANGE, type = ExchangeTypes.TOPIC, durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT), value = @Queue(value = RabbitMQConstant.DEFAULT_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT), key = DKEY ), //手动指明消费者的监听容器,默认Spring为自动生成一个SimpleMessageListenerContainer containerFactory = "container", //指定消费者的线程数量,一个线程会打开一个Channel,一个队列上的消息只会被消费一次(不考虑消息重新入队列的情况),下面的表示至少开启5个线程,最多10个。线程的数目需要根据你的任务来决定,如果是计算密集型,线程的数目就应该少一些 concurrency = "5-10" ) public void process(@Headers Map<String, Object> headers, @Payload ExampleEvent msg) { log.info("basic consumer receive message:{headers = [" + headers + "], msg = [" + msg + "]}"); } /** * {@link Queue#ignoreDeclarationExceptions}声明队列会忽略错误不声明队列,这个消费者仍然是可用的 * * @param headers * @param msg */ @RabbitListener(queuesToDeclare = @Queue(value = RabbitMQConstant.DEFAULT_QUEUE, ignoreDeclarationExceptions = RabbitMQConstant.true_CONSTANT)) public void process2(@Headers Map<String, Object> headers, @Payload ExampleEvent msg) { log.info("basic2 consumer receive message:{headers = [" + headers + "], msg = [" + msg + "]}"); }
关于消息序列化
这个比较简单,默认采用了Java序列化,我们一般使用的Json格式,所以配置了Jackson,根据自己的情况来,直接贴代码:
@Bean MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); }
同一个队列多消费类型
如果是同一个队列多个消费类型那么就需要针对每种类型提供一个消费方法,否则找不到匹配的方法会报错,如下:
)
@Profile(SpringConstant.MULTIPART_PROFILE))
)
@Profile(SpringConstant.MULTIPART_PROFILE))
)
@SendTo(]br/>)
)
@SendTo("queue.reply.s")
相关文章推荐
- Java SpringBoot集成RabbitMq实战和总结
- Java【SpringBoot实战—微信点餐系统学习总结】
- Spring Boot 集成 RabbitMQ 实战
- Spring BOOT 集成 RabbitMq 实战操作(一)
- Spring Boot 集成 RabbitMq 实战操作(二)
- SpringBoot+Maven项目实战(4):集成Mybatis
- spring-boot 集成 rabbitmq
- Java Web开发之集成Spring Boot
- SpringBoot项目实战--RabbitMQ
- 85. Spring Boot集成RabbitMQ【从零开始学Spring Boot】
- spring boot集成tk.mybatis3.4.0通用mapper, java.lang.ClassCastException,实体不能转换为实体
- Springboot集成RabbitMq
- RabbitMQ的Java应用(3) -- 使用spring-boot-starter-amqp开发生产者应用
- windows环境下springboot集成phoenix时报如下异常: java.io.IOException: Could not locate executable null\bin\winut
- springboot中rabbitmq集成——单项目
- SpringBoot+Maven项目实战(2):集成SpringBoot
- spring boot实战(第十二篇)整合RabbitMQ
- springboot中rabbitmq集成——多项目
- Spring boot集成RabbitMQ的示例代码
- RabbitMQ-从基础到实战(6)— 与Spring集成