Redis分布式锁
2016-05-04 11:14
567 查看
该分布式锁仅仅供思路参考,实际上该方法用在生产环境用还是有一定的问题,请使用原生的redisson分布式锁来解决生产环境中分布式应用锁的问题。https://github.com/redisson/
以下仅供参考思路,不要应用在生产环境中
参考原文来自博客园
本文是参考其他博客的思路和版本使用java代码改写的java版本。直接上代码,然后再进行分析。
php版本
上面是php版本的加锁方式,最开始我直接将上面版本使用java改写。 然后部署双机进行并发测试。我将分布式锁加到一个发礼包码的功能上面,该功能大概需求是这样的:用户点击页面按钮,服务器收到用户请求返回一个礼包码给用户,每个用户只能抢一次,每个礼包码都是唯一的不用重复。测试工具使用的是Jmeter, 每次启动100个线程,然后进行模拟用户点击请求发礼包码接口。线程请求完毕之后,看数据发现会有少量用户重复了。于是我添加了跟踪日志,在用户每次获取锁返回的地方,打印当前的参数与锁是如何获取到的。不难发现获取锁总共有两种途径,一种是setnx时返回true,另外就是
ttl < 0 的时候,ttl<0 说明锁key当前为设置过期时间,那么下面我们就来分析下锁key未设置过期时间的两种情况。
1. 线程A 在调用setnx之后系统宕机,未来得及调用expire方法,B线程进入的时候发现ttl<0;
2. 线程A 在调用setnx之后,未来的及调用expire方法,B线程等待的一个循环中刚好调用ttl方法,此时A线程的expire方法还未调用,B线程发现ttl小于0,因此此时两个线程同时获取了锁。
因为 setnx与与expire 是非原子性操作,所以很难保证在ttl<0的时候(即在setnx和expire之间其它线程进入ttl < 0 的流程。因此在ttl<0 的时候不能直接调用set方法来占用锁,而是应该先取出锁的值进行超时判断,由于最开始的时候setnx的值保存的是锁的有效时间,此时可以拿出锁的值进行判断是否超时,如果超时则说明该锁未设置过期时间是因为系统宕机引起的(锁的超时时间一般以秒为单位,系统宕机再启动肯定已经超时)。 如果锁的值未超时则是其他线程在调用setnx时未来的及调用expire方法。最后Java版本如下。
以下仅供参考思路,不要应用在生产环境中
参考原文来自博客园
本文是参考其他博客的思路和版本使用java代码改写的java版本。直接上代码,然后再进行分析。
php版本
/** * 加锁 * @param [type] $name 锁的标识名 * @param integer $timeout 循环获取锁的等待超时时间,在此时间内会一直尝试获取锁直到超时,为0表示失败后直接返回不等待 * @param integer $expire 当前锁的最大生存时间(秒),必须大于0,如果超过生存时间锁仍未被释放,则系统会自动强制释放 * @param integer $waitIntervalUs 获取锁失败后挂起再试的时间间隔(微秒) * @return [type] [description] */ public function lock($name, $timeout = 0, $expire = 15, $waitIntervalUs = 100000) { if ($name == null) return false; //取得当前时间 $now = time(); //获取锁失败时的等待超时时刻 $timeoutAt = $now + $timeout; //锁的最大生存时刻 $expireAt = $now + $expire; $redisKey = "Lock:{$name}"; while (true) { //将rediskey的最大生存时刻存到redis里,过了这个时刻该锁会被自动释放 $result = $this->redisString->setnx($redisKey, $expireAt); if ($result != false) { //设置key的失效时间 $this->redisString->expire($redisKey, $expireAt); //将锁标志放到lockedNames数组里 $this->lockedNames[$name] = $expireAt; return true; } //以秒为单位,返回给定key的剩余生存时间 $ttl = $this->redisString->ttl($redisKey); //ttl小于0 表示key上没有设置生存时间(key是不会不存在的,因为前面setnx会自动创建) //如果出现这种状况,那就是进程的某个实例setnx成功后 crash 导致紧跟着的expire没有被调用 //这时可以直接设置expire并把锁纳为己用 if ($ttl < 0) { $this->redisString->set($redisKey, $expireAt); $this->lockedNames[$name] = $expireAt; return true; } /*****循环请求锁部分*****/ //如果没设置锁失败的等待时间 或者 已超过最大等待时间了,那就退出 if ($timeout <= 0 || $timeoutAt < microtime(true)) break; //隔 $waitIntervalUs 后继续 请求 usleep($waitIntervalUs); } return false; }
上面是php版本的加锁方式,最开始我直接将上面版本使用java改写。 然后部署双机进行并发测试。我将分布式锁加到一个发礼包码的功能上面,该功能大概需求是这样的:用户点击页面按钮,服务器收到用户请求返回一个礼包码给用户,每个用户只能抢一次,每个礼包码都是唯一的不用重复。测试工具使用的是Jmeter, 每次启动100个线程,然后进行模拟用户点击请求发礼包码接口。线程请求完毕之后,看数据发现会有少量用户重复了。于是我添加了跟踪日志,在用户每次获取锁返回的地方,打印当前的参数与锁是如何获取到的。不难发现获取锁总共有两种途径,一种是setnx时返回true,另外就是
ttl < 0 的时候,ttl<0 说明锁key当前为设置过期时间,那么下面我们就来分析下锁key未设置过期时间的两种情况。
1. 线程A 在调用setnx之后系统宕机,未来得及调用expire方法,B线程进入的时候发现ttl<0;
2. 线程A 在调用setnx之后,未来的及调用expire方法,B线程等待的一个循环中刚好调用ttl方法,此时A线程的expire方法还未调用,B线程发现ttl小于0,因此此时两个线程同时获取了锁。
因为 setnx与与expire 是非原子性操作,所以很难保证在ttl<0的时候(即在setnx和expire之间其它线程进入ttl < 0 的流程。因此在ttl<0 的时候不能直接调用set方法来占用锁,而是应该先取出锁的值进行超时判断,由于最开始的时候setnx的值保存的是锁的有效时间,此时可以拿出锁的值进行判断是否超时,如果超时则说明该锁未设置过期时间是因为系统宕机引起的(锁的超时时间一般以秒为单位,系统宕机再启动肯定已经超时)。 如果锁的值未超时则是其他线程在调用setnx时未来的及调用expire方法。最后Java版本如下。
/** * 加锁 * * @param name * 锁的表示名称 * @param timeout * 循环获取锁的等待超时时间,在此时间内会一致尝试 * @param expire * 当前锁的最大生存时间(秒) * @param waitIntervalUs */ public boolean lock(String name, Integer timeout, Integer expire, Long waitIntervalUs) { if (StringUtils.isBlank(name)) { return false; } Long newTime = System.currentTimeMillis(); Long timeoutAt = newTime + timeout * 1000L; Long expireAt = newTime + expire * 1000L; String lockKey = getRedisKey(name); int count = 0; while (true) { if(logger.isDebugEnabled()) { logger.debug( "count: {}",count++); } Long result = redisClientTemplate.setnx(lockKey, String.valueOf(expireAt)); if (result != null && result == 1) { redisClientTemplate.expire(lockKey, expire); lockedNames.put(name, expireAt); if(logger.isDebugEnabled()) { logger.debug( "setnx success, get lockKey:{}, newTime:{}, expireAt:{}", lockKey, newTime, expireAt); } return true; } Long tll = redisClientTemplate.ttl(lockKey); // ttl小于0 表示key上没有设置生存时间(key是不会不存在的,因为前面setnx会自动创建) // 如果出现这种状况,那就是进程的某个实例setnx成功后 crash 导致紧跟着的expire没有被调用 // 这时可以直接设置expire并把锁纳为己用 if (tll < 0) { String expireAtTime = redisClientTemplate.get(lockKey); // 判断是否存在值,如果存在根据该值判断是刚刚加入进去的新值,还是系统挂掉导致expire未更新的旧值 if (StringUtils.isNotBlank(expireAtTime)) { Long expireAtSime = Long.valueOf(expireAtTime); // 判断set进去的值是否小于当前系统时间,如果小于的话说明是上面在调用expire之前系统crash掉,已经过期 if (expireAtSime < System.currentTimeMillis()) { redisClientTemplate.setex(lockKey, expire, String.valueOf(expireAt)); lockedNames.put(name, expireAt); if(logger.isDebugEnabled()) { logger.debug( "tll < 0 and expireAtSime < system time, get lockKey:{}, newTime:{}, expireAt:{},old expireat:{}", lockKey, newTime, expireAt, expireAtSime); } return true; } }else { result = redisClientTemplate.setnx(lockKey, String.valueOf(expireAt)); if (result != null && result == 1) { redisClientTemplate.expire(lockKey, expire); lockedNames.put(name, expireAt); if(logger.isDebugEnabled()) { logger.debug( "tll < 0 and setnx success, get lockKey:{}, newTime:{}, expireAt:{}", lockKey, newTime, expireAt); } return true; } } } /***** 循环请求锁部分 *****/ // 如果没设置锁失败的等待时间 或者 已超过最大等待时间了,那就退出 if (timeout <= 0 || timeoutAt < System.currentTimeMillis()) { if(logger.isDebugEnabled()) { logger.debug("请求超时退出"); } break; } // 隔 $waitIntervalUs 后继续 请求 try { Thread.sleep(waitIntervalUs); } catch (InterruptedException e) { logger.error("thread sleep error: ", e); } } return false; }
相关文章推荐
- 基于hiredis的聊天客户端
- 开源分布式搜索平台ELK+Redis+Syslog-ng实现日志实时搜索
- Redis3.0单机版的安装教程
- PubSub机制测试
- redis笔记总结(一)集群分片
- redis入门笔记(1)
- redis入门笔记(1)
- 在redis命令行中正确显示中文
- linux命令行中显示redis的中文编码
- redis make test tcl 8.5 or newer in order to run the Redis test
- 学习Redis的一本好书: Redis Essentials
- redis主从不同步问题处理
- redis性能优化
- Jedis使用总结【pipeline】【分布式的id生成器】【分布式锁【watch】【multi】】【redis分布式】
- Redis+Spring缓存实例(windows环境,附实例源码及详解)
- redis学习笔记(2)---链表adlist
- Redis主从复制和集群配置
- Redis主从复制和集群配置
- Redis主从复制和集群配置
- Redis源码解析——统计二进制数中1的个数