您的位置:首页 > 其它

分布式锁基于zookeeper实现

2015-04-22 10:30 591 查看
public class LockUtil {
private static final Logger logger = LoggerFactory.getLogger(LockUtil.class);
private static final byte[]  data      = { 0x12, 0x34 };
private static Watcher watcher =  new Watcher() {

public void process(WatchedEvent event) {
LoggerUtils.logInfo(logger,"process : " + event.getType());
}
};
private static ZooKeeper           zookeeper=null;
private final String         root;                                     //根节点路径
private String               id;
private LockNode             idName;
private String               ownerId;
private String               lastChildId;
private Throwable            other     = null;
private KeeperException      exception = null;
private InterruptedException interrupt = null;
private static final Long DEFAULT_TIMEOUT_PERIOD=1000L;
private ReentrantLock       reentrantLock = new ReentrantLock();

static{
try
{
zookeeper =new ZooKeeper("ip:port", DEFAULT_TIMEOUT_PERIOD.intValue(),watcher);
}
catch (IOException e)
{
logger.error("获取zookeeper错误");
}
}

public LockUtil(String root) {
this.root = root;
ensureExists(root);

}

/**
* 尝试获取锁操作,阻塞式可被中断
* @throws NestableRuntimeException
*/
public void lock() throws InterruptedException, KeeperException, NestableRuntimeException {
// 可能初始化的时候就失败了
if (exception != null) {
throw exception;
}

if (interrupt != null) {
throw interrupt;
}

if (other != null) {
throw new NestableRuntimeException(other);
}

if (isOwner()) {//锁重入
return;
}
reentrantLock.lock();
BooleanMutex mutex = new BooleanMutex();
acquireLock(mutex);
// 避免zookeeper重启后导致watcher丢失,会出现死锁使用了超时进行重试
try {
mutex.get(DEFAULT_TIMEOUT_PERIOD, TimeUnit.MICROSECONDS);// 阻塞等待值为true
// mutex.get();
} catch (TimeoutException e) {
if (!mutex.state()) {
lock();
}
}

if (exception != null) {
throw exception;
}

if (interrupt != null) {
throw interrupt;
}

if (other != null) {
throw new NestableRuntimeException(other);
}
}

/**
* 尝试获取锁对象, 不会阻塞
*
* @throws InterruptedException
* @throws KeeperException
* @throws NestableRuntimeException
*/
public boolean tryLock() throws KeeperException, NestableRuntimeException {
// 可能初始化的时候就失败了
if (exception != null) {
throw exception;
}

if (isOwner()) {//锁重入
return true;
}
reentrantLock.lock();
acquireLock(null);

if (exception != null) {
throw exception;
}

if (interrupt != null) {
Thread.currentThread().interrupt();
}

if (other != null) {
throw new NestableRuntimeException(other);
}

return isOwner();
}

/**
* 释放锁对象
*/
public void unlock() throws KeeperException {
if (id != null) {
try {
zookeeper.delete(root + "/" + id, -1);
reentrantLock.unlock();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (KeeperException.NoNodeException e) {
// do nothing
} finally {
id = null;
}
} else {
//do nothing
}
}

private void ensureExists(final String path) {
try {
Stat stat = zookeeper.exists(path, false);
if (stat != null) {
return;
}

zookeeper.create(path, data,Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException e) {
exception = e;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
interrupt = e;
}
}

/**
* 返回锁对象对应的path
*/
public String getRoot() {
return root;
}

/**
* 判断当前是不是锁的owner
*/
public boolean isOwner() {
return id != null && ownerId != null && id.equals(ownerId);
}

/**
* 返回当前的节点id
*/
public String getId() {
return this.id;
}

// ===================== helper method =============================

/**
* 执行lock操作,允许传递watch变量控制是否需要阻塞lock操作
*/
private Boolean acquireLock(final BooleanMutex mutex) {
try {
do {
if (id == null) {//构建当前lock的唯一标识
long sessionId = zookeeper.getSessionId();
String prefix = "x-" + sessionId + "-";
//如果第一次,则创建一个节点
String path = zookeeper.create(root + "/" + prefix, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
int index = path.lastIndexOf("/");
id = StringUtils.substring(path, index + 1);
idName = new LockNode(id);
}

if (id != null) {
List<String> names = zookeeper.getChildren(root, false);
if (names.isEmpty()) {
id = null;//异常情况,重新创建一个
} else {
//对节点进行排序
SortedSet<LockNode> sortedNames = new TreeSet<LockNode>();
for (String name : names) {
sortedNames.add(new LockNode(name));
}

if (sortedNames.contains(idName) == false) {
id = null;//清空为null,重新创建一个
continue;
}

//将第一个节点做为ownerId
ownerId = sortedNames.first().getName();
if (mutex != null && isOwner()) {
mutex.set(true);//直接更新状态,返回
return true;
} else if (mutex == null) {
return isOwner();
}

SortedSet<LockNode> lessThanMe = sortedNames.headSet(idName);
if (!lessThanMe.isEmpty()) {
//关注一下排队在自己之前的最近的一个节点
LockNode lastChildName = lessThanMe.last();
lastChildId = lastChildName.getName();
//异步watcher处理
Stat stat=zookeeper.exists(root + "/" + lastChildId, new AsyncWatcher() {

public void asyncProcess(WatchedEvent event) {
acquireLock(mutex);
}

});

if (stat == null) {
acquireLock(mutex);// 如果节点不存在,需要自己重新触发一下,watcher不会被挂上去
}
} else {
if (isOwner()) {
mutex.set(true);
} else {
id = null;// 可能自己的节点已超时挂了,所以id和ownerId不相同
}
}
}
}
} while (id == null);
} catch (KeeperException e) {
exception = e;
if (mutex != null) {
mutex.set(true);
}
} catch (InterruptedException e) {
interrupt = e;
if (mutex != null) {
mutex.set(true);
}
} catch (Throwable e) {
other = e;
if (mutex != null) {
mutex.set(true);
}
}

if (isOwner() && mutex != null) {
mutex.set(true);
}
return Boolean.FALSE;
}
public class LockNode implements Comparable<LockNode>
{
private final String name;

private String       prefix;

private int          sequence = -1;

public LockNode(String name)
{
this.name = name;
this.prefix = name;
int idx = name.lastIndexOf('-');
if (idx >= 0)
{
this.prefix = name.substring(0, idx);
try
{
this.sequence = Integer.parseInt(name.substring(idx + 1));
}
catch (Exception e)
{
// ignore
}
}
}

public int compareTo(LockNode that)
{
int s1 = this.sequence;
int s2 = that.sequence;
if (s1 == -1 && s2 == -1)
{
return this.name.compareTo(that.name);
}
if (s1 == -1)
{
return -1;
}
else if (s2 == -1)
{
return 1;
}
else
{
return s1 - s2;
}
}

public String getName()
{
return name;
}

public int getSequence()
{
return sequence;
}

public String getPrefix()
{
return prefix;
}

public String toString()
{
return name.toString();
}

// ==================== hashcode & equals方法=======================
@Override
public int hashCode()
{
final int prime = 31;
int result = 1;
result = prime * result + ((name == null) ? 0 : name.hashCode());
return result;
}

@Override
public boolean equals(Object obj)
{
if (this == obj)
{
return true;
}
if (obj == null)
{
return false;
}
if (getClass() != obj.getClass())
{
return false;
}
LockNode other = (LockNode) obj;
if (name == null)
{
if (other.name != null)
{
return false;
}
}
else if (!name.equals(other.name))
{
return false;
}
return true;
}
}


public class BooleanMutex {

private Sync sync;

public BooleanMutex() {
sync = new Sync();
set(false);
}

public BooleanMutex(Boolean mutex) {
sync = new Sync();
set(mutex);
}

/**
* 阻塞等待Boolean为true
*
* @throws InterruptedException
*/
public void get() throws InterruptedException {
sync.innerGet();
}

/**
* 阻塞等待Boolean为true,允许设置超时时间
*
* @param timeout
* @param unit
* @throws InterruptedException
* @throws TimeoutException
*/
public void get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
sync.innerGet(unit.toNanos(timeout));
}

/**
* 重新设置对应的Boolean mutex
*
* @param mutex
*/
public void set(Boolean mutex) {
if (mutex) {
sync.innerSetTrue();
} else {
sync.innerSetFalse();
}
}

public boolean state() {
return sync.innerState();
}

/**
* Synchronization control for BooleanMutex. Uses AQS sync state to
* represent run status
*/
private final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -7828117401763700385L;

/** State value representing that TRUE */
private static final int  TRUE             = 1;
/** State value representing that FALSE */
private static final int  FALSE            = 2;

private boolean isTrue(int state) {
return (state & TRUE) != 0;
}

/**
* 实现AQS的接口,获取共享锁的判断
*/
protected int tryAcquireShared(int state) {
// 如果为true,直接允许获取锁对象
// 如果为false,进入阻塞队列,等待被唤醒
return isTrue(getState()) ? 1 : -1;
}

/**
* 实现AQS的接口,释放共享锁的判断
*/
protected boolean tryReleaseShared(int ignore) {
//始终返回true,代表可以release
return true;
}

boolean innerState() {
return isTrue(getState());
}

void innerGet() throws InterruptedException {
acquireSharedInterruptibly(0);
}

void innerGet(long nanosTimeout) throws InterruptedException, TimeoutException {
if (!tryAcquireSharedNanos(0, nanosTimeout))
throw new TimeoutException();
}

void innerSetTrue() {
for (;;) {
int s = getState();
if (s == TRUE) {
return; //直接退出
}
if (compareAndSetState(s, TRUE)) {// cas更新状态,避免并发更新true操作
releaseShared(0);//释放一下锁对象,唤醒一下阻塞的Thread
}
}
}

void innerSetFalse() {
for (;;) {
int s = getState();
if (s == FALSE) {
return; //直接退出
}
if (compareAndSetState(s, FALSE)) {//cas更新状态,避免并发更新false操作
setState(FALSE);
}
}
}

}
}
public abstract class AsyncWatcher implements Watcher
{
private static final int       DEFAULT_POOL_SIZE    = 30;

private static final int       DEFAULT_ACCEPT_COUNT = 60;

private static ExecutorService executor             = new ThreadPoolExecutor(
1,
DEFAULT_POOL_SIZE,
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue(
DEFAULT_ACCEPT_COUNT),
new NamedThreadFactory(
"Arbitrate-Async-Watcher"),
new ThreadPoolExecutor.CallerRunsPolicy());

public void process(final WatchedEvent event)
{
executor.execute(new Runnable()
{// 提交异步处理
@Override
public void run()
{
asyncProcess(event);
}
});
}

public abstract void asyncProcess(WatchedEvent event);
}


public class NestableRuntimeException extends Exception
{
public NestableRuntimeException(Throwable nest){
super(nest);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: