您的位置:首页 > 编程语言 > Java开发

Spring boot实战项目整合阿里云RocketMQ (非开源版)消息队列实现发送普通消息,延时消息 --附代码

2019-08-17 16:19 1336 查看

一.为什么选择RocketMQ消息队列?

  • 首先RocketMQ是阿里巴巴自研出来的,也已开源。其性能和稳定性从双11就能看出来,借用阿里的一句官方介绍:历年双 11 购物狂欢节零点千万级 TPS、万亿级数据洪峰,创造了全球最大的业务消息并发以及流转纪录(日志类消息除外); 
  • 在始终保证高性能前提下,支持亿级消息堆积,不影响集群的正常服务,在削峰填谷(蓄洪)、微服务解耦的场景下尤为重要;这,就能说明RocketMQ的强大。

二.RocketMQ的特点和优势(可跳过看三的整合代码)

  • 削峰填谷(主要解决诸如秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,海量消息堆积能力强)
    /**
    * MQ配置加载
    * @author laifuwei
    */
    @Configuration
    @ConfigurationProperties(prefix = "rocketmq")
    public class MqConfig {
    
    private String accessKey;
    private String secretKey;
    private String nameSrvAddr;
    private String topic;
    private String groupId;
    private String tag;
    private String timeTopic;
    private String timeGroupId;
    private String timeTag;
    
    public Properties getMqPropertie() {
    Properties properties = new Properties();
    properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
    properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
    properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
    //设置发送超时时间,单位毫秒
    properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "4000");
    return properties;
    }
    
    public String getAccessKey() {
    return accessKey;
    }
    
    public void setAccessKey(String accessKey) {
    this.accessKey = accessKey;
    }
    
    public String getSecretKey() {
    return secretKey;
    }
    
    public void setSecretKey(String secretKey) {
    this.secretKey = secretKey;
    }
    
    public String getNameSrvAddr() {
    return nameSrvAddr;
    }
    
    public void setNameSrvAddr(String nameSrvAddr) {
    this.nameSrvAddr = nameSrvAddr;
    }
    
    public String getTopic() {
    return topic;
    }
    
    public void setTopic(String topic) {
    this.topic = topic;
    }
    
    public String getGroupId() {
    return groupId;
    }
    
    public void setGroupId(String groupId) {
    this.groupId = groupId;
    }
    
    public String getTag() {
    return tag;
    }
    
    public void setTag(String tag) {
    this.tag = tag;
    }
    
    public String getTimeTopic() {
    return timeTopic;
    }
    
    public void setTimeTopic(String timeTopic) {
    this.timeTopic = timeTopic;
    }
    
    public String getTimeGroupId() {
    return timeGroupId;
    }
    
    public void setTimeGroupId(String timeGroupId) {
    this.timeGroupId = timeGroupId;
    }
    
    public String getTimeTag() {
    return timeTag;
    }
    
    public void setTimeTag(String timeTag) {
    this.timeTag = timeTag;
    }
    
    }
    View Code

 

 

  • 给消息生产者注入配置信息,
    ProducerBean
    用于将
    Producer
    集成至Spring Bean中
    /**
    * MQ配置注入生成消息实例
    */
    @Configuration
    public class ProducerClient {
    
    @Autowired
    private MqConfig mqConfig;
    
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ProducerBean buildProducer() {
    //ProducerBean用于将Producer集成至Spring Bean中
    ProducerBean producer = new ProducerBean();
    producer.setProperties(mqConfig.getMqPropertie());
    return producer;
    }
    }

 

  • 为了方便使用,我封装了一个发送消息的类,消息的Message参数和配置,看代码注释,很容易理解
    /**
    * MQ发送消息助手
    * @author laifuwei
    */
    @Component
    public class ProducerUtil {
    
    private Logger logger = LoggerFactory.getLogger(ProducerUtil.class);
    
    @Autowired
    private MqConfig config;
    
    @Autowired
    private ProducerBean producer;
    
    /**
    * 同步发送消息
    * @param msgTag 标签,可用于消息小分类标注
    * @param messageBody 消息body内容,生产者自定义内容
    * @param msgKey 消息key值,建议设置全局唯一,可不传,不影响消息投递
    * @return success:SendResult or error:null
    */
    public SendResult sendMsg(String msgTag,byte[] messageBody,String msgKey) {
    Message msg = new Message(config.getTopic(),msgTag,msgKey,messageBody);
    return this.send(msg,Boolean.FALSE);
    }
    /**
    * 同步发送定时/延时消息
    * @param msgTag 标签,可用于消息小分类标注,对消息进行再归类
    * @param messageBody 消息body内容,生产者自定义内容,二进制形式的数据
    * @param msgKey 消息key值,建议设置全局唯一值,可不设置,不影响消息收发
    * @param delayTime 服务端发送消息时间,立即发送输入0或比更早的时间
    * @return success:SendResult or error:null
    */
    public SendResult sendTimeMsg(String msgTag,byte[] messageBody,String msgKey,long delayTime) {
    Message msg = new Message(config.getTimeTopic(),msgTag,msgKey,messageBody);
    msg.setStartDeliverTime(delayTime);
    return this.send(msg,Boolean.FALSE);
    }
    /**
    * 发送单向消息
    */
    public void sendOneWayMsg(String msgTag,byte[] messageBody,String msgKey) {
    Message msg = new Message(config.getTopic(),msgTag,msgKey,messageBody);
    this.send(msg,Boolean.TRUE);
    }
    
    /**
    * 普通消息发送发放
    * @param msg 消息
    * @param isOneWay 是否单向发送
    */
    private SendResult send(Message msg,Boolean isOneWay) {
    try {
    if(isOneWay) {
    //由于在 oneway 方式发送消息时没有请求应答处理,一旦出现消息发送失败,则会因为没有重试而导致数据丢失。
    //若数据不可丢,建议选用同步或异步发送方式。
    producer.sendOneway(msg);
    success(msg, "单向消息MsgId不返回");
    return null;
    }else {
    //可靠同步发送
    SendResult sendResult = producer.send(msg);
    //获取发送结果,不抛异常即发送成功
    if (sendResult != null) {
    success(msg, sendResult.getMessageId());
    return sendResult;
    }else {
    error(msg,null);
    return null;
    }
    }
    } catch (Exception e) {
    error(msg,e);
    return null;
    }
    }
    
    //对于使用异步接口,可设置单独的回调处理线程池,拥有更灵活的配置和监控能力。
    //根据项目需要,服务器配置合理设置线程数,线程太多有OOM 风险,
    private ExecutorService threads = Executors.newFixedThreadPool(3);
    //仅建议执行轻量级的Callback任务,避免阻塞公共线程池 引起其它链路超时。
    
    /**
    * 异步发送普通消息
    * @param msgTag
    * @param messageBody
    * @param msgKey
    */
    public void sendAsyncMsg(String msgTag,byte[] messageBody,String msgKey) {
    producer.setCallbackExecutor(threads);
    
    Message msg = new Message(config.getTopic(),msgTag,msgKey,messageBody);
    try {
    producer.sendAsync(msg, new SendCallback() {
    @Override
    public void onSuccess(final SendResult sendResult) {
    assert sendResult != null;
    success(msg, sendResult.getMessageId());
    }
    @Override
    public void onException(final OnExceptionContext context) {
    //出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。
    error(msg,context.getException());
    }
    });
    } catch (ONSClientException e) {
    error(msg,e);
    }
    }
    
    //--------------日志打印----------
    private void error(Message msg,Exception e) {
    logger.error("发送MQ消息失败-- Topic:{}, Key:{}, tag:{}, body:{}"
    ,msg.getTopic(),msg.getKey(),msg.getTag(),new String(msg.getBody()));
    logger.error("errorMsg --- {}",e.getMessage());
    }
    private void success(Message msg,String messageId) {
    logger.info("发送MQ消息成功 -- Topic:{} ,msgId:{} , Key:{}, tag:{}, body:{}"
    ,msg.getTopic(),messageId,msg.getKey(),msg.getTag(),new String(msg.getBody()));
    }
    
    }

 

  • 前面已经配置好了将Producer集成至Spring Bean中,直接注入Producer,在业务系统需要的地方调用来发送消息即可
    //普通消息的Producer 已经注册到了spring容器中,后面需要使用时可以 直接注入到其它类中
    @Autowired
    private ProducerBean producer;
    
    /**
    * 演示方法,可在自己的业务系统方法中进行发送消息
    */
    public String mqTest() {
    /*  使用前面封装的方法,传入对应的参数即可发送消息
    *  msgTag 标签,可用于消息小分类标注
    *  messageBody 消息body内容,生产者自定义内容,任何二进制数据,生产者和消费者协定数据的序列化和反序列化
    *  msgKey 消息key值,建议设置全局唯一,可不传,不影响消息投递
    */
    //body内容自定义
    JSONObject body = new JSONObject();
    body.put("userId", "this is userId");
    body.put("notice", "同步消息");
    //同步发送消息
    producer.sendMsg("userMessage", body.toJSONString().getBytes(), "messageId");
    //单向消息
    producer.sendOneWayMsg("userMessage", "单向消息".getBytes(), "messageId");
    //异步消息
    producer.sendAsyncMsg("userMessage", "异步消息".getBytes(), "messageId");
    //定时/延时消息,当前时间的30秒后推送。时间自己定义
    producer.sendTimeMsg("userMessage", "延时消息".getBytes(), "messageId", System.currentTimeMillis()+30000);
    //顺序消息(全局顺序 / 分区顺序)、分布式事务消息 目前没用到,可看官网说明操作
    return "ok";
    }

 

  • 接下来是消息消费者的配置和接收消息(一般在下游系统或者相关联的系统),接收消息的项目照旧,添加依赖jar包 ons-client v1.8.0.Final 、配置mq参数链接(mq的配置文件参数要和生产者项目配置的一样)、添加MqConfig类(上面有写)
  • 注入配置、订阅消息、添加消息处理的方法
    @Configuration
    public class ConsumerClient {
    
    @Autowired
    private MqConfig mqConfig;
    
    //普通消息监听器,Consumer注册消息监听器来订阅消息.
    @Autowired
    private MqMessageListener messageListener;
    
    //定时消息监听器,Consumer注册消息监听器来订阅消息.
    @Autowired
    private MqTimeMessageListener timeMessageListener;
    
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean buildConsumer() {
    ConsumerBean consumerBean = new ConsumerBean();
    //配置文件
    Properties properties = mqConfig.getMqPropertie();
    properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId());
    //将消费者线程数固定为20个 20为默认值
    properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
    consumerBean.setProperties(properties);
    //订阅消息
    Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();
    //订阅普通消息
    Subscription subscription = new Subscription();
    subscription.setTopic(mqConfig.getTopic());
    subscription.setExpression(mqConfig.getTag());
    subscriptionTable.put(subscription, messageListener);
    //订阅定时/延时消息
    Subscription subscriptionTime = new Subscription();
    subscriptionTime.setTopic(mqConfig.getTimeTopic());
    subscriptionTime.setExpression(mqConfig.getTimeTag());
    subscriptionTable.put(subscriptionTime, timeMessageListener);
    
    consumerBean.setSubscriptionTable(subscriptionTable);
    return consumerBean;
    }
    
    }

     

  • 对消息监听类进行实现,处理接收到的消息
    /**
    * 定时/延时MQ消息监听消费
    * @author laifuwei
    */
    @Component
    public class MqTimeMessageListener extends AutowiredService implements MessageListener {
    
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    
    //实现MessageListtener监听器的消费方法
    @Override
    public Action consume(Message message, ConsumeContext context) {

         logger.info("接收到MQ消息 -- Topic:{}, tag:{},msgId:{} , Key:{}, body:{}",
                     message.getTopic(),message.getTag(),message.getMsgID(),message.getKey(),new String(message.getBody()));

      try {

    String msgTag = message.getTag();//消息类型
    String msgKey = message.getKey();//业务唯一id
    switch (msgTag) {
    //----通过生产者传的tag标签进行消息分类和过滤处理
    case "userMessage":
    //通过唯一key的,查询需要处理的数据,
    GroupChatMessage chatMessage = chatMessageService.getById(msgKey);
    //由于RocketMQ能重复推送消息,处理消息的时候做好数据的幂等,防止重复处理
    if( chatMessage.isRead() ) {
    break;
    }
    //验证通过,处理业务
    //do something
    break;
    }
    //消费成功,继续消费下一条消息
    return Action.CommitMessage;
    } catch (Exception e) {
    logger.error("消费MQ消息失败! msgId:" + message.getMsgID()+"----ExceptionMsg:"+e.getMessage());
    //消费失败,告知服务器稍后再投递这条消息,继续消费其他消息
    return Action.ReconsumeLater;
    }
    }
    
    }

     

 四.最后运行消费者项目和生产者项目,调用生产者项目发送消息验证效果:

  • 生产者发送消息结果日志:消息发送正常
    2019-08-17 15:11:06.837 INFO 9996 --- [nio-8080-exec-9] com.dyj.shop.mq.ProducerUtil : 发送MQ消息成功 -- Topic:common ,msgId:C0A86532270C2A139A5555A7E5DD0000 , Key:messageId, tag:userMessage, body:{"userId":"this is userId","notice":"同步消息"}
    2019-08-17 15:11:06.841 INFO 9996 --- [nio-8080-exec-9] com.dyj.shop.mq.ProducerUtil : 发送MQ消息成功 -- Topic:common ,msgId:单向消息MsgId不返回 , Key:messageId, tag:userMessage, body:单向消息
    2019-08-17 15:11:06.901 INFO 9996 --- [pool-6-thread-1] com.dyj.shop.mq.ProducerUtil : 发送MQ消息成功 -- Topic:common ,msgId:C0A86532270C2A139A5555A7E6630004 , Key:messageId, tag:userMessage, body:异步消息
    2019-08-17 15:11:07.060 INFO 9996 --- [nio-8080-exec-9] com.dyj.shop.mq.ProducerUtil : 发送MQ消息成功 -- Topic:time-lapse ,msgId:C0A86532270C2A139A5555A7E69F0006 , Key:messageId, tag:userMessage, body:定时/延时消息
  • 消费者接收到消息,可以看到普通消息的发送时间和接收到消息的时间,就相差几毫秒,值得注意的是:延时消息按照生产者定义的30秒后消费者才收到。这就是延时消息的好玩之处
    2019-08-17 15:11:06.881 INFO 10942 --- [MessageThread_7] com.dyj.timer.mq.MqMessageListener : 接收到MQ消息. Topic :common, tag :userMessage msgId : C0A86532270C2A139A5555A7E5DD0000, Key :messageId, body:{"userId":"this is userId","notice":"同步消息"}
    2019-08-17 15:11:06.934 INFO 10942 --- [MessageThread_8] com.dyj.timer.mq.MqMessageListener : 接收到MQ消息. Topic :common, tag :userMessage msgId : C0A86532270C2A139A5555A7E6550002, Key :messageId, body:单向消息
    2019-08-17 15:11:06.947 INFO 10942 --- [MessageThread_9] com.dyj.timer.mq.MqMessageListener : 接收到MQ消息. Topic :common, tag :userMessage msgId : C0A86532270C2A139A5555A7E6630004, Key :messageId, body:异步消息
    2019-08-17 15:11:36.996 INFO 10942 --- [essageThread_10] com.dyj.timer.mq.MqTimeMessageListener : 接收到MQ消息. Topic :time-lapse, tag :userMessage msgId : cd900e16f7cba68369ec498ae2f9dd6c, Key :messageId, body:定时/延时消

写在最后:有不妥或有兴趣的可以下方留言,多谢指教(#^-^#)

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: