使用Zookeeper解决微服务架构下分布式事务问题
本文为原创作品,禁止转载!
准备工作
单机调试zookeeper集群的话,我们需要在虚拟机里虚拟出几台“微服务器“,做这一步操作之前需要在系统中预留出来8G以上磁盘空间,4G以上物理内存。
- 虚拟机
我们使用virtualbox
在官网下载最新版并安装
https://www.virtualbox.org/wiki/Downloads
- 操作系统
操作系统使用
CentOS-6.8-x86_64-minimal版
- Zookeeper
下载 Zookeeper
- 准备SSH连接工具
Xshell 或Winscp + Putty
https://winscp.net/eng/download.php
操作系统安装
- 加载光盘镜像
选择ISO文件
- 跳过磁盘测试
剩下的步骤按照提示 下一步就可以了
Zookeeper集群安装部署
Zookeeper客户端命令
Zookeeper Java Api
使用zookeeper开发分布式锁
分布式锁算法
竞争式 核心代码
此方式会在高并发场景下有缺陷
// 1. 检查父节点
String path = "/locks"; // 父节点名称
Stat stat = zooKeeper.exists(path, false);
// 如果 stat这个对象是空的 意味着 需要创建节点
if (stat == null){
// 创建父节点
// createMode e 临时节点 不支持 有子节点
zooKeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("DistributedLock -> getLock -> 父节点创建成功");
}
String lock_path = path + "/" + lockItem; // 子节点完整路径
// 2. 创建 带有唯一标识的 子节点
int try_max = 20; // 允许重试次数
int try_count = 1; // 重试计数
long try_time = 2000; // 重试间隔
try{
// 创建成功
zooKeeper.create(lock_path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}catch (Exception e) {
/**创建失败 说明节点已存在
* 接下来使用while循环尝试继续创建节点
* 直到成功或次数超限
*/
while (try_count < try_max) {
try_count = try_count + 1;
Thread.sleep(try_time);
try{
zooKeeper.create(lock_path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}catch (Exception e1) {
System.out.println("DistributedLock -> getLock 子节点创建中 当前次数 :" + try_count);
continue;//重试创建失败 继续
}
break; // 创建节点成功,退出循环
}
}
System.out.println("DistributedLock -> getLock 子节点创建: 成功 使用次数 :" + try_count);
队列式
Curator
Curator是Netflix公司一个开源的zookeeper客户端,在原生API接口上进行了包装,解决了很多ZooKeeper客户端非常底层的细节开发。同时内部实现了诸如Session超时重连,Watcher反复注册等功能,实现了Fluent风格的API接口,是使用最广泛的zookeeper客户端之一。
Curator包含了几个包:
curator-framework:对zookeeper的底层api的一些封装
curator-client:提供一些客户端的操作,例如重试策略等
curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等
Maven依赖
使用curator的版本:2.12.0,对应Zookeeper的版本为:3.4.x
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
建立连接
retryPolicy 重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
connectionString,
5000,
3000,
retryPolicy);
数据节点操作
@Test
/**
* 创建节点
*/
public void t2()throws Exception {
String string = client.create().forPath("/ct1");
System.out.println("string:" + string);
}
@Test
/**
* 展示节点
*/
public void t3()throws Exception {
List<String> forPath = client.getChildren().forPath("/");
for(String node : forPath) {
System.out.println(node);
}
}
@Test
/**
* 节点类型
*/
public void t4()throws Exception {
String string = client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/ct1");
System.out.println("string:" + string);
t3();
}
@Test
/**
* 节点数据
*/
public void t5()throws Exception {
String string = client.create().withMode(CreateMode.PERSISTENT).forPath("/ct2","aaxxa".getBytes());
System.out.println("string:" + string);
}
@Test
/**
* 获取数据
*/
public void t6()throws Exception {
t5();
byte[] forPath = client.getData().forPath("/ct2");
System.out.println("string:" + new String(forPath));
}
@Test
/**
* 创建子节点
*/
public void t7()throws Exception {
client.create().forPath("/ct666/xxoo","xxoo".getBytes());
}
@Test
/**
* 创建子节点并自动创建父节点
*/
public void t8()throws Exception {
client.create().creatingParentsIfNeeded().forPath("/ct666/xxoo2","xxoo".getBytes());
}
@Test
/**
* 创建子节点并自动创建父节点
*/
public void t9()throws Exception {
client.create().creatingParentContainersIfNeeded().forPath("/ct666/xxoo2","xxoo".getBytes());
}
@Test
/**
* 创建子节点并自动创建父节点
*/
public void t10()throws Exception {
List<String> forPath = client.getChildren().forPath("/ct666");
for(String node : forPath) {
System.out.println(node);
}
}
@Test
/**
* 删除节点
*/
public void t11()throws Exception {
client.delete().forPath("/ct666/xxoo");
t10();
}
@Test
/**
* 删除节点,有子节点不能删除
*/
public void t12()throws Exception {
client.delete().forPath("/ct666");
t10();
}
@Test
/**
* 删除节点,有子节点 一起删
*/
public void t13()throws Exception {
client.delete().deletingChildrenIfNeeded().forPath("/ct666");
t10();
}
@Test
/**
* 检查节点是否存在
*/
public void t14()throws Exception {
Stat forPath = client.checkExists().forPath("/ct666");
if(null == forPath) {
System.out.println("不存在");
}else {
client.delete().deletingChildrenIfNeeded().forPath("/ct666");
t10();
}
}
@Test
/**
* 事务
* 1. 检查节点是否存在
* 2. 创建节点
* 3. 设置数据
* 4. 提交事务
*/
public void t15()throws Exception {
Collection<CuratorTransactionResult> commit = client.inTransaction()
.check().forPath("/")
.and()
.create().forPath("/order/order2")
.and()
.setData().forPath("/order/order1","xxx".getBytes())
.and()
.commit();
for (CuratorTransactionResult curatorTransactionResult : commit) {
System.out.println("curatorTransactionResult:" + curatorTransactionResult.getForPath() + curatorTransactionResult.getResultStat());
}
}
事件监听 Cache
Zookeeper原生支持通过注册Watcher来进行事件监听,但是开发者需要反复注册(Watcher只能单次注册单次使用)。
Cache是Curator中对事件监听的包装,可以看作是对事件监听的本地缓存视图,能够自动为开发者处理反复注册监听。
Curator提供了三种Watcher(Cache)来监听结点的变化。
Path Cache
Path Cache用来监控一个ZNode的子节点. 当一个子节点增加, 更新,删除时, Path Cache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态,而状态的更变将通过PathChildrenCacheListener通知。
@Test
/**
PathChildrenCache 监听子节点
*/
public void t16()throws Exception {
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/", true );
pathChildrenCache.start();
PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
System.out.println("事件类型:" + event.getType());
if(null != event.getData())
System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
}
};
pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
Thread.sleep(Long.MAX_VALUE);
}
NodeCache
Node Cache只是监听某一个特定的节点是否存在和数据变化
@Test
/**
NodeCache 监听单个节点
*/
public void t17()throws Exception {
NodeCache nodeCache = new NodeCache(client, "/order");
nodeCache.start();
NodeCacheListener nodeCacheListener = new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
ChildData currentData = nodeCache.getCurrentData();
if(null == currentData ) {
System.out.println("节点不存在");
}else {
System.out.println("currentData:" + new String(currentData.getData()));
}
}
};
nodeCache.getListenable().addListener(nodeCacheListener);
Thread.sleep(Long.MAX_VALUE);
Tree Cache
@Test
/**
TreeCache 监听节点下的所有事件
*/
public void t18()throws Exception {
TreeCache treeCache = new TreeCache(client, "/order");
treeCache.start();
TreeCacheListener treeCacheListener = new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
System.out.println("事件类型:" + event.getType());
if(null != event.getData())
System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
}
};
treeCache.getListenable().addListener(treeCacheListener);
Thread.sleep(Long.MAX_VALUE);
}
分布式锁
可重入共享锁—Shared Reentrant Lock
不可重入共享锁—Shared Lock
可重入读写锁—Shared Reentrant Read Write Lock
多共享锁对象 —Multi Shared Lock
分布式计数器
分布式队列
分布式屏障—Barrier
阅读更多- 使用nhmicro提供的micro-datasource嵌入式的解决微服务架构中分布式事务问题
- [置顶] 【分布式事务】使用atomikos+jta解决分布式事务问题
- 使用TableDiff实用工具解决事务复制中的问题
- 使用复制存储过程执行解决“事务复制中的表大量更新导致无法及时同步”的问题
- ORA-14450: 试图访问已经在使用的事务处理临时表,根据网上文章提供的方法,问题是解决了
- 解决“COM+ 无法与 Microsoft 分布式事务协调程序交谈”的问题
- 使用TableDiff实用工具解决事务复制中的问题
- [转贴]使用TableDiff实用工具解决事务复制中的问题
- ORA-02049: 超时: 分布式事务处理等待锁 问题的解决
- Android多线程操作sqlite(Sqlite解决database locked问题)(2)使用事务处理的效果
- 解决yum安装过程中断导致后续使用yum始终提示有未完成事务的问题
- 使用Redis模拟简单分布式锁,解决单点故障的问题
- Percona 开始尝试基于Ceph做上层感知的分布式 MySQL 集群,使用 Ceph 提供的快照,备份和 HA 功能来解决分布式数据库的底层存储问题
- 面向过程就是分析出解决问题所需要的步骤,然后用函数把这些步骤一步一步实现,使用的时候一个一个依次调用就可以了;面向对象是把构成问题事务分解成各个对象,建立对象的目的不是为了完成一个步骤,而是为了描叙某个事物在整个解决问题的步骤中的行为(转)
- 【Java EE 学习 19】【使用过滤器实现全站压缩】【使用ThreadLocal模式解决跨DAO事务回滚问题】
- 转:邹建--使用TableDiff实用工具解决事务复制中的问题
- 使用TableDiff实用工具解决事务复制中的问题
- 【Spring十】使用OpenSessionInView解决懒加载问题及Spring管理下的session和事务
- 使用TableDiff实用工具解决事务复制中的问题
- 使用zookeeper来解决在分布式系统中单节点维护微信token生命周期的容灾demo【已抽象分离】[分布式锁][9.28更新]