SpringBoot:RabbitMQ-发送取出消息-监听消息-使用Jackson消息转换
2020-06-28 05:08
771 查看
发送消息
-
以Message对象发送消息
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Service; @Service public class MyRabbitMQService { final RabbitTemplate rabbitTemplate; public MyRabbitMQService(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public void sendDirectQueue(String sayMessage) { byte[] bytes = sayMessage.getBytes(); MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentEncoding("UTF-8"); messageProperties.setHeader("Message","这是一个测试消息"); Message message = new Message(bytes, messageProperties); rabbitTemplate.send("exchange.direct","exchange.direct.queue",message); } }
测试:
@Autowired MyRabbitMQService myRabbitMQService; @Test void rabbitService() { myRabbitMQService.sendDirectQueue("Say Hello Direct queue"); }
结果:
-
以自定义对象发送消息
service
public void sendFanout(Object object) { rabbitTemplate.convertAndSend("exchange.fanout","",object); }
测试
注意: 默认使用的是 SimpleMessageConverter 用的 JDK 的序列化;需要实现Serializable接口
如:
public class User implements Serializable {}
@Test void rabbitService() { User user = new User(); user.setId(1); user.setUsername("dead"); user.setPassword("Dead_Password_814"); myRabbitMQService.sendFanout(user); }
结果
取出消息
-
receive(String queueName)
从一个消息队列中取出消息对象(Message)
receive(String queueName, long timeoutMillis)
-
从一个消息队列中取出消息 并 指定超时时间;单位 秒
receiveAndConvert(String queueName)
-
从一个消息队列中取出消息 并 转换 为对象(Object)
receiveAndConvert(String queueName, long timeoutMillis)
-
从一个消息队列中取出消息 并 转换 为对象(Object)并 指定超时时间;单位 秒
如果传入的long小于0时,使用默认的10000(单位 秒)
源码
private static final long DEFAULT_CONSUME_TIMEOUT = 10000;
private Delivery consumeDelivery(Channel channel, String queueName, long timeoutMillis) throws IOException { Delivery delivery = null; RuntimeException exception = null; CompletableFuture<Delivery> future = new CompletableFuture<>(); ShutdownListener shutdownListener = c -> { if (!RabbitUtils.isNormalChannelClose(c)) { future.completeExceptionally(c); } }; channel.addShutdownListener(shutdownListener); ClosingRecoveryListener.addRecoveryListenerIfNecessary(channel); DefaultConsumer consumer = null; try { consumer = createConsumer(queueName, channel, future, timeoutMillis < 0 ? DEFAULT_CONSUME_TIMEOUT : timeoutMillis); if (timeoutMillis < 0) { delivery = future.get(); } else { delivery = future.get(timeoutMillis, TimeUnit.MILLISECONDS); } } catch (ExecutionException e) { Throwable cause = e.getCause(); this.logger.error("Consumer failed to receive message: " + consumer, cause); exception = RabbitExceptionTranslator.convertRabbitAccessException(cause); throw exception; } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (TimeoutException e) { RabbitUtils.setPhysicalCloseRequired(channel, true); } finally { if (consumer != null && !(exception instanceof ConsumerCancelledException) && channel.isOpen()) { cancelConsumerQuietly(channel, consumer); } try { channel.removeShutdownListener(shutdownListener); } catch (Exception e) { // NOSONAR - channel might have closed. } } return delivery; }
示例
@Test public void receiveMessage() { Object o = rabbitTemplate.receiveAndConvert("topic.queue"); System.out.println(o); }
打印结果
User(username=dead, password=Dead_Password_814, id=1)
监听消息
-
启动类加上 @EnableRabbit 注解 :开启基于注解的RabbitMQ
-
Service
@RabbitListener - queues:可写多个消息队列
package com.live.service; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Service; @Service public class MyRabbitMQService { final RabbitTemplate rabbitTemplate; public MyRabbitMQService(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } @RabbitListener(queues = "direct.queue") public void listenerQueue(Object o) { System.out.println("收到消息:" + o); } }
启动web容器
启动后,如果消息队列有消息:
收到消息:( Body:'{"username":"dead","password":"Dead_Password_814","id":1}' MessageProperties [ headers={__TypeId__=com.live.model.User}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=exchange.fanout, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-u6CQedbhTxrETDGPLyfMag, consumerQueue=direct.queue ] )
使用Jackson转换消息
在发送自定义对象时:默认使用JDK序列化器
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class CustomRabbitMQMessageConvert { @Bean public MessageConverter customMessageConvert() { return new Jackson2JsonMessageConverter(); } }
测试
@Test void rabbitService() { User user = new User(); user.setId(1); user.setUsername("dead"); user.setPassword("Dead_Password_814"); myRabbitMQService.sendFanout(user); }
结果
相关文章推荐
- RabbitMQ使用教程(三)如何保证消息99.99%被发送成功?
- .Net下RabbitMQ的使用(2) -- 发送接收消息
- 使用RabbitMQ简单发送接收消息
- 使用HasMap的另一种写法,在RabbitMQ发送消息时,消费方会消费失败
- rabbitmq学习9:使用spring-amqp发送消息及同步接受消息
- shell脚本向端口发送消息并使用netcat监听
- 【SpringBoot MQ 系列】RabbitMq 消息发送基本使用姿势
- rabbitmq学习9:使用spring-amqp发送消息及同步接收消息
- SpringBoot的RabbitMQ消息队列: 一、消息发送接收第一印象
- rabbitmq学习10:使用spring-amqp发送消息及异步接收消息
- rabbitmq学习9:使用spring-amqp发送消息及同步接收消息
- 异常信息:CLR无法从COM 上下文0x645e18 转换为COM上下文0x645f88,这种状态已持续60秒。拥有目标上下文/单元的线程很有可能执行的是非泵式等待或者在不发送 Windows 消息的情况下处理一个运行时间非常长的操作.这种情况通常会影响到性能,甚至可能导致应用程序不响应或者使用的内存随时间不断累积
- sails.js 使用node-cron模块,监听订单状态并发送消息到企业微信
- RabbitMq研究历程一:如何使用RabbitMq发送,接收消息
- 使用spring-rabbit测试RabbitMQ消息确认(发送确认,接收确认)
- rabbitmq学习10:使用spring-ampq发送消息及异步接受消息
- rabbitmq学习10:使用spring-amqp发送消息及异步接收消息
- (unix domain socket)使用udp发送>=128K的消息会报ENOBUFS的错误
- 使用C#在应用程序间发送消息
- 使用JAVA导入某个MSN帐号的好友列表并发送消息