您的位置:首页 > 其它

ZooKeeper客户端curator组件介绍

2014-12-20 22:17 393 查看

Apache Curator Recipes

Curator实现了 ZooKeeper recipes文档中列出的所有技巧(除了两段提交two phase commit)。点击下面的技巧的名字可以查看详细信息。


Elections

Leader Latch - 在分布式计算中, leader选举是在几台节点中指派单一的进程作为任务组织者的过程。在任务开始前,
所有的网络节点都不知道哪一个节点会作为任务的leader或coordinator. 一旦leader选举算法被执行, 网络中的每个节点都将知道一个特别的唯一的节点作为任务leader.

Leader Election - 初始的leader选举实现.


Locks

Shared Reentrant Lock - 全功能的分布式锁。 任何一刻不会有两个client同时拥有锁.

Shared Lock - 与Shared Reentrant Lock类似但是不是重入的.

Shared Reentrant Read Write Lock -
类似Java的读写锁,但是是分布式的.

Shared Semaphore - 跨JVM的计数信号量.

Multi Shared Lock - 将多个锁看成整体,要不全部acquire成功,要不acquire全部失败。
release也是释放全部锁.


Barriers

Barrier - 分布式的barriers。 会阻塞全部的节点的处理,直到条件满足,所有的节点会继续执行.

Double Barrier - 双barrier 允许客户端在一个计算开始点和结束点保持同步。当足够的进程加入barrier,
进程开始它们的计算, 当所有的进程完成计算才离开.


Counters

Shared Counter - 管理一个共享的整数integer. 所有监控同一path的客户端都会得到最新的值(ZK的
一致性保证).

Distributed Atomic Long - 尝试原子增加的计数器首先它尝试乐观锁.如果失败,可选的InterProcessMutex会被采用.
不管是optimistic 还是 mutex, 重试机制都被用来尝试增加值.


Caches

Path Cache - Path Cache用来监控ZNode. Whenever a child is added,
updated or removed, the Path Cache will change its state to contain the current set of children, the children’s data and the children’s state. Path caches in the Curator Framework are provided by the PathChildrenCache class. Changes to the path are passed
to registered PathChildrenCacheListener instances.

Node Cache - A utility that attempts to keep the data from
a node locally cached. This class will watch the node, respond to update/create/delete events, pull down the data, etc. You can register a listener that will get notified when changes occur.


Nodes

Persistent Ephemeral Node - An ephemeral
node that attempts to stay present in ZooKeeper, even through connection and session interruptions..


Queues

Distributed Queue - 分布式的ZK队列. Items put into the
queue are guaranteed to be ordered (by means of ZK’s PERSISTENTSEQUENTIAL node). If a single consumer takes items out of the queue, they will be ordered FIFO. If ordering is important, use a LeaderSelector to nominate a single consumer.

Distributed Id Queue - A version of DistributedQueue
that allows IDs to be associated with queue items. Items can then be removed from the queue if needed.

Distributed Priority Queue - An implementation
of the Distributed Priority Queue ZK recipe.

Distributed Delay Queue - An implementation
of a Distributed Delay Queue.

Simple Distributed Queue - A drop-in replacement
for the DistributedQueue that comes with the ZK distribution.

Apache Curator Framework

Curator framework提供了高级API, 极大的简化了ZooKeeper的使用。 它在ZooKeeper基础上增加了很多特性,可以管理与ZOoKeeper的连接和重试机制。这些特性包括:

自动连接管理

** 有些潜在的错误情况需要让ZooKeeper client重建连接和重试。Curator可以自动地和透明地处理这些情况

Cleaner API
简化原始的ZooKeeper方法,事件等 提供现代的流式接口

技巧(Recipe)实现
Leader选举 共享锁
Path缓存和监控 分布式队列
分布式优先级队列 …


产生Curator framework 实例

使用CuratorFrameworkFactory产生framework实例。 CuratorFrameworkFactory 既提供了factory方法也提供了builder来创建实例。 CuratorFrameworkFactory是线程安全的。你应该在应用中为单一的ZooKeeper集群共享唯一的CuratorFramework实例。

工厂方法(newClient())提供了一个简单的方式创建实例。Builder可以使用更多的参数控制生成的实例。一旦生成framework实例, 必须调用start方法启动它。应用结束时应该调用close方法关闭它。


CuratorFramework API

CuratorFramework 使用流程风格的接口。 代码胜于说教:
1234
client.create().forPath("/head", new byte[0]);client.delete().inBackground().forPath("/head");client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child", new byte[0]);client.getData().watched().inBackground().forPath("/test");


方法

