利用redis实现分布式锁
2015-09-30 15:34
736 查看
通过SET命令实现
从 Redis 2.6.12 版本开始,SET命令支持了NX参数,能实现 "只在键不存在时,才对键进行设置操作" ,我们可以利用这一特性实现我们的锁。
例如我们想得到名为lock1的锁,我们可以用如下操作进行加锁:
SET lock1 lock NX
若返回OK,表明加锁成功;返回nil表明此键值已存在,即存在其他用户得到了该锁。
当然,解锁也非常简单
DEL lock1
这里还有一个问题,若加锁后因为程序崩溃或代码BUG,忘记释放锁,就会造成死锁。我们可以将上述加锁过程稍加改造,就能解决这个问题
SET lock1 lock PX 1000 NX
这样若出现死锁的情况,这把锁也会在1000ms超时后自动释放。
加锁示例代码:
通过SETNX命令实现
上面的方法只适用于Redis 2.6.12版本以上,对于低于2.6.12版本的redis,我们可以用SETNX实现。具体细节可以参考http://www.jeffkit.info/2011/07/1000/,这里只把流程图和示例代码画一下。
加锁的流程图:
加锁示例代码:
这里存在两个问题:
1、计算超时的过程依赖本机时间,若两台机器时钟不同步,很容易计算超时出错。例如
A 加锁,超时时间设置为1000ms
同一时刻B 加锁,此时会加锁失败,进入判断死锁的逻辑。假设B机器时钟比A机器快1000ms,就会断定已死锁,从而获取锁成功。
此时就会出现A和B同时获取锁成功。
2、极端情况会出现加锁失败时仍然能更新锁,见原文。
这两个问题可以用下面的方案来解决。
通过LUA实现
由于redis是单线程的,执行lua脚本的过程中不会被其他指令打断。基于此,我们可以这样设计锁。
这个过程模拟了SET key value PX milliseconds NX的过程。考虑到若发送SETNX成功EXPIRE失败时,会出现死锁,这里在SETNX失败时加了一步判断是否有设置过期时间的逻辑。若未设置,就表明出现了死锁,重新锁定即可。
lua代码如下:
加锁代码如下:
从 Redis 2.6.12 版本开始,SET命令支持了NX参数,能实现 "只在键不存在时,才对键进行设置操作" ,我们可以利用这一特性实现我们的锁。
例如我们想得到名为lock1的锁,我们可以用如下操作进行加锁:
SET lock1 lock NX
若返回OK,表明加锁成功;返回nil表明此键值已存在,即存在其他用户得到了该锁。
当然,解锁也非常简单
DEL lock1
这里还有一个问题,若加锁后因为程序崩溃或代码BUG,忘记释放锁,就会造成死锁。我们可以将上述加锁过程稍加改造,就能解决这个问题
SET lock1 lock PX 1000 NX
这样若出现死锁的情况,这把锁也会在1000ms超时后自动释放。
加锁示例代码:
bool lock(redisContext *c, const char *lock_name, uint32_t timeout, uint64_t &seq ) { struct timeval t; gettimeofday(&t, NULL); seq = (uint64_t)t.tv_sec * 1000 + t.tv_usec / 1000 + timeout; redisReply *reply = (redisReply *)redisCommand(c, "SET %s %lu PX %u NX", lock_name, seq, timeout ); if (reply == NULL) { /* 发生错误 */ return false; } if (reply->type == REDIS_REPLY_STATUS && !strcmp(reply->str, "OK")) { /* 加锁成功 */ freeReplyObject(reply); return true; } if (reply->type == REDIS_REPLY_NIL) { /* 加锁失败 */ freeReplyObject(reply); return false; } /* error */ freeReplyObject(reply); return false; }解锁示例代码:
void unlock(redisContext *c, const char *lock_name, uint64_t seq) { struct timeval t; gettimeofday(&t, NULL); if (seq < (uint64_t)t.tv_sec * 1000 + t.tv_usec / 1000) { /* 已超时自动释放 */ return; } redisReply *reply = (redisReply *)redisCommand(c, "DEL %s", lock_name ); if (reply == NULL) { /* 发生错误 */ return; } if (reply->type != REDIS_REPLY_INTEGER || reply->integer != 1) { /* TODO: DEL失败 */ } freeReplyObject(reply); }
通过SETNX命令实现
上面的方法只适用于Redis 2.6.12版本以上,对于低于2.6.12版本的redis,我们可以用SETNX实现。具体细节可以参考http://www.jeffkit.info/2011/07/1000/,这里只把流程图和示例代码画一下。
加锁的流程图:
加锁示例代码:
bool lock(redisContext *c, const char *lock_name, uint32_t timeout, uint64_t &seq ) { struct timeval t; gettimeofday(&t, NULL); uint64_t now = (uint64_t)t.tv_sec * 1000 + t.tv_usec / 1000; seq = now + timeout; redisReply *reply = (redisReply *)redisCommand(c, "SETNX %s %lu", lock_name, seq ); if (reply == NULL) { /* 发生错误 */ return false; } if (reply->type != REDIS_REPLY_INTEGER) { /* 发生错误 */ freeReplyObject(reply); return false; } if (reply->integer == 1) { /* 加锁成功 */ freeReplyObject(reply); return true; } freeReplyObject(reply); reply = (redisReply *)redisCommand(c, "GET %s", lock_name ); if (reply == NULL) { /* 发生错误 */ return false; } if (reply->type != REDIS_REPLY_STRING) { /* 发生错误 */ freeReplyObject(reply); return false; } if (strtoull(reply->str, NULL, 10) > now) { /* 未因死锁而超时 */ freeReplyObject(reply); return false; } freeReplyObject(reply); reply = (redisReply *)redisCommand(c, "GETSET %s %lu", lock_name, seq ); if (reply == NULL) { /* 发生错误 */ return false; } if (reply->type != REDIS_REPLY_STRING) { /* 发生错误 */ freeReplyObject(reply); return false; } if (strtoull(reply->str, NULL, 10) > now) { /* 发生冲突 */ freeReplyObject(reply); return false; } freeReplyObject(reply); /* 上把锁因死锁而超时,重新锁定 */ return true; }
这里存在两个问题:
1、计算超时的过程依赖本机时间,若两台机器时钟不同步,很容易计算超时出错。例如
A 加锁,超时时间设置为1000ms
同一时刻B 加锁,此时会加锁失败,进入判断死锁的逻辑。假设B机器时钟比A机器快1000ms,就会断定已死锁,从而获取锁成功。
此时就会出现A和B同时获取锁成功。
2、极端情况会出现加锁失败时仍然能更新锁,见原文。
这两个问题可以用下面的方案来解决。
通过LUA实现
由于redis是单线程的,执行lua脚本的过程中不会被其他指令打断。基于此,我们可以这样设计锁。
这个过程模拟了SET key value PX milliseconds NX的过程。考虑到若发送SETNX成功EXPIRE失败时,会出现死锁,这里在SETNX失败时加了一步判断是否有设置过期时间的逻辑。若未设置,就表明出现了死锁,重新锁定即可。
lua代码如下:
if redis.call('SETNX', KEYS[1], ARGV[1]) == 1 or redis.call('TTL', KEYS[1]) < 0 then redis.call('PEXPIRE', KEYS[1], ARGV[1]) return 1 else return 0 end
加锁代码如下:
bool lock(redisContext *c, const char *lock_name, uint32_t timeout, uint64_t &seq ) { struct timeval t; gettimeofday(&t, NULL); seq = (uint64_t)t.tv_sec * 1000 + t.tv_usec / 1000 + timeout; const char *lua = "if redis.call('SETNX', KEYS[1], ARGV[1]) == 1 or redis.call('TTL', KEYS[1]) < 0 then " "redis.call('PEXPIRE', KEYS[1], ARGV[1]) " "return 1 " "else " "return 0 " "end " ; redisReply *reply = (redisReply *)redisCommand(c, "EVAL %s 1 %s %u", lua, lock_name, timeout ); if (reply == NULL) { /* 发生错误 */ return false; } if (reply->type == REDIS_REPLY_INTEGER && reply->integer == 1) { /* 加锁成功 */ freeReplyObject(reply); return true; } /* 加锁失败 */ freeReplyObject(reply); return false; }
相关文章推荐
- redis安装问题小结
- 分布式版本管理git入门指南使用资料汇总及文章推荐
- Redis偶发连接失败案例实战记录
- 详解Lua中的表的概念及其相关操作方法
- 基于oracle中锁的深入理解
- Lua编程示例(二):面向对象、metatable对表进行扩展
- Redis中实现查找某个值的范围
- Redis和Memcached的区别详解
- 分割超大Redis数据库例子
- Redis总结笔记(一):安装和常用命令
- Redis sort 排序命令详解
- 用Redis实现微博关注关系
- 把Lua编译进nginx步骤方法
- Lua脚本自动生成APK包
- Lua中的元表(metatable)、元方法(metamethod)详解
- Lua中的metatable介绍
- Lua中ipair和pair的区别
- Lua中的函数精讲笔记
- 浅谈Lua的面向对象特性
- 详解Lua中的变量相关知识点