您的位置:首页 > 其它

Zookeeper深入理解(三)kazoo接口

2016-03-28 22:44 183 查看
zookeeper的开发接口以前主要以java和c为主,随着python项目越来越多的使用zookeeper作为分布式集群实现,python的zookeeper接口也出现了很多,现在主流的纯python的zookeeper接口是kazoo。因此如何使用kazoo开发基于python的分布式程序是必须掌握的。

1.安装kazoo

yum install python-pip

pip install kazoo

安装过程中会出现一些python依赖包未安装的情况,安装即可。

2.运行kazoo基础例子kazoo_basic.py

import time

from kazoo.client import KazooClient

from kazoo.client import KazooState

def main():

zk=KazooClient(hosts='127.0.0.1:2182')

zk.start()

@zk.add_listener

def my_listener(state):

if state == KazooState.LOST:

print("LOST")

elif state == KazooState.SUSPENDED:

print("SUSPENDED")

else:

print("Connected")

#Creating Nodes

# Ensure a path, create if necessary

zk.ensure_path("/my/favorite")

# Create a node with data

zk.create("/my/favorite/node", b"")

zk.create("/my/favorite/node/a", b"A")

#Reading Data

# Determine if a node exists

if zk.exists("/my/favorite"):

print("/my/favorite is existed")

@zk.ChildrenWatch("/my/favorite/node")

def watch_children(children):

print("Children are now: %s" % children)

# Above function called immediately, and from then on

@zk.DataWatch("/my/favorite/node")

def watch_node(data, stat):

print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))

# Print the version of a node and its data

data, stat = zk.get("/my/favorite/node")

print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))

# List the children

children = zk.get_children("/my/favorite/node")

print("There are %s children with names %s" % (len(children), children))

#Updating Data

zk.set("/my/favorite", b"some data")

#Deleting Nodes

zk.delete("/my/favorite/node/a")

#Transactions

transaction = zk.transaction()

transaction.check('/my/favorite/node', version=-1)

transaction.create('/my/favorite/node/b', b"B")

results = transaction.commit()

print ("Transaction results is %s" % results)

zk.delete("/my/favorite/node/b")

zk.delete("/my", recursive=True)

time.sleep(2)

zk.stop()

if __name__ == "__main__":

try:

main()

except Exception, ex:

print "Ocurred Exception: %s" % str(ex)

quit()

运行结果:

Children are now: [u'a']

Version: 0, data:

Version: 0, data:

There are 1 children with names [u'a']

Children are now: []

Transaction results is [True, u'/my/favorite/node/b']

Children are now: [u'b']

Children are now: []

No handlers could be found for logger "kazoo.recipe.watchers"

LOST

以上程序运行了基本kazoo接口命令,包括创建删除加watcher等操作,通过调试并对比zookeeper服务节点znode目录结构的变化,就可以理解具体的操作结果。

3.运行通过kazoo实现的分布式锁程序kazoo_lock.py

import logging, os, time

from kazoo.client import KazooClient

from kazoo.client import KazooState

from kazoo.recipe.lock import Lock

class ZooKeeperLock():

def __init__(self, hosts, id_str, lock_name, logger=None, timeout=1):

self.hosts = hosts

self.id_str = id_str

self.zk_client = None

self.timeout = timeout

self.logger = logger

self.name = lock_name

self.lock_handle = None

self.create_lock()

def create_lock(self):

try:

self.zk_client = KazooClient(hosts=self.hosts, logger=self.logger, timeout=self.timeout)

self.zk_client.start(timeout=self.timeout)

except Exception, ex:

self.init_ret = False

self.err_str = "Create KazooClient failed! Exception: %s" % str(ex)

logging.error(self.err_str)

return

try:

lock_path = os.path.join("/", "locks", self.name)

self.lock_handle = Lock(self.zk_client, lock_path)

except Exception, ex:

self.init_ret = False

self.err_str = "Create lock failed! Exception: %s" % str(ex)

logging.error(self.err_str)

return

def destroy_lock(self):

#self.release()

if self.zk_client != None:

self.zk_client.stop()

self.zk_client = None

def acquire(self, blocking=True, timeout=None):

if self.lock_handle == None:

return None

try:

return self.lock_handle.acquire(blocking=blocking, timeout=timeout)

except Exception, ex:

self.err_str = "Acquire lock failed! Exception: %s" % str(ex)

logging.error(self.err_str)

return None

def release(self):

if self.lock_handle == None:

return None

return self.lock_handle.release()

def __del__(self):

self.destroy_lock()

def main():

logger = logging.getLogger()

logger.setLevel(logging.INFO)

sh = logging.StreamHandler()

formatter = logging.Formatter('%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s')

sh.setFormatter(formatter)

logger.addHandler(sh)

zookeeper_hosts = "127.0.0.1:2182"

lock_name = "test"

lock = ZooKeeperLock(zookeeper_hosts, "myid is 1", lock_name, logger=logger)

ret = lock.acquire()

if not ret:

logging.info("Can't get lock! Ret: %s", ret)

return

logging.info("Get lock! Do something! Sleep 10 secs!")

for i in range(1, 11):

time.sleep(1)

print str(i)

lock.release()

if __name__ == "__main__":

try:

main()

except Exception, ex:

print "Ocurred Exception: %s" % str(ex)

quit()

将该测试文件copy到多个服务器,同时运行,就可以看到分布式锁的效果了。

参考链接:
http://kazoo.readthedocs.org/en/latest/basic_usage.html http://yunjianfei.iteye.com/blog/2164888
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: