Springboot分别使用乐观锁和分布式锁(基于redisson)完成高并发防超卖
在电商中经常会有防超卖的需求,本质上是对一条数据的多线程并发情况下的数据安全性进行控制。
譬如一个商品goods,库存是100,在多线程都去读取修改的情况下,会产生数据错乱。
不加锁的情况
我们来看一个简单的例子,有个goods表,里面有个int型字段amount。我们用多线程来频繁修改amount的值,看看结果。
[code] @Transactional(rollbackFor = Exception.class) public void mult(Long goodsId) { PtGoods ptGoods = ptGoodsManager.find(goodsId); System.out.println(ptGoods.getAmount()); ptGoods.setAmount(ptGoods.getAmount() + 1); ptGoodsManager.update(ptGoods); }
这是操作修改amount字段的类GoodsService的代码,就是每次调用给amount的值加1.
下面是测试类,用100个线程来模拟同时操作该商品的数量:
[code]for (int i = 0; i < 100; i++) { new Thread(() -> { goodsService.mult(1L); } ).start(); }
运行后,我们去查看amount的最终的值,发现是10(每次调用都不一样)。正常情况下,被100次调用后应该是100才对,说明在并发下,出现了数据错误,原因大家都懂不细说。
解决方案
我们为了不让商品出现超卖,必然需要对上面的情况进行处理,方式也有很多种,基本都是锁。
来简单探讨一下几种情况下的处理。
1 最简单的是单体应用,也就是你的程序只部署了一份,不是分布式负载均衡部署的,也没有其他程序来修改你的数据。那么在天真的同学,可能会想到在修改数据的地方加个synchronized。锁住整个方法,线程自然老实排队通过该方法。
这样会有效吗?答案当然是否定的,可以看到,方法中大量重复读取了修改前的数据库值,即便线程是一个一个执行的,但是db依然返回了修改前的值。原因也很简单,jpa在执行save方法后,方法已经走完,就会进入下一个线程的逻辑,但此时,数据库并没有更新成功。jpa操作db,也是通过线程池连接的,执行了save,修改了表的值,save走完了,db未必瞬时更新,这是要时间的。与此同时,另一个查询请求早已到来,可能因为jpa的二级缓存、或者就是查询时,db的值还未修改成功。无论哪种都会导致读取到脏数据。
2 既然线程锁不能起到有效的作用,大家可能意识到问题点在于数据库的读写不一致。这就诞生了另外一套方案,即数据库上的锁——悲观锁和乐观锁。
悲观锁可以做到我读取时,就把该条数据锁住,其他人不允许读写,我做完我的事务后,别人才可以读写。
乐观锁可以做到,在我写入时,会再次查询最新的值,之后对比一下我读取时的版本,倘若最新的版本和我读取的不一致,那我就不写入,并抛异常。
jpa做悲观锁很简单,在Repository的方法上,加上Lock注解,PESSIMISTIC_READ,PESSIMISTIC_WRITE比较常用。具体的不细谈,因为我在做微服务时,不赞同用任何数据库级别的方式,尤其是悲观锁,使用不当易造成表锁,会严重影响其他业务的读写。
乐观锁也很简单,只需在model类里加上@Version注解的一个字段即可。譬如我在Goods类上加个
@Version
private Long version;即可实现乐观锁。倘若该表的读多写少,发生冲突的概率不高,那么避免并发读写错误可以选择比较简单的乐观锁。后面会来看一下乐观锁的应用。
3 分布式锁,就是借助于redis或者zookeeper来完成的在分布式环境下的锁,能够使用于分布式环境和单实例环境,而无需对数据库做任何要求,无需关心使用的是mysql还是MongoDB等任何数据库。也就是完全基于应用层面的对并发读写进行的控制,也是比较推荐的实现方式。
乐观锁
乐观锁就是在修改时,带上version版本号。这样如果试图修改已被别人修改过的数据时,会抛出异常。在一定程度上,也可以作为防超卖的一种处理方法。我们来看一下。
我们在Goods的entity类上,加上这个字段。
[code]@Version private Long version;
[code]@Transactional public synchronized void mult(Long goodsId) { PtGoods ptGoods = ptGoodsManager.find(goodsId); logger.info("----amount:" + ptGoods.getAmount()); ptGoods.setAmount(ptGoods.getAmount() + 1); ptGoodsManager.update(ptGoods); }
测试一下:
[code]for (int i = 0; i < 100; i++) { new Thread(() -> { goodsService.mult(1L); } ).start(); }
可以发现,抛出了很多异常,这就是乐观锁的异常。可想而知,当高并发购买同一个商品时,会出现大量的购买失败,而不会出现超卖的情况,因为他限制了并发的访问修改。
这样其实显而易见,也是大有问题的,只适应于读多写少的情况,否则大量的失败也是有损用户体验,明明有货,却不卖出。
那怎么解决呢,有人提出了重试策略。托若出现了上面的异常,就去重试执行该方法,直到成功。我们来看看能不能有用。
定义个自定义注解
[code]import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * @author wuweifeng wrote on 2019/5/8. */ @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface RetryOnFailure { }
定义个切面,注意把order设置的小一点,让该切面优先于Transactional注解,这样才能优先被捕捉乐观锁异常。
[code]@Aspect @Component @Order(1) public class RetryAspect { public static final int MAX_RETRY_TIMES = 10;//max retry times private Logger log = LoggerFactory.getLogger(getClass()); @Around("@annotation(retryOnF 3ff7 ailure)") public Object doConcurrentOperation(ProceedingJoinPoint pjp, RetryOnFailure retryOnFailure) throws Throwable { int attempts = 0; do { attempts++; try { pjp.proceed(); } catch (Exception e) { if (e instanceof ObjectOptimisticLockingFailureException || e instanceof StaleObjectStateException) { log.info("retrying....times:{}", attempts); if (attempts > MAX_RETRY_TIMES) { log.info("retry excceed the max times.."); throw e; } } } } while (attempts < MAX_RETRY_TIMES); return null; } }
从结果上看,进行了大量的重试,最终原本该加到100的amount字段,值已经到200多了。出现了严重的并发修改值错误。原因我也没细想,因为根本没打算采用这种方式。首先重试策略肯定是比较差的方式,因为非常不可控。其次,靠借助于数据库的锁来抛异常然后做处理,也是不太好的方式,应当尽量将问题控制在DB以外,由业务代码来控制。
分布式锁
通过上面的一系列操作,我们可以看到,通过单体应用自身的代码是控制不住这种情况的。此时就需要借助于一个第三方框架,能够提供无论是多线程或者分布式环境下的具备原子性的组件。比较常用的就是redis和zookeeper了。
以redis为例,这里选用功能比较丰富强大的redis客户端redisson来完成,https://github.com/redisson/redisson。
redisson具体的可以去上GitHub看他文档,我这里直接来讲怎么用了。
pom里加入依赖
[code]<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.10.6</version> </dependency>
redisson支持单点、集群等模式,这里选择单点的。application.yml配置好redis的连接:
[code]spring: redis: host: ${REDIS_HOST:127.0.0.1} port: ${REDIS_PORT:6379} password: ${REDIS_PASSWORD:}
配置redisson的客户端bean
[code]@Configuration public class RedisConfig { @Value("${spring.redis.host}") private String host; @Bean(name = {"redisTemplate", "stringRedisTemplate"}) public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) { StringRedisTemplate redisTemplate = new StringRedisTemplate(); redisTemplate.setConnectionFactory(factory); return redisTemplate; } @Bean public Redisson redisson() { Config config = new Config(); config.useSingleServer().setAddress("redis://" + host + ":6379"); return (Redisson) Redisson.create(config); } }
至于使用redisson的功能也很少,其实就是对并发访问的方法加个锁即可,方法执行完后释放锁。这样下一个请求才能进入到该方法。
我们创建一个redis锁的注解
[code]import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * @author wuweifeng wrote on 2019/5/8. */ @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface RedissonLock { /** * 要锁哪个参数 */ int lockIndex() default -1; /** * 锁多久后自动释放(单位:秒) */ int leaseTime() default 10; }
切面类:
[code]import com.tianyalei.giftmall.global.annotation.RedissonLock; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.redisson.Redisson; import org.redisson.api.RLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.concurrent.TimeUnit; /** * 分布式锁 * @author wuweifeng wrote on 2019/5/8. */ @Aspect @Component @Order(1) //该order必须设置,很关键 public class RedissonLockAspect { private Logger log = LoggerFactory.getLogger(getClass()); @Resource private Redisson redisson; @Around("@annotation(redissonLock)") public Object around(ProceedingJoinPoint joinPoint, RedissonLock redissonLock) throws Throwable { Object obj = null; //方法内的所有参数 Object[] params = joinPoint.getArgs(); int lockIndex = redissonLock.lockIndex(); //取得方法名 String key = joinPoint.getSignature().getDeclaringTypeName() + "." + joinPoint .getSignature().getName(); //-1代表锁整个方法,而非具体锁哪条数据 if (lockIndex != -1) { key += params[lockIndex]; } //多久会自动释放,默认10秒 int leaseTime = redissonLock.leaseTime(); int waitTime = 5; RLock rLock = redisson.getLock(key); boolean res = rLock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS); if (res) { log.info("取到锁"); obj = joinPoint.proceed(); rLock.unlock(); log.info("释放锁"); } else { log.info("----------nono----------"); throw new RuntimeException("没有获得锁"); } return obj; } }
这里解释一下,防超卖,其实是对某一个商品在被修改时进行加锁,而这个时候其他的商品是不受影响的。所以不能去锁整个方法,而应该是锁某个商品。所以我设置了一个lockIndex的参数,来指明你要锁的是方法的哪个属性,这里就是锁goodsId,如果不写,则是锁整个方法。
在切面里里面RLock.tryLock,则是最多等待5秒,托若还没取到锁就走失败,取到了则进入方法走逻辑。第二个参数是自动释放锁的时间,以避免自己刚取到锁,就挂掉了,导致锁无法释放。
测试类:
[code]package com.tianyalei.giftmall; import com.tianyalei.giftmall.core.goods.GoodsService; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import javax.annotation.Resource; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; @RunWith(SpringRunner.class) @SpringBootTest public class GiftmallApplicationTests { @Resource private GoodsService goodsService; private CyclicBarrier cyclicBarrier = new CyclicBarrier(100); private CyclicBarrier cyclicBarrier1 = new CyclicBarrier(100); @Test public void contextLoads() { for (int i = 0; i < 100; i++) { new Thread(() -> { try { cyclicBarrier.await(); goodsService.multi(1L); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } ).start(); new Thread(() -> { try { cyclicBarrier1.await(); goodsService.multi(2L); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } ).start(); } try { Thread.sleep(6000); } catch (InterruptedException e) { e.printStackTrace(); } } }
这里用100并发,同时操作2个商品。
可以看到,这两个商品在各自更新各自的,互不影响。最终在5秒后,有的超时了。调大等待时间,则能保证每个都是100.
通过这种方式,即完成了分布式锁,简单也便捷。当然这里只是举例,在实际项目中,倘若要做防止超卖,以追求最大性能的话,也可以考虑使用redis来存储amount,借助于redis的increase来做数量的增减,能迅速的给出客户端是否抢到了商品的判断,之后再通过消息队列去生成订单之类的耗时操作。
- 第三十八章:基于SpringBoot架构使用Profile完成打包环境分离
- 架构师基于微服务的项目使用 MockMvc完成SpringBoot2.x单元测试
- 基于 spring boot 使用Elasticsearch2.4.5及相关插件在windows安装
- 使用Spring Boot快速构建基于SQLite数据源的应用
- 使用Spring Boot快速构建基于SQLite数据源的应用
- 基于spring boot的企业项目完成基础框架搭建需要的模块
- SpringBoot-基于Maven工程使用SpringBoot
- Java 开发基于Zookeeper,Spring,vue.js的高并发多用户模块化微信商城系统(四) Java微框架Spring Boot的应用
- 基于spring-boot使用Swagger构建restful api文档
- 使用IDEA创建一个简单的基于Spring Boot的RESTful Web Service
- ES:partial update 原理、 基于groovy使用、 内置乐观锁并发控制
- SpringBoot 使用sharding jdbc进行分库分表,基于4.0版本,Springboot2.1
- 基于Spring Boot,使用JPA调用Sql Server数据库的存储过程并返回记录集合
- 1 springboot项目集成使用disconf,基于docker环境
- 使用Spring boot基于Redis快速搭建分布式Session缓存方案
- Spring boot 使用profile完成不同环境的maven打包功能
- 基于SpringBoot平台使用Lombok来优雅的编码
- 实现基于redis的分布式锁并集成spring-boot-starter
- 使用Idea完成spring boot 项目的创建与hello word
- 使用Redis为注册中心的Dubbo微服务架构(基于SpringBoot)