基于zookeeper的分布式锁实现
2016-08-11 18:40
585 查看
摘要: 工作中需要写一个定时任务,由于是集群环境,自然而然想到需要通过分布式锁来保证单台执行..相信大家都会想到使用zk来实现对应的分布式锁.下面就简单介绍一下几种实现
工作中需要写一个定时任务,由于是集群环境,自然而然想到需要通过分布式锁来保证单台执行..相信大家都会想到使用zk来实现对应的分布式锁.下面就简单介绍一下几种实现
ZKClient 对zk的操作做了一个简单的封装
Java代码
package zk.lock;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import zk.util.ZKUtil;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* User: zhenghui
* Date: 14-3-26
* Time: 下午8:50
* 封装一个zookeeper实例.
*/
public class ZKClient implements Watcher {
private ZooKeeper zookeeper;
private CountDownLatch connectedSemaphore = new CountDownLatch(1);
public ZKClient(String connectString, int sessionTimeout) throws Exception {
zookeeper = new ZooKeeper(connectString, sessionTimeout, this);
System.out.println("connecting zk server");
if (connectedSemaphore.await(10l, TimeUnit.SECONDS)) {
System.out.println("connect zk server success");
} else {
System.out.println("connect zk server error.");
throw new Exception("connect zk server error.");
}
}
public void close() throws InterruptedException {
i
3ff0
f (zookeeper != null) {
zookeeper.close();
}
}
public void createPathIfAbsent(String path, boolean isPersistent) throws Exception {
CreateMode createMode = isPersistent ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;
path = ZKUtil.normalize(path);
if (!this.exists(path)) {
zookeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);
}
}
public boolean exists(String path) throws Exception {
path = ZKUtil.normalize(path);
Stat stat = zookeeper.exists(path, null);
return stat != null;
}
public String getData(String path) throws Exception {
path = ZKUtil.normalize(path);
try {
byte[] data = zookeeper.getData(path, null, null);
return new String(data);
} catch (KeeperException e) {
if (e instanceof KeeperException.NoNodeException) {
throw new Exception("Node does not exist,path is [" + e.getPath() + "].", e);
} else {
throw new Exception(e);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new Exception(e);
}
}
@Override
public void process(WatchedEvent event) {
if (event == null) return;
// 连接状态
Watcher.Event.KeeperState keeperState = event.getState();
// 事件类型
Watcher.Event.EventType eventType = event.getType();
// 受影响的path
// String path = event.getPath();
if (Watcher.Event.KeeperState.SyncConnected == keeperState) {
// 成功连接上ZK服务器
if (Watcher.Event.EventType.None == eventType) {
System.out.println("zookeeper connect success");
connectedSemaphore.countDown();
}
}
//下面可以做一些重连的工作.
else if (Watcher.Event.KeeperState.Disconnected == keeperState) {
System.out.println("zookeeper Disconnected");
} else if (Watcher.Event.KeeperState.AuthFailed == keeperState) {
System.out.println("zookeeper AuthFailed");
} else if (Watcher.Event.KeeperState.Expired == keeperState) {
System.out.println("zookeeper Expired");
}
}
}
ZKUtil 针对zk路径的一个工具类
Java代码
package zk.util;
/**
* User: zhenghui
* Date: 14-3-26
* Time: 下午9:56
*/
public class ZKUtil {
public static final String SEPARATOR = "/";
/**
* 转换path为zk的标准路径 以/开头,最后不带/
*/
public static String normalize(String path) {
String temp = path;
if(!path.startsWith(SEPARATOR)) {
temp = SEPARATOR + path;
}
if(path.endsWith(SEPARATOR)) {
temp = temp.substring(0, temp.length()-1);
return normalize(temp);
}else {
return temp;
}
}
/**
* 链接两个path,并转化为zk的标准路径
*/
public static String contact(String path1,String path2){
if(path2.startsWith(SEPARATOR)) {
path2 = path2.substring(1);
}
if(path1.endsWith(SEPARATOR)) {
return normalize(path1 + path2);
} else {
return normalize(path1 + SEPARATOR + path2);
}
}
/**
* 字符串转化成byte类型
*/
public static byte[] toBytes(String data) {
if(data == null || data.trim().equals("")) return null;
return data.getBytes();
}
}
NetworkUtil 获取本机IP的工具方法
Java代码
package zk.util;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.util.Enumeration;
/**
* User: zhenghui
* Date: 14-4-1
* Time: 下午4:47
*/
public class NetworkUtil {
static private final char COLON = ':';
/**
* 获取当前机器ip地址
* 据说多网卡的时候会有问题.
*/
public static String getNetworkAddress() {
Enumeration<NetworkInterface> netInterfaces;
try {
netInterfaces = NetworkInterface.getNetworkInterfaces();
InetAddress ip;
while (netInterfaces.hasMoreElements()) {
NetworkInterface ni = netInterfaces
.nextElement();
Enumeration<InetAddress> addresses=ni.getInetAddresses();
while(addresses.hasMoreElements()){
ip = addresses.nextElement();
if (!ip.isLoopbackAddress()
&& ip.getHostAddress().indexOf(COLON) == -1) {
return ip.getHostAddress();
}
}
}
return "";
} catch (Exception e) {
return "";
}
}
}
--------------------------- 正文开始 -----------------------------------
这种实现非常简单,具体的流程如下
![](http://static.oschina.net/uploads/img/201608/11184049_f5S6.jpg)
对应的实现如下
Java代码
package zk.lock;
import zk.util.NetworkUtil;
import zk.util.ZKUtil;
/**
* User: zhenghui
* Date: 14-3-26
* Time: 下午8:37
* 分布式锁实现.
*
* 这种实现的
7fe0
原理是,创建某一个任务的节点,比如 /lock/tasckname 然后获取对应的值,如果是当前的Ip,那么获得锁,如果不是,则没获得
* .如果该节点不存在,则创建该节点,并把改节点的值设置成当前的IP
*/
public class DistributedLock01 {
private ZKClient zkClient;
public static final String LOCK_ROOT = "/lock";
private String lockName;
public DistributedLock01(String connectString, int sessionTimeout,String lockName) throws Exception {
//先创建zk链接.
this.createConnection(connectString,sessionTimeout);
this.lockName = lockName;
}
public boolean tryLock(){
String path = ZKUtil.contact(LOCK_ROOT,lockName);
String localIp = NetworkUtil.getNetworkAddress();
try {
if(zkClient.exists(path)){
String ownnerIp = zkClient.getData(path);
if(localIp.equals(ownnerIp)){
return true;
}
} else {
zkClient.createPathIfAbsent(path,false);
if(zkClient.exists(path)){
String ownnerIp = zkClient.getData(path);
if(localIp.equals(ownnerIp)){
return true;
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
/**
* 创建zk连接
*
*/
protected void createConnection(String connectString, int sessionTimeout) throws Exception {
if(zkClient != null){
releaseConnection();
}
zkClient = new ZKClient(connectString,sessionTimeout);
zkClient.createPathIfAbsent(LOCK_ROOT,true);
}
/**
* 关闭ZK连接
*/
protected void releaseConnection() throws InterruptedException {
if (zkClient != null) {
zkClient.close();
}
}
}
核心技术:Maven,Springmvc mybatis shiro, Druid, Restful, Dubbo, ZooKeeper,Redis,FastDFS,ActiveMQ,Nginx
1. 项目核心代码结构截图
![](http://static.oschina.net/uploads/img/201608/11184049_vFFC.png)
项目模块依赖
![](http://static.oschina.net/uploads/img/201608/11184049_l7wL.png)
特别提醒:开发人员在开发的时候可以将自己的业务REST服务化或者Dubbo服务化
2. 项目依赖介绍
2.1 后台管理系统、Rest服务系统、Scheculer定时调度系统依赖如下图:
![](http://static.oschina.net/uploads/img/201608/11184049_0j3c.png)
2.2 Dubbo独立服务项目依赖如下图:
![](http://static.oschina.net/uploads/img/201608/11184049_3uZ3.png)
3. 项目功能部分截图:
![](http://static.oschina.net/uploads/img/201608/11184049_uXuP.png)
![](http://static.oschina.net/uploads/img/201608/11184049_O7Ua.png)
![](http://static.oschina.net/uploads/img/201608/11184049_uqjt.png)
![](http://static.oschina.net/uploads/img/201608/11184049_k1cj.png)
![](http://static.oschina.net/uploads/img/201608/11184049_WKvA.png)
![](http://static.oschina.net/uploads/img/201608/11184049_S5nW.png)
![](http://static.oschina.net/uploads/img/201608/11184049_Ekyj.png)
zookeeper、dubbo服务启动
![](http://static.oschina.net/uploads/img/201608/11184049_htJp.jpg)
![](http://static.oschina.net/uploads/img/201608/11184049_Z1j8.png)
dubbo管控台
![](http://static.oschina.net/uploads/img/201608/11184050_t7BG.png)
![](http://static.oschina.net/uploads/img/201608/11184050_Zttn.jpg)
![](http://static.oschina.net/uploads/img/201608/11184050_xUgc.png)
![](http://static.oschina.net/uploads/img/201608/11184050_xUgc.png)
![](http://static.oschina.net/uploads/img/201608/11184050_BwwO.png)
![](http://static.oschina.net/uploads/img/201608/11184050_LUbt.png)
![](http://static.oschina.net/uploads/img/201608/11184050_IU2a.png)
REST服务平台
![](http://static.oschina.net/uploads/img/201608/11184050_74ik.png)
![](http://static.oschina.net/uploads/img/201608/11184050_taJo.png)
![](http://static.oschina.net/uploads/img/201608/11184050_Utmj.png)
工作中需要写一个定时任务,由于是集群环境,自然而然想到需要通过分布式锁来保证单台执行..相信大家都会想到使用zk来实现对应的分布式锁.下面就简单介绍一下几种实现
准备工作
有几个帮助类,先把代码放上来ZKClient 对zk的操作做了一个简单的封装
Java代码
package zk.lock;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import zk.util.ZKUtil;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* User: zhenghui
* Date: 14-3-26
* Time: 下午8:50
* 封装一个zookeeper实例.
*/
public class ZKClient implements Watcher {
private ZooKeeper zookeeper;
private CountDownLatch connectedSemaphore = new CountDownLatch(1);
public ZKClient(String connectString, int sessionTimeout) throws Exception {
zookeeper = new ZooKeeper(connectString, sessionTimeout, this);
System.out.println("connecting zk server");
if (connectedSemaphore.await(10l, TimeUnit.SECONDS)) {
System.out.println("connect zk server success");
} else {
System.out.println("connect zk server error.");
throw new Exception("connect zk server error.");
}
}
public void close() throws InterruptedException {
i
3ff0
f (zookeeper != null) {
zookeeper.close();
}
}
public void createPathIfAbsent(String path, boolean isPersistent) throws Exception {
CreateMode createMode = isPersistent ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;
path = ZKUtil.normalize(path);
if (!this.exists(path)) {
zookeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);
}
}
public boolean exists(String path) throws Exception {
path = ZKUtil.normalize(path);
Stat stat = zookeeper.exists(path, null);
return stat != null;
}
public String getData(String path) throws Exception {
path = ZKUtil.normalize(path);
try {
byte[] data = zookeeper.getData(path, null, null);
return new String(data);
} catch (KeeperException e) {
if (e instanceof KeeperException.NoNodeException) {
throw new Exception("Node does not exist,path is [" + e.getPath() + "].", e);
} else {
throw new Exception(e);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new Exception(e);
}
}
@Override
public void process(WatchedEvent event) {
if (event == null) return;
// 连接状态
Watcher.Event.KeeperState keeperState = event.getState();
// 事件类型
Watcher.Event.EventType eventType = event.getType();
// 受影响的path
// String path = event.getPath();
if (Watcher.Event.KeeperState.SyncConnected == keeperState) {
// 成功连接上ZK服务器
if (Watcher.Event.EventType.None == eventType) {
System.out.println("zookeeper connect success");
connectedSemaphore.countDown();
}
}
//下面可以做一些重连的工作.
else if (Watcher.Event.KeeperState.Disconnected == keeperState) {
System.out.println("zookeeper Disconnected");
} else if (Watcher.Event.KeeperState.AuthFailed == keeperState) {
System.out.println("zookeeper AuthFailed");
} else if (Watcher.Event.KeeperState.Expired == keeperState) {
System.out.println("zookeeper Expired");
}
}
}
ZKUtil 针对zk路径的一个工具类
Java代码
package zk.util;
/**
* User: zhenghui
* Date: 14-3-26
* Time: 下午9:56
*/
public class ZKUtil {
public static final String SEPARATOR = "/";
/**
* 转换path为zk的标准路径 以/开头,最后不带/
*/
public static String normalize(String path) {
String temp = path;
if(!path.startsWith(SEPARATOR)) {
temp = SEPARATOR + path;
}
if(path.endsWith(SEPARATOR)) {
temp = temp.substring(0, temp.length()-1);
return normalize(temp);
}else {
return temp;
}
}
/**
* 链接两个path,并转化为zk的标准路径
*/
public static String contact(String path1,String path2){
if(path2.startsWith(SEPARATOR)) {
path2 = path2.substring(1);
}
if(path1.endsWith(SEPARATOR)) {
return normalize(path1 + path2);
} else {
return normalize(path1 + SEPARATOR + path2);
}
}
/**
* 字符串转化成byte类型
*/
public static byte[] toBytes(String data) {
if(data == null || data.trim().equals("")) return null;
return data.getBytes();
}
}
NetworkUtil 获取本机IP的工具方法
Java代码
package zk.util;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.util.Enumeration;
/**
* User: zhenghui
* Date: 14-4-1
* Time: 下午4:47
*/
public class NetworkUtil {
static private final char COLON = ':';
/**
* 获取当前机器ip地址
* 据说多网卡的时候会有问题.
*/
public static String getNetworkAddress() {
Enumeration<NetworkInterface> netInterfaces;
try {
netInterfaces = NetworkInterface.getNetworkInterfaces();
InetAddress ip;
while (netInterfaces.hasMoreElements()) {
NetworkInterface ni = netInterfaces
.nextElement();
Enumeration<InetAddress> addresses=ni.getInetAddresses();
while(addresses.hasMoreElements()){
ip = addresses.nextElement();
if (!ip.isLoopbackAddress()
&& ip.getHostAddress().indexOf(COLON) == -1) {
return ip.getHostAddress();
}
}
}
return "";
} catch (Exception e) {
return "";
}
}
}
--------------------------- 正文开始 -----------------------------------
这种实现非常简单,具体的流程如下
![](http://static.oschina.net/uploads/img/201608/11184049_f5S6.jpg)
对应的实现如下
Java代码
package zk.lock;
import zk.util.NetworkUtil;
import zk.util.ZKUtil;
/**
* User: zhenghui
* Date: 14-3-26
* Time: 下午8:37
* 分布式锁实现.
*
* 这种实现的
7fe0
原理是,创建某一个任务的节点,比如 /lock/tasckname 然后获取对应的值,如果是当前的Ip,那么获得锁,如果不是,则没获得
* .如果该节点不存在,则创建该节点,并把改节点的值设置成当前的IP
*/
public class DistributedLock01 {
private ZKClient zkClient;
public static final String LOCK_ROOT = "/lock";
private String lockName;
public DistributedLock01(String connectString, int sessionTimeout,String lockName) throws Exception {
//先创建zk链接.
this.createConnection(connectString,sessionTimeout);
this.lockName = lockName;
}
public boolean tryLock(){
String path = ZKUtil.contact(LOCK_ROOT,lockName);
String localIp = NetworkUtil.getNetworkAddress();
try {
if(zkClient.exists(path)){
String ownnerIp = zkClient.getData(path);
if(localIp.equals(ownnerIp)){
return true;
}
} else {
zkClient.createPathIfAbsent(path,false);
if(zkClient.exists(path)){
String ownnerIp = zkClient.getData(path);
if(localIp.equals(ownnerIp)){
return true;
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
/**
* 创建zk连接
*
*/
protected void createConnection(String connectString, int sessionTimeout) throws Exception {
if(zkClient != null){
releaseConnection();
}
zkClient = new ZKClient(connectString,sessionTimeout);
zkClient.createPathIfAbsent(LOCK_ROOT,true);
}
/**
* 关闭ZK连接
*/
protected void releaseConnection() throws InterruptedException {
if (zkClient != null) {
zkClient.close();
}
}
}
总结
网上有很多文章,大家的方法大多数都是创建一个root根节点,每一个trylock的客户端都会在root下创建一个 EPHEMERAL_SEQUENTIAL 的子节点,同时设置root的child 变更watcher(为了避免羊群效应,可以只添加前一个节点的变更通知) .如果创建的节点的序号是最小,则获取到锁,否则继续等待root的child 变更核心技术:Maven,Springmvc mybatis shiro, Druid, Restful, Dubbo, ZooKeeper,Redis,FastDFS,ActiveMQ,Nginx
1. 项目核心代码结构截图
![](http://static.oschina.net/uploads/img/201608/11184049_vFFC.png)
项目模块依赖
![](http://static.oschina.net/uploads/img/201608/11184049_l7wL.png)
特别提醒:开发人员在开发的时候可以将自己的业务REST服务化或者Dubbo服务化
2. 项目依赖介绍
2.1 后台管理系统、Rest服务系统、Scheculer定时调度系统依赖如下图:
![](http://static.oschina.net/uploads/img/201608/11184049_0j3c.png)
2.2 Dubbo独立服务项目依赖如下图:
![](http://static.oschina.net/uploads/img/201608/11184049_3uZ3.png)
3. 项目功能部分截图:
![](http://static.oschina.net/uploads/img/201608/11184049_uXuP.png)
![](http://static.oschina.net/uploads/img/201608/11184049_O7Ua.png)
![](http://static.oschina.net/uploads/img/201608/11184049_uqjt.png)
![](http://static.oschina.net/uploads/img/201608/11184049_k1cj.png)
![](http://static.oschina.net/uploads/img/201608/11184049_WKvA.png)
![](http://static.oschina.net/uploads/img/201608/11184049_S5nW.png)
![](http://static.oschina.net/uploads/img/201608/11184049_Ekyj.png)
zookeeper、dubbo服务启动
![](http://static.oschina.net/uploads/img/201608/11184049_htJp.jpg)
![](http://static.oschina.net/uploads/img/201608/11184049_Z1j8.png)
dubbo管控台
![](http://static.oschina.net/uploads/img/201608/11184050_t7BG.png)
![](http://static.oschina.net/uploads/img/201608/11184050_Zttn.jpg)
![](http://static.oschina.net/uploads/img/201608/11184050_xUgc.png)
![](http://static.oschina.net/uploads/img/201608/11184050_xUgc.png)
![](http://static.oschina.net/uploads/img/201608/11184050_BwwO.png)
![](http://static.oschina.net/uploads/img/201608/11184050_LUbt.png)
![](http://static.oschina.net/uploads/img/201608/11184050_IU2a.png)
REST服务平台
![](http://static.oschina.net/uploads/img/201608/11184050_74ik.png)
![](http://static.oschina.net/uploads/img/201608/11184050_taJo.png)
![](http://static.oschina.net/uploads/img/201608/11184050_Utmj.png)
![](http://static.oschina.net/uploads/img/201608/11184050_gJ3d.png)
相关文章推荐
- dubbo+zookeeper+dubbo管理控制台实践demo
- maven+springmvc+mybatis后台管理
- Dubbo实现RPC调用使用入门
- dubbo+zookeeper例子
- Dubbo实现RPC调用使用入门
- 第四章 INI配置——跟我学习springmvc shiro mybatis
- 第三章 授权——跟我学习springmvc shiro mybatis
- 第二章 身份验证——跟我学习springmvc shiro mybatis
- 第一章 Shiro简介——跟我学习springmvc shiro mybatis
- 企业分布式架构真正适用于大型互联网项目的架构!《精点》
- SSH ++shiro+restful+bootstrap java架构
- springMVC 的工作原理和机制、配置
- redis分布式缓存实现
- Web功能测试的4种类型
- 微服务架构的优势与不足
- 分布式缓存Redis Centos下单节点安装
- Redis分布式缓存安装和使用
- 精华【分布式、微服务、云架构、dubbo+zookeeper+springmvc+mybatis+shiro+redis】分布式大型互联网企业架构!
- JEESZ-Zookeeper集群安装
- Thinkphp 中缓存redis 支持密码登陆