您的位置:首页 > 编程语言 > Java开发

Java SpringBoot集成RabbitMq实战和总结

2018-12-16 19:07 429 查看

目录

交换器、队列、绑定的声明
关于消息序列化
同一个队列多消费类型
注解将消息和消息头注入消费者方法
关于消费者确认
关于发送者确认模式
消费消息、死信队列和RetryTemplate
RPC模式的消息(不常用)
关于消费模型
关于RabbitMq客户端的线程模型
在公司里一直在用RabbitMQ,由于api已经封装的很简单,关于RabbitMQ本身还有封装的实现没有了解,最近在看RabbitMQ实战这本书,结合网上的一些例子和spring文档,实现了RabbitMQ和spring的集成,对着自己平时的疑惑做了一些总结。
关于RabbitMQ基础不在详细讲解(本文不适合RabbitMq零基础),RabbitMQ实战的1,2,4三章讲的非常不错。因为书中讲的都是Python和Php的例子,所以自己结合SpringBoot文档和朱小厮的博客做了一些总结,写了一些Springboot的例子。

交换器、队列、绑定的声明

SpringAMQP项目对RabbitMQ做了很好的封装,可以很方便的手动声明队列,交换器,绑定。如下:

/**
* 队列
* @return
*/
@Bean
@Qualifier(RabbitMQConstant.PROGRAMMATICALLY_QUEUE)
Queue queue() {
return new Queue(RabbitMQConstant.PROGRAMMATICALLY_QUEUE, false, false, true);
}

/**
* 交换器
* @return
*/
@Bean
@Qualifier(RabbitMQConstant.PROGRAMMATICALLY_EXCHANGE)
TopicExchange exchange() {
return new TopicExchange(RabbitMQConstant.PROGRAMMATICALLY_EXCHANGE, false, true);
}
/**
* 声明绑定关系
* @return
*/
@Bean
Binding binding(@Qualifier(RabbitMQConstant.PROGRAMMATICALLY_EXCHANGE) TopicExchange exchange,
@Qualifier(RabbitMQConstant.PROGRAMMATICALLY_QUEUE) Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with(RabbitMQConstant.PROGRAMMATICALLY_KEY);
}

/**
* 声明简单的消费者,接收到的都是原始的{@link Message}
*
* @param connectionFactory
*
* @return
*/
@Bean
SimpleMessageListenerContainer simpleContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setMessageListener(message -> log.info("simple receiver,message:{}", message));
container.setQueueNames(RabbitMQConstant.PROGRAMMATICALLY_QUEUE);
return container;
}

消费者和生产者都可以声明,交换器这种一般经常创建,可以手动创建。需要注意对于没有路由到队列的消息会被丢弃。

如果是Spring的话还需要声明连接:

@Bean
ConnectionFactory connectionFactory(@Value("${spring.rabbitmq.port}") int port,
@Value("${spring.rabbitmq.host}") String host,
@Value("${spring.rabbitmq.username}") String userName,
@Value("${spring.rabbitmq.password}") String password,
@Value("${spring.rabbitmq.publisher-confirms}") boolean isConfirm,
@Value("${spring.rabbitmq.virtual-host}") String vhost) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setVirtualHost(vhost);
connectionFactory.setPort(port);
connectionFactory.setUsername(userName);
connectionFactory.setPassword(password);
connectionFactory.setPublisherConfirms(isConfirm);
}

在配置类使用@EnableRabbit的情况下,也可以基于注解进行声明,在Bean的方法上加上@RabbitListener,如下:

/**
* 可以直接通过注解声明交换器、绑定、队列。但是如果声明的和rabbitMq中已经存在的不一致的话
* 会报错便于测试,我这里都是不使用持久化,没有消费者之后自动删除
* {@link RabbitListener}是可以重复的。并且声明队列绑定的key也可以有多个.
*
* @param headers
* @param msg
*/
@RabbitListener(
bindings = @QueueBinding(
exchange = @Exchange(value = RabbitMQConstant.DEFAULT_EXCHANGE, type = ExchangeTypes.TOPIC,
durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT),
value = @Queue(value = RabbitMQConstant.DEFAULT_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT,
autoDelete = RabbitMQConstant.true_CONSTANT),
key = DKEY
),
//手动指明消费者的监听容器,默认Spring为自动生成一个SimpleMessageListenerContainer
containerFactory = "container",
//指定消费者的线程数量,一个线程会打开一个Channel,一个队列上的消息只会被消费一次(不考虑消息重新入队列的情况),下面的表示至少开启5个线程,最多10个。线程的数目需要根据你的任务来决定,如果是计算密集型,线程的数目就应该少一些
concurrency = "5-10"
)
public void process(@Headers Map<String, Object> headers, @Payload ExampleEvent msg) {
log.info("basic consumer receive message:{headers = [" + headers + "], msg = [" + msg + "]}");
}

/**
* {@link Queue#ignoreDeclarationExceptions}声明队列会忽略错误不声明队列,这个消费者仍然是可用的
*
* @param headers
* @param msg
*/
@RabbitListener(queuesToDeclare = @Queue(value = RabbitMQConstant.DEFAULT_QUEUE, ignoreDeclarationExceptions = RabbitMQConstant.true_CONSTANT))
public void process2(@Headers Map<String, Object> headers, @Payload ExampleEvent msg) {
log.info("basic2 consumer receive message:{headers = [" + headers + "], msg = [" + msg + "]}");
}

关于消息序列化

这个比较简单,默认采用了Java序列化,我们一般使用的Json格式,所以配置了Jackson,根据自己的情况来,直接贴代码:

@Bean
MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}

同一个队列多消费类型

如果是同一个队列多个消费类型那么就需要针对每种类型提供一个消费方法,否则找不到匹配的方法会报错,如下:

[p]@Component[url=http://blog.51cto.com/14084556/mailto:br/>@Slf4j@Slf4j)
)
@Profile(SpringConstant.MULTIPART_PROFILE))
)
@Profile(SpringConstant.MULTIPART_PROFILE))
)
@SendTo(]br/>)
)
@SendTo("queue.reply.s")
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java 日志 添加