您的位置:首页 > 其它

curator实现zookeeper的分布式锁

2017-04-24 12:59 260 查看
基于curator元语实现的分布式锁种类有好几种,下面只讲解一种实现。InterProcessMutex实现的分布式锁属于可重入式锁,当一个客户端获取到lock锁之后,可以重复调用acquire()而不会发生阻塞。基于InterProcessSemaphoreMutex实现的分布式的分布式锁是不可重入的,当一个客户端获取到lock锁之后,再次调用acquire方法获取锁时会发生阻塞。基于InterProcessReadWriteLock实现的分布式锁里边包含了读锁与写锁,其中读锁与读锁互斥,读锁与写锁互斥,读锁与读锁不互斥。

public class DistributeLock {

private static final String PATH = “/example/cache”;

private static final String zkStr=”master:2181,worker1:2181,worker2:2181”;

//private InterProcessReadWriteLock lock;

public static void main(String []args){

for(int i=0;i<200;i++){
CuratorFramework client= CuratorFrameworkFactory.newClient(zkStr, new ExponentialBackoffRetry(1000, 3));
client.start();
LockExample example=new LockExample(client,PATH,"client"+i);
try{
example.doWork(10, TimeUnit.SECONDS);
}catch (Exception e){

}finally {
CloseableUtils.closeQuietly(client);
}
}
}


}
public class LockExample {

private CuratorFramework client;

private InterProcessMutex lock;

private String lockPath;

private String clientName;

public LockExample(CuratorFramework client,String lockPath,String clientName){

this.lockPath=lockPath;

this.clientName=clientName;

lock=new InterProcessMutex(client,lockPath);

}
public void doWork(long time,TimeUnit unit )throws  Exception{

try
{
if (lock.acquire(time,unit))
{
System.out.println(clientName + " has the lock");
}
}
finally
{
System.out.println(clientName + " releasing the lock");
lock.release(); // always release the lock in a finally block
}
}


}
其他的讲解可以参考下边连接

http://www.aboutyun.com/thread-10725-1-1.html

基于InterProcessReadWriteLock实现的分布式锁里边包含了读锁与写锁,其中读锁与读锁互斥,读锁与写锁互斥,读锁与读锁不互斥。验证代码最下所示:不管获取到读锁还是写锁都不释放,通过例子发现先获取到写锁如果不释放是不可能获到读锁的,先获到读锁如果不释放是不可能获到写锁的但是其他客户端client还是会获取到读锁。调用writeLock.acquire(time,unit)时time时间越短即使理论上可以获取到读锁,但是失败的次数也很多,因此应该找一个合适的时间参数进行获取。例如设置成3秒就还可以,1秒失败的此处比较多。

public class ReadWriteLockExample {

private static final String     PATH = "/example/cache";
private static final String zkStr="master:2181,worker1:2181,worker2:2181";
private static FakeLimitedResource resource=new FakeLimitedResource();
public static void main(String []args){
//获取写锁
new Thread(new Runnable(){
CuratorFramework client=null;
public void run() {
try{
for(int i=0;i<20;i++){
client= CuratorFrameworkFactory.newClient(zkStr, new ExponentialBackoffRetry(1000, 3));
client.start();
ReadAndWriteLock example=new ReadAndWriteLock(client,resource,PATH,"client"+i);
example.doWorkXie(3, TimeUnit.SECONDS);
}
}catch (Exception e){

}finally {
CloseableUtils.closeQuietly(client);
}
}
}).start();

//获取读锁
new Thread(new Runnable(){
CuratorFramework client=null;
public void run() {
try{
for(int i=0;i<20;i++){
client= CuratorFrameworkFactory.newClient(zkStr, new ExponentialBackoffRetry(1000, 3));
client.start();
ReadAndWriteLock example=new ReadAndWriteLock(client,resource,PATH,"client"+i);
example.doWorkDu(3, TimeUnit.SECONDS);
}
}catch (Exception e){

}finally {
CloseableUtils.closeQuietly(client);
}
}
}).start();


// try{

// CuratorFramework client=null;

// ReadAndWriteLock example=null;

// //先获取一个写锁

// client= CuratorFrameworkFactory.newClient(zkStr, new ExponentialBackoffRetry(1000, 3));

// client.start();

// example=new ReadAndWriteLock(client,resource,PATH,”client”);

// example.doWorkXie(5,TimeUnit.SECONDS);

//

// //先获取一个读锁不释放,看看能不能

// client= CuratorFrameworkFactory.newClient(zkStr, new ExponentialBackoffRetry(1000, 3));

// client.start();

// example=new ReadAndWriteLock(client,resource,PATH,”client”);

// example.doWorkDu(5,TimeUnit.SECONDS);

//

// }catch(Exception e){}
// finally {

//

// }
}


}
主函数:

/**

* Created by zhangpengshuai on 17/4/24.

*/

public class ReadAndWriteLock {

private InterProcessReadWriteLock lock;

private InterProcessMutex readLock;

private InterProcessMutex writeLock;

private FakeLimitedResource resource;

private String clientName;

public ReadAndWriteLock(CuratorFramework client,FakeLimitedResource resource ,String lockPath, String clientName)

{

this.resource = resource;

this.clientName = clientName;

lock = new InterProcessReadWriteLock(client, lockPath);

readLock = lock.readLock();

writeLock = lock.writeLock();

}
public void doWorkDu(long time, TimeUnit unit) throws Exception

{

try{

if(readLock.acquire(time,unit)){

System.out.println(clientName + ” 已得到du锁”);

}else{

System.out.println(clientName + ” meiyou得到du锁”);

        }//只获取读锁
}finally {
//System.out.println(clientName+" 已释放du锁");
//readLock.release();
}
}
//只获取读锁
public void doWorkXie(long time, TimeUnit unit) throws Exception
{
try{
if(writeLock.acquire(time,unit)){
System.out.println(clientName + " 已得到xie锁");
}else{
System.out.println(clientName + " meiyou得到xie锁");
}//Thread.sleep(1000);
}finally {
//System.out.println(clientName + " 已释放xie锁");
//writeLock.release();
}}


}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息