您的位置:首页 > 其它

基于zookeeper简单实现分布式锁

2017-08-21 01:59 316 查看
这里利用zookeeper的EPHEMERAL_SEQUENTIAL类型节点及watcher机制,来简单实现分布式锁。 思路如下:1、开启10个线程,在disLocks节点下各自创建名为sub的EPHEMERAL_SEQUENTIAL节点;2、获取disLocks节点下所有子节点,排序,如果自己的节点编号最小,则获取锁;3、否则watch排在自己前面的节点,监听到其删除后,进入第2步(重新检测排序是防止监听的节点发生连接失效,导致的节点删除情况);4、删除自身sub节点,释放连接;
这里插播下zookeeper的4种节点类型:public enum CreateMode {      /** 
     * 持久节点:节点创建后,会一直存在,不会因客户端会话失效而删除; 
     */  
    PERSISTENT (0, false, false),  
  
    /** 
    * 持久顺序节点:基本特性与持久节点一致,创建节点的过程中,zookeeper会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名;  
    */  
    PERSISTENT_SEQUENTIAL (2, false, true),  
  
    /** 
     *  临时节点:客户端会话失效或连接关闭后,该节点会被自动删除,且不能再临时节点下面创建子节点,否则报如下错:org.apache.zookeeper.KeeperException$NoChildrenForEphemeralsException; 
     */  
    EPHEMERAL (1, true, false),  
  
    /** 
     * 临时顺序节点:基本特性与临时节点一致,创建节点的过程中,zookeeper会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名;  
     */  
    EPHEMERAL_SEQUENTIAL (3, true, true);  
    private static final Logger LOG = LoggerFactory.getLogger(CreateMode.class);  
    private boolean ephemeral;  
    private boolean sequential;  
    private int flag;  
    CreateMode(int flag, boolean ephemeral, boolean sequential) {  
        this.flag = flag;  
        this.ephemeral = ephemeral;  
        this.sequential = sequential;  
    }  
    public boolean isEphemeral() {  
        return ephemeral;  
    }  
    public boolean isSequential() {  
        return sequential;  
    }  
    public int toFlag() {  
        return flag;  
    }  
    static public CreateMode fromFlag(int flag) throws KeeperException {  
        switch(flag) {  
        case 0: return CreateMode.PERSISTENT;  
        case 1: return CreateMode.EPHEMERAL;  
        case 2: return CreateMode.PERSISTENT_SEQUENTIAL;  
        case 3: return CreateMode.EPHEMERAL_SEQUENTIAL ;  
        default:  
            LOG.error("Received an invalid flag value to convert to a CreateMode");  
            throw new KeeperException.BadArgumentsException();  
        }  
    }  
}  
测试代码:package zookeeper;  import org.slf4j.Logger;  
import org.slf4j.LoggerFactory;  
import org.apache.zookeeper.*;  
import org.apache.zookeeper.data.Stat;  
import java.util.List;  
import java.io.IOException;  
import java.util.Collections;  
import java.util.concurrent.CountDownLatch;  
  
public class DistributedLock implements Watcher{  
    private int threadId;  
    private ZooKeeper zk = null;  
    private String selfPath;  
    private String waitPath;  
    private String LOG_PREFIX_OF_THREAD;  
    private static final int SESSION_TIMEOUT = 10000;  
    private static final String GROUP_PATH = "/disLocks";  
    private static final String SUB_PATH = "/disLocks/sub";  
    private static final String CONNECTION_STRING = "192.168.*.*:2181";  
      
