springboot+rabbitMq整合开发实战二:模拟用户下单的过程
2017-12-11 20:47
841 查看
springboot+rabbitMq整合开发实战二:模拟用户下单的过程
上一篇博客简单介绍了rabbitMQ的原理以及生产消费的过程,还介绍了一个采用“确认消费模式”的demo。这一篇博客,将介绍另外一种消费模式
“直接消费”,并介绍一种在实际项目频繁使用的队列模式
“延迟队列”。
延迟队列,也叫“延时队列”,顾名思义,其实就是“生产者生产消息,消息进入队列之后,并不会立即被指定的消费者所消费,而是会延时一段指定的时间ttl,最终才被消费者消费”。
介绍了这个概念之后,我们接下来实战一个在项目中常见的场景:“用户创建下单记录之后,会对其进行付款,付款成功之后,该条记录将变为已支付并且有效,否则的话,一旦过了指定的时间,即超时了,则该记录将置为无效,并且不能被用于后续的业务逻辑”。
对于这个场景,我们就去掉了“付款的环节”,直接实现ttl一到,处理下单记录为无效。
首先是项目的配置文件:
#profile
#spring.profiles.active=production
#spring.profiles.active=local
#spring.profiles.active=dev
server.port=8098
server.tomcat.accesslog.enabled=true
server.tomcat.accesslog.directory=log
server.tomcat.uri-encoding=UTF-8
另外,贴一下这个业务涉及的一张表的信息:
CREATE TABLE `order_trade_record` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`customer_id` int(11) DEFAULT NULL COMMENT '客户id',
`order_id` int(11) DEFAULT NULL COMMENT '订单id',
`price` decimal(15,2) DEFAULT NULL COMMENT '收款金额',
`status` int(11) DEFAULT '0' COMMENT '状态(0=未支付,1=已支付)',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_time` datetime DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='订单交易记录';
生成的model以及mapper接口以及mapper配置文件如下:
package com.debug.steadyjack.springbootMQ.model.entity;
import java.math.BigDecimal;
import java.util.Date;
public class OrderTradeRecord {
private Integer id;
private Integer customerId;
private Integer orderId;
private BigDecimal price;
private Integer status=1;
private Date createTime;
private Date updateTime;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public Integer getCustomerId() {
return customerId;
}
public void setCustomerId(Integer customerId) {
this.customerId = customerId;
}
public Integer getOrderId() {
return orderId;
}
public void setOrderId(Integer orderId) {
this.orderId = orderId;
}
public BigDecimal getPrice() {
return price;
}
public void setPrice(BigDecimal price) {
this.price = price;
}
public Integer getStatus() {
return status;
}
public void setStatus(Integer status) {
this.status = status;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
@Override
public String toString() {
return "OrderTradeRecord{" +
"id=" + id +
", customerId=" + customerId +
", orderId=" + orderId +
", price=" + price +
", status=" + status +
", createTime=" + createTime +
", updateTime=" + updateTime +
'}';
}
}
接下来,则是RabbitMq的全局配置加载文件,这个在实际项目中是很实用的(所以,诸位博友可以考虑直接拿在项目上使用):
package com.debug.steadyjack.springbootMQ.server.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import java.util.HashMap;
import java.util.Map;
/**
* rabbitmq 配置
* Created by steadyjack on 2017/12/01.
*/
@Configuration
public class RabbitmqConfig {
private final static Logger log = LoggerFactory.getLogger("mqLog");
@Autowired
private Environment env;
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
/**延迟队列配置**/
@Bean(name = "registerDelayQueue")
public Queue registerDelayQueue(){
Map<String, Object> params = new HashMap<>();
params.put("x-dead-letter-exchange",env.getProperty("register.exchange.name"));
params.put("x-dead-letter-routing-key","all");
return new Queue(env.getProperty("register.delay.queue.name"), true,false,false,params);
}
@Bean
public DirectExchange registerDelayExchange(){
return new DirectExchange(env.getProperty("register.delay.exchange.name"));
}
@Bean
public Binding registerDelayBinding(){
return BindingBuilder.bind(registerDelayQueue()).to(registerDelayExchange()).with("");
}
/**延迟队列配置**/
/**指标消费队列配置**/
@Bean
public TopicExchange registerTopicExchange(){
return new TopicExchange(env.getProperty("register.exchange.name"));
}
@Bean
public Binding registerBinding(){
return BindingBuilder.bind(registerQueue()).to(registerTopicExchange()).with("all");
}
@Bean(name = "registerQueue")
public Queue registerQueue(){
return new Queue(env.getProperty("register.queue.name"),true);
}
/**指标消费队列配置**/
/**
* 单一消费者
* @return
*/
@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(1);
factory.setPrefetchCount(1);
factory.setTxSize(1);
return factory;
}
/**
* 多个消费者
* @return
*/
@Bean(name = "multiListenerContainer")
public SimpleRabbitListenerContainerFactory multiListenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factoryConfigurer.configure(factory,connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.NONE);
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(){
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
}
});
return rabbitTemplate;
}
}
上图,即配置了两个队列:作为缓冲使用的延迟队列,真正去消费消息的队列。流程图可以大概用下图表示:
紧接着,我写了个controller以及request和service:
package com.debug.steadyjack.springbootMQ.server.controller;
import com.debug.steadyjack.springbootMQ.api.enums.StatusCode;
import com.debug.steadyjack.springbootMQ.api.response.BaseResponse;
import com.debug.steadyjack.springbootMQ.server.request.OrderTradeRecordRequest;
import com.debug.steadyjack.springbootMQ.server.service.OrderTradeRecordService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.validation.BindingResult;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import javax.validation.Valid;
/**
* 订单交易记录controller
* Created by steadyjack on 2017/12/11.
*/
@RestController
public class OrderTradeRecordController {
private static final Logger log= LoggerFactory.getLogger(OrderTradeRecordController.class);
private static final String prefix="order/trade/record";
@Autowired
private OrderTradeRecordService orderTradeRecordService;
/**
* 创建用户下单记录
* @param requestData
* @param bindingResult
* @return
* @throws Exception
*/
@RequestMapping(value = prefix+"/create",method = RequestMethod.POST,consumes = MediaType.APPLICATION_JSON_UTF8_VALUE,produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
public BaseResponse createRecord(@Valid @RequestBody OrderTradeRecordRequest requestData, BindingResult bindingResult) throws Exception{
if (bindingResult.hasErrors()){
return new BaseResponse(StatusCode.Invalid_Params);
}
BaseResponse response=new BaseResponse(StatusCode.Success);
try {
orderTradeRecordService.createTradeRecord(requestData);
}catch (Exception e){
log.error("用户下单记录异常:{} ",requestData,e.fillInStackTrace());
return new BaseResponse(StatusCode.Fail);
}
return response;
}
}
package com.debug.steadyjack.springbootMQ.server.service;
import com.debug.steadyjack.springbootMQ.model.entity.OrderTradeRecord;
import com.debug.steadyjack.springbootMQ.model.entity.User;
import com.debug.steadyjack.springbootMQ.model.mapper.OrderTradeRecordMapper;
import com.debug.steadyjack.springbootMQ.server.request.OrderTradeRecordRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.AbstractJavaTypeMapper;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;
import java.util.Date;
/**
* Created by steadyjack on 2017/12/11.
*/
@Service
public class OrderTradeRecordService {
private static final Logger log= LoggerFactory.getLogger(OrderTradeRecordService.class);
@Autowired
private Environment env;
@Autowired
public OrderTradeRecordMapper orderTradeRecordMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
public void createTradeRecord(OrderTradeRecordRequest requestData) throws Exception{
//TODO:其余业务逻辑上的校验。。
//TODO:创建交易记录
OrderTradeRecord record=new OrderTradeRecord();
BeanUtils.copyProperties(requestData,record);
record.setCreateTime(new Date());
record.setStatus(1);
orderTradeRecordMapper.insertSelective(record);
//TODO:设置超时,用mq处理已超时的下单记录(一旦记录超时,则处理为无效)
final Long ttl=env.getProperty("trade.record.ttl",Long.class);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setExchange(env.getProperty("register.delay.exchange.name"));
rabbitTemplate.setRoutingKey("");
rabbitTemplate.convertAndSend(record, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,User.class.getName());
message.getMessageProperties().setExpiration(ttl+"");
return message;
}
});
}
}
最终,我们当然是开发mq延迟队列对应的消费者:
package com.debug.steadyjack.springbootMQ.server.listener;
import com.debug.steadyjack.springbootMQ.model.entity.OrderTradeRecord;
import com.debug.steadyjack.springbootMQ.model.entity.User;
import com.debug.steadyjack.springbootMQ.model.mapper.OrderTradeRecordMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.Objects;
@Component
public class RabbitMQListener {
private final static Logger log= LoggerFactory.getLogger(RabbitMQListener.class);
@Autowired
private OrderTradeRecordMapper orderTradeRecordMapper;
/*@RabbitListener(queues = "${register.queue.name}",containerFactory = "singleListenerContainer")
public void test(@Payload User user){
try {
log.debug("消费者监听消费到消息: {} ",user);
}catch (Exception e){
log.error("消息体解析 发生异常; ",e.fillInStackTrace());
}
}*/
//直接消费模式
@RabbitListener(queues = "${register.queue.name}",containerFactory = "singleListenerContainer")
public void consumeMessage(@Payload OrderTradeRecord record){
try {
log.debug("消费者监听交易记录信息: {} ",record);
//TODO:表示已经到ttl了,却还没付款,则需要处理为失效
if (Objects.equals(1,record.getStatus())){
record.setStatus(0);
record.setUpdateTime(new Date());
orderTradeRecordMapper.updateByPrimaryKeySelective(record);
}
}catch (Exception e){
log.error("消息体解析 发生异常; ",e.fillInStackTrace());
}
}
}
其他像Log4j的配置等就不贴出来了。下面看看效果
首先,run项目之后,可以在mq后台看到两条队列:
然后,在controller发起一条消息(相当于生产者):
最后看idea控制台以及mq控制台会发现这样一个现象:首先是延迟队列消费了消息,在数据库插入一条记录;然后在到达ttl后(这里是10s)进行转发到实际的消费队列:
好了,对于本篇博客,如果有相关疑问,可以留言,或者加入群讨论:java开源技术交流:583522159。我叫debug。
另外,推荐我一好友的博客以及技术学习微信公众号(里面有诸多干货哦!),博客地址为:http://blog.battcn.com/
上一篇博客简单介绍了rabbitMQ的原理以及生产消费的过程,还介绍了一个采用“确认消费模式”的demo。这一篇博客,将介绍另外一种消费模式
“直接消费”,并介绍一种在实际项目频繁使用的队列模式
“延迟队列”。
延迟队列,也叫“延时队列”,顾名思义,其实就是“生产者生产消息,消息进入队列之后,并不会立即被指定的消费者所消费,而是会延时一段指定的时间ttl,最终才被消费者消费”。
介绍了这个概念之后,我们接下来实战一个在项目中常见的场景:“用户创建下单记录之后,会对其进行付款,付款成功之后,该条记录将变为已支付并且有效,否则的话,一旦过了指定的时间,即超时了,则该记录将置为无效,并且不能被用于后续的业务逻辑”。
对于这个场景,我们就去掉了“付款的环节”,直接实现ttl一到,处理下单记录为无效。
首先是项目的配置文件:
#profile
#spring.profiles.active=production
#spring.profiles.active=local
#spring.profiles.active=dev
server.port=8098
server.tomcat.accesslog.enabled=true
server.tomcat.accesslog.directory=log
server.tomcat.uri-encoding=UTF-8
logging.file=springbootMQ spring.mvc.view.prefix=/WEB-INF/views/ spring.mvc.view.suffix=.jsp multipart.max-request-size=20Mb multipart.max-file-size=2Mb logging.level.org.springframework = INFO logging.level.com.fasterxml.jackson = INFO logging.level.com.debug.steadyjack.springbootMQ = DEBUG spring.jackson.date-format=yyyy-MM-dd HH:mm:ss spring.jackson.date-format-exact=yyyy-MM-dd HH:mm:ss SSS spring.jackson.time-zone=GMT+8 spring.datasource.initialize=false datasource.url=jdbc:mysql://127.0.0.1:3306/db_springboot?characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull datasource.username=root datasource.password=linsen #mybatis mybatis.config-location=classpath:mybatis-config.xml mybatis.checkConfigLocation = true mybatis.mapper-locations=classpath:mappers/*.xml ############################### rabbitmq ######################## spring.rabbitmq.virtual-host=/ spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.listener.concurrency=5 spring.rabbitmq.listener.max-concurrency=10 spring.rabbitmq.listener.prefetch=1 spring.rabbitmq.listener.transaction-size=1 ########################### queue 配置 ########################## mq.env=test register.exchange.name=${mq.env}.user.register.exchange register.delay.queue.name=${mq.env}.user.register.delay.queue register.delay.exchange.name=${mq.env}.user.register.delay.exchange register.queue.name=${mq.env}.user.register.queue #交易记录失效时间:10s trade.record.ttl=10000
另外,贴一下这个业务涉及的一张表的信息:
CREATE TABLE `order_trade_record` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`customer_id` int(11) DEFAULT NULL COMMENT '客户id',
`order_id` int(11) DEFAULT NULL COMMENT '订单id',
`price` decimal(15,2) DEFAULT NULL COMMENT '收款金额',
`status` int(11) DEFAULT '0' COMMENT '状态(0=未支付,1=已支付)',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_time` datetime DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='订单交易记录';
生成的model以及mapper接口以及mapper配置文件如下:
package com.debug.steadyjack.springbootMQ.model.entity;
import java.math.BigDecimal;
import java.util.Date;
public class OrderTradeRecord {
private Integer id;
private Integer customerId;
private Integer orderId;
private BigDecimal price;
private Integer status=1;
private Date createTime;
private Date updateTime;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public Integer getCustomerId() {
return customerId;
}
public void setCustomerId(Integer customerId) {
this.customerId = customerId;
}
public Integer getOrderId() {
return orderId;
}
public void setOrderId(Integer orderId) {
this.orderId = orderId;
}
public BigDecimal getPrice() {
return price;
}
public void setPrice(BigDecimal price) {
this.price = price;
}
public Integer getStatus() {
return status;
}
public void setStatus(Integer status) {
this.status = status;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
@Override
public String toString() {
return "OrderTradeRecord{" +
"id=" + id +
", customerId=" + customerId +
", orderId=" + orderId +
", price=" + price +
", status=" + status +
", createTime=" + createTime +
", updateTime=" + updateTime +
'}';
}
}
package com.debug.steadyjack.springbootMQ.model.mapper; import com.debug.steadyjack.springbootMQ.model.entity.OrderTradeRecord; public interface OrderTradeRecordMapper { int deleteByPrimaryKey(Integer id); int insert(OrderTradeRecord record); int insertSelective(OrderTradeRecord record); OrderTradeRecord selectByPrimaryKey(Integer id); int updateByPrimaryKeySelective(OrderTradeRecord record); int updateByPrimaryKey(OrderTradeRecord record); }
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <mapper namespace="com.debug.steadyjack.springbootMQ.model.mapper.OrderTradeRecordMapper" > <resultMap id="BaseResultMap" type="com.debug.steadyjack.springbootMQ.model.entity.OrderTradeRecord" > <id column="id" property="id" jdbcType="INTEGER" /> <result column="customer_id" property="customerId" jdbcType="INTEGER" /> <result column="order_id" property="orderId" jdbcType="INTEGER" /> <result column="price" property="price" jdbcType="DECIMAL" /> <result column="status" property="status" jdbcType="INTEGER" /> <result column="create_time" property="createTime" jdbcType="TIMESTAMP" /> <result column="update_time" property="updateTime" jdbcType="TIMESTAMP" /> </resultMap> <sql id="Base_Column_List" > id, customer_id, order_id, price, status, create_time, update_time </sql> <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.Integer" > select <include refid="Base_Column_List" /> from order_trade_record where id = #{id,jdbcType=INTEGER} </select> <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer" > delete from order_trade_record where id = #{id,jdbcType=INTEGER} </delete> <insert id="insert" keyProperty="id" useGeneratedKeys="true" parameterType="com.debug.steadyjack.springbootMQ.model.entity.OrderTradeRecord" > insert into order_trade_record (id, customer_id, order_id, price, status, create_time, update_time) values (#{id,jdbcType=INTEGER}, #{customerId,jdbcType=INTEGER}, #{orderId,jdbcType=INTEGER}, #{price,jdbcType=DECIMAL}, #{status,jdbcType=INTEGER}, #{createTime,jdbcType=TIMESTAMP}, #{updateTime,jdbcType=TIMESTAMP}) </insert> <insert id="insertSelective" keyProperty="id" useGeneratedKeys="true" parameterType="com.debug.steadyjack.springbootMQ.model.entity.OrderTradeRecord" > insert into order_trade_record <trim prefix="(" suffix=")" suffixOverrides="," > <if test="id != null" > id, </if> <if test="customerId != null" > customer_id, </if> <if test="orderId != null" > order_id, </if> <if test="price != null" > price, </if> <if test="status != null" > status, </if> <if test="createTime != null" > create_time, </if> <if test="updateTime != null" > update_time, </if> </trim> <trim prefix="values (" suffix=")" suffixOverrides="," > <if test="id != null" > #{id,jdbcType=INTEGER}, </if> <if test="customerId != null" > #{customerId,jdbcType=INTEGER}, </if> <if test="orderId != null" > #{orderId,jdbcType=INTEGER}, </if> <if test="price != null" > #{price,jdbcType=DECIMAL}, </if> <if test="status != null" > #{status,jdbcType=INTEGER}, </if> <if test="createTime != null" > #{createTime,jdbcType=TIMESTAMP}, </if> <if test="updateTime != null" > #{updateTime,jdbcType=TIMESTAMP}, </if> </trim> </insert> <update id="updateByPrimaryKeySelective" parameterType="com.debug.steadyjack.springbootMQ.model.entity.OrderTradeRecord" > update order_trade_record <set > <if test="customerId != null" > customer_id = #{customerId,jdbcType=INTEGER}, </if> <if test="orderId != null" > order_id = #{orderId,jdbcType=INTEGER}, </if> <if test="price != null" > price = #{price,jdbcType=DECIMAL}, </if> <if test="status != null" > status = #{status,jdbcType=INTEGER}, </if> <if test="createTime != null" > create_time = #{createTime,jdbcType=TIMESTAMP}, </if> <if test="updateTime != null" > update_time = #{updateTime,jdbcType=TIMESTAMP}, </if> </set> where id = #{id,jdbcType=INTEGER} </update> <update id="updateByPrimaryKey" parameterType="com.debug.steadyjack.springbootMQ.model.entity.OrderTradeRecord" > update order_trade_record set customer_id = #{customerId,jdbcType=INTEGER}, order_id = #{orderId,jdbcType=INTEGER}, price = #{price,jdbcType=DECIMAL}, status = #{status,jdbcType=INTEGER}, create_time = #{createTime,jdbcType=TIMESTAMP}, update_time = #{updateTime,jdbcType=TIMESTAMP} where id = #{id,jdbcType=INTEGER} </update> </mapper>
接下来,则是RabbitMq的全局配置加载文件,这个在实际项目中是很实用的(所以,诸位博友可以考虑直接拿在项目上使用):
package com.debug.steadyjack.springbootMQ.server.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import java.util.HashMap;
import java.util.Map;
/**
* rabbitmq 配置
* Created by steadyjack on 2017/12/01.
*/
@Configuration
public class RabbitmqConfig {
private final static Logger log = LoggerFactory.getLogger("mqLog");
@Autowired
private Environment env;
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
/**延迟队列配置**/
@Bean(name = "registerDelayQueue")
public Queue registerDelayQueue(){
Map<String, Object> params = new HashMap<>();
params.put("x-dead-letter-exchange",env.getProperty("register.exchange.name"));
params.put("x-dead-letter-routing-key","all");
return new Queue(env.getProperty("register.delay.queue.name"), true,false,false,params);
}
@Bean
public DirectExchange registerDelayExchange(){
return new DirectExchange(env.getProperty("register.delay.exchange.name"));
}
@Bean
public Binding registerDelayBinding(){
return BindingBuilder.bind(registerDelayQueue()).to(registerDelayExchange()).with("");
}
/**延迟队列配置**/
/**指标消费队列配置**/
@Bean
public TopicExchange registerTopicExchange(){
return new TopicExchange(env.getProperty("register.exchange.name"));
}
@Bean
public Binding registerBinding(){
return BindingBuilder.bind(registerQueue()).to(registerTopicExchange()).with("all");
}
@Bean(name = "registerQueue")
public Queue registerQueue(){
return new Queue(env.getProperty("register.queue.name"),true);
}
/**指标消费队列配置**/
/**
* 单一消费者
* @return
*/
@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(1);
factory.setPrefetchCount(1);
factory.setTxSize(1);
return factory;
}
/**
* 多个消费者
* @return
*/
@Bean(name = "multiListenerContainer")
public SimpleRabbitListenerContainerFactory multiListenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factoryConfigurer.configure(factory,connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.NONE);
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(){
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
}
});
return rabbitTemplate;
}
}
上图,即配置了两个队列:作为缓冲使用的延迟队列,真正去消费消息的队列。流程图可以大概用下图表示:
紧接着,我写了个controller以及request和service:
package com.debug.steadyjack.springbootMQ.server.controller;
import com.debug.steadyjack.springbootMQ.api.enums.StatusCode;
import com.debug.steadyjack.springbootMQ.api.response.BaseResponse;
import com.debug.steadyjack.springbootMQ.server.request.OrderTradeRecordRequest;
import com.debug.steadyjack.springbootMQ.server.service.OrderTradeRecordService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.validation.BindingResult;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import javax.validation.Valid;
/**
* 订单交易记录controller
* Created by steadyjack on 2017/12/11.
*/
@RestController
public class OrderTradeRecordController {
private static final Logger log= LoggerFactory.getLogger(OrderTradeRecordController.class);
private static final String prefix="order/trade/record";
@Autowired
private OrderTradeRecordService orderTradeRecordService;
/**
* 创建用户下单记录
* @param requestData
* @param bindingResult
* @return
* @throws Exception
*/
@RequestMapping(value = prefix+"/create",method = RequestMethod.POST,consumes = MediaType.APPLICATION_JSON_UTF8_VALUE,produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
public BaseResponse createRecord(@Valid @RequestBody OrderTradeRecordRequest requestData, BindingResult bindingResult) throws Exception{
if (bindingResult.hasErrors()){
return new BaseResponse(StatusCode.Invalid_Params);
}
BaseResponse response=new BaseResponse(StatusCode.Success);
try {
orderTradeRecordService.createTradeRecord(requestData);
}catch (Exception e){
log.error("用户下单记录异常:{} ",requestData,e.fillInStackTrace());
return new BaseResponse(StatusCode.Fail);
}
return response;
}
}
package com.debug.steadyjack.springbootMQ.server.request; import javax.validation.constraints.NotNull; import java.io.Serializable; import java.math.BigDecimal; /** * 订单交易记录request * Created by steadyjack on 2017/12/11. */ public class OrderTradeRecordRequest implements Serializable{ @NotNull private Integer customerId; @NotNull private Integer orderId; @NotNull private BigDecimal price; private Integer status=0; public Integer getCustomerId() { return customerId; } public void setCustomerId(Integer customerId) { this.customerId = customerId; } public Integer getOrderId() { return orderId; } public void setOrderId(Integer orderId) { this.orderId = orderId; } public BigDecimal getPrice() { return price; } public void setPrice(BigDecimal price) { this.price = price; } public Integer getStatus() { return status; } public void setStatus(Integer status) { this.status = status; } @Override public String toString() { return "OrderTradeRecordRequest{" + "customerId=" + customerId + ", orderId=" + orderId + ", price=" + price + ", status=" + status + '}'; } }
package com.debug.steadyjack.springbootMQ.server.service;
import com.debug.steadyjack.springbootMQ.model.entity.OrderTradeRecord;
import com.debug.steadyjack.springbootMQ.model.entity.User;
import com.debug.steadyjack.springbootMQ.model.mapper.OrderTradeRecordMapper;
import com.debug.steadyjack.springbootMQ.server.request.OrderTradeRecordRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.AbstractJavaTypeMapper;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;
import java.util.Date;
/**
* Created by steadyjack on 2017/12/11.
*/
@Service
public class OrderTradeRecordService {
private static final Logger log= LoggerFactory.getLogger(OrderTradeRecordService.class);
@Autowired
private Environment env;
@Autowired
public OrderTradeRecordMapper orderTradeRecordMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
public void createTradeRecord(OrderTradeRecordRequest requestData) throws Exception{
//TODO:其余业务逻辑上的校验。。
//TODO:创建交易记录
OrderTradeRecord record=new OrderTradeRecord();
BeanUtils.copyProperties(requestData,record);
record.setCreateTime(new Date());
record.setStatus(1);
orderTradeRecordMapper.insertSelective(record);
//TODO:设置超时,用mq处理已超时的下单记录(一旦记录超时,则处理为无效)
final Long ttl=env.getProperty("trade.record.ttl",Long.class);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setExchange(env.getProperty("register.delay.exchange.name"));
rabbitTemplate.setRoutingKey("");
rabbitTemplate.convertAndSend(record, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,User.class.getName());
message.getMessageProperties().setExpiration(ttl+"");
return message;
}
});
}
}
最终,我们当然是开发mq延迟队列对应的消费者:
package com.debug.steadyjack.springbootMQ.server.listener;
import com.debug.steadyjack.springbootMQ.model.entity.OrderTradeRecord;
import com.debug.steadyjack.springbootMQ.model.entity.User;
import com.debug.steadyjack.springbootMQ.model.mapper.OrderTradeRecordMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.Objects;
@Component
public class RabbitMQListener {
private final static Logger log= LoggerFactory.getLogger(RabbitMQListener.class);
@Autowired
private OrderTradeRecordMapper orderTradeRecordMapper;
/*@RabbitListener(queues = "${register.queue.name}",containerFactory = "singleListenerContainer")
public void test(@Payload User user){
try {
log.debug("消费者监听消费到消息: {} ",user);
}catch (Exception e){
log.error("消息体解析 发生异常; ",e.fillInStackTrace());
}
}*/
//直接消费模式
@RabbitListener(queues = "${register.queue.name}",containerFactory = "singleListenerContainer")
public void consumeMessage(@Payload OrderTradeRecord record){
try {
log.debug("消费者监听交易记录信息: {} ",record);
//TODO:表示已经到ttl了,却还没付款,则需要处理为失效
if (Objects.equals(1,record.getStatus())){
record.setStatus(0);
record.setUpdateTime(new Date());
orderTradeRecordMapper.updateByPrimaryKeySelective(record);
}
}catch (Exception e){
log.error("消息体解析 发生异常; ",e.fillInStackTrace());
}
}
}
其他像Log4j的配置等就不贴出来了。下面看看效果
首先,run项目之后,可以在mq后台看到两条队列:
然后,在controller发起一条消息(相当于生产者):
最后看idea控制台以及mq控制台会发现这样一个现象:首先是延迟队列消费了消息,在数据库插入一条记录;然后在到达ttl后(这里是10s)进行转发到实际的消费队列:
好了,对于本篇博客,如果有相关疑问,可以留言,或者加入群讨论:java开源技术交流:583522159。我叫debug。
另外,推荐我一好友的博客以及技术学习微信公众号(里面有诸多干货哦!),博客地址为:http://blog.battcn.com/
相关文章推荐
- springboot+rabbitMq整合开发实战一
- springboot+rabbitmq整合示例程
- spring boot整合activemq rabbitmq
- Springboot+RabbitMQ整合
- springboot+Rabit实战一:(Rabbit MQ windows 环境搭建)
- springboot+rabbitmq整合示例程
- Spring Boot整合RabbitMQ开发实战详解
- SpringBoot RabbitMQ 整合进阶版
- springboot+rabbitmq整合示例程
- 4 微服务实战系列 - SpringBoot RabbitMQ 实战解决项目中实践
- springboot+rabbitmq 整合实例
- [Spring Boot实战系列] - No.2 Spring boot 整合Spring Security用户管理和用户权限管理
- SpringBoot RocketMQ 整合使用和监控
- SpringBoot整合MyBatis开发
- SpringBoot+Maven项目实战(3):整合Freemark模板
- spring-boot rabbitmq 例子
- [Spring Boot实战系列] - No.1 开发第一个应用程序 Hello World
- Spring Boot RabbitMQ快速入门 (1)
- SpringBoot开发案例之整合Swagger篇
- spring-boot实战【02】:开发web应用