您的位置:首页 > 运维架构 > 网站架构

技术贴001使用Zookeeper解决微服务架构下分布式事务问题

2018-08-10 15:25 148 查看

准备工作

单机调试zookeeper集群的话,我们需要在虚拟机里虚拟出几台“微服务器“,做这一步操作之前需要在系统中预留出来8G以上磁盘空间,4G以上物理内存。

  1. 虚拟机

我们使用virtualbox

在官网下载最新版并安装

https://www.virtualbox.org/wiki/Downloads

  1. 操作系统

操作系统使用

CentOS-6.8-x86_64-minimal版

  1. Zookeeper

下载 Zookeeper

http://zookeeper.apache.org

  1. 准备SSH连接工具

Xshell 或Winscp + Putty

https://winscp.net/eng/download.php

https://www.putty.org/

操作系统安装

  1. 加载光盘镜像

选择ISO文件

  1. 跳过磁盘测试

剩下的步骤按照提示 下一步就可以了

Zookeeper集群安装部署

Zookeeper客户端命令

Zookeeper Java Api

使用zookeeper开发分布式锁

分布式锁算法

竞争式 饿汉式 核心代码

此方式会在高并发场景下有缺陷

// 1. 检查父节点

String path = “/locks”; // 父节点名称

Stat stat = zooKeeper.exists(path, false);

// 如果 stat这个对象是空的 意味着 需要创建节点

if (stat == null){

// 创建父节点

// createMode e 临时节点 不支持 有子节点

zooKeeper.create(path, “”.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

System.out.println(“DistributedLock -> getLock -> 父节点创建成功”);

}

String lock_path = path + “/” + lockItem; // 子节点完整路径

// 2. 创建 带有唯一标识的 子节点

int try_max = 20; // 允许重试次数

int try_count = 1; // 重试计数

long try_time = 2000; // 重试间隔

try{

// 创建成功

zooKeeper.create(lock_path, “”.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

}catch (Exception e) {

/**创建失败 说明节点已存在

  • 接下来使用while循环尝试继续创建节点

  • 直到成功或次数超限

*/

while (try_count < try_max) {

try_count = try_count + 1;

Thread.sleep(try_time);

try{

zooKeeper.create(lock_path, “”.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

}catch (Exception e1) {

System.out.println(“DistributedLock -> getLock 子节点创建中 当前次数 :” + try_count);

continue;//重试创建失败 继续

}

break; // 创建节点成功,退出循环

}

}

System.out.println(“DistributedLock -> getLock 子节点创建: 成功 使用次数 :” + try_count);

队列式 懒汉式 核心代码

// 1. 检查父节点

String path = “/locks” + “/” + lockItem; // 父节点名称

Stat stat = zooKeeper.exists(path, false);

// 如果 stat这个对象是空的 意味着 需要创建节点

if (stat == null){

// 创建父节点

// createMode e 临时节点 不支持 有子节点

zooKeeper.create(path, “”.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

System.out.println(“DistributedLock -> getLock -> 父节点创建成功”);

}

String lock_path = path + “/lock”; // 子节点完整路径

// 2. 创建 带有唯一标识的 子节点

// 创建成功 进入等待状态

String lock_sub_path = zooKeeper.create(lock_path, “”.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

System.out.println(“lock_sub_path:” + lock_sub_path);

/**

  • 检查是否持有锁

  • 持有锁特征 当前创建节点编号前无其他子节点

  • 如果有其他子节点进入 等待事件状态

  • 等待事件 : 监听当前编号的上一编号释放 说明持有锁

*/

// 2.1 检查是否持有锁,检查是否为当前最小号

List childrens = zooKeeper.getChildren(path, false);

// 2.2 如果只有一个子节点 直接返回

if (childrens.size() == 1){

System.out.println(“DistributedLock -> getLock 子节点创建: 成功 == 1”);

return;

}

// 2.3 多个子节点的话 给自己当前节点编号 -1 然后监听释放

// /locks/item-id-1024/lock0000000018

int lock_path_ids = Integer.parseInt(lock_sub_path.substring(lock_sub_path.lastIndexOf(“/lock”) +5));

Integer pre_id = lock_path_ids -1; // 上一节点id号

String format = String.format(“%0”+10+”d”, pre_id);

String pre_path = lock_path + format; // 重组上一节点完整路径

zooKeeper.exists(pre_path, new Watcher() {

@Override

public void process(WatchedEvent event) {

if(event.getType() == EventType.NodeDeleted){

System.err.println(“上一个锁释放 当前进程获得锁”);

}

System.out.println(“event:” + event);

cd2.countDown();

}

});

cd2.await();

System.out.println(“DistributedLock -> getLock 子节点创建: 成功”);

Curator

Curator是Netflix公司一个开源的zookeeper客户端,在原生API接口上进行了包装,解决了很多ZooKeeper客户端非常底层的细节开发。同时内部实现了诸如Session超时重连,Watcher反复注册等功能,实现了Fluent风格的API接口,是使用最广泛的zookeeper客户端之一。

Curator包含了几个包:

curator-framework:对zookeeper的底层api的一些封装

curator-client:提供一些客户端的操作,例如重试策略等

curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等

Maven依赖

使用curator的版本:2.12.0,对应Zookeeper的版本为:3.4.x

org.apache.curator

curator-framework

2.12.0

org.apache.curator

curator-recipes

2.12.0

建立连接

retryPolicy 重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

CuratorFramework client =

CuratorFrameworkFactory.newClient(

connectionString,

5000,

3000,

retryPolicy);

数据节点操作

@Test

/**

  • 创建节点

*/

public void t2()throws Exception {

String string = client.create().forPath(“/ct1”);

System.out.println(“string:” + string);

}

@Test

/**

  • 展示节点

*/

public void t3()throws Exception {

List forPath = client.getChildren().forPath(“/”);

for(String node : forPath) {

System.out.println(node);

}

}

@Test

/**

  • 节点类型

*/

public void t4()throws Exception {

String string = client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(“/ct1”);

System.out.println(“string:” + string);

t3();

}

@Test

/**

  • 节点数据

*/

public void t5()throws Exception {

String string = client.create().withMode(CreateMode.PERSISTENT).forPath(“/ct2”,”aaxxa”.getBytes());

System.out.println(“string:” + string);

}

@Test

/**

  • 获取数据

*/

public void t6()throws Exception {

t5();

byte[] forPath = client.getData().forPath(“/ct2”);

System.out.println(“string:” + new String(forPath));

}

@Test

/**

  • 创建子节点

*/

public void t7()throws Exception {

client.create().forPath(“/ct666/xxoo”,”xxoo”.getBytes());

}

@Test

/**

  • 创建子节点并自动创建父节点

*/

public void t8()throws Exception {

client.create().creatingParentsIfNeeded().forPath(“/ct666/xxoo2”,”xxoo”.getBytes());

}

@Test

/**

  • 创建子节点并自动创建父节点

*/

public void t9()throws Exception {

client.create().creatingParentContainersIfNeeded().forPath(“/ct666/xxoo2”,”xxoo”.getBytes());

}

@Test

/**

  • 创建子节点并自动创建父节点

*/

public void t10()throws Exception {

List forPath = client.getChildren().forPath(“/ct666”);

for(String node : forPath) {

System.out.println(node);

}

}

@Test

/**

  • 删除节点

*/

public void t11()throws Exception {

client.delete().forPath(“/ct666/xxoo”);

t10();

}

@Test

/**

  • 删除节点,有子节点不能删除

*/

public void t12()throws Exception {

client.delete().forPath(“/ct666”);

t10();

}

@Test

/**

  • 删除节点,有子节点 一起删

*/

public void t13()throws Exception {

client.delete().deletingChildrenIfNeeded().forPath(“/ct666”);

t10();

}

@Test

/**

  • 检查节点是否存在

*/

public void t14()throws Exception {

Stat forPath = client.checkExists().forPath(“/ct666”);

if(null == forPath) {

System.out.println(“不存在”);

}else {

client.delete().deletingChildrenIfNeeded().forPath(“/ct666”);

t10();

}

}

@Test

/**

  • 事务

    1. 检查节点是否存在
    1. 创建节点
    1. 设置数据
    1. 提交事务

*/

public void t15()throws Exception {

Collection commit = client.inTransaction()

.check().forPath(“/”)

.and()

.create().forPath(“/order/order2”)

.and()

.setData().forPath(“/order/order1”,”xxx”.getBytes())

.and()

.commit();

for (CuratorTransactionResult curatorTransactionResult : commit) {

System.out.println(“curatorTransactionResult:” + curatorTransactionResult.getForPath() + curatorTransactionResult.getResultStat());

}

}

事件监听 Cache

Zookeeper原生支持通过注册Watcher来进行事件监听,但是开发者需要反复注册(Watcher只能单次注册单次使用)。

Cache是Curator中对事件监听的包装,可以看作是对事件监听的本地缓存视图,能够自动为开发者处理反复注册监听。

Curator提供了三种Watcher(Cache)来监听结点的变化。

Path Cache

Path Cache用来监控一个ZNode的子节点. 当一个子节点增加, 更新,删除时, Path Cache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态,而状态的更变将通过PathChildrenCacheListener通知。

@Test

/**

PathChildrenCache 监听子节点

*/

public void t16()throws Exception {

PathChildrenCache pathChildrenCache = new PathChildrenCache(client, “/”, true );

pathChildrenCache.start();

PathChildrenCacheListener pathChildrenCacheListener = newPathChildrenCacheListener() {

@Override

public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {

System.out.println(“事件类型:” + event.getType());

if(null != event.getData())

System.out.println(“节点数据:” + event.getData().getPath() + ” = ” + newString(event.getData().getData()));

}

};

pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);

Thread.sleep(Long.MAX_VALUE);

}

NodeCache

Node Cache只是监听某一个特定的节点是否存在和数据变化

@Test

/**

NodeCache 监听单个节点

*/

public void t17()throws Exception {

NodeCache nodeCache = new NodeCache(client, “/order”);

nodeCache.start();

NodeCacheListener nodeCacheListener = new NodeCacheListener() {

@Override

public void nodeChanged() throws Exception {

ChildData currentData = nodeCache.getCurrentData();

if(null == currentData ) {

System.out.println(“节点不存在”);

}else {

System.out.println(“currentData:” + new String(currentData.getData()));

}

}

};

nodeCache.getListenable().addListener(nodeCacheListener);

Thread.sleep(Long.MAX_VALUE);

Tree Cache

@Test

/**

TreeCache 监听节点下的所有事件

*/

public void t18()throws Exception {

TreeCache treeCache = new TreeCache(client, “/order”);

treeCache.start();

TreeCacheListener treeCacheListener = new TreeCacheListener() {

@Override

public void childEvent(CuratorFramework client, TreeCacheEvent event) throwsException {

System.out.println(“事件类型:” + event.getType());

if(null != event.getData())

System.out.println(“节点数据:” + event.getData().getPath() + ” = ” + newString(event.getData().getData()));

}

};

treeCache.getListenable().addListener(treeCacheListener);

Thread.sleep(Long.MAX_VALUE);

}

分布式锁

可重入共享锁—Shared Reentrant Lock

不可重入共享锁—Shared Lock

可重入读写锁—Shared Reentrant Read Write Lock

多共享锁对象 —Multi Shared Lock

分布式计数器

分布式队列

分布式屏障—Barrier

更多学习资料访问大数据公开课

阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