Rb(redis blaster),一个为 redis 实现 non-replicated 分片的 python 库
2022-04-07 10:31
1581 查看
Rb,redis blaster,是一个为
redis实现
非复制分片(non-replicated sharding)的库。它在
python redis之上实现了一个自定义路由系统,允许您自动定位不同的服务器,而无需手动将请求路由到各个节点。
它没有实现
redis的所有功能,也没有尝试这样做。 您可以随时将客户端连接到特定主机,但大多数情况下假设您的操作仅限于可以自动路由到不同节点的基本
key/value操作。
你可以做什么:
- 自动针对主机进行单
key
操作 - 对所有或部分节点执行命令
- 并行执行所有这些
安装
rb在
PyPI上可用,可以从那里安装:
$ pip install rb
配置
开始使用
rb非常简单。如果您之前一直在使用
py-redis,您会感到宾至如归。 主要区别在于,不是连接到单个主机,而是将
cluster配置为连接到多个:
from rb import Cluster cluster = Cluster(hosts={ 0: {'port': 6379}, 1: {'port': 6380}, 2: {'port': 6381}, 3: {'port': 6382}, 4: {'port': 6379}, 5: {'port': 6380}, 6: {'port': 6381}, 7: {'port': 6382}, }, host_defaults={ 'host': '127.0.0.1', })
在这种情况下,我们在同一主机上的四个不同服务器进程上设置了
8个节点。
hosts参数是要连接的主机的映射。 字典的
key是
host ID(整数),值是参数字典。
host_defaults是为所有主机填写的可选默认值字典。 如果您想共享一些重复的常见默认值(在这种情况下,所有主机都连接到
localhost),这很有用。
在默认配置中,
PartitionRouter用于路由。
路由
现在集群已经构建好了,我们可以使用
Cluster.get_routing_client()来获取一个
redis客户端,它会为每个命令自动路由到正确的
redis节点:
client = cluster.get_routing_client() results = {} for key in keys_to_look_up: results[key] = client.get(key)
该客户端的工作原理与标准的
pyredis StrictClient非常相似,主要区别在于它只能执行只涉及一个
key的命令。
然而,这个基本操作是串联运行的。使
rb有用的是它可以自动构建
redis管道并将查询并行发送到许多主机。但是,这会稍微改变用法,因为现在该值无法立即使用:
results = {} with cluster.map() as client: for key in keys_to_look_up: results[key] = client.get(key)
虽然到目前为止看起来很相似,但不是将实际值存储在
result字典中,而是存储
Promise对象。当
map context manager结束时,它们保证已经被执行,您可以访问
Promise.value属性来获取值:
for key, promise in results.iteritems(): print '%s: %s' % (key, promise.value)
如果要向所有参与的主机发送命令(例如删除数据库),可以使用
Cluster.all()方法:
with cluster.all() as client: client.flushdb()
如果你这样做,
promise值是一个字典,其中
host ID作为
key,结果作为
value。举个例子:
with cluster.all() as client: results = client.info() for host_id, info in results.iteritems(): print 'host %s is running %s' % (host_id, info['os'])
要明确针对某些主机,您可以使用
Cluster.fanout()接受要将命令发送到
host ID列表。
API
这是公共 API 的完整参考。请注意,此库扩展了
Python redis库,因此其中一些类具有更多功能,您需要查阅
py-redis库。
Cluster
class rb.Cluster(hosts, host_defaults=None, pool_cls=None, pool_options=None, router_cls=None, router_options=None)
cluster是
rb背后的核心对象。 它保存到各个节点的连接池,并且可以在应用程序运行期间在中央位置共享。
具有默认
router的四个
redis实例上的集群的基本示例:
cluster = Cluster(hosts={ 0: {'port': 6379}, 1: {'port': 6380}, 2: {'port': 6381}, 3: {'port': 6382}, }, host_defaults={ 'host': '127.0.0.1', })
hosts是一个主机字典,它将
host ID数量映射到配置参数。参数对应于
add_host()函数的签名。这些参数的默认值是从
host_defaults中提取的。要覆盖
pool类,可以使用
pool_cls和
pool_options参数。这同样适用于
router的
router_cls和
router_options。
pool选项对于设置
socket超时和类似参数很有用。
add_host(host_id=None, host='localhost', port=6379, unix_socket_path=None, db=0, password=None, ssl=False, ssl_options=None)
将新主机添加到集群。 这仅对单元测试真正有用,因为通常主机是通过构造函数添加的,并且在第一次使用集群后进行更改不太可能有意义。
all(timeout=None, max_concurrency=64, auto_batch=True)
-
扇出到所有主机。其他方面与
fanout()完全一样。
with cluster.all() as client: client.flushdb()
disconnect_pools()
-
断开与内部池的所有连接。
execute_commands(mapping, *args, **kwargs)
同时在
Redis集群上执行与路由
key关联的一系列命令,返回一个新映射,其中值是与同一位置的命令对应的结果列表。例如:
>>> cluster.execute_commands({ ... 'foo': [ ... ('PING',), ... ('TIME',), ... ], ... 'bar': [ ... ('CLIENT', 'GETNAME'), ... ], ... }) {'bar': [<Promise None>], 'foo': [<Promise True>, <Promise (1454446079, 418404)>]}
作为
redis.client.Script实例的命令将首先检查它们在目标节点上的存在,然后在执行之前加载到目标上,并且可以与其他命令交错:
>>> from redis.client import Script >>> TestScript = Script(None, 'return {KEYS, ARGV}') >>> cluster.execute_commands({ ... 'foo': [ ... (TestScript, ('key:1', 'key:2'), range(0, 3)), ... ], ... 'bar': [ ... (TestScript, ('key:3', 'key:4'), range(3, 6)), ... ], ... }) {'bar': [<Promise [['key:3', 'key:4'], ['3', '4', '5']]>], 'foo': [<Promise [['key:1', 'key:2'], ['0', '1', '2']]>]}
在内部,
FanoutClient用于发出命令。
fanout(hosts=None, timeout=None, max_concurrency=64, auto_batch=True)
-
用于获取路由客户端、开始扇出操作并
join结果的快捷上下文管理器。
FanoutClient。示例用法:
with cluster.fanout(hosts='all') as client: client.flushdb()
get_local_client(host_id)
-
返回特定主机
ID的本地化
client。这个
client就像一个普通的
Python redis客户端一样工作,并立即返回结果。
get_local_client_for_key(key)
-
类似于
get_local_client_for_key()但根据
router所说的
key目的地返回
client。
get_pool_for_host(host_id)
-
返回给定主机的连接池。
get_router()
-
返回
cluster的
router。如果
cluster重新配置,
router将被重新创建。 通常,您不需要自己与
router交互,因为集群的路由客户端会自动执行此操作。
BaseRouter的一个实例。
get_routing_client(auto_batch=True)
-
返回一个路由客户端。该客户端能够自动将请求路由到各个主机。 它是线程安全的,可以类似于主机本地客户端使用,但它会拒绝执行无法直接路由到单个节点的命令。
GET命令最终可以合并为一个
MGET命令。可以通过将
auto_batch设置为
False来禁用此行为。这对于调试很有用,因为
MONITOR将更准确地反映代码中发出的命令。
RoutingClient。
map(timeout=None, max_concurrency=64, auto_batch=True)
-
用于获取路由客户端、开始映射操作并
join结果的快捷上下文管理器。
max_concurrency定义在隐式连接发生之前可以存在多少未完成的并行查询。
MappingClient。示例用法:
results = {} with cluster.map() as client: for key in keys_to_fetch: results[key] = client.get(key) for key, promise in results.iteritems(): print '%s => %s' % (key, promise.value)
remove_host(host_id)
-
从
client中删除
host。这仅对单元测试真正有用。
Clients
class rb.RoutingClient(cluster, auto_batch=True)
可以路由到单个目标的客户端。
有关参数,请参见
Cluster.get_routing_client()。
execute_command(*args, **options)
执行命令并返回解析后的响应
fanout(hosts=None, timeout=None, max_concurrency=64, auto_batch=None)
-
返回映射操作的
context manager,该操作扇出到手动指定的主机,而不是使用路由系统。 例如,这可用于清空所有主机上的数据库。
context manager返回一个
FanoutClient。 示例用法:
with cluster.fanout(hosts=[0, 1, 2, 3]) as client: results = client.info() for host_id, info in results.value.iteritems(): print '%s -> %s' % (host_id, info['is'])
promise将所有结果累积到由
host_id键入的字典中。
hosts参数是一个
host_id列表,或者是字符串
'all',用于将命令发送到所有主机。
fanout API需要非常小心地使用,因为当
key被写入不期望它们的主机时,它可能会造成很多损坏。
get_fanout_client(hosts, max_concurrency=64, auto_batch=None)
-
返回线程不安全的扇出客户端。
FanoutClient的实例。
get_mapping_client(max_concurrency=64, auto_batch=None)
-
返回一个线程不安全的映射客户端。此客户端的工作方式类似于
redis管道并返回最终结果对象。它需要
join才能正常工作。您应该使用自动
join的
map()上下文管理器,而不是直接使用它。
MappingClient的一个实例。
map(timeout=None, max_concurrency=64, auto_batch=None)
-
返回映射操作的
context manager。 这会并行运行多个查询,然后最后
join以收集所有结果。
MappingClient。示例用法:
results = {} with cluster.map() as client: for key in keys_to_fetch: results[key] = client.get(key) for key, promise in results.iteritems(): print '%s => %s' % (key, promise.value)
class rb.MappingClient(connection_pool, max_concurrency=None, auto_batch=True)
路由客户端使用
cluster的
router根据执行的
redis命令的
key自动定位单个节点。
有关参数,请参见
Cluster.map()。
cancel()
取消所有未完成的请求。
execute_command(*args, **options)
-
执行命令并返回解析后的响应
join(timeout=None)
-
等待所有未完成的响应返回或超时
mget(keys, *args)
-
返回与
key顺序相同的值列表
mset(*args, **kwargs)
-
根据映射设置
key/value。映射是
key/value对的字典。
key和
value都应该是可以通过
str()转换为
string的字符串或类型。
class rb.FanoutClient(hosts, connection_pool, max_concurrency=None, auto_batch=True)
这与
MappingClient的工作方式相似,但它不是使用
router来定位主机,而是将命令发送到所有手动指定的主机。
结果累积在由
host_id键入的字典中。
有关参数,请参见
Cluster.fanout()。
execute_command(*args, **options)
执行命令并返回解析后的响应
target(hosts)
-
为一次调用临时重新定位
client。当必须为一次调用处理主机
subset时,这很有用。
target_key(key)
-
临时重新定位客户端以进行一次调用,以专门路由到给定
key路由到的一台主机。 在这种情况下,
promise的结果只是一个主机的值而不是字典。
1.3版中的新功能。
Promise
class rb.Promise
一个尝试为
Promise对象镜像
ES6 API的
Promise对象。与
ES6的
Promise不同,这个
Promise也直接提供对底层值的访问,并且它有一些稍微不同的静态方法名称,因为这个
Promise可以在外部解析。
static all(iterable_or_dict)
当所有传递的promise
都解决时,promise
就解决了。你可以传递一个promise
列表或一个promise
字典。
done(on_success=None, on_failure=None)
-
将一些回调附加到
Promise并返回
Promise。
is_pending
-
如果
promise仍然等待,则为
True,否则为
False。
is_rejected
-
如果
promise被拒绝,则为
True,否则为
False。
is_resolved
-
如果
promise已解决,则为
True,否则为
False。
reason
-
如果它被拒绝,这个
promise的原因。
reject(reason)
-
以给定的理由拒绝
promise。
static rejected(reason)
-
创建一个以特定值被拒绝的
promise对象。
resolve(value)
-
用给定的值解决
promise。
static resolved(value)
-
创建一个以特定值解析的
promise对象。
then(success=None, failure=None)
-
向
Promise添加成功和/或失败回调的实用方法,该方法还将在此过程中返回另一个
Promise。
value
-
如果它被解决,这个
promise所持有的值。
Routers
class rb.BaseRouter(cluster)
所有路由的基类。如果你想实现一个自定义路由,这就是你的子类。
cluster
引用回此router
所属的Cluster
。
get_host_for_command(command, args)
-
返回应执行此命令的主机。
get_host_for_key(key)
-
执行路由并返回目标的
host_id。
get_key(command, args)
-
返回命令操作的
key。
class rb.ConsistentHashingRouter(cluster)
基于一致哈希算法返回
host_id的
router。 一致的哈希算法仅在提供
key参数时才有效。
该
router要求主机是无间隙的,这意味着
N台主机的
ID范围从
0到
N-1。
get_host_for_key(key)
执行路由并返回目标的host_id
。- 子类需要实现这一点。
class rb.PartitionRouter(cluster)
一个简单的
router,仅根据简单的
crc32 % node_count设置将命令单独路由到单个节点。
该
router要求主机是无间隙的,这意味着
N台主机的
ID范围从
0到
N-1。
- get_host_for_key(key)
执行路由并返回目标的
host_id
。 - 子类需要实现这一点。
exception rb.UnroutableCommand
如果发出的命令无法通过
router路由到单个主机,则引发。
Testing
class rb.testing.TestSetup(servers=4, databases_each=8, server_executable='redis-server')
测试设置是生成多个
redis服务器进行测试并自动关闭它们的便捷方式。 这可以用作
context manager来自动终止客户端。
rb.testing.make_test_cluster(*args, **kwargs)
用于创建测试设置然后从中创建cluster
的便捷快捷方式。这必须用作context manager
:from rb.testing import make_test_cluster with make_test_cluster() as cluster: ...
相关文章推荐
- 利用redis的订阅和发布来实现实时监控的一个DEMO(Python版本)
- 用python实现一个redis的zset数据结构
- Python使用Redis实现一个简单作业调度系统
- 利用redis的订阅和发布来实现实时监控的一个DEMO(Python版本)
- 一个无聊男人的疯狂《数据结构与算法分析-C++描述》学习笔记 用C++/lua/python/bash的四重实现(4)二分搜索算法
- 一个无聊男人的疯狂《数据结构与算法分析-C++描述》学习笔记 用C++/lua/python/bash的四重实现(5)欧几里得算法欧几里得算法求最大公约数
- 初学python 用shelve 实现的一个contacts
- 一个无聊男人的疯狂《数据结构与算法分析-C++描述》学习笔记 用C++/lua/python/bash的四重实现(3) 最大子序列和问题
- 推荐一个机器学习框架——python实现
- 用python实现一个抓取腾讯电影的爬虫
- 发布一个参考ssdb,用go实现的类似redis的高性能nosql:ledisdb
- 一个无聊男人的疯狂《数据结构与算法分析-C++描述》学习笔记 用C++/lua/python/bash的四重实现(7)习题2.8 随机数组的三种生成算法
- 用Python实现一个细粒度hadoop作业监控分析工具
- 使用Python实现一个小型的航空订票系统(3)
- Python 实现在对一个目录下所有文件,指定某一行之后添加内容(批处理脚本)
- 用python实现一个抓取电影的爬虫
- 用Python socket 实现一个简单的http服务器(post 与get 的区别)、CGI、WSGI、MVC
- 用Python实现一个简单的文件传输协议
- python cgi ajax - 使用CGIHTTPServer实现一个ajax程序
- 用python实现的又一个Console下的进度条