您的位置:首页 > 编程语言 > Python开发

使用Python进行分布式系统协调 (ZooKeeper,Consul, etcd )

2017-02-09 23:06 316 查看
随着大数据时代的到来,分布式是解决大数据问题的一个主要手段,随着越来越多的分布式的服务,如何在分布式的系统中对这些服务做协调变成了一个很棘手的问题。今天我们就来看看如何使用Python,利用开源对分布式服务做协调。
在对分布式的应用做协调的时候,主要会碰到以下的应用场景:

业务发现(service discovery)

找到分布式系统中存在那些可用的服务和节点

名字服务 (name service)

通过给定的名字知道到对应的资源

配置管理 (configuration management)

如何在分布式的节点中共享配置文件,保证一致性。

故障发现和故障转移 (failure detection and failover)

当某一个节点出故障的时候,如何检测到并通知其它节点, 或者把想用的服务转移到其它的可用节点

领导选举(leader election)

如何在众多的节点中选举一个领导者,来协调所有的节点

分布式的锁 (distributed exclusive lock)

如何通过锁在分布式的服务中进行同步

消息和通知服务 (message queue and notification)

如何在分布式的服务中传递消息,以通知的形式对事件作出主动的响应

有许多的开源软件试图解决以上的全部或者部分问题,例如ZooKeeper,consul,doozerd等等,我们现在就看看它们是如何做的。

ZooKeeper

ZooKeeper是使用最广泛,也是最有名的解决分布式服务的协调问题的开源软件了,它最早和Hadoop一起开发,后来成为了Apache的顶级项目,很多开源的项目都在使用ZooKeeper,例如大名鼎鼎的Kafka。

Zookeeper本身是一个分布式的应用,通过对共享的数据的管理来实现对分布式应用的协调。

ZooKeeper使用一个树形目录作为数据模型,这个目录和文件目录类似,目录上的每一个节点被称作ZNodes。



ZooKeeper提供基本的API来操纵和控制Znodes,包括对节点的创建,删除,设置和获取数据,获得子节点等。

除了这些基本的操作,ZooKeeper还提供了一些配方(Recipe),其实就是一些常见的用例,例如锁,两阶段提交,领导选举等等。

ZooKeeper本身是用Java开发的,所以对Java的支持是最自然的。它同时还提供了C语言的绑定。

Kazoo是一个非常成熟的Zookeeper Python客户端,我们这就看看如果使用Python来调用ZooKeeper。(注意,运行以下的例子,需要在本地启动ZooKeeper的服务)

基本操作

以下的例子现实了对Znode的基本操作,首先要创建一个客户端的连接,并启动客户端。然后我们可以利用该客户端对Znode做增删改,取内容的操作。最后推出客户端。

from kazoo.client import KazooClient

import logging
logging.basicConfig()

zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()

# Ensure a path, create if necessary
zk.ensure_path("/test/zk1")

# Create a node with data
zk.create("/test/zk1/node", b"a test value")

# Determine if a node exists
if zk.exists("/test/zk1"):
    print "the node exist"

# Print the version of a node and its data
data, stat = zk.get("/test/zk1")
print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))

# List the children
children = zk.get_children("/test/zk1")
print("There are %s children with names %s" % (len(children), children))

zk.stop()

通过对ZNode的操作,我们可以完成一些分布式服务协调的基本需求,包括名字服务,配置服务,分组等等。

故障检测(Failure Detection)

在分布式系统中,一个最基本的需求就是当某一个服务出问题的时候,能够通知其它的节点或者某个管理节点。

ZooKeeper提供ephemeral Node的概念,当创建该Node的服务退出或者异常中止的时候,该Node会被删除,所以我们就可以利用这种行为来监控服务运行状态。

以下是worker的代码

from kazoo.client import KazooClient
import time

import logging
logging.basicConfig()

zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()

# Ensure a path, create if necessary
zk.ensure_path("/test/failure_detection")

# Create a node with data
zk.create("/test/failure_detection/worker",
          value=b"a test value", ephemeral=True)

while True:
    print "I am alive!"
    time.sleep(3)

zk.stop()

以下的monitor 代码,监控worker服务是否运行。

from kazoo.client import KazooClient

import time

import logging
logging.basicConfig()

zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()

# Determine if a node exists
while True:
    if zk.exists("/test/failure_detection/worker"):
        print "the worker is alive!"
    else:
        print "the worker is dead!"
        break
    time.sleep(3)

zk.stop()


领导选举

Kazoo直接提供了领导选举的API,使用起来非常方便。

from kazoo.client import KazooClient
import time
import uuid

import logging
logging.basicConfig()

my_id = uuid.uuid4()

def leader_func():
    print "I am the leader {}".format(str(my_id))
    while True:
        print "{} is working! ".format(str(my_id))
        time.sleep(3)

zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()

election = zk.Election("/electionpath")

# blocks until the election is won, then calls
# leader_func()
election.run(leader_func)

zk.stop()

你可以同时运行多个worker,其中一个会获得Leader,当你杀死当前的leader后,会有一个新的leader被选出。

分布式锁

锁的概念大家都熟悉,当我们希望某一件事在同一时间只有一个服务在做,或者某一个资源在同一时间只有一个服务能访问,这个时候,我们就需要用到锁。

from kazoo.client import KazooClient
import time
import uuid

import logging
logging.basicConfig()

my_id = uuid.uuid4()

def work():
    print "{} is working! ".format(str(my_id))

zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()

lock = zk.Lock("/lockpath", str(my_id))

print "I am {}".format(str(my_id))

while True:
    with lock:
        work()
    time.sleep(3)    

zk.stop()

当你运行多个worker的时候,不同的worker会试图获取同一个锁,然而只有一个worker会工作,其它的worker必须等待获得锁后才能执行。

监视

ZooKeeper提供了监视(Watch)的功能,当节点的数据被修改的时候,监控的function会被调用。我们可以利用这一点进行配置文件的同步,发消息,或其他需要通知的功能。

from kazoo.client import KazooClient
import time

import logging
logging.basicConfig()

zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()

@zk.DataWatch('/path/to/watch')
def my_func(data, stat):
    if data:
        print "Data is %s" % data
        print "Version is %s" % stat.version 
    else :
        print "data is not available"

while True:
    time.sleep(10)

zk.stop()

除了我们上面列举的内容外,Kazoo还提供了许多其他的功能,例如:计数,租约,队列等等,大家有兴趣可以参考它的文档


Consul

Consul是用Go开发的分布式服务协调管理的工具,它提供了服务发现,健康检查,Key/Value存储等功能,并且支持跨数据中心的功能。

Consul提供ZooKeeper类似的功能,它的基于HTTP的API可以方便的和各种语言进行绑定。自然Python也在列。

与Zookeeper有所差异的是Consul通过基于Client/Server架构的Agent部署来支持跨Data Center的功能。



Consul在Cluster伤的每一个节点都运行一个Agent,这个Agent可以使Server或者Client模式。Client负责到Server的高效通信,相对为无状态的。 Server负责包括选举领导节点,维护cluster的状态,对所有的查询做响应,跨数据中心的通信等等。

KV基本操作

类似于Zookeeper,Consul支持对KV的增删查改的操作。

import consul

c = consul.Consul()

# set data for key foo
c.kv.put('foo', 'bar')

# poll a key for updates
index = None
while True:
    index, data = c.kv.get('foo', index=index)
    print data['Value']
    
c.kv.delete('foo')

这里和ZooKeeper对Znode的操作几乎是一样的。

服务发现(Service Discovery)和健康检查(Health Check)

Consul的另一个主要的功能是用于对分布式的服务做管理,用户可以注册一个服务,同时还提供对服务做健康检测的功能。

首先,用户需要定义一个服务。

{
  "service": {
    "name": "redis",
    "tags": ["master"],
    "address": "127.0.0.1",
    "port": 8000,
    "checks": [
      {
        "script": "/usr/local/bin/check_redis.py",
        "interval": "10s"
      }
    ]
  }}

其中,服务的名字是必须的,其它的字段可以自选,包括了服务的地址,端口,相应的健康检查的脚本。当用户注册了一个服务后,就可以通过Consul来查询该服务,获得该服务的状态。

Consul支持三种Check的模式:

调用一个外部脚本(Script),在该模式下,consul定时会调用一个外部脚本,通过脚本的返回内容获得对应服务的健康状态。

调用HTTP,
import consul
import time

def is_session_exist(name, sessions):
    for s in sessions:
        if s['Name'] == name:
            return True

    return False

c = consul.Consul()

while True:
    index, sessions = c.session.list()
    if is_session_exist('worker', sessions):
        print "worker is alive ..."
    else:
        print 'worker is dead!'
        break
    time.sleep(3)

这里注意,因为是基于ttl(最小10秒)的检测,从业务中断到被检测到,至少有10秒的时延,对应需要实时响应的情景,并不适用。Zookeeper使用
import etcd

client = etcd.Client() 
client.write('/nodes/n1', 1)
print client.read('/nodes/n1').value

etcd对节点的操作和ZooKeeper类似,不过etcd不支持ZooKeeper的
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息