您的位置:首页 > 编程语言

Zookeeper分布式锁(多进程竞争)实现的代码示例分享

2013-09-15 10:12 686 查看
zookeeper分布式锁在实际的场景中应用很多,比如集群中多个节点的leader选举,数据库master-slave模式的主库的选择等等

解决方案依然很简单,需要加锁的进程先尝试在zookeeper上创建一个临时节点L,如果创建成功则加锁成功,

如果不成功(已存在)则在该节点上设置watch。

进程通过删除L来解锁(当进程意外终止,L也会被删除,不会造成死锁),当L被删除时,其它等待锁的进程会得到通知,

此时这些进程再次创建L来获得锁。

上面的方案,当竞争锁的进程比较多时,解锁时会引起Herd Effect,可对加锁规则进行限制,如按进程尝试加锁的顺序来分配锁。

在zookeeper上,每个加锁的进程创建一个带SEQUENTIAL标志的临时节点,每次让序号最小的节点获得锁,

这样每个节点只需要watch它前面节点的状态即可,当其前面

节点被删除时,其将被通知,并获得锁。

以下是笔者在项目中的分布式锁的示例代码,可供参考

zookeeper中的节点

/election

|---90338461809508352-192.168.1.111:8983

|---90338461809508351-192.168.1.112:8983

|---90338461809508356-192.168.1.113:8983

/leader(90338461809508351-192.168.1.112:8983)

代码示例:

/**
 * 分布式锁应用示例
 * 多个进程节点的leader选举
 * @author yangbutao
 * 
 */
public class ElectionMain {
	private static Logger log = LoggerFactory.getLogger(ElectionMain.class);

	private final static Pattern LEADER_SEQ = Pattern
			.compile(".*?/?.*?-n_(\\d+)");
	private final static Pattern SESSION_ID = Pattern
			.compile(".*?/?(.*?-.*?)-n_\\d+");

	public static void main(String[] args)throws Exception {
		ZooKeeper zkClient = new ZooKeeper("10.1.1.20:2222", 3000, null);
		String electionPath = "/election";
		//当前节点名称
		String coreNodeName = "192.168.1.111:8983";
		//客户端节点的sessionId
		long sessionId = zkClient.getSessionId();
		String id = sessionId + "-" + coreNodeName;
		String leaderSeqPath = null;
		boolean cont = true;
		int tries = 0;
		while (cont) {
			try {
				leaderSeqPath = zkClient.create(
						electionPath + "/" + id + "-n_", null,
						ZooDefs.Ids.OPEN_ACL_UNSAFE,
						CreateMode.EPHEMERAL_SEQUENTIAL);
				cont = false;
			} catch (ConnectionLossException e) {
				List<String> entries = zkClient.getChildren(electionPath, true);
				boolean foundId = false;
				for (String entry : entries) {
					String nodeId = getNodeId(entry);
					if (id.equals(nodeId)) {
						foundId = true;
						break;
					}
				}
				if (!foundId) {
					cont = true;
					if (tries++ > 20) {
						throw new Exception( "server error", e);
					}
					try {
						Thread.sleep(50);
					} catch (InterruptedException e2) {
						Thread.currentThread().interrupt();
					}
				}

			} catch (KeeperException.NoNodeException e) {
				if (tries++ > 20) {
					throw new Exception( "server error", e);
				}
				cont = true;
				try {
					Thread.sleep(50);
				} catch (InterruptedException e2) {
					Thread.currentThread().interrupt();
				}
			}
		}
		int seq = getSeq(leaderSeqPath);
		checkIfIamLeader(zkClient, seq);
	}

	private static String getNodeId(String nStringSequence) {
		String id;
		Matcher m = SESSION_ID.matcher(nStringSequence);
		if (m.matches()) {
			id = m.group(1);
		} else {
			throw new IllegalStateException("Could not find regex match in:"
					+ nStringSequence);
		}
		return id;
	}

	private static int getSeq(String nStringSequence) {
		int seq = 0;
		Matcher m = LEADER_SEQ.matcher(nStringSequence);
		if (m.matches()) {
			seq = Integer.parseInt(m.group(1));
		} else {
			throw new IllegalStateException("Could not find regex match in:"
					+ nStringSequence);
		}
		return seq;
	}

	/**
	 * 排序seq
	 */
	private static void sortSeqs(List<String> seqs) {
		Collections.sort(seqs, new Comparator<String>() {

			@Override
			public int compare(String o1, String o2) {
				return Integer.valueOf(getSeq(o1)).compareTo(
						Integer.valueOf(getSeq(o2)));
			}
		});
	}

	private static List<Integer> getSeqs(List<String> seqs) {
		List<Integer> intSeqs = new ArrayList<Integer>(seqs.size());
		for (String seq : seqs) {
			intSeqs.add(getSeq(seq));
		}
		return intSeqs;
	}

	private static void checkIfIamLeader(final ZooKeeper zkClient, final int seq)
			throws KeeperException, InterruptedException, IOException {
		// get all other numbers...
		final String holdElectionPath = "/election";
		List<String> seqs = zkClient.getChildren(holdElectionPath, true);
		sortSeqs(seqs);
		List<Integer> intSeqs = getSeqs(seqs);
		if (intSeqs.size() == 0) {
			return;
		}
		if (seq <= intSeqs.get(0)) {
			//删除来的leader节点
			try {
				zkClient.delete("/leader", -1);
			} catch (Exception e) {
				// fine
			}
			String seqStr = null;
			for (String currSeq : seqs) {
				if (getSeq(currSeq) == seq) {
					seqStr = currSeq;
					break;
				}
			}
			runIamLeaderProcess(zkClient, seqStr);
		} else {
			// I am not the leader - watch the node below me
			//当前节点不是leader,watcher比我小的节点
			int i = 1;
			for (; i < intSeqs.size(); i++) {
				int s = intSeqs.get(i);
				if (seq < s) {
					// we found who we come before - watch the guy in front
					//发现比我小的节点(节点列表全面经过排序),退出循环
					break;
				}
			}
			int index = i - 2;
			if (index < 0) {
				log.warn("Our node is no longer in line to be leader");
				return;
			}
			try {
				//监控比当前节点seq次小的节点的值变化
				zkClient.getData(holdElectionPath + "/" + seqs.get(index),
						new Watcher() {

							@Override
							public void process(WatchedEvent event) {
								if (EventType.None.equals(event.getType())) {
									return;
								}
								// 检查是否是可以做为leader
								try {
									checkIfIamLeader(zkClient, seq);
								} catch (InterruptedException e) {
									Thread.currentThread().interrupt();
									log.warn("", e);
								} catch (IOException e) {
									log.warn("", e);
								} catch (Exception e) {
									log.warn("", e);
								}
							}

						}, null, true);
			} catch (Exception e) {
				log.warn("Failed setting watch", e);
				checkIfIamLeader(zkClient, seq);
			}
		}
	}

	
	protected static void runIamLeaderProcess(ZooKeeper zkClient, String seqStr)
			throws KeeperException, InterruptedException, IOException {
		final String id = seqStr.substring(seqStr.lastIndexOf("/") + 1);
		//设置leader节点
		zkClient.create("/leader", id.getBytes(),
				ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
	}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: