消息驱动的微服务 - Spring Cloud Alibaba RocketMQ
引入MQ后的架构演进
MQ的选择
消息队列对比参照表:
RocketMQ vs. ActiveMQ vs. Kafka:
参考至:
CentOS7上搭建RocketMQ
环境要求:
- CentOS 7.2
- 64位JDK1.8+
- 4G+的可用磁盘空间
1、下载RocketMQ的二进制包,我这里使用的是4.5.1版本,下载地址如下:
http://rocketmq.apache.org/release_notes/release-notes-4.5.1/
使用wget命令下载:
[root@study-01 ~]# cd /usr/local/src [root@study-01 /usr/local/src]# wget http://mirror.bit.edu.cn/apache/rocketmq/4.5.1/rocketmq-all-4.5.1-bin-release.zip
2、解压下载好的压缩包,并移动到合适的目录下:
[root@study-01 /usr/local/src]# unzip rocketmq-all-4.5.1-bin-release.zip [root@study-01 /usr/local/src]# mv rocketmq-all-4.5.1-bin-release /usr/local/rocketmq-4.5.1
注:若没有安装unzip命令则使用如下命令安装:
yum install -y unzip
3、进入rocketmq的根目录并查看是否包含如下目录及文件:
[root@study-01 /usr/local/src]# cd /usr/local/rocketmq-4.5.1 [root@study-01 /usr/local/rocketmq-4.5.1]# ls benchmark bin conf lib LICENSE NOTICE README.md
4、没问题后,使用如下命令启动Name Server:
[root@study-01 /usr/local/rocketmq-4.5.1]# nohup sh bin/mqnamesrv & [1] 2448 [root@study-01 /usr/local/rocketmq-4.5.1]#
5、查看默认的9876端口是否被监听,以验证Name Server是否启动成功:
[root@study-01 /usr/local/rocketmq-4.5.1]# netstat -lntp |grep java tcp6 0 0 :::9876 :::* LISTEN 2454/java [root@study-01 /usr/local/rocketmq-4.5.1]#
6、启动Broker:
[root@study-01 /usr/local/rocketmq-4.5.1]# nohup sh bin/mqbroker -n localhost:9876 & [2] 2485 [root@study-01 /usr/local/rocketmq-4.5.1]#
7、验证Broker是否启动成功,如果启动成功,能看到类似如下的日志::
[root@study-01 /usr/local/rocketmq-4.5.1]# cat ~/logs/rocketmqlogs/broker.log |grep "boot success" 2019-08-04 01:27:38 INFO main - The broker[study-01, 192.168.190.129:10911] boot success. serializeType=JSON and name server is localhost:9876 [root@study-01 /usr/local/rocketmq-4.5.1]#
若想停止Name Server和Broker,则依次执行以下两条命令即可:
[root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/mqshutdown broker The mqbroker(2492) is running... Send shutdown request to mqbroker(2492) OK # 输出该信息说明停止成功 [root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/mqshutdown namesrv The mqnamesrv(2454) is running... Send shutdown request to mqnamesrv(2454) OK # 输出该信息说明停止成功 [2]+ 退出 143 nohup sh bin/mqbroker -n localhost:9876 [root@study-01 /usr/local/rocketmq-4.5.1]#
验证RocketMQ功能是否正常
1、验证生产消息正常,执行如下命令:
[root@study-01 /usr/local/rocketmq-4.5.1]# export NAMESRV_ADDR=localhost:9876 [root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
正常的情况下,会看到一堆的类似于如下的输出,这是生产消息后成功的result:
SendResult [sendStatus=SEND_OK, msgId=C0A8BE810A690D7163610FCC253B03E7, offsetMsgId=C0A8BE8100002A9F000000000002BDFE, messageQueue=MessageQueue [topic=TopicTest, brokerName=study-01, queueId=3], queueOffset=249]
2、验证消费消息正常,执行如下命令:
[root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
正常的情况下,会看到一堆的类似于如下的输出,这是消费的消息内容:
ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=3, storeSize=180, queueOffset=242, sysFlag=0, bornTimestamp=1564853837073, bornHost=/192.168.190.129:34708, storeTimestamp=1564853837074, storeHost=/192.168.190.129:10911, msgId=C0A8BE8100002A9F000000000002AA4E, commitLogOffset=174670, bodyCRC=911284903, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1564854006859, UNIQ_KEY=C0A8BE810A690D7163610FCC251103CB, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 55, 49], transactionId='null'}]]
搭建RocketMQ控制台
RocketMQ官方提供了一个基于Spring Boot开发的可视化控制台,可以方便我们查看RocketMQ的运行情况以及提升运维效率。所以本小节将介绍一下如何搭建搭建RocketMQ的控制台,由于我们使用的RocketMQ版本是4.5.1,所以需要对控制台的源码进行一些改动以适配RocketMQ的4.5.1版本。
1、首先需要下载源码,有两种方式,一是使用git克隆代码仓库,二是直接下载rocketmq-externals的zip包,我这里使用git方式,执行如下命令:
git clone https://github.com/apache/rocketmq-externals.git
2、修改控制台代码,使用IDE打开
rocketmq-console项目,如下图所示:
2.1、修改项目中的
application.properties配置文件,我这里主要是修改了监听端口和Name Server的连接地址,至于其他配置项有需要的话可按照说明自行修改:
# console的监听端口,默认是8080 server.port=8011 # Name Server的连接地址;非必须,可以在启动了console后,在控制台导航栏 - 运维 - NameSvrAddrList一栏设置 rocketmq.config.namesrvAddr=192.168.190.129:9876
2.2、修改依赖,由于console项目默认使用的rocketmq版本是4.4.0,与我们这里使用的是4.5.1不完全兼容,所以需要修改一下依赖版本,找到这一行:
<rocketmq.version>4.4.0</rocketmq.version>
修改为:
<rocketmq.version>4.5.1</rocketmq.version>
2.3、修改代码,由于修改了rocketmq的版本,会导致
org.apache.rocketmq.console.service.impl.MessageServiceImpl#queryMessageByTopic方法编译报错,所以需要改动一下此处代码 ,将:
@Override public List<MessageView> queryMessageByTopic(String topic, final long begin, final long end) { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, null); ...
修改为:
@Override public List<MessageView> queryMessageByTopic(String topic, final long begin, final long end) { RPCHook rpcHook = null; DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook); ...
3、打包构建并启动,打开idea的terminal,执行如下命令:
# 在rocketmq-console目录下执行 mvn clean package -DskipTests # 进入jar包存放目录 cd target # 启动rocketmq console java -jar rocketmq-console-ng-1.0.1.jar
4、使用浏览器访问控制台,我这里由于修改了端口,所以访问地址是:
http://localhost:8011,正常的情况下能看到如下界面:
不习惯英文的话可以在右上角切换语言:
由于控制台是可视化界面并且支持中文,这里就不过多介绍了,可以参考官方的控制台使用说明文档:
RocketMQ术语与概念
我这里将基本的术语与概念简单总结成了思维导图:
官方文档:
Spring消息编程模型 - 编写生产者
在以上小节搭建完RocketMQ之后,我们来使用Spring的消息编程模型,编写一个简单的示例。首先需要在项目中添加相关依赖如下:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency>
在配置文件中添加rocketmq相关的配置如下:
rocketmq: name-server: 192.168.190.129:9876 producer: # 小坑:必须指定group group: test-group
编写生产者的代码,这里以Controller做示例,具体代码如下:
package com.zj.node.contentcenter.controller.content; import lombok.Data; import lombok.RequiredArgsConstructor; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Date; /** * 生产者 * * @author 01 * @date 2019-08-03 **/ @RestController @RequiredArgsConstructor public class TestProducerController { /** * 用于发送消息到 RocketMQ 的api */ private final RocketMQTemplate rocketMQTemplate; @GetMapping("/test-rocketmq/sendMsg") public String testSendMsg() { String topic = "test-topic"; // 发送消息 rocketMQTemplate.convertAndSend(topic, Message.getInstance()); return "send message success"; } } @Data class Message { private Integer id; private String name; private String status; private Date createTime; static Message getInstance() { Message message = new Message(); message.id = 1; message.name = "×××"; message.status = "default"; message.createTime = new Date(); return message; } }
编写完成后,启动项目,访问该接口:
消息发送成功后,可以到RocketMQ的控制台中进行查看:
消息体可以在消息详情中查看,如下:
从生产者的代码来看,可以说是十分的简单了,只需要使用一个RocketMQTemplate就可以实现将对象转换成消息体并发送消息。实际上除了RocketMQ外,其他的MQ也有对应的Template,如下:
- RocketMQ:RocketMQTemplate
- ActiveMQ/Artemis:JmsTemplate
- RabbitMQ:AmqpTemplate
- Kafka:KafkaTemplate
Spring消息编程模型 - 编写消费者
在消费者项目中,也需要添加rocketmq的依赖:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency>
同样需要配置Name Server的连接地址:
rocketmq: name-server: 192.168.190.129:9876
编写消费者的代码,具体代码如下:
package com.zj.node.usercenter.rocketmq; import com.alibaba.fastjson.JSON; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; import java.util.Date; /** * 消费者监听器 * * @author 01 * @date 2019-08-03 **/ @Slf4j @Component // topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意 @RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer-group") public class TestConsumerListener implements RocketMQListener<Message> { /** * 监听到消息的时候就会调用该方法 * * @param message 消息体 */ @Override public void onMessage(Message message) { log.info("从test-topic中监听到消息"); log.info(JSON.toJSONString(message)); } } /** * 消息体结构需要一致 */ @Data class Message { private Integer id; private String name; private String status; private Date createTime; }
编写完成后启动项目,由于之前我们已经往队列里发送了消息,所以此时消费者项目一启动,就可以监听到消息并消费,控制台就会输出如下日志:
- Spring Cloud架构教程 (七)消息驱动的微服务(核心概念)【Dalston版】
- Spring Cloud架构教程 (八)消息驱动的微服务(消费组)【Dalston版】
- Spring Cloud架构教程 (八)消息驱动的微服务(消费组)【Dalston版】
- SpringCloud Stream-----1、消息驱动的微服务概念
- Spring Cloud构建微服务架构:消息驱动的微服务(消费组)【Dalston版】
- Spring Cloud架构教程 (七)消息驱动的微服务(核心概念)【Dalston版】
- Spring Cloud构建微服务架构:消息驱动的微服务(消费分区)【Dalston版】
- Spring Cloud架构教程 (六)消息驱动的微服务【Dalston版】
- Spring Cloud架构教程 (六)消息驱动的微服务【Dalston版】
- 使用 Spring Cloud Stream 构建消息驱动微服务
- SpringCloud微服务实战之消息驱动Stream
- Spring Cloud架构教程 (七)消息驱动的微服务(核心概念)【Dalston版】
- SpringCloud学习笔记(10)——消息驱动的微服务:Spring Cloud Stream
- SpringCloud微服务知识整理十:消息驱动的微服务:Spring Cloud Stream
- Spring Cloud架构教程 (八)消息驱动的微服务(消费组)【Dalston版】
- 使用 Spring Cloud Stream 构建消息驱动微服务
- Spring Cloud构建微服务架构:消息驱动的微服务(消费分区)【Dalston版】
- Spring Cloud构建微服务架构:消息驱动的微服务(核心概念)【Dalston版】
- SpringCloudStream 构建消息驱动的微服务框架
- SpringCloudStream 构建消息驱动的微服务框架 集成kafka_http://blog.spring-cloud.io/blog/sc-stream.html