Zookeeper分布式锁(多进程竞争)实现的代码示例分享
2013-09-15 10:12
686 查看
zookeeper分布式锁在实际的场景中应用很多,比如集群中多个节点的leader选举,数据库master-slave模式的主库的选择等等
解决方案依然很简单,需要加锁的进程先尝试在zookeeper上创建一个临时节点L,如果创建成功则加锁成功,
如果不成功(已存在)则在该节点上设置watch。
进程通过删除L来解锁(当进程意外终止,L也会被删除,不会造成死锁),当L被删除时,其它等待锁的进程会得到通知,
此时这些进程再次创建L来获得锁。
上面的方案,当竞争锁的进程比较多时,解锁时会引起Herd Effect,可对加锁规则进行限制,如按进程尝试加锁的顺序来分配锁。
在zookeeper上,每个加锁的进程创建一个带SEQUENTIAL标志的临时节点,每次让序号最小的节点获得锁,
这样每个节点只需要watch它前面节点的状态即可,当其前面
节点被删除时,其将被通知,并获得锁。
以下是笔者在项目中的分布式锁的示例代码,可供参考
zookeeper中的节点
/election
|---90338461809508352-192.168.1.111:8983
|---90338461809508351-192.168.1.112:8983
|---90338461809508356-192.168.1.113:8983
/leader(90338461809508351-192.168.1.112:8983)
代码示例:
解决方案依然很简单,需要加锁的进程先尝试在zookeeper上创建一个临时节点L,如果创建成功则加锁成功,
如果不成功(已存在)则在该节点上设置watch。
进程通过删除L来解锁(当进程意外终止,L也会被删除,不会造成死锁),当L被删除时,其它等待锁的进程会得到通知,
此时这些进程再次创建L来获得锁。
上面的方案,当竞争锁的进程比较多时,解锁时会引起Herd Effect,可对加锁规则进行限制,如按进程尝试加锁的顺序来分配锁。
在zookeeper上,每个加锁的进程创建一个带SEQUENTIAL标志的临时节点,每次让序号最小的节点获得锁,
这样每个节点只需要watch它前面节点的状态即可,当其前面
节点被删除时,其将被通知,并获得锁。
以下是笔者在项目中的分布式锁的示例代码,可供参考
zookeeper中的节点
/election
|---90338461809508352-192.168.1.111:8983
|---90338461809508351-192.168.1.112:8983
|---90338461809508356-192.168.1.113:8983
/leader(90338461809508351-192.168.1.112:8983)
代码示例:
/** * 分布式锁应用示例 * 多个进程节点的leader选举 * @author yangbutao * */ public class ElectionMain { private static Logger log = LoggerFactory.getLogger(ElectionMain.class); private final static Pattern LEADER_SEQ = Pattern .compile(".*?/?.*?-n_(\\d+)"); private final static Pattern SESSION_ID = Pattern .compile(".*?/?(.*?-.*?)-n_\\d+"); public static void main(String[] args)throws Exception { ZooKeeper zkClient = new ZooKeeper("10.1.1.20:2222", 3000, null); String electionPath = "/election"; //当前节点名称 String coreNodeName = "192.168.1.111:8983"; //客户端节点的sessionId long sessionId = zkClient.getSessionId(); String id = sessionId + "-" + coreNodeName; String leaderSeqPath = null; boolean cont = true; int tries = 0; while (cont) { try { leaderSeqPath = zkClient.create( electionPath + "/" + id + "-n_", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); cont = false; } catch (ConnectionLossException e) { List<String> entries = zkClient.getChildren(electionPath, true); boolean foundId = false; for (String entry : entries) { String nodeId = getNodeId(entry); if (id.equals(nodeId)) { foundId = true; break; } } if (!foundId) { cont = true; if (tries++ > 20) { throw new Exception( "server error", e); } try { Thread.sleep(50); } catch (InterruptedException e2) { Thread.currentThread().interrupt(); } } } catch (KeeperException.NoNodeException e) { if (tries++ > 20) { throw new Exception( "server error", e); } cont = true; try { Thread.sleep(50); } catch (InterruptedException e2) { Thread.currentThread().interrupt(); } } } int seq = getSeq(leaderSeqPath); checkIfIamLeader(zkClient, seq); } private static String getNodeId(String nStringSequence) { String id; Matcher m = SESSION_ID.matcher(nStringSequence); if (m.matches()) { id = m.group(1); } else { throw new IllegalStateException("Could not find regex match in:" + nStringSequence); } return id; } private static int getSeq(String nStringSequence) { int seq = 0; Matcher m = LEADER_SEQ.matcher(nStringSequence); if (m.matches()) { seq = Integer.parseInt(m.group(1)); } else { throw new IllegalStateException("Could not find regex match in:" + nStringSequence); } return seq; } /** * 排序seq */ private static void sortSeqs(List<String> seqs) { Collections.sort(seqs, new Comparator<String>() { @Override public int compare(String o1, String o2) { return Integer.valueOf(getSeq(o1)).compareTo( Integer.valueOf(getSeq(o2))); } }); } private static List<Integer> getSeqs(List<String> seqs) { List<Integer> intSeqs = new ArrayList<Integer>(seqs.size()); for (String seq : seqs) { intSeqs.add(getSeq(seq)); } return intSeqs; } private static void checkIfIamLeader(final ZooKeeper zkClient, final int seq) throws KeeperException, InterruptedException, IOException { // get all other numbers... final String holdElectionPath = "/election"; List<String> seqs = zkClient.getChildren(holdElectionPath, true); sortSeqs(seqs); List<Integer> intSeqs = getSeqs(seqs); if (intSeqs.size() == 0) { return; } if (seq <= intSeqs.get(0)) { //删除来的leader节点 try { zkClient.delete("/leader", -1); } catch (Exception e) { // fine } String seqStr = null; for (String currSeq : seqs) { if (getSeq(currSeq) == seq) { seqStr = currSeq; break; } } runIamLeaderProcess(zkClient, seqStr); } else { // I am not the leader - watch the node below me //当前节点不是leader,watcher比我小的节点 int i = 1; for (; i < intSeqs.size(); i++) { int s = intSeqs.get(i); if (seq < s) { // we found who we come before - watch the guy in front //发现比我小的节点(节点列表全面经过排序),退出循环 break; } } int index = i - 2; if (index < 0) { log.warn("Our node is no longer in line to be leader"); return; } try { //监控比当前节点seq次小的节点的值变化 zkClient.getData(holdElectionPath + "/" + seqs.get(index), new Watcher() { @Override public void process(WatchedEvent event) { if (EventType.None.equals(event.getType())) { return; } // 检查是否是可以做为leader try { checkIfIamLeader(zkClient, seq); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("", e); } catch (IOException e) { log.warn("", e); } catch (Exception e) { log.warn("", e); } } }, null, true); } catch (Exception e) { log.warn("Failed setting watch", e); checkIfIamLeader(zkClient, seq); } } } protected static void runIamLeaderProcess(ZooKeeper zkClient, String seqStr) throws KeeperException, InterruptedException, IOException { final String id = seqStr.substring(seqStr.lastIndexOf("/") + 1); //设置leader节点 zkClient.create("/leader", id.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } }
相关文章推荐
- Zookeeper分布式锁(多进程竞争)实现的代码示例分享
- ZooKeeper示例 分布式锁思路及示例代码
- php生成缩略图示例代码分享(使用gd库实现)
- 使用curator实现zookeeper锁服务的示例分享
- zookeeper 实现分布式锁zookeeper 使用 Curator 示例监听、分布式锁
- 【示例代码】分享Jquery-UI实现Web桌面系统jWebOS
- 分布式编程->Remoting的一个代码示例(借助Remoting实现发送信息功能)
- python实现代码行数统计示例分享
- JavaScript实现图片滑动切换的代码示例分享
- 分布式编程->Remoting的一个代码示例(借助Remoting实现发送信息功能)
- 分布式RPC服务器(容灾和服务器识别机制的实现,借助zookeeper)最终完整代码
- 网页中实现"分享至微博、QQ空间等"代码示例
- 分布式编程->Remoting的一个代码示例(借助Remoting实现发送信息功能)
- 分布式编程->Remoting的一个代码示例(借助Remoting实现发送信息功能)
- python实现代码行数统计示例分享
- zookeeper-分布式锁的代码实现-【每日五分钟搞定大数据】
- 用POI实现excel文件导出 代码示例分享
- 使用curator实现zookeeper锁服务的示例分享
- JavaScript实现图片滑动切换的代码示例分享
- java实现优酷视频地址解析示例代码分享