您的位置:首页 > 数据库 > Redis

Redis 分布式锁实现

2016-08-30 18:23 495 查看
Redis是一个key-value存储系统。和Memcached类似,但是解决了断电后数据完全丢失的情况,而且她支持更多无化的value类型,除了和string外,还支持lists(链表)、sets(集合)和zsets(有序集合)几种数据类型。这些数据类型都支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。

下面我们以redis为基础设计分布式锁。

1.锁接口定义

[java] view
plain copy

print?





package cn.slimsmart.redis.demo.lock;

import java.util.List;

import java.util.concurrent.TimeUnit;

/**

*锁接口定义

*/

public interface IRedisLockHandler {

/**

* 获取锁 如果锁可用 立即返回true, 否则返回false

* @param key

* @return

*/

boolean tryLock(String key);

/**

* 锁在给定的等待时间内空闲,则获取锁成功 返回true, 否则返回false

* @param key

* @param timeout

* @param unit

* @return

*/

boolean tryLock(String key, long timeout, TimeUnit unit);

/**

* 如果锁空闲立即返回 获取失败 一直等待

* @param key

*/

void lock(String key);

/**

* 批量获取锁 如果全部获取 立即返回true, 部分获取失败 返回false

* @param keyList

* @return

*/

boolean tryLock(List<String> keyList);

/**

* 锁在给定的等待时间内空闲,则获取锁成功 返回true, 否则返回false

* @param keyList

* @param timeout

* @param unit

* @return

*/

boolean tryLock(List<String> keyList, long timeout, TimeUnit unit);

/**

* 释放锁

* @param key

*/

void unLock(String key);

/**

* 批量释放锁

* @param keyList

*/

void unLock(List<String> keyList);

}

2.锁接口实现

[java] view
plain copy

print?





package cn.slimsmart.redis.demo.lock;

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.CopyOnWriteArrayList;

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import redis.clients.jedis.Jedis;

import redis.clients.jedis.JedisPool;

import redis.clients.jedis.Pipeline;

import redis.clients.jedis.exceptions.JedisConnectionException;

/**

* redis 分布式锁实现

*

*/

public class RedisLockHandler implements IRedisLockHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(RedisLockHandler.class);

// 单个锁有效期

private static final int DEFAULT_SINGLE_EXPIRE_TIME = 30;

// 批量锁有效期

private static final int DEFAULT_BATCH_EXPIRE_TIME = 60;

private final JedisPool jedisPool;

/**

* 构造

*/

public RedisLockHandler(JedisPool jedisPool) {

this.jedisPool = jedisPool;

}

/**

* 获取锁 如果锁可用 立即返回true, 否则返回false,不等待

*

* @return

*/

@Override

public boolean tryLock(String key) {

return tryLock(key, 0L, null);

}

/**

* 锁在给定的等待时间内空闲,则获取锁成功 返回true, 否则返回false

*

* @param timeout

* @param unit

* @return

*/

@Override

public boolean tryLock(String key, long timeout, TimeUnit unit) {

Jedis jedis = null;

try {

jedis = getResource();

//系统计时器的当前值,以毫微秒为单位。

long nano = System.nanoTime();

do {

LOGGER.debug("try lock key: " + key);

//将 key 的值设为 value 1成功 0失败

Long i = jedis.setnx(key, key);

if (i == 1) {

//设置过期时间

jedis.expire(key, DEFAULT_SINGLE_EXPIRE_TIME);

LOGGER.debug("get lock, key: " + key + " , expire in " + DEFAULT_SINGLE_EXPIRE_TIME + " seconds.");

//成功获取锁,返回true

return Boolean.TRUE;

} else { // 存在锁,循环等待锁

if (LOGGER.isDebugEnabled()) {

String desc = jedis.get(key);

LOGGER.debug("key: " + key + " locked by another business:" + desc);

}

}

if (timeout <= 0) {

//没有设置超时时间,直接退出等待

break;

}

Thread.sleep(300);

} while ((System.nanoTime() - nano) < unit.toNanos(timeout));

return Boolean.FALSE;

} catch (JedisConnectionException je) {

LOGGER.error(je.getMessage(), je);

//释放资源

returnBrokenResource(jedis);

} catch (Exception e) {

LOGGER.error(e.getMessage(), e);

} finally {

returnResource(jedis);

}

return Boolean.FALSE;

}

/**

* 如果锁空闲立即返回 获取失败 一直等待

*/

@Override

public void lock(String key) {

Jedis jedis = null;

try {

jedis = getResource();

do {

LOGGER.debug("lock key: " + key);

Long i = jedis.setnx(key, key);

if (i == 1) {

jedis.expire(key, DEFAULT_SINGLE_EXPIRE_TIME);

LOGGER.debug("get lock, key: " + key + " , expire in " + DEFAULT_SINGLE_EXPIRE_TIME + " seconds.");

return;

} else {

if (LOGGER.isDebugEnabled()) {

String desc = jedis.get(key);

LOGGER.debug("key: " + key + " locked by another business:" + desc);

}

}

Thread.sleep(300);

} while (true);

} catch (JedisConnectionException je) {

LOGGER.error(je.getMessage(), je);

returnBrokenResource(jedis);

} catch (Exception e) {

LOGGER.error(e.getMessage(), e);

} finally {

returnResource(jedis);

}

}

