您的位置:首页 > 其它

<从PAXOS到ZOOKEEPER分布式一致性原理与实践>读书笔记-zookeeper分布式锁

2016-05-22 17:52 351 查看
本文属于分布式系统学习笔记系列,上一篇梳理了第六章的zookeeper使用场景之一全局唯一id 生成,本文继续第六章,介绍分布式锁。

一背景

为什么使用分布式锁?

书上6.17节介绍了原因,依赖于关系型数据库时MYSQL的应用可以借助于事务来实现锁,也可以使用版本号等实现乐观锁,主要缺陷就是性能差或者就是有瓶颈。对于分布式系统来说不同节点访问同一资源,就需要引入分布式锁。

补充下对于分布式锁的要求:

1,高性能

2. 避免死锁(基于锁的节点异常不能导致其他节点一直等待)

二使用zookeeper实现分布式锁

思路比较简单:

,每一个trylock的客户端都会在root下创建一个 EPHEMERAL_SEQUENTIAL 的子节点,同时设置root的child 变更watcher(为了避免羊群效应,可以只添加前一个节点的变更通知) .如果创建的节点的序号是最小,则获取到锁,否则继续等待root的child
变更.常见场景如下:

*1.结点A请求加锁,在特定路径下注册自己(会话自增结点),得到一个ID号1

2.结点B请求加锁,在特定路径下注册自己(会话自增结点),得到一个ID号2

*3.结点A获取所有结点ID,判断出来自己是最小结点号,于是获得锁

4.结点B获取所有结点ID,判断出来自己不是最小结点,于是监听小于自己的最大结点(结点A)变更事件

5.结点A拿到锁,处理业务,处理完,释放锁(删除自己)

6.结点B收到结点A变更事件,判断出来自己已经是最小结点号,于是获得锁。

网上引一段代码,原文地址:http://www.oschina.net/code/snippet_255033_20334

package com.concurrent;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/**
DistributedLock lock = null;
try {
lock = new DistributedLock("127.0.0.1:2182","test");
lock.lock();
//do something...
} catch (Exception e) {
e.printStackTrace();
}
finally {
if(lock != null)
lock.unlock();
}
* @author xueliang
*
*/
public class DistributedLock implements Lock, Watcher{
private ZooKeeper zk;
private String root = "/locks";//根
private String lockName;//竞争资源的标志
private String waitNode;//等待前一个锁
private String myZnode;//当前锁
private CountDownLatch latch;//计数器
private int sessionTimeout = 30000;
private List<Exception> exception = new ArrayList<Exception>();

/**
* 创建分布式锁,使用前请确认config配置的zookeeper服务可用
* @param config 127.0.0.1:2181
* @param lockName 竞争资源标志,lockName中不能包含单词lock
*/
public DistributedLock(String config, String lockName){
this.lockName = lockName;
// 创建一个与服务器的连接
try {
zk = new ZooKeeper(config, sessionTimeout, this);
Stat stat = zk.exists(root, false);
if(stat == null){
// 创建根节点
zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
} catch (IOException e) {
exception.add(e);
} catch (KeeperException e) {
exception.add(e);
} catch (InterruptedException e) {
exception.add(e);
}
}

/**
* zookeeper节点的监视器
*/
public void process(WatchedEvent event) {
if(this.latch != null) {
this.latch.countDown();
}
}

public void lock() {
if(exception.size() > 0){
throw new LockException(exception.get(0));
}
try {
if(this.tryLock()){
System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true");
return;
}
else{
waitForLock(waitNode, sessionTimeout);//等待锁
}
} catch (KeeperException e) {
throw new LockException(e);
} catch (InterruptedException e) {
throw new LockException(e);
}
}

public boolean tryLock() {
try {
String splitStr = "_lock_";
if(lockName.contains(splitStr))
throw new LockException("lockName can not contains \\u000B");
//创建临时子节点
myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(myZnode + " is created ");
//取出所有子节点
List<String> subNodes = zk.getChildren(root, false);
//取出所有lockName的锁
List<String> lockObjNodes = new ArrayList<String>();
for (String node : subNodes) {
String _node = node.split(splitStr)[0];
if(_node.equals(lockName)){
lockObjNodes.add(node);
}
}
Collections.sort(lockObjNodes);
System.out.println(myZnode + "==" + lockObjNodes.get(0));
if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
//如果是最小的节点,则表示取得锁
return true;
}
//如果不是最小的节点,找到比自己小1的节点
String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);
} catch (KeeperException e) {
throw new LockException(e);
} catch (InterruptedException e) {
throw new LockException(e);
}
return false;
}

public boolean tryLock(long time, TimeUnit unit) {
try {
if(this.tryLock()){
return true;
}
return waitForLock(waitNode,time);
} catch (Exception e) {
e.printStackTrace();
}
return false;
}

private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException {
Stat stat = zk.exists(root + "/" + lower,true);
//判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听
if(stat != null){
System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);
this.latch = new CountDownLatch(1);
this.latch.await(waitTime, TimeUnit.MILLISECONDS);
this.latch = null;
}
return true;
}

public void unlock() {
try {
System.out.println("unlock " + myZnode);
zk.delete(myZnode,-1);
myZnode = null;
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}

public void lockInterruptibly() throws InterruptedException {
this.lock();
}

public Condition newCondition() {
return null;
}

public class LockException extends RuntimeException {
private static final long serialVersionUID = 1L;
public LockException(String e){
super(e);
}
public LockException(Exception e){
super(e);
}
}

}


三基于redis实现分布式锁

对于上面这种zookeeper的方案,网上有测试效率不高。并给出了性能更好地基于redis的方案。这里也贴出来学习下

原文地址如下http://rdc.gleasy.com/%E4%B8%A4%E7%A7%8D%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81%E5%AE%9E%E7%8E%B0%E6%96%B9%E6%A1%882.html:

/**
*使用Redis实现的分布式锁
*基本工作原理如下:
*1. 使用setnx(key,时间戮 超时),如果设置成功,则直接拿到锁
*2. 如果设置不成功,获取key的值v1(它的到期时间戮),跟当前时间对比,看是否已经超时
*3. 如果超时(说明拿到锁的结点已经挂掉),v2=getset(key,时间戮 超时 1),判断v2是否等于v1,如果相等,加锁成功,否则加锁失败,等过段时间再重试(200MS)
*/
public class RedisLock implements LockListener{
private String key;
private boolean owner = false;
private AbstractLockObserver observer = null;
private LockListener lockListener = null;
private boolean waiting = false;
private long expire;//锁超时时间,以秒为单位
private boolean expired = false;

public RedisLock(String key, LockListener lockListener, AbstractLockObserver observer) {
this.key = key;
this.lockListener = lockListener;
this.observer = observer;
}

public boolean trylock(long expire) {
synchronized(this){
if(owner){
return true;
}
this.expire = expire;
this.expired = false;
if(!waiting){
owner = observer.tryLock(key,expire);
if(!owner){
waiting = true;
observer.addLockListener(key, this);
}
}
return owner;
}
}

public boolean isOwner() {
return owner;
}

public void unlock() {
synchronized(this){
observer.unLock(key);
owner = false;
}
}

public void clear() {
synchronized(this){
if(waiting) {
observer.removeLockListener(key);
waiting = false;
}
}
}

public boolean doExpire(){
synchronized(this){
if(owner) return true;
if(expired) return false;
expired = true;
clear();
}
return false;
}

@Override
public void lockAcquired() {
synchronized(this){
if(expired){
unlock();
return;
}
owner = true;
waiting = false;
}
lockListener.lockAcquired();
}

@Override
public long getExpire() {
return this.expire;
}

@Override
public void lockError() {
synchronized(this){
owner = false;
waiting = false;
lockListener.lockError();
}
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: