您的位置:首页 > 其它

基于zookeeper的分布式锁实现(监听父节点)

2017-09-18 00:00 429 查看
前言:在zk中,当有节点新增,删除,或者节点内容发生改变的时候,只要对节点注册了监听事件,那么当发生上述节点变化的时候,zk会自动触发监听事件并通知客户端,客户端拿到对应事件通知后,就可以做相应的业务处理

本文涉及到的节点:

1.父节点:/disLocks1(zk根目录下的disLocks1目录,CreateMode.PERSISTENT类型)

2.所有需要获取锁的线程,都会在/disLocks1目录下建立一个临时顺序的子节点(CreateMode.EPHEMERAL_SEQUENTIAL类型)

3.每次都是序号最小的节点获取锁,当最小的节点业务逻辑处理完毕后,断开本次连接(或者删除当前子节点),则临时顺序的节点自动删除,接着让其他没有获取锁的节点去获取锁

贴代码:

一个JVM,10个线程并发获取锁()多jvm,只需要事先建立父节点即可

package zoo.com.max.zoo.lock;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/**
*
* zk分布式锁实现
* 基于监听父节点下面的全部子节点实现,效率较低
* */
public class DistributedLock implements Watcher{

public static String host="127.0.0.1:2181";
//缓存时间
private static final int TIME_OUT   = 2000;

private static String FATHER_PATH = "/disLocks1";

private ZooKeeper zk;

private int threadId;

protected  CountDownLatch countDownLatch=new CountDownLatch(1);

public DistributedLock(int threadId){
this.threadId = threadId;
}
//获取zk连接
public void getZkClient(String host,int timeout)
{
try {
if(null == zk){
zk = new ZooKeeper(host, timeout, this);
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 创建子节点
*
* */
public String createNode(){
try {
//检测节点是否存在
Stat stat = zk.exists(FATHER_PATH, false);
//父节点不存在,则创建父节点,防止多线程并发创建父节点,所以加上同步代码块,防止在同一个jvm中的并发创建,多jvm环境下, 父节点可以事先创建好
if(Objects.isNull(stat)){
synchronized (FATHER_PATH) {
Stat stat2 = zk.exists(FATHER_PATH, false);
if(Objects.isNull(stat2)){
//父节点是持久节点
String path = zk.create(FATHER_PATH, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("父节点创建成功,返回值【"+path+"】");
}

}
}
//创建持久性父节点下面的临时顺序子节点,/父节点路径/0000000002
String lockPath = zk.create(FATHER_PATH+"/",null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("线程【"+threadId+"】开始执行,子节点创建成功,返回值【"+lockPath+"】");
return lockPath;
} catch (KeeperException e1) {
e1.printStackTrace();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
return null;
}

//校验当前节点是否为序号最小的节点
public boolean checkLockPath(String lockPath){
try {
//注册父节点监听事件,当父节点下面的子节点有变化,就会触发Watcher事件
List<String> nodeList = zk.getChildren(FATHER_PATH, this);
Collections.sort(nodeList);
int index = nodeList.indexOf( lockPath.substring(FATHER_PATH.length()+1));
switch (index){
case -1:{
System.out.println("本节点已不在了"+lockPath);
return false;
}
case 0:{
System.out.println("线程【"+threadId+"】获取锁成功,子节点序号【"+lockPath+"】");
return true;
}
default:{
String waitPath = nodeList.get(index - 1);
System.out.println(waitPath+"在"+nodeList.get(index)+"点前面,需要等待【"+nodeList.get(index)+"】");
return false;
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}

public boolean getLock(){
//创建获取锁的节点(顺序临时节点)
String childPath = createNode();
boolean flag = true;
if(null != childPath){
try {
//轮询等待zk获取锁的通知
while(flag){
if(checkLockPath(childPath)){
//获取锁成功
return true;
}else{
//节点创建成功, 则等待zk通知
countDownLatch.await();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}else{
System.out.println("节点没有创建成功,获取锁失败");
}
return false;
}

public void process(WatchedEvent event) {
//成功连接zk,状态判断
if(event.getState() == KeeperState.SyncConnected){
//子节点有变化
if(event.getType() == EventType.NodeChildrenChanged){
countDownLatch.countDown();
}
}
}

public void unlock(){
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public ZooKeeper getZooKeeper(){
return zk;
}
public static void main(String[] args) throws KeeperException, InterruptedException {
for(int i=0; i < 10; i++){
final int threadId = i+1;
new Thread(){
@Override
public void run() {
try{
DistributedLock dis = new DistributedLock(threadId);
dis.getZkClient(host,TIME_OUT);
if(dis.getLock()){
Thread.sleep(200);
dis.unlock();
}
} catch (Exception e){
System.out.println("【第"+threadId+"个线程】 抛出的异常:");
e.printStackTrace();
}
}
}.start();
}
}
}

第二遍会改进为向子节点注册监听事件, 这样就不用所有子节点都去向父节点注册事件,子节点只会在自己前面一个节点注册节点删除事件

新手码农,如有错误,希望大家多多指教,共同进步
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  zookeeper分布式锁