方法名描述
create()开始一create操作. 可以调用额外的方法(mode or background),最后调用forPath()
delete()开始一个delete操作. 调用额外的方法(version or background) , 最好调用forPath()
checkExists()开始一个检查ZNode是否存在的操作. 调用额外的方法 (watch or background), 最后调用forPath()
getData()开始一个获取ZNode节点数据的操作. 调用额外的方法(watch, background or get stat), 最后调用forPath()
setData()开始一个设置ZNode节点数据的操作. 调用额外的方法(version or background), 最后调用forPath()
getChildren()开始一个获取ZNode的子节点列表的操作.调用额外的方法(watch, background or get stat), 最后调用forPath()
inTransaction()开始一个原子的ZooKeeper事务. 可以包含 create, setData, check, and/or delete 操作的组合, 然后commit() 作为一个原子操作.


通知 Notifications

服务于后台操作和监控(watch)的通知通过ClientListener接口发布。你通过CuratorFramework实例的addListener方法可以注册监听器。

Interface CuratorListener
1
eventReceived() 	A background operation has completed or a watch has triggered. Examine the given event for details
Interface ConnectionStateListener:
12
stateChanged(CuratorFramework client, ConnectionState newState)          Called when there is a state change in the connection
Interface UnhandledErrorListener:
12
unhandledError(String message, Throwable e)          Called when an exception is caught in a background thread, handler, etc.


ClientEvent

ClientEvent是事件父类, 代表后台通知和监控的类型。 ClientEvent有用的字段根据事件的类型(getType()方法获取)不同而不同。
Event TypeEvent Methods
CREATEgetResultCode() and getPath()
DELETEgetResultCode() and getPath()
EXISTSgetResultCode(), getPath() and getStat()
GETDATAgetResultCode(), getPath(), getStat() and getData()
SETDATAgetResultCode(), getPath() and getStat()
CHILDRENgetResultCode(), getPath(), getStat(), getChildren()
WATCHEDgetWatchedEvent()


命名空间 namespace

因为ZOoKeeper是一个共享的集群。所以命名空间约定极为重要,各个应用在使用同一集群时不会有冲突的ZK path。

CuratorFramework 提供了命名空间的概念。当生成CuratorFramework 可以设置命名空间。CuratorFramework在调用API会在所有的path前面加上命名空间。
1234
CuratorFramework    client = CuratorFrameworkFactory.builder().namespace("MyApp") ... build(); ...client.create().forPath("/test", data);// node was actually written to: "/MyApp/test"


临时连接

Temporary CuratorFramework instances are meant for single requests to ZooKeeper ensembles over a failure prone network such as a WAN.

临时的CuratorFramework用来发送单独请求通过一个易出错的网络如WAN。CuratorTempFramework 的API是有限制的,而且,连接在不用一段时间后会被关闭。

这个想法基于Camille Fournier的文章: http://whilefalse.blogspot.com/2012/12/building-global-highly-available.html


创建CuratorTempFramework

就像正常的CuratorFramework 实例一样,CuratorTempFramework依然通过CuratorFrameworkFactory 创建。但是最后不是调用build()方法,
而是buildTemp()。buildTemp()创建创建CuratorTempFramework然后在它不用三分钟后就关闭它。有个buildTemp()重载版本可以设定不活跃(不用)的时间。


Limited API

CuratorTempFramework提供了下列方法:
1234567891011121314151617181920
/**     * Stop the client     */    public void     close();    /**     * Start a transaction builder     *     * @return builder object     * @throws Exception errors     */    public CuratorTransaction inTransaction() throws Exception;    /**     * Start a get data builder     *     * @return builder object     * @throws Exception errors     */    public TempGetDataBuilder getData() throws Exception;

Apache Curator Utilities

Curator提供了一组工具类和方法用来测试基于Curator的应用。 并且提供了操作ZNode辅助类以及其它一些数据结构


Test Server

curator-test提供了TestingServer类。
这个类创建了一个本地的, 同进程的ZooKeeper服务器用来测试。


Test Cluster

curator-test提供了TestingCluster类。
这个类创建了一个内部的ZooKeeper集群用来测试。


ZKPaths

提供了各种静态方法来操作ZNode:

getNodeFromPath: 从一个全路径中得到节点名, 比如 “/one/two/three” 返回 “three”

mkdirs: 确保所有的节点都已被创建

getSortedChildren: 得到一个给定路径的子节点, 按照sequence number排序

makePath: 给定父路径和子节点,创建一个全路径


EnsurePath

确保一个特定的路径被创建。当它第一次使用时,一个同步ZKPaths.mkdirs(ZooKeeper, String)调用被触发来确保完整的路径都已经被创建。后续的调用将不是同步操作.

用法:
12345678
EnsurePath       ensurePath = new EnsurePath(aFullPathToEnsure);...String           nodePath = aFullPathToEnsure + "/foo";ensurePath.ensure(zk);   // first time syncs and creates if neededzk.create(nodePath, ...);...ensurePath.ensure(zk);   // subsequent times are NOPszk.create(nodePath, ...);
注意: 此方法namespace会参与路径名字的创建。


BlockingQueueConsumer

请参看DistributedQueue 和 DistributedPriorityQueue

提供JDK BlockingQueue类似的行为。


QueueSharder

由于zookeeper传输层的限制, 单一的队列如果超过10K的元素会被分割(break)。 这个类为多个分布式队列提供了一个facade。 它监控队列, 如果一个队列超过这个阈值, 一个新的队列就被创建。 在这些队列中Put是分布式的。


Reaper and ChildReaper

Reaper

可以用来删除锁的父路径。定时检查路径被加入到reaper中。 当检查时,如果path没有子节点/路径, 此路径将被删除。每个应用中CLient应该只创建一个reaper实例。必须将lock path加到这个readper中。 reaper会定时的检查删除它们。

ChildReaper

用来清除父节点下所有的空节点。定时的调用getChildren()并将空节点加入到内部管理的reaper中。

注意: 应该考虑使用LeaderSelector来运行Reapers, 因为它们不需要在每个client运行.

Apache Curator Client

Curator client使用底层的API, 强烈推荐你是用Curator Framework代替使用CuratorZookeeperClient


背景

CuratorZookeeperClient 是ZOoKeeper client的包装类。但是提供了更简单方式, 而且可以减少错误的发生。它提供了下列的特性:

持续的连接管理 - ZooKeeper有很多的关于连接管理的警告(你可以到ZooKeeper FAQ查看细节)。CuratorZookeeperClient 可以自动的管理这些事情。

retry - 提供一个方式处理retry。

Test ZooKeeper server - 提供一个进程内的ZooKeeper测试服务器用来测试和实验。


方法

MethodDescription
Constructor创建一个给定ZooKeeper集群的连接。 你可以传入一个可选的watcher. 必须提供Retry策略
getZooKeeper()返回管理的ZooKeeper实例. 重要提示: a) 它会花费些许时间等待连接来完成, 在使用其它方法之前你应该校验连接是否完成. b) 管理的ZooKeeper实例可以根据特定的事件而改变。 不要持有实例太长时间. 总是调用getZooKeeper()得到一个新的实例.
isConnected()返回ZooKeeper client当前连接状态
blockUntilConnectedOrTimedOut()block知道连接成功或者超时
close()关闭连接
setRetryPolicy()改变retry策略
newRetryLoop()分配一个新的Retry Loop - 详情看下边


Retry Loop

由于各种各样的原因, 在zookeeper集群上的操作难免遇到失败的情况。最佳实践表明应该提供重试机制。Retry Loop 为此而生。 每个操作都被包装在一个Retry Loop中。下面是一个典型的处理流程:
12345678910111213141516
RetryLoop retryLoop = client.newRetryLoop();while ( retryLoop.shouldContinue() ){   try   {       // perform your work       ...       // it's important to re\-get the ZK instance as there may have been an error and the instance was re\-created       ZooKeeper      zk = client.getZookeeper();       retryLoop.markComplete();   }   catch ( Exception e )   {       retryLoop.takeException(e);   }}
Retry Loop维护一定数量的retry, 它还决定一个错误是否可以要执行retry。 假如一个错误需要retry,Retry策略被调用来决定retry是要要执行,执行多少次才放弃。

很方便地,RetryLoop 提供了一个静态方法使用Callable来执行一个完整retry loop。
123456789
RetryLoop.callWithRetry(client, new Callable<Void>(){      @Override      public Void call() throws Exception      {          // do your work here - it will get retried if needed          return null;      }});


Retry策略

retry策略可以改变retry的行为。 它抽象出RetryPolicy接口, 包含一个方法public
boolean allowRetry(int retryCount, long elapsedTimeMs);。 在retry被尝试执行前, allowRetry()被调用,并且将当前的重试次数和操作已用时间作为参数. 如果返回true, retry被执行。否则异常被抛出。

Curator本身提供了几个策略(在 com.netflix.curator.retry 包下):
Policy NameDescription
ExponentialBackoffRetry重试一定次数,每次重试sleep更多的时间
RetryNTimes重试N次
RetryOneTime重试一次
RetryUntilElapsed重试一定的时间
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Curator ZooKeeper