您的位置:首页 > 编程语言 > Java开发

SpringBoot:RabbitMQ-发送取出消息-监听消息-使用Jackson消息转换

2020-06-28 05:08 771 查看
发送消息
  • 以Message对象发送消息

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.stereotype.Service;
    
    @Service
    public class MyRabbitMQService {
    
    final
    RabbitTemplate rabbitTemplate;
    
    public MyRabbitMQService(RabbitTemplate rabbitTemplate) {
    this.rabbitTemplate = rabbitTemplate;
    }
    
    public void sendDirectQueue(String sayMessage) {
    byte[] bytes = sayMessage.getBytes();
    
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setContentEncoding("UTF-8");
    messageProperties.setHeader("Message","这是一个测试消息");
    
    Message message = new Message(bytes, messageProperties);
    
    rabbitTemplate.send("exchange.direct","exchange.direct.queue",message);
    }
    
    }

    测试:

    @Autowired
    MyRabbitMQService myRabbitMQService;
    
    @Test
    void rabbitService() {
    myRabbitMQService.sendDirectQueue("Say Hello Direct queue");
    }

    结果:

  • 以自定义对象发送消息

    service

    public void sendFanout(Object object) {
    rabbitTemplate.convertAndSend("exchange.fanout","",object);
    }

    测试

    注意: 默认使用的是 SimpleMessageConverter 用的 JDK 的序列化;需要实现Serializable接口

    如:

    public class User implements Serializable {}
@Test
void rabbitService() {
User user = new User();
user.setId(1);
user.setUsername("dead");
user.setPassword("Dead_Password_814");

myRabbitMQService.sendFanout(user);
}

结果

取出消息

  • receive(String queueName)

    从一个消息队列中取出消息对象(Message)
  • receive(String queueName, long timeoutMillis)

      从一个消息队列中取出消息 并 指定超时时间;单位 秒
  • receiveAndConvert(String queueName)

      从一个消息队列中取出消息 并 转换 为对象(Object)
  • receiveAndConvert(String queueName, long timeoutMillis)

      从一个消息队列中取出消息 并 转换 为对象(Object)并 指定超时时间;单位 秒

    如果传入的long小于0时,使用默认的10000(单位 秒)

    源码

    private static final long DEFAULT_CONSUME_TIMEOUT = 10000;
    private Delivery consumeDelivery(Channel channel, String queueName, long timeoutMillis) throws IOException {
    Delivery delivery = null;
    RuntimeException exception = null;
    CompletableFuture<Delivery> future = new CompletableFuture<>();
    ShutdownListener shutdownListener = c -> {
    if (!RabbitUtils.isNormalChannelClose(c)) {
    future.completeExceptionally(c);
    }
    };
    channel.addShutdownListener(shutdownListener);
    ClosingRecoveryListener.addRecoveryListenerIfNecessary(channel);
    DefaultConsumer consumer = null;
    try {
    consumer = createConsumer(queueName, channel, future,
    timeoutMillis < 0 ? DEFAULT_CONSUME_TIMEOUT : timeoutMillis);
    if (timeoutMillis < 0) {
    delivery = future.get();
    }
    else {
    delivery = future.get(timeoutMillis, TimeUnit.MILLISECONDS);
    }
    }
    catch (ExecutionException e) {
    Throwable cause = e.getCause();
    this.logger.error("Consumer failed to receive message: " + consumer, cause);
    exception = RabbitExceptionTranslator.convertRabbitAccessException(cause);
    throw exception;
    }
    catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    }
    catch (TimeoutException e) {
    RabbitUtils.setPhysicalCloseRequired(channel, true);
    }
    finally {
    if (consumer != null && !(exception instanceof ConsumerCancelledException) && channel.isOpen()) {
    cancelConsumerQuietly(channel, consumer);
    }
    try {
    channel.removeShutdownListener(shutdownListener);
    }
    catch (Exception e) {
    // NOSONAR - channel might have closed.
    }
    }
    return delivery;
    }

    示例

    @Test
    public void receiveMessage() {
    Object o = rabbitTemplate.receiveAndConvert("topic.queue");
    System.out.println(o);
    }

    打印结果

    User(username=dead, password=Dead_Password_814, id=1)

    监听消息

    • 启动类加上 @EnableRabbit 注解 :开启基于注解的RabbitMQ

    • Service

      @RabbitListener
    • queues:可写多个消息队列
    package com.live.service;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.stereotype.Service;
    
    @Service
    public class MyRabbitMQService {
    
    final
    RabbitTemplate rabbitTemplate;
    
    public MyRabbitMQService(RabbitTemplate rabbitTemplate) {
    this.rabbitTemplate = rabbitTemplate;
    }
    
    @RabbitListener(queues = "direct.queue")
    public void listenerQueue(Object o) {
    System.out.println("收到消息:" + o);
    }
    
    }
  • 启动web容器

    启动后,如果消息队列有消息:

    收到消息:(
    Body:'{"username":"dead","password":"Dead_Password_814","id":1}'
    MessageProperties [
    headers={__TypeId__=com.live.model.User},
    contentType=application/json,
    contentEncoding=UTF-8,
    contentLength=0,
    receivedDeliveryMode=PERSISTENT,
    priority=0,
    redelivered=true,
    receivedExchange=exchange.fanout,
    receivedRoutingKey=,
    deliveryTag=1,
    consumerTag=amq.ctag-u6CQedbhTxrETDGPLyfMag,
    consumerQueue=direct.queue
    ]
    )
  • 使用Jackson转换消息

    在发送自定义对象时:默认使用JDK序列化器

    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class CustomRabbitMQMessageConvert {
    
    @Bean
    public MessageConverter customMessageConvert() {
    return new Jackson2JsonMessageConverter();
    }
    
    }

    测试

    @Test
    void rabbitService() {
    User user = new User();
    user.setId(1);
    user.setUsername("dead");
    user.setPassword("Dead_Password_814");
    
    myRabbitMQService.sendFanout(user);
    }

    结果

    内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: 
    相关文章推荐