您的位置:首页 > 数据库 > Redis

基于zookeeper的分布式锁实现

2016-08-09 15:33 706 查看
摘要: 工作中需要写一个定时任务,由于是集群环境,自然而然想到需要通过分布式锁来保证单台执行..相信大家都会想到使用zk来实现对应的分布式锁.下面就简单介绍一下几种实现

工作中需要写一个定时任务,由于是集群环境,自然而然想到需要通过分布式锁来保证单台执行..相信大家都会想到使用zk来实现对应的分布式锁.下面就简单介绍一下几种实现

准备工作

有几个帮助类,先把代码放上来
ZKClient 对zk的操作做了一个简单的封装

Java代码

package zk.lock;

import org.apache.zookeeper.*;

import org.apache.zookeeper.data.Stat;

import zk.util.ZKUtil;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.TimeUnit;

/**

* User: zhenghui

* Date: 14-3-26

* Time: 下午8:50

* 封装一个zookeeper实例.

*/

public class ZKClient implements Watcher {

private ZooKeeper zookeeper;

private CountDownLatch connectedSemaphore = new CountDownLatch(1);

public ZKClient(String connectString, int sessionTimeout) throws Exception {

zookeeper = new ZooKeeper(connectString, sessionTimeout, this);

System.out.println("connecting zk server");

if (connectedSemaphore.await(10l, TimeUnit.SECONDS)) {

System.out.println("connect zk server success");

} else {

System.out.println("connect zk server error.");

throw new Exception("connect zk server error.");

}

}

public void close() throws InterruptedException {

&
8000
nbsp;if (zookeeper != null) {

zookeeper.close();

}

}

public void createPathIfAbsent(String path, boolean isPersistent) throws Exception {

CreateMode createMode = isPersistent ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;

path = ZKUtil.normalize(path);

if (!this.exists(path)) {

zookeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);

}

}

public boolean exists(String path) throws Exception {

path = ZKUtil.normalize(path);

Stat stat = zookeeper.exists(path, null);

return stat != null;

}

public String getData(String path) throws Exception {

path = ZKUtil.normalize(path);

try {

byte[] data = zookeeper.getData(path, null, null);

return new String(data);

} catch (KeeperException e) {

if (e instanceof KeeperException.NoNodeException) {

throw new Exception("Node does not exist,path is [" + e.getPath() + "].", e);

} else {

throw new Exception(e);

}

} catch (InterruptedException e) {

Thread.currentThread().interrupt();

throw new Exception(e);

}

}

@Override

public void process(WatchedEvent event) {

if (event == null) return;

// 连接状态

Watcher.Event.KeeperState keeperState = event.getState();

// 事件类型

Watcher.Event.EventType eventType = event.getType();

// 受影响的path

// String path = event.getPath();

if (Watcher.Event.KeeperState.SyncConnected == keeperState) {

// 成功连接上ZK服务器

if (Watcher.Event.EventType.None == eventType) {

System.out.println("zookeeper connect success");

connectedSemaphore.countDown();

}

}

//下面可以做一些重连的工作.

else if (Watcher.Event.KeeperState.Disconnected == keeperState) {

System.out.println("zookeeper Disconnected");

} else if (Watcher.Event.KeeperState.AuthFailed == keeperState) {

System.out.println("zookeeper AuthFailed");

} else if (Watcher.Event.KeeperState.Expired == keeperState) {

System.out.println("zookeeper Expired");

}

}

}

ZKUtil 针对zk路径的一个工具类
Java代码

package zk.util;

/**

* User: zhenghui

* Date: 14-3-26

* Time: 下午9:56

*/

public class ZKUtil {

public static final String SEPARATOR = "/";

/**

* 转换path为zk的标准路径 以/开头,最后不带/

*/

public static String normalize(String path) {

String temp = path;

if(!path.startsWith(SEPARATOR)) {

temp = SEPARATOR + path;

}

if(path.endsWith(SEPARATOR)) {

temp = temp.substring(0, temp.length()-1);

return normalize(temp);

}else {

return temp;

}

}

/**

* 链接两个path,并转化为zk的标准路径

*/

public static String contact(String path1,String path2){

if(path2.startsWith(SEPARATOR)) {

path2 = path2.substring(1);

}

if(path1.endsWith(SEPARATOR)) {

return normalize(path1 + path2);

} else {

return normalize(path1 + SEPARATOR + path2);

}

}

/**

* 字符串转化成byte类型

*/

public static byte[] toBytes(String data) {

if(data == null || data.trim().equals("")) return null;

return data.getBytes();

}

}

NetworkUtil 获取本机IP的工具方法
Java代码

package zk.util;

import java.net.InetAddress;

import java.net.NetworkInterface;

import java.util.Enumeration;

/**

* User: zhenghui

* Date: 14-4-1

* Time: 下午4:47

*/

public class NetworkUtil {

static private final char COLON = ':';

/**

* 获取当前机器ip地址

* 据说多网卡的时候会有问题.

*/

public static String getNetworkAddress() {

Enumeration<NetworkInterface> netInterfaces;

try {

netInterfaces = NetworkInterface.getNetworkInterfaces();

InetAddress ip;

while (netInterfaces.hasMoreElements()) {

NetworkInterface ni = netInterfaces

.nextElement();

Enumeration<InetAddress> addresses=ni.getInetAddresses();

while(addresses.hasMoreElements()){

ip = addresses.nextElement();

if (!ip.isLoopbackAddress()

&& ip.getHostAddress().indexOf(COLON) == -1) {

return ip.getHostAddress();

}

}

}

return "";

} catch (Exception e) {

return "";

}

}

}

--------------------------- 正文开始 -----------------------------------
这种实现非常简单,具体的流程如下



对应的实现如下
Java代码


package zk.lock;

import zk.util.NetworkUtil;

import zk.util.ZKUtil;

/**

* User: zhenghui

* Date: 14-3-26

* Time: 下午8:37

* 分布式锁实现.

*

* 这种实现的原理是,创建某一个任务的节点,比如 /lock/tasckname 然后获取对应的值,如果是当前的Ip,那么获得锁,如果不是,则没获得

* .如果该节点不存在,则创建该节点,并把改节点的值设置成当前的IP

*/

public class DistributedLock01 {

private ZKClient zkClient;

public static final String LOCK_ROOT = "/lock";

private String lockName;

public DistributedLock01(String connectString, int sessionTimeout,String lockName) throws Exception {

//先创建zk链接.

this.createConnection(connectString,sessionTimeout);

this.lockName = lockName;

}

public boolean tryLock(){

String path = ZKUtil.contact(LOCK_ROOT,lockName);

String localIp = NetworkUtil.getNetworkAddress();

try {

if(zkClient.exists(path)){

String ownnerIp = zkClient.getData(path);

if(localIp.equals(ownnerIp)){

return true;

}

} else {

zkClient.createPathIfAbsent(path,false);

if(zkClient.exists(path)){

String ownnerIp = zkClient.getData(path);

if(localIp.equals(ownnerIp)){

return true;

}

}

}

} catch (Exception e) {

e.printStackTrace();

}

return false;

}

/**

* 创建zk连接

*

*/

protected void createConnection(String connectString, int sessionTimeout) throws Exception {

if(zkClient != null){

releaseConnection();

}

zkClient = new ZKClient(connectString,sessionTimeout);

zkClient.createPathIfAbsent(LOCK_ROOT,true);

}

/**

* 关闭ZK连接

*/

protected void releaseConnection() throws InterruptedException {

if (zkClient != null) {

zkClient.close();

}

}

}

总结

网上有很多文章,大家的方法大多数都是创建一个root根节点,每一个trylock的客户端都会在root下创建一个 EPHEMERAL_SEQUENTIAL 的子节点,同时设置root的child 变更watcher(为了避免羊群效应,可以只添加前一个节点的变更通知) .如果创建的节点的序号是最小,则获取到锁,否则继续等待root的child 变更

核心技术:Maven,Springmvc mybatis shiro, Druid, Restful, Dubbo, ZooKeeper,Redis,FastDFS,ActiveMQ,Nginx
1. 项目核心代码结构截图


项目模块依赖

特别提醒:开发人员在开发的时候可以将自己的业务REST服务化或者Dubbo服务化
2. 项目依赖介绍
2.1 后台管理系统、Rest服务系统、Scheculer定时调度系统依赖如下图:



2.2 Dubbo独立服务项目依赖如下图:



3. 项目功能部分截图:














zookeeper、dubbo服务启动




dubbo管控台














REST服务平台






内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息