RocketMQ最佳实践(三)开发spring-boot-starter-rocketmq实现与spring boot项目的整合
2017-04-28 16:52
1001 查看
不要以为这只是spring boot与RocketMQ的简单整合,本篇文章还为各位看官呈现以下知识点的最佳实践:
自定义一个spring boot 的starter
使用spring的事件传播机制实现bean与bean之间基于事件驱动的通信
自定义注解、组合注解
word天,顿时激发我的创作基情啊有木有
上面这张截图来自spring boot官方文档,为啥官方提供了JMS、AMQP和Kafka却偏偏少了RocketMQ呢,我认为是因为目前RocketMQ在国外并不普及,而且才捐献给apache不久,需要一段时间,那么如此看来,写一个spring-boot-starter-rocketmq还是比较有意义的。
but,本人水平毕竟有限,写的东西自然没法和spring相比,这个版本的starter参考了JMS的starter来封装,虽然不够尽善尽美,但还是极具实用价值的
消费消息测试类consumerDemo
再测试一下消费者,在RocketMQ控制台(RocketMQ控制台的介绍放到下一篇吧
)发送一条消息
查看控制台打印的消费日志
恭喜你,成功了。
补充说明:
本来想自定义一个叫RocketmqListener的注解来实现消息的监听的,花了大量时间去阅读和研究了spring关于EventListener注解和JmsListener注解的实现,发现目前我并不能很好的理解和掌控其设计思路,想以瓢画葫最终也没能实现,迫于五一节来临,只能使用EventListener注解代替,不过发现其实也不错。
同时,也希望各位猿友能给出指导性意见和建议:如何实现RocketmqListener注解以及是否有意义?
自定义一个spring boot 的starter
使用spring的事件传播机制实现bean与bean之间基于事件驱动的通信
自定义注解、组合注解
先来撩点故事背景^_^
最近在使用spring boot/spring cloud搭建做微服务架构,发现spring boot官方提供的starter中居然没有集成RocketMQword天,顿时激发我的创作基情啊有木有
上面这张截图来自spring boot官方文档,为啥官方提供了JMS、AMQP和Kafka却偏偏少了RocketMQ呢,我认为是因为目前RocketMQ在国外并不普及,而且才捐献给apache不久,需要一段时间,那么如此看来,写一个spring-boot-starter-rocketmq还是比较有意义的。
but,本人水平毕竟有限,写的东西自然没法和spring相比,这个版本的starter参考了JMS的starter来封装,虽然不够尽善尽美,但还是极具实用价值的
编写spring-boot-starter-rocketmq
创建一个Maven项目名字就叫spring-boot-starter-rocketmq,其pom.xml文件内容如下:<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <groupId>com.bqjr</groupId> <version>0.0.1-SNAPSHOT</version> <name>spring-boot-starter-rocketmq</name> <description>Starter for using RocketMQ</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.3.RELEASE</version> <relativePath/> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> <rocketmq.version>4.0.0-incubating</rocketmq.version> </properties> <modelVersion>4.0.0</modelVersion> <artifactId>spring-boot-starter-rocketmq</artifactId> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!-- RocketMq客户端相关依赖 --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>${rocketmq.version}</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>${rocketmq.version}</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.10</version><!--$NO-MVN-MAN-VER$--> </dependency> </dependencies> </project>编写配置类RocketmqProperties,这个类的属性对应application.properties文件中的配置项,目前只提供核心的一些配置支持,其他性能优化方面的配置参数可自行扩展
/** * @author jiangjb */ @Data @ConfigurationProperties(PREFIX) public class RocketmqProperties { public static final String PREFIX = "spring.extend.rocketmq"; private String namesrvAddr; private String instanceName; private String clientIP; private ProducerConfig producer; private ConsumerConfig consumer; }编写配置解析类RocketmqAutoConfiguration,这个类主要初始化了三个Bean:defaultProducer用来发送普通消息、transactionProducer用来发送事务消息以及pushConsumer用来接收订阅的所有topic下的消息,并派发给不同的tag的消费者。
/** * @author jiangjb */ @Configuration @EnableConfigurationProperties(RocketmqProperties.class) @ConditionalOnProperty(prefix = PREFIX, value = "namesrvAddr") public class RocketmqAutoConfiguration { @Autowired private RocketmqProperties properties; @Value("${spring.application.name}") private String producerGroupName; @Value("${spring.application.name}") private String consumerGroupName; @Autowired private ApplicationEventPublisher publisher; /** * 初始化向rocketmq发送普通消息的生产者 */ @Bean @ConditionalOnProperty(prefix = PREFIX, value = "producer.instanceName") public DefaultMQProducer defaultProducer() throws MQClientException{ /** * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br> * 注意:ProducerGroupName需要由应用来保证唯一<br> * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键, * 因为服务器会回查这个Group下的任意一个Producer */ DefaultMQProducer producer = new DefaultMQProducer(producerGroupName); producer.setNamesrvAddr(properties.getNamesrvAddr()); producer.setInstanceName(properties.getProducer().getInstanceName()); producer.setVipChannelEnabled(false); /** * Producer对象在使用之前必须要调用start初始化,初始化一次即可<br> * 注意:切记不可以在每次发送消息时,都调用start方法 */ producer.start(); System.out.println("RocketMq defaultProducer Started."); return producer; } /** * 初始化向rocketmq发送事务消息的生产者 */ @Bean @ConditionalOnProperty(prefix = PREFIX, value = "producer.tranInstanceName") public TransactionMQProducer transactionProducer() throws MQClientException{ /** * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br> * 注意:ProducerGroupName需要由应用来保证唯一<br> * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键, * 因为服务器会回查这个Group下的任意一个Producer */ TransactionMQProducer producer = new TransactionMQProducer("TransactionProducerGroupName"); producer.setNamesrvAddr(properties.getNamesrvAddr()); producer.setInstanceName(properties.getProducer().getTranInstanceName()); // 事务回查最小并发数 producer.setCheckThreadPoolMinSize(2); // 事务回查最大并发数 producer.setCheckThreadPoolMaxSize(2); // 队列数 producer.setCheckRequestHoldMax(2000); //TODO 由于社区版本的服务器阉割调了消息回查的功能,所以这个地方没有意义 //TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl(); //producer.setTransactionCheckListener(transactionCheckListener); /** * Producer对象在使用之前必须要调用start初始化,初始化一次即可<br> * 注意:切记不可以在每次发送消息时,都调用start方法 */ producer.start(); System.out.println("RocketMq TransactionMQProducer Started."); return producer; } /** * 初始化rocketmq消息监听方式的消费者 */ @Bean @ConditionalOnProperty(prefix = PREFIX, value = "consumer.instanceName") public DefaultMQPushConsumer pushConsumer() throws MQClientException{ /** * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br> * 注意:ConsumerGroupName需要由应用来保证唯一 */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroupName); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setNamesrvAddr(properties.getNamesrvAddr()); consumer.setInstanceName(properties.getConsumer().getInstanceName()); consumer.setConsumeMessageBatchMaxSize(1);//设置批量消费,以提升消费吞吐量,默认是1 /** * 订阅指定topic下tags */ List<String> subscribeList = properties.getConsumer().getSubscribe(); for (String sunscribe : subscribeList) { consumer.subscribe(sunscribe.split(":")[0], sunscribe.split(":")[1]); } consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> { MessageExt msg = msgs.get(0); try { //默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息 System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size()); //发布消息到达的事件,以便分发到每个tag的监听方法 this.publisher.publishEvent(new RocketmqEvent(msg,consumer)); System.out.println("消息到达事件已经发布成功!"); } catch (Exception e) { e.printStackTrace(); if(msg.getReconsumeTimes()<=3){//重复消费3次 //TODO 进行日志记录 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } else { //TODO 消息消费失败,进行日志记录 } } //如果没有return success,consumer会重复消费此信息,直到success。 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(5000);//延迟5秒再启动,主要是等待spring事件监听相关程序初始化完成,否则,回出现对RocketMQ的消息进行消费后立即发布消息到达的事件,然而此事件的监听程序还未初始化,从而造成消息的丢失 /** * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br> */ try { consumer.start(); } catch (Exception e) { System.out.println("RocketMq pushConsumer Start failure!!!."); e.printStackTrace(); } System.out.println("RocketMq pushConsumer Started."); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); return consumer; } }编写基于spring事件传播机制的事件类RocketmqEvent,用来定义上面的consumer接收到消息后的发布的事件。
/** * * @author jiangjb * */ @Data @EqualsAndHashCode(callSuper=false) public class RocketmqEvent extends ApplicationEvent{ private static final long serialVersionUID = -4468405250074063206L; private DefaultMQPushConsumer consumer; private MessageExt messageExt; private String topic; private String tag; private byte[] body; public RocketmqEvent(MessageExt msg,DefaultMQPushConsumer consumer) throws Exception { super(msg); this.topic = msg.getTopic(); this.tag = msg.getTags(); this.body = msg.getBody(); this.consumer = consumer; this.messageExt = msg; } public String getMsg() { try { return new String(this.body,"utf-8"); } catch (UnsupportedEncodingException e) { return null; } } public String getMsg(String code) { try { return new String(this.body,code); } catch (UnsupportedEncodingException e) { return null; } } }然后运行maven的编译、打包
编写测试项目rocketmq-starter-test
pom.xml中加入上面的starter的依赖<dependency> <groupId>com.bqjr</groupId> <artifactId>spring-boot-starter-rocketmq</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>发送消息测试类producerDemo
/** * * @author jiangjb * */ @RestController public class producerDemo { @Autowired private DefaultMQProducer defaultProducer; @Autowired private TransactionMQProducer transactionProducer; @Value("${spring.extend.rocketmq.producer.topic}") private String producerTopic; @RequestMapping(value = "/sendMsg", method = RequestMethod.GET) public void sendMsg() { Message msg = new Message(producerTopic,// topic "TagA",// tag "OrderID001",// key ("Hello jyqlove333").getBytes());// body try { defaultProducer.send(msg,new SendCallback(){ @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); //TODO 发送成功处理 } @Override public void onException(Throwable e) { System.out.println(e); //TODO 发送失败处理 } }); } catch (Exception e) { e.printStackTrace(); } } @RequestMapping(value = "/sendTransactionMsg", method = RequestMethod.GET) public String sendTransactionMsg() { SendResult sendResult = null; try { //构造消息 Message msg = new Message(producerTopic,// topic "TagA",// tag "OrderID001",// key ("Hello jyqlove333").getBytes());// body //发送事务消息,LocalTransactionExecute的executeLocalTransactionBranch方法中执行本地逻辑 sendResult = transactionProducer.sendMessageInTransaction(msg, (Message msg1,Object arg) -> { int value = 1; //TODO 执行本地事务,改变value的值 //=================================================== System.out.println("执行本地事务。。。完成"); if(arg instanceof Integer){ value = (Integer)arg; } //=================================================== if (value == 0) { throw new RuntimeException("Could not find db"); } else if ((value % 5) == 0) { return LocalTransactionState.ROLLBACK_MESSAGE; } else if ((value % 4) == 0) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; }, 4); System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); } return sendResult.toString(); } }
消费消息测试类consumerDemo
/** * * @author jiangjb * */ @Component public class consumerDemo { @EventListener(condition = "#event.topic=='TopicTest1' && #event.tag=='TagA'") public void rocketmqMsgListen(RocketmqEvent event) { DefaultMQPushConsumer consumer = event.getConsumer(); try { System.out.println("com.bqjr.consumerDemo监听到一个消息达到:" + event.getMsg("gbk")); //TODO 进行业务处理 } catch (Exception e) { if(event.getMessageExt().getReconsumeTimes()<=3){//重复消费3次 try { consumer.sendMessageBack(event.getMessageExt(), 2); } catch (Exception e1) { //TODO 消息消费失败,进行日志记录 } } else { //TODO 消息消费失败,进行日志记录 } } } }
来,测试一把
在浏览器中访问:http://10.89.0.144:12306/sendMsg,控制台输出如下:再测试一下消费者,在RocketMQ控制台(RocketMQ控制台的介绍放到下一篇吧
)发送一条消息
查看控制台打印的消费日志
恭喜你,成功了。
补充说明:
本来想自定义一个叫RocketmqListener的注解来实现消息的监听的,花了大量时间去阅读和研究了spring关于EventListener注解和JmsListener注解的实现,发现目前我并不能很好的理解和掌控其设计思路,想以瓢画葫最终也没能实现,迫于五一节来临,只能使用EventListener注解代替,不过发现其实也不错。
同时,也希望各位猿友能给出指导性意见和建议:如何实现RocketmqListener注解以及是否有意义?
相关文章推荐
- Spring Boot 整合JDBC 实现后端项目开发
- springboot开发笔记(5.2 整合rocketmq)
- springboot 整合dubbo最佳实践 (使用redis作为注册中心)
- Spring Boot / Spring MVC 入门实践 (三) : 入门项目介绍与用户注册登录的实现
- Spring Boot / Spring MVC 入门实践 (三) : 入门项目介绍与用户注册登录的实现
- maven项目 spring-boot 整合 mybatis 实现查询功能demo
- IDEA下开发SpringBoot项目,实现代码的热部署/热加载
- Spring Boot + Spring Data JPA 项目整合开发记录(持续更新)
- 精通JSF:基于EJB、Hibernate、Spring整合开发与项目实践 --我的第一本合著书终于出版了
- Springboot整合dubbo构建maven多模块项目(三) - 把server分为api(服务接口定义)和server(服务实现)两个子module
- SpringBoot+Maven项目实战(6):整合Log4j和Aop,实现简单的日志记录
- 项目实战之Quartz与Spring整合进行热部署的设计与实现
- 在java项目中实现spring 和 myBatis 的整合
- 整合asp.net mvc4,Spring.net,nhibernate实现分层开发
- SSH开发实践part4:Spring整合Struts
- (IBM)Struts2、Spring、Hibernate 高效开发的最佳实践
- Spring Boot实践应用开发(2)
- SSH开发实践part1:Spring与Hibernate整合
- 一、东软实践项目2-基于android平台的应用开发:实现页面之间的跳转
- Struts2、Spring、Hibernate 高效开发的最佳实践