/**

* 释放锁

*/

@Override

public void unLock(String key) {

List<String> list = new ArrayList<String>();

list.add(key);

unLock(list);

}

/**

* 批量获取锁 如果全部获取 立即返回true, 部分获取失败 返回false

*

* @return

*/

@Override

public boolean tryLock(List<String> keyList) {

return tryLock(keyList, 0L, null);

}

/**

* 锁在给定的等待时间内空闲,则获取锁成功 返回true, 否则返回false

*

* @param timeout

* @param unit

* @return

*/

@Override

public boolean tryLock(List<String> keyList, long timeout, TimeUnit unit) {

Jedis jedis = null;

try {

//需要的锁

List<String> needLocking = new CopyOnWriteArrayList<String>();

//得到的锁

List<String> locked = new CopyOnWriteArrayList<String>();

jedis = getResource();

long nano = System.nanoTime();

do {

// 构建pipeline,批量提交

Pipeline pipeline = jedis.pipelined();

for (String key : keyList) {

needLocking.add(key);

pipeline.setnx(key, key);

}

LOGGER.debug("try lock keys: " + needLocking);

// 提交redis执行计数,批量处理完成返回

List<Object> results = pipeline.syncAndReturnAll();

for (int i = 0; i < results.size(); ++i) {

Long result = (Long) results.get(i);

String key = needLocking.get(i);

if (result == 1) { // setnx成功,获得锁

jedis.expire(key, DEFAULT_BATCH_EXPIRE_TIME);

locked.add(key);

}

}

needLocking.removeAll(locked); // 已锁定资源去除

if (needLocking.size() == 0) { //成功获取全部的锁

return true;

} else {

// 部分资源未能锁住

LOGGER.debug("keys: " + needLocking + " locked by another business:");

}

if (timeout == 0) {

break;

}

Thread.sleep(500);

} while ((System.nanoTime() - nano) < unit.toNanos(timeout));

// 得不到锁,释放锁定的部分对象,并返回失败

if (locked.size() > 0) {

jedis.del(locked.toArray(new String[0]));

}

return false;

} catch (JedisConnectionException je) {

LOGGER.error(je.getMessage(), je);

returnBrokenResource(jedis);

} catch (Exception e) {

LOGGER.error(e.getMessage(), e);

} finally {

returnResource(jedis);

}

return true;

}

/**

* 批量释放锁

*/

@Override

public void unLock(List<String> keyList) {

List<String> keys = new CopyOnWriteArrayList<String>();

for (String key : keyList) {

keys.add(key);

}

Jedis jedis = null;

try {

jedis = getResource();

jedis.del(keys.toArray(new String[0]));

LOGGER.debug("release lock, keys :" + keys);

} catch (JedisConnectionException je) {

LOGGER.error(je.getMessage(), je);

returnBrokenResource(jedis);

} catch (Exception e) {

LOGGER.error(e.getMessage(), e);

} finally {

returnResource(jedis);

}

}

/**

* 获取redis客户端

* @return

*/

private Jedis getResource() {

return jedisPool.getResource();

}

/**

* 销毁连接

* @param jedis

*/

private void returnBrokenResource(Jedis jedis) {

if (jedis == null) {

return;

}

try {

//中断链接

jedisPool.returnBrokenResource(jedis);

} catch (Exception e) {

LOGGER.error(e.getMessage(), e);

}

}

/**

* 重新初始化对象

* @param jedis

*/

private void returnResource(Jedis jedis) {

if (jedis == null) {

return;

}

try {

jedisPool.returnResource(jedis);

} catch (Exception e) {

LOGGER.error(e.getMessage(), e);

}

}

}

3.分布式锁测试

[java] view
plain copy

print?





package cn.slimsmart.redis.demo.lock;

import java.util.concurrent.TimeUnit;

import redis.clients.jedis.JedisPool;

import redis.clients.jedis.JedisPoolConfig;

public class RedisLockMain {

public static void main(String[] args) {

//创建jedis池配置实例

JedisPoolConfig config = new JedisPoolConfig();

//设置池配置项值

config.setMaxTotal(1024);

config.setMaxIdle(200);

config.setMaxWaitMillis(1000);

config.setTestOnBorrow(true);

config.setTestOnReturn(true);

//根据配置实例化jedis池

JedisPool pool = new JedisPool(config,"192.168.100.205", 6379);

IRedisLockHandler lock = new RedisLockHandler(pool);

if(lock.tryLock("abcd",20,TimeUnit.SECONDS)){

System.out.println(" get lock ...");

}else{

System.out.println(" not get lock ...");

}

lock.unLock("abcd");

}

}

关于通过spring aop实现分布式锁,请参考:http://blog.csdn.net/michaelzhaozero/article/details/23746059

由于setnx、expire2步操作,并非原子操作,可以会出现setnx执行完,服务宕机,这样就会导致锁一直不能释放,可以考虑将这2个操作放在一个事务中执行。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: