您的位置:首页 > 其它

基于zookeeper的分布式锁实现

2016-11-30 12:03 561 查看
工作中需要写一个定时任务,由于是集群环境,自然而然想到需要通过分布式锁来保证单台执行..相信大家都会想到使用zk来实现对应的分布式锁.下面就简单介绍一下几种实现

 

准备工作

有几个帮助类,先把代码放上来

ZKClient 对zk的操作做了一个简单的封装

 

Java代码  


package zk.lock;  
  
import org.apache.zookeeper.*;  
import org.apache.zookeeper.data.Stat;  
import zk.util.ZKUtil;  
  
import java.util.concurrent.CountDownLatch;  
import java.util.concurrent.TimeUnit;  
  
/** 
 * User: zhenghui 
 * Date: 14-3-26 
 * Time: 下午8:50 
 * 封装一个zookeeper实例. 
 */  
public class ZKClient implements Watcher {  
  
    private ZooKeeper zookeeper;  
  
    private CountDownLatch connectedSemaphore = new CountDownLatch(1);  
  
  
    public ZKClient(String connectString, int sessionTimeout) throws Exception {  
        zookeeper = new ZooKeeper(connectString, sessionTimeout, this);  
        System.out.println("connecting zk server");  
        if (connectedSemaphore.await(10l, TimeUnit.SECONDS)) {  
            System.out.println("connect zk server success");  
        } else {  
            System.out.println("connect zk server error.");  
            throw new Exception("connect zk server error.");  
        }  
    }  
  
    public void close() throws InterruptedException {  
        if (zookeeper != null) {  
            zookeeper.close();  
        }  
    }  
  
    public void createPathIfAbsent(String path, boolean isPersistent) throws Exception {  
        CreateMode createMode = isPersistent ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;  
        path = ZKUtil.normalize(path);  
        if (!this.exists(path)) {  
            zookeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);  
        }  
    }  
  
    public boolean exists(String path) throws Exception {  
        path = ZKUtil.normalize(path);  
        Stat stat = zookeeper.exists(path, null);  
        return stat != null;  
    }  
  
    public String getData(String path) throws Exception {  
        path = ZKUtil.normalize(path);  
        try {  
            byte[] data = zookeeper.getData(path, null, null);  
            return new String(data);  
        } catch (KeeperException e) {  
            if (e instanceof KeeperException.NoNodeException) {  
                throw new Exception("Node does not exist,path is [" + e.getPath() + "].", e);  
            } else {  
                throw new Exception(e);  
            }  
        } catch (InterruptedException e) {  
            Thread.currentThread().interrupt();  
            throw new Exception(e);  
        }  
    }  
  
    @Override  
    public void process(WatchedEvent event) {  
        if (event == null) return;  
  
        // 连接状态  
        Watcher.Event.KeeperState keeperState = event.getState();  
        // 事件类型  
        Watcher.Event.EventType eventType = event.getType();  
        // 受影响的path  
//        String path = event.getPath();  
        if (Watcher.Event.KeeperState.SyncConnected == keeperState) {  
            // 成功连接上ZK服务器  
            if (Watcher.Event.EventType.None == eventType) {  
                System.out.println("zookeeper connect success");  
                connectedSemaphore.countDown();  
            }  
        }  
        //下面可以做一些重连的工作.  
        else if (Watcher.Event.KeeperState.Disconnected == keeperState) {  
            System.out.println("zookeeper Disconnected");  
        } else if (Watcher.Event.KeeperState.AuthFailed == keeperState) {  
            System.out.println("zookeeper AuthFailed");  
        } else if (Watcher.Event.KeeperState.Expired == keeperState) {  
            System.out.println("zookeeper Expired");  
        }  
    }  
}  

 ZKUtil 针对zk路径的一个工具类

Java代码  


package zk.util;  
  
/** 
 * User: zhenghui 
 * Date: 14-3-26 
 * Time: 下午9:56 
 */  
public class ZKUtil {  
  
    public static final String SEPARATOR = "/";  
  
    /** 
     * 转换path为zk的标准路径 以/开头,最后不带/ 
     */  
    public static String normalize(String path) {  
        String temp = path;  
        if(!path.startsWith(SEPARATOR)) {  
            temp = SEPARATOR + path;  
        }  
        if(path.endsWith(SEPARATOR)) {  
            temp = temp.substring(0, temp.length()-1);  
            return normalize(temp);  
        }else {  
            return temp;  
        }  
    }  
  
    /** 
     * 链接两个path,并转化为zk的标准路径 
     */  
    public static String contact(String path1,String path2){  
        if(path2.startsWith(SEPARATOR)) {  
            path2 = path2.substring(1);  
        }  
        if(path1.endsWith(SEPARATOR)) {  
            return normalize(path1 + path2);  
        } else {  
            return normalize(path1 + SEPARATOR + path2);  
        }  
    }  
  
    /** 
     * 字符串转化成byte类型 
     */  
    public static byte[] toBytes(String data) {  
        if(data == null || data.trim().equals("")) return null;  
        return data.getBytes();  
    }  
}  

 NetworkUtil 获取本机IP的工具方法

Java代码  


package zk.util;  
  
import java.net.InetAddress;  
import java.net.NetworkInterface;  
import java.util.Enumeration;  
  
/** 
 * User: zhenghui 
 * Date: 14-4-1 
 * Time: 下午4:47 
 */  
public class NetworkUtil {  
  
    static private final char COLON = ':';  
  
    /** 
     * 获取当前机器ip地址 
     * 据说多网卡的时候会有问题. 
     */  
    public static String getNetworkAddress() {  
        Enumeration<NetworkInterface> netInterfaces;  
        try {  
            netInterfaces = NetworkInterface.getNetworkInterfaces();  
            InetAddress ip;  
            while (netInterfaces.hasMoreElements()) {  
                NetworkInterface ni = netInterfaces  
                        .nextElement();  
                Enumeration<InetAddress> addresses=ni.getInetAddresses();  
                while(addresses.hasMoreElements()){  
                    ip = addresses.nextElement();  
                    if (!ip.isLoopbackAddress()  
                            && ip.getHostAddress().indexOf(COLON) == -1) {  
                        return ip.getHostAddress();  
                    }  
                }  
            }  
            return "";  
        } catch (Exception e) {  
            return "";  
        }  
    }  
}  

 

--------------------------- 正文开始  -----------------------------------

这种实现非常简单,具体的流程如下



 对应的实现如下

Java代码  


package zk.lock;  
  
  
import zk.util.NetworkUtil;  
import zk.util.ZKUtil;  
  
/** 
 * User: zhenghui 
 * Date: 14-3-26 
 * Time: 下午8:37 
 * 分布式锁实现. 
 * 
 * 这种实现的原理是,创建某一个任务的节点,比如 /lock/tasckname 然后获取对应的值,如果是当前的Ip,那么获得锁,如果不是,则没获得 
 * .如果该节点不存在,则创建该节点,并把改节点的值设置成当前的IP 
 */  
public class DistributedLock01 {  
  
    private ZKClient zkClient;  
  
  
    public static final String LOCK_ROOT = "/lock";  
    private String lockName;  
  
  
    public DistributedLock01(String connectString, int sessionTimeout,String lockName) throws Exception {  
        //先创建zk链接.  
        this.createConnection(connectString,sessionTimeout);  
  
        this.lockName = lockName;  
    }  
  
    public boolean tryLock(){  
        String path = ZKUtil.contact(LOCK_ROOT,lockName);  
        String localIp = NetworkUtil.getNetworkAddress();  
        try {  
            if(zkClient.exists(path)){  
                String ownnerIp = zkClient.getData(path);  
                if(localIp.equals(ownnerIp)){  
                    return true;  
                }  
            } else {  
                zkClient.createPathIfAbsent(path,false);  
                if(zkClient.exists(path)){  
                    String ownnerIp = zkClient.getData(path);  
                    if(localIp.equals(ownnerIp)){  
                        return true;  
                    }  
                }  
            }  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
        return false;  
    }  
  
  
    /** 
     * 创建zk连接 
     * 
     */  
    protected void createConnection(String connectString, int sessionTimeout) throws Exception {  
        if(zkClient != null){  
            releaseConnection();  
        }  
        zkClient = new ZKClient(connectString,sessionTimeout);  
        zkClient.createPathIfAbsent(LOCK_ROOT,true);  
    }  
    /** 
     * 关闭ZK连接 
     */  
    protected void releaseConnection() throws InterruptedException {  
        if (zkClient != null) {  
            zkClient.close();  
        }  
    }  
  
}  

 

总结

网上有很多文章,大家的方法大多数都是创建一个root根节点,每一个trylock的客户端都会在root下创建一个 EPHEMERAL_SEQUENTIAL 的子节点,同时设置root的child 变更watcher(为了避免羊群效应,可以只添加前一个节点的变更通知) .如果创建的节点的序号是最小,则获取到锁,否则继续等待root的child 变更..这种方式我个人觉得有点繁琐,实现起来有点麻烦.具体的现有实现可以查看

https://svn.apache.org/repos/asf/zookeeper/trunk/src/recipes/lock/

其他的,想了半天没啥好说的..over
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: