使用Python进行分布式系统协调 (ZooKeeper,Consul, etcd )
2017-02-09 23:06
316 查看
随着大数据时代的到来,分布式是解决大数据问题的一个主要手段,随着越来越多的分布式的服务,如何在分布式的系统中对这些服务做协调变成了一个很棘手的问题。今天我们就来看看如何使用Python,利用开源对分布式服务做协调。
在对分布式的应用做协调的时候,主要会碰到以下的应用场景:
业务发现(service discovery)
找到分布式系统中存在那些可用的服务和节点
名字服务 (name service)
通过给定的名字知道到对应的资源
配置管理 (configuration management)
如何在分布式的节点中共享配置文件,保证一致性。
故障发现和故障转移 (failure detection and failover)
当某一个节点出故障的时候,如何检测到并通知其它节点, 或者把想用的服务转移到其它的可用节点
领导选举(leader election)
如何在众多的节点中选举一个领导者,来协调所有的节点
分布式的锁 (distributed exclusive lock)
如何通过锁在分布式的服务中进行同步
消息和通知服务 (message queue and notification)
如何在分布式的服务中传递消息,以通知的形式对事件作出主动的响应
有许多的开源软件试图解决以上的全部或者部分问题,例如ZooKeeper,consul,doozerd等等,我们现在就看看它们是如何做的。
Zookeeper本身是一个分布式的应用,通过对共享的数据的管理来实现对分布式应用的协调。
ZooKeeper使用一个树形目录作为数据模型,这个目录和文件目录类似,目录上的每一个节点被称作ZNodes。
ZooKeeper提供基本的API来操纵和控制Znodes,包括对节点的创建,删除,设置和获取数据,获得子节点等。
除了这些基本的操作,ZooKeeper还提供了一些配方(Recipe),其实就是一些常见的用例,例如锁,两阶段提交,领导选举等等。
ZooKeeper本身是用Java开发的,所以对Java的支持是最自然的。它同时还提供了C语言的绑定。
Kazoo是一个非常成熟的Zookeeper Python客户端,我们这就看看如果使用Python来调用ZooKeeper。(注意,运行以下的例子,需要在本地启动ZooKeeper的服务)
通过对ZNode的操作,我们可以完成一些分布式服务协调的基本需求,包括名字服务,配置服务,分组等等。
故障检测(Failure Detection)
在分布式系统中,一个最基本的需求就是当某一个服务出问题的时候,能够通知其它的节点或者某个管理节点。
ZooKeeper提供ephemeral Node的概念,当创建该Node的服务退出或者异常中止的时候,该Node会被删除,所以我们就可以利用这种行为来监控服务运行状态。
以下是worker的代码
以下的monitor 代码,监控worker服务是否运行。
领导选举
Kazoo直接提供了领导选举的API,使用起来非常方便。
你可以同时运行多个worker,其中一个会获得Leader,当你杀死当前的leader后,会有一个新的leader被选出。
当你运行多个worker的时候,不同的worker会试图获取同一个锁,然而只有一个worker会工作,其它的worker必须等待获得锁后才能执行。
除了我们上面列举的内容外,Kazoo还提供了许多其他的功能,例如:计数,租约,队列等等,大家有兴趣可以参考它的文档
Consul提供ZooKeeper类似的功能,它的基于HTTP的API可以方便的和各种语言进行绑定。自然Python也在列。
与Zookeeper有所差异的是Consul通过基于Client/Server架构的Agent部署来支持跨Data Center的功能。
Consul在Cluster伤的每一个节点都运行一个Agent,这个Agent可以使Server或者Client模式。Client负责到Server的高效通信,相对为无状态的。 Server负责包括选举领导节点,维护cluster的状态,对所有的查询做响应,跨数据中心的通信等等。
这里和ZooKeeper对Znode的操作几乎是一样的。
首先,用户需要定义一个服务。
其中,服务的名字是必须的,其它的字段可以自选,包括了服务的地址,端口,相应的健康检查的脚本。当用户注册了一个服务后,就可以通过Consul来查询该服务,获得该服务的状态。
Consul支持三种Check的模式:
调用一个外部脚本(Script),在该模式下,consul定时会调用一个外部脚本,通过脚本的返回内容获得对应服务的健康状态。
调用HTTP,
这里注意,因为是基于ttl(最小10秒)的检测,从业务中断到被检测到,至少有10秒的时延,对应需要实时响应的情景,并不适用。Zookeeper使用
etcd对节点的操作和ZooKeeper类似,不过etcd不支持ZooKeeper的
在对分布式的应用做协调的时候,主要会碰到以下的应用场景:
业务发现(service discovery)
找到分布式系统中存在那些可用的服务和节点
名字服务 (name service)
通过给定的名字知道到对应的资源
配置管理 (configuration management)
如何在分布式的节点中共享配置文件,保证一致性。
故障发现和故障转移 (failure detection and failover)
当某一个节点出故障的时候,如何检测到并通知其它节点, 或者把想用的服务转移到其它的可用节点
领导选举(leader election)
如何在众多的节点中选举一个领导者,来协调所有的节点
分布式的锁 (distributed exclusive lock)
如何通过锁在分布式的服务中进行同步
消息和通知服务 (message queue and notification)
如何在分布式的服务中传递消息,以通知的形式对事件作出主动的响应
有许多的开源软件试图解决以上的全部或者部分问题,例如ZooKeeper,consul,doozerd等等,我们现在就看看它们是如何做的。
ZooKeeper
ZooKeeper是使用最广泛,也是最有名的解决分布式服务的协调问题的开源软件了,它最早和Hadoop一起开发,后来成为了Apache的顶级项目,很多开源的项目都在使用ZooKeeper,例如大名鼎鼎的Kafka。Zookeeper本身是一个分布式的应用,通过对共享的数据的管理来实现对分布式应用的协调。
ZooKeeper使用一个树形目录作为数据模型,这个目录和文件目录类似,目录上的每一个节点被称作ZNodes。
ZooKeeper提供基本的API来操纵和控制Znodes,包括对节点的创建,删除,设置和获取数据,获得子节点等。
除了这些基本的操作,ZooKeeper还提供了一些配方(Recipe),其实就是一些常见的用例,例如锁,两阶段提交,领导选举等等。
ZooKeeper本身是用Java开发的,所以对Java的支持是最自然的。它同时还提供了C语言的绑定。
Kazoo是一个非常成熟的Zookeeper Python客户端,我们这就看看如果使用Python来调用ZooKeeper。(注意,运行以下的例子,需要在本地启动ZooKeeper的服务)
基本操作
以下的例子现实了对Znode的基本操作,首先要创建一个客户端的连接,并启动客户端。然后我们可以利用该客户端对Znode做增删改,取内容的操作。最后推出客户端。from kazoo.client import KazooClient import logging logging.basicConfig() zk = KazooClient(hosts='127.0.0.1:2181') zk.start() # Ensure a path, create if necessary zk.ensure_path("/test/zk1") # Create a node with data zk.create("/test/zk1/node", b"a test value") # Determine if a node exists if zk.exists("/test/zk1"): print "the node exist" # Print the version of a node and its data data, stat = zk.get("/test/zk1") print("Version: %s, data: %s" % (stat.version, data.decode("utf-8"))) # List the children children = zk.get_children("/test/zk1") print("There are %s children with names %s" % (len(children), children)) zk.stop()
通过对ZNode的操作,我们可以完成一些分布式服务协调的基本需求,包括名字服务,配置服务,分组等等。
故障检测(Failure Detection)
在分布式系统中,一个最基本的需求就是当某一个服务出问题的时候,能够通知其它的节点或者某个管理节点。ZooKeeper提供ephemeral Node的概念,当创建该Node的服务退出或者异常中止的时候,该Node会被删除,所以我们就可以利用这种行为来监控服务运行状态。
以下是worker的代码
from kazoo.client import KazooClient import time import logging logging.basicConfig() zk = KazooClient(hosts='127.0.0.1:2181') zk.start() # Ensure a path, create if necessary zk.ensure_path("/test/failure_detection") # Create a node with data zk.create("/test/failure_detection/worker", value=b"a test value", ephemeral=True) while True: print "I am alive!" time.sleep(3) zk.stop()
以下的monitor 代码,监控worker服务是否运行。
from kazoo.client import KazooClient import time import logging logging.basicConfig() zk = KazooClient(hosts='127.0.0.1:2181') zk.start() # Determine if a node exists while True: if zk.exists("/test/failure_detection/worker"): print "the worker is alive!" else: print "the worker is dead!" break time.sleep(3) zk.stop()
领导选举
Kazoo直接提供了领导选举的API,使用起来非常方便。from kazoo.client import KazooClient import time import uuid import logging logging.basicConfig() my_id = uuid.uuid4() def leader_func(): print "I am the leader {}".format(str(my_id)) while True: print "{} is working! ".format(str(my_id)) time.sleep(3) zk = KazooClient(hosts='127.0.0.1:2181') zk.start() election = zk.Election("/electionpath") # blocks until the election is won, then calls # leader_func() election.run(leader_func) zk.stop()
你可以同时运行多个worker,其中一个会获得Leader,当你杀死当前的leader后,会有一个新的leader被选出。
分布式锁
锁的概念大家都熟悉,当我们希望某一件事在同一时间只有一个服务在做,或者某一个资源在同一时间只有一个服务能访问,这个时候,我们就需要用到锁。from kazoo.client import KazooClient import time import uuid import logging logging.basicConfig() my_id = uuid.uuid4() def work(): print "{} is working! ".format(str(my_id)) zk = KazooClient(hosts='127.0.0.1:2181') zk.start() lock = zk.Lock("/lockpath", str(my_id)) print "I am {}".format(str(my_id)) while True: with lock: work() time.sleep(3) zk.stop()
当你运行多个worker的时候,不同的worker会试图获取同一个锁,然而只有一个worker会工作,其它的worker必须等待获得锁后才能执行。
监视
ZooKeeper提供了监视(Watch)的功能,当节点的数据被修改的时候,监控的function会被调用。我们可以利用这一点进行配置文件的同步,发消息,或其他需要通知的功能。from kazoo.client import KazooClient import time import logging logging.basicConfig() zk = KazooClient(hosts='127.0.0.1:2181') zk.start() @zk.DataWatch('/path/to/watch') def my_func(data, stat): if data: print "Data is %s" % data print "Version is %s" % stat.version else : print "data is not available" while True: time.sleep(10) zk.stop()
除了我们上面列举的内容外,Kazoo还提供了许多其他的功能,例如:计数,租约,队列等等,大家有兴趣可以参考它的文档
Consul
Consul是用Go开发的分布式服务协调管理的工具,它提供了服务发现,健康检查,Key/Value存储等功能,并且支持跨数据中心的功能。Consul提供ZooKeeper类似的功能,它的基于HTTP的API可以方便的和各种语言进行绑定。自然Python也在列。
与Zookeeper有所差异的是Consul通过基于Client/Server架构的Agent部署来支持跨Data Center的功能。
Consul在Cluster伤的每一个节点都运行一个Agent,这个Agent可以使Server或者Client模式。Client负责到Server的高效通信,相对为无状态的。 Server负责包括选举领导节点,维护cluster的状态,对所有的查询做响应,跨数据中心的通信等等。
KV基本操作
类似于Zookeeper,Consul支持对KV的增删查改的操作。import consul c = consul.Consul() # set data for key foo c.kv.put('foo', 'bar') # poll a key for updates index = None while True: index, data = c.kv.get('foo', index=index) print data['Value'] c.kv.delete('foo')
这里和ZooKeeper对Znode的操作几乎是一样的。
服务发现(Service Discovery)和健康检查(Health Check)
Consul的另一个主要的功能是用于对分布式的服务做管理,用户可以注册一个服务,同时还提供对服务做健康检测的功能。首先,用户需要定义一个服务。
{ "service": { "name": "redis", "tags": ["master"], "address": "127.0.0.1", "port": 8000, "checks": [ { "script": "/usr/local/bin/check_redis.py", "interval": "10s" } ] }}
其中,服务的名字是必须的,其它的字段可以自选,包括了服务的地址,端口,相应的健康检查的脚本。当用户注册了一个服务后,就可以通过Consul来查询该服务,获得该服务的状态。
Consul支持三种Check的模式:
调用一个外部脚本(Script),在该模式下,consul定时会调用一个外部脚本,通过脚本的返回内容获得对应服务的健康状态。
调用HTTP,
import consul import time def is_session_exist(name, sessions): for s in sessions: if s['Name'] == name: return True return False c = consul.Consul() while True: index, sessions = c.session.list() if is_session_exist('worker', sessions): print "worker is alive ..." else: print 'worker is dead!' break time.sleep(3)
这里注意,因为是基于ttl(最小10秒)的检测,从业务中断到被检测到,至少有10秒的时延,对应需要实时响应的情景,并不适用。Zookeeper使用
import etcd client = etcd.Client() client.write('/nodes/n1', 1) print client.read('/nodes/n1').value
etcd对节点的操作和ZooKeeper类似,不过etcd不支持ZooKeeper的
相关文章推荐
- 使用Python进行分布式系统协调 (ZooKeeper/Consul/etcd)
- 使用Python进行分布式系统协调 (ZooKeeper,Consul, etcd )
- 使用Python进行分布式系统协调 (ZooKeeper/Consul/etcd)
- 使用Python进行分布式系统协调 (ZooKeeper,Consul, etcd )
- 使用 Python 进行分布式系统协调
- 使用 Python 进行分布式系统协调
- 使用C# 和Consul进行分布式系统协调
- 使用 Python 进行分布式系统协调
- 使用C# 和Consul进行分布式系统协调
- 使用 Python 进行分布式系统协调
- 使用 Python 进行分布式系统协调
- Python中使用pickle对内建类型(built in types)进行对象序列化(object serialization and deseirialzation)
- 使用rope进行Python代码补全
- 使用Flex 4、Django、Python和PyAMF进行开发
- 使用python+cron对php状态进行定时监控
- 转贴: 【python】使用zlib进行压缩解压
- 使用Python进行二进制文件读写
- 使用python进行下载地址转换
- 使用python进行桌面程序开发(二)
- 使用python对txt格式的小说进行处理