    private static final int THREAD_NUM = 10;   
    //确保连接zk成功;  
    private CountDownLatch connectedSemaphore = new CountDownLatch(1);  
    //确保所有线程运行结束;  
    private static final CountDownLatch threadSemaphore = new CountDownLatch(THREAD_NUM);  
    private static final Logger LOG = LoggerFactory.getLogger(AllZooKeeperWatcher.class);  
    public DistributedLock(int id) {  
        this.threadId = id;  
        LOG_PREFIX_OF_THREAD = "【第"+threadId+"个线程】";  
    }  
    public static void main(String[] args) {  
        for(int i=0; i < THREAD_NUM; i++){  
            final int threadId = i+1;  
            new Thread(){  
                @Override  
                public void run() {  
                    try{  
                        DistributedLock dc = new DistributedLock(threadId);  
                        dc.createConnection(CONNECTION_STRING, SESSION_TIMEOUT);  
                        //GROUP_PATH不存在的话,由一个线程创建即可;  
                        synchronized (threadSemaphore){  
                            dc.createPath(GROUP_PATH, "该节点由线程" + threadId + "创建", true);  
                        }  
                        dc.getLock();  
                    } catch (Exception e){  
                        LOG.error("【第"+threadId+"个线程】 抛出的异常:");  
                        e.printStackTrace();  
                    }  
                }  
            }.start();  
        }  
        try {  
            threadSemaphore.await();  
            LOG.info("所有线程运行结束!");  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
    /** 
     * 获取锁 
     * @return 
     */  
    private void getLock() throws KeeperException, InterruptedException {  
        selfPath = zk.create(SUB_PATH,null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);  
        LOG.info(LOG_PREFIX_OF_THREAD+"创建锁路径:"+selfPath);  
        if(checkMinPath()){  
            getLockSuccess();  
        }  
    }  
    /** 
     * 创建节点 
     * @param path 节点path 
     * @param data 初始数据内容 
     * @return 
     */  
    public boolean createPath( String path, String data, boolean needWatch) throws KeeperException, InterruptedException {  
        if(zk.exists(path, needWatch)==null){  
            LOG.info( LOG_PREFIX_OF_THREAD + "节点创建成功, Path: "  
                    + this.zk.create( path,  
                    data.getBytes(),  
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,  
                    CreateMode.PERSISTENT )  
                    + ", content: " + data );  
        }  
        return true;  
    }  
    /** 
     * 创建ZK连接 
     * @param connectString  ZK服务器地址列表 
     * @param sessionTimeout Session超时时间 
     */  
    public void createConnection( String connectString, int sessionTimeout ) throws IOException, InterruptedException {  
            zk = new ZooKeeper( connectString, sessionTimeout, this);  
            connectedSemaphore.await();  
    }  
    /** 
     * 获取锁成功 
    */  
    public void getLockSuccess() throws KeeperException, InterruptedException {  
        if(zk.exists(this.selfPath,false) == null){  
            LOG.error(LOG_PREFIX_OF_THREAD+"本节点已不在了...");  
            return;  
        }  
        LOG.info(LOG_PREFIX_OF_THREAD + "获取锁成功,赶紧干活!");  
        Thread.sleep(2000);  
        LOG.info(LOG_PREFIX_OF_THREAD + "删除本节点:"+selfPath);  
        zk.delete(this.selfPath, -1);  
        releaseConnection();  
        threadSemaphore.countDown();  
    }  
    /** 
     * 关闭ZK连接 
     */  
    public void releaseConnection() {  
        if ( this.zk !=null ) {  
            try {  
                this.zk.close();  
            } catch ( InterruptedException e ) {}  
        }  
        LOG.info(LOG_PREFIX_OF_THREAD + "释放连接");  
    }  
    /** 
     * 检查自己是不是最小的节点 
     * @return 
     */  
    public boolean checkMinPath() throws KeeperException, InterruptedException {  
         List<String> subNodes = zk.getChildren(GROUP_PATH, false);  
         Collections.sort(subNodes);  
         int index = subNodes.indexOf( selfPath.substring(GROUP_PATH.length()+1));  
         switch (index){  
             case -1:{  
                 LOG.error(LOG_PREFIX_OF_THREAD+"本节点已不在了..."+selfPath);  
                 return false;  
             }  
             case 0:{  
                 LOG.info(LOG_PREFIX_OF_THREAD+"子节点中,我果然是老大"+selfPath);  
                 return true;  
             }  
             default:{  
                 this.waitPath = GROUP_PATH +"/"+ subNodes.get(index - 1);  
                 LOG.info(LOG_PREFIX_OF_THREAD+"获取子节点中,排在我前面的"+waitPath);  
                 try{  
                     zk.getData(waitPath, true, new Stat());  
                     return false;  
                 }catch(KeeperException e){  
                     if(zk.exists(waitPath,false) == null){  
                         LOG.info(LOG_PREFIX_OF_THREAD+"子节点中,排在我前面的"+waitPath+"已失踪,幸福来得太突然?");  
                         return checkMinPath();  
                     }else{  
                         throw e;  
                     }  
                 }  
             }                     
         }  
    } 
  /*
  process 方法是 Watcher 接口中的一个回调方法,当 ZooKeeper 向客户端发送一个 Watcher 事件通知时,
  客户端就会对相应的 process 方法进行回调 */
    @Override  
    public void process(WatchedEvent event) {  
        if(event == null){  
            return;  
        }  
        Event.KeeperState keeperState = event.getState();  
        Event.EventType eventType = event.getType();  
        if ( Event.KeeperState.SyncConnected == keeperState) {  
            if ( Event.EventType.None == eventType ) {  
                LOG.info( LOG_PREFIX_OF_THREAD + "connected to zk successfully" );  
                connectedSemaphore.countDown();  
            }else if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {  
                LOG.info(LOG_PREFIX_OF_THREAD + "lock owners release lock, I have chance now");  
                try {  
                    if(checkMinPath()){  
                        getLockSuccess();  
                    }  
                } catch (KeeperException e) {  
                    e.printStackTrace();  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
        }else if ( Event.KeeperState.Disconnected == keeperState ) {  
            LOG.info( LOG_PREFIX_OF_THREAD + "connection is disrupted" );  
        } else if ( Event.KeeperState.AuthFailed == keeperState ) {  
            LOG.info( LOG_PREFIX_OF_THREAD + "authentication failed" );  
        } else if ( Event.KeeperState.Expired == keeperState ) {  
            LOG.info( LOG_PREFIX_OF_THREAD + "session expire" );  
        }  
    }  
}  

log配置文件:# DEFAULT   log4j.rootLogger=INFO,CONSOLE  
  
#  
# Log INFO level and above messages to the console  
#  
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender  
log4j.appender.CONSOLE.Threshold=INFO  
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout  
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %m%n  
  
  
log4j.appender.COMMONSTAT=org.apache.log4j.DailyRollingFileAppender  
log4j.appender.COMMONSTAT.Threshold=INFO  
log4j.appender.COMMONSTAT.File=/home/zookeeper/zookeeper-test-agent/logs/test.log  
log4j.appender.COMMONSTAT.DatePattern='.'yyyy-MM-dd  
  
log4j.appender.COMMONSTAT.layout=org.apache.log4j.PatternLayout  
log4j.appender.COMMONSTAT.layout.ConversionPattern=[%d{yyyy-MM-dd HH:mm:ss}] - %m%n  
  
log4j.logger.org.displaytag=WARN  
log4j.logger.org.apache.zookeeper=ERROR  
log4j.logger.org.springframework=WARN  
log4j.logger.org.I0Itec=WARN  
log4j.logger.commonStat=INFO,COMMONSTAT  
 另外,说一下关于Zookeeper watch的事。watch事件是一次性触发器,当watch监视的数据发生变化时,通知设置了该watch的client,即watcher。

需要注意三点:
1.一次性触发器
client在一个节点上设置watch,随后节点内容改变,client将获取事件。当节点内容再次改变,client不会获取这个事件,
除非它又执行了一次读操作并设置watch。
2.发送至client,watch事件延迟
watch事件异步发送至观察者。比如说client执行一次写操作,节点数据内容发生变化,操作返回后,而watch事件可能还在发往client的路上。这种情况下,zookeeper提供有序保证:client不会得知数据变化,直到它获取watch事件。网络延迟或其他因素可能导致不同client在不同时刻获取watch事件和操作返回值。

3.设置watch的数据内容
涉及到节点改变的不同方式。比方说zookeeper维护两个watch列表:节点的数据watch和子节点watch。getData()和exists()设置了内容watch,getChildren()设置了子节点watch,操作返回的数据类型不同,前者是节点的内容,后者是节点的子节点列表。setData()触发内容watch,create()触发当前节点的"内容watch"和其父节点的"子节点watch",delete()同时触发"内容watch"和"子节点watch"(其子节点被全部删除),以及其父节点的"子节点watch"。说白了,对当前节点的操作,要考虑到对其父节点与子节点的影响。
watch在客户端所连接的服务端本地维护。watch的设置、维护、分发操作都很轻量级。当客户端连接到新的服务端,watch将被任一会话事件触发。与服务端断开连接时,不能获取watch事件。客户端重连后,之前注册的watch将被重新注册并在需要时触发。通常这一切透明地发生,用户不会察觉到。有一种情况watch可能丢失:之前对一个尚未建立的节点的设置了exists watch,如果断开期间该节点被建立或删除,那么此watch将丢失。

对于watch,zookeeper提供以下保证:
1. watch对于其他事件、watch、异步响应是有序的。zookeeper client library保证有序分发
2. 客户端监视一个节点,总是先获取watch事件,再发现节点的数据变化。
3. watch事件的顺序对应于zookeeper服务所见的数据更新的顺序。

关于watch要记住的是:
1.watch是一次性触发的,如果获取一个watch事件并希望得到新变化的通知,需要重新设置watch。
2.watch是一次性触发的并且在获取watch事件和设置新watch事件之间有延迟,所以不能可靠的观察到节点的每一次变化。要认识到这一点。
3.watch object只触发一次,比如,一个watch object被注册到同一个节点的getData()和exists(),节点被删除,仅对应于exists()的watch ojbect被调用。
4.若与服务端断开连接,直到重连后才能获取watch事件。
更多关于ZooKeeper监听机制和watcher用法演示,
见这篇文章:https://www.cnblogs.com/programlearning/archive/2017/05/10/6834963.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: