您的位置:首页 > 数据库 > Redis

Rb(redis blaster),一个为 redis 实现 non-replicated 分片的 python 库

2022-04-07 10:31 1581 查看

Rb,redis blaster
,是一个为
redis
实现
非复制分片(non-replicated sharding)
的库。它在
python redis
之上实现了一个自定义路由系统,允许您自动定位不同的服务器,而无需手动将请求路由到各个节点。

它没有实现

redis
的所有功能,也没有尝试这样做。 您可以随时将客户端连接到特定主机,但大多数情况下假设您的操作仅限于可以自动路由到不同节点的基本
key/value
操作。

你可以做什么:

  • 自动针对主机进行单
    key
    操作
  • 对所有或部分节点执行命令
  • 并行执行所有这些

安装

rb
PyPI
上可用,可以从那里安装:

$ pip install rb

配置

开始使用

rb
非常简单。如果您之前一直在使用
py-redis
,您会感到宾至如归。 主要区别在于,不是连接到单个主机,而是将
cluster
配置为连接到多个:

from rb import Cluster

cluster = Cluster(hosts={
0: {'port': 6379},
1: {'port': 6380},
2: {'port': 6381},
3: {'port': 6382},
4: {'port': 6379},
5: {'port': 6380},
6: {'port': 6381},
7: {'port': 6382},
}, host_defaults={
'host': '127.0.0.1',
})

在这种情况下,我们在同一主机上的四个不同服务器进程上设置了

8
个节点。
hosts
参数是要连接的主机的映射。 字典的
key
host ID
(整数),值是参数字典。
host_defaults
是为所有主机填写的可选默认值字典。 如果您想共享一些重复的常见默认值(在这种情况下,所有主机都连接到
localhost
),这很有用。

在默认配置中,

PartitionRouter
用于路由。

路由

现在集群已经构建好了,我们可以使用

Cluster.get_routing_client()
来获取一个
redis
客户端,它会为每个命令自动路由到正确的
redis
节点:

client = cluster.get_routing_client()
results = {}
for key in keys_to_look_up:
results[key] = client.get(key)

该客户端的工作原理与标准的

pyredis StrictClient
非常相似,主要区别在于它只能执行只涉及一个
key
的命令。

然而,这个基本操作是串联运行的。使

rb
有用的是它可以自动构建
redis
管道并将查询并行发送到许多主机。但是,这会稍微改变用法,因为现在该值无法立即使用:

results = {}
with cluster.map() as client:
for key in keys_to_look_up:
results[key] = client.get(key)

虽然到目前为止看起来很相似,但不是将实际值存储在

result
字典中,而是存储
Promise
对象。当
map context manager
结束时,它们保证已经被执行,您可以访问
Promise.value
属性来获取值:

for key, promise in results.iteritems():
print '%s: %s' % (key, promise.value)

如果要向所有参与的主机发送命令(例如删除数据库),可以使用

Cluster.all()
方法:

with cluster.all() as client:
client.flushdb()

如果你这样做,

promise
值是一个字典,其中
host ID
作为
key
,结果作为
value
。举个例子:

with cluster.all() as client:
results = client.info()
for host_id, info in results.iteritems():
print 'host %s is running %s' % (host_id, info['os'])

要明确针对某些主机,您可以使用

Cluster.fanout()
接受要将命令发送到
host ID
列表。

API

这是公共 API 的完整参考。请注意,此库扩展了

Python redis
库,因此其中一些类具有更多功能,您需要查阅
py-redis
库。

Cluster

class rb.Cluster(hosts, host_defaults=None, pool_cls=None, pool_options=None, router_cls=None, router_options=None)

cluster
rb
背后的核心对象。 它保存到各个节点的连接池,并且可以在应用程序运行期间在中央位置共享。

具有默认

router
的四个
redis
实例上的集群的基本示例:

cluster = Cluster(hosts={
0: {'port': 6379},
1: {'port': 6380},
2: {'port': 6381},
3: {'port': 6382},
}, host_defaults={
'host': '127.0.0.1',
})

hosts
是一个主机字典,它将
host ID
数量映射到配置参数。参数对应于
add_host()
函数的签名。这些参数的默认值是从
host_defaults
中提取的。要覆盖
pool
类,可以使用
pool_cls
pool_options
参数。这同样适用于
router
router_cls
router_options
pool
选项对于设置
socket
超时和类似参数很有用。

  • add_host(host_id=None, host='localhost', port=6379, unix_socket_path=None, db=0, password=None, ssl=False, ssl_options=None)
    将新主机添加到集群。 这仅对单元测试真正有用,因为通常主机是通过构造函数添加的,并且在第一次使用集群后进行更改不太可能有意义。
  • all(timeout=None, max_concurrency=64, auto_batch=True)
      扇出到所有主机。其他方面与
      fanout()
      完全一样。
    • 例子:
      with cluster.all() as client:
      client.flushdb()
  • disconnect_pools()
      断开与内部池的所有连接。
  • execute_commands(mapping, *args, **kwargs)

      同时在

      Redis
      集群上执行与路由
      key
      关联的一系列命令,返回一个新映射,其中值是与同一位置的命令对应的结果列表。例如:

      >>> cluster.execute_commands({
      ...   'foo': [
      ...     ('PING',),
      ...     ('TIME',),
      ...   ],
      ...   'bar': [
      ...     ('CLIENT', 'GETNAME'),
      ...   ],
      ... })
      {'bar': [<Promise None>],
      'foo': [<Promise True>, <Promise (1454446079, 418404)>]}
    • 作为

      redis.client.Script
      实例的命令将首先检查它们在目标节点上的存在,然后在执行之前加载到目标上,并且可以与其他命令交错:

      >>> from redis.client import Script
      >>> TestScript = Script(None, 'return {KEYS, ARGV}')
      >>> cluster.execute_commands({
      ...   'foo': [
      ...     (TestScript, ('key:1', 'key:2'), range(0, 3)),
      ...   ],
      ...   'bar': [
      ...     (TestScript, ('key:3', 'key:4'), range(3, 6)),
      ...   ],
      ... })
      {'bar': [<Promise [['key:3', 'key:4'], ['3', '4', '5']]>],
      'foo': [<Promise [['key:1', 'key:2'], ['0', '1', '2']]>]}

      在内部,

      FanoutClient
      用于发出命令。

  • fanout(hosts=None, timeout=None, max_concurrency=64, auto_batch=True)
      用于获取路由客户端、开始扇出操作并
      join
      结果的快捷上下文管理器。
    • 在上下文管理器中,可用的客户端是
      FanoutClient
      。示例用法:
      with cluster.fanout(hosts='all') as client:
      client.flushdb()
  • get_local_client(host_id)
      返回特定主机
      ID
      的本地化
      client
      。这个
      client
      就像一个普通的
      Python redis
      客户端一样工作,并立即返回结果。
  • get_local_client_for_key(key)
      类似于
      get_local_client_for_key()
      但根据
      router
      所说的
      key
      目的地返回
      client
  • get_pool_for_host(host_id)
      返回给定主机的连接池。
    • redis 客户端使用此连接池来确保它不必不断地重新连接。如果要使用自定义 redis 客户端,可以手动将其作为连接池传入。
  • get_router()
      返回
      cluster
      router
      。如果
      cluster
      重新配置,
      router
      将被重新创建。 通常,您不需要自己与
      router
      交互,因为集群的路由客户端会自动执行此操作。
    • 这将返回
      BaseRouter
      的一个实例。
  • get_routing_client(auto_batch=True)
      返回一个路由客户端。该客户端能够自动将请求路由到各个主机。 它是线程安全的,可以类似于主机本地客户端使用,但它会拒绝执行无法直接路由到单个节点的命令。
    • 路由客户端的默认行为是尝试将符合条件的命令批处理成批处理版本。 例如,路由到同一节点的多个
      GET
      命令最终可以合并为一个
      MGET
      命令。可以通过将
      auto_batch
      设置为
      False
      来禁用此行为。这对于调试很有用,因为
      MONITOR
      将更准确地反映代码中发出的命令。
    • 有关详细信息,请参阅
      RoutingClient
  • map(timeout=None, max_concurrency=64, auto_batch=True)
      用于获取路由客户端、开始映射操作并
      join
      结果的快捷上下文管理器。
      max_concurrency
      定义在隐式连接发生之前可以存在多少未完成的并行查询。
    • 在上下文管理器中,可用的客户端是
      MappingClient
      。示例用法:
      results = {}
      with cluster.map() as client:
      for key in keys_to_fetch:
      results[key] = client.get(key)
      for key, promise in results.iteritems():
      print '%s => %s' % (key, promise.value)
  • remove_host(host_id)
      client
      中删除
      host
      。这仅对单元测试真正有用。

    Clients

    class rb.RoutingClient(cluster, auto_batch=True)

    可以路由到单个目标的客户端。

    有关参数,请参见

    Cluster.get_routing_client()

    • execute_command(*args, **options)
      执行命令并返回解析后的响应
  • fanout(hosts=None, timeout=None, max_concurrency=64, auto_batch=None)
      返回映射操作的
      context manager
      ,该操作扇出到手动指定的主机,而不是使用路由系统。 例如,这可用于清空所有主机上的数据库。
      context manager
      返回一个
      FanoutClient
      。 示例用法:
      with cluster.fanout(hosts=[0, 1, 2, 3]) as client:
      results = client.info()
      for host_id, info in results.value.iteritems():
      print '%s -> %s' % (host_id, info['is'])
    • 返回的
      promise
      将所有结果累积到由
      host_id
      键入的字典中。
    • hosts
      参数是一个
      host_id
      列表,或者是字符串
      'all'
      ,用于将命令发送到所有主机。
    • fanout API
      需要非常小心地使用,因为当
      key
      被写入不期望它们的主机时,它可能会造成很多损坏。
  • get_fanout_client(hosts, max_concurrency=64, auto_batch=None)
      返回线程不安全的扇出客户端。
    • 返回
      FanoutClient
      的实例。
  • get_mapping_client(max_concurrency=64, auto_batch=None)
      返回一个线程不安全的映射客户端。此客户端的工作方式类似于
      redis
      管道并返回最终结果对象。它需要
      join
      才能正常工作。您应该使用自动
      join
      map()
      上下文管理器,而不是直接使用它。
    • 返回
      MappingClient
      的一个实例。
  • map(timeout=None, max_concurrency=64, auto_batch=None)
      返回映射操作的
      context manager
      。 这会并行运行多个查询,然后最后
      join
      以收集所有结果。
    • 在上下文管理器中,可用的客户端是
      MappingClient
      。示例用法:
      results = {}
      with cluster.map() as client:
      for key in keys_to_fetch:
      results[key] = client.get(key)
      for key, promise in results.iteritems():
      print '%s => %s' % (key, promise.value)

    class rb.MappingClient(connection_pool, max_concurrency=None, auto_batch=True)

    路由客户端使用

    cluster
    router
    根据执行的
    redis
    命令的
    key
    自动定位单个节点。

    有关参数,请参见

    Cluster.map()

    • cancel()
      取消所有未完成的请求。
  • execute_command(*args, **options)
      执行命令并返回解析后的响应
  • join(timeout=None)
      等待所有未完成的响应返回或超时
  • mget(keys, *args)
      返回与
      key
      顺序相同的值列表
  • mset(*args, **kwargs)
      根据映射设置
      key/value
      。映射是
      key/value
      对的字典。
      key
      value
      都应该是可以通过
      str()
      转换为
      string
      的字符串或类型。

    class rb.FanoutClient(hosts, connection_pool, max_concurrency=None, auto_batch=True)

    这与

    MappingClient
    的工作方式相似,但它不是使用
    router
    来定位主机,而是将命令发送到所有手动指定的主机。

    结果累积在由

    host_id
    键入的字典中。

    有关参数,请参见

    Cluster.fanout()

    • execute_command(*args, **options)
      执行命令并返回解析后的响应
  • target(hosts)
      为一次调用临时重新定位
      client
      。当必须为一次调用处理主机
      subset
      时,这很有用。
  • target_key(key)
      临时重新定位客户端以进行一次调用,以专门路由到给定
      key
      路由到的一台主机。 在这种情况下,
      promise
      的结果只是一个主机的值而不是字典。
    • 1.3
      版中的新功能。

    Promise

    class rb.Promise

    一个尝试为

    Promise
    对象镜像
    ES6 API
    Promise
    对象。与
    ES6
    Promise
    不同,这个
    Promise
    也直接提供对底层值的访问,并且它有一些稍微不同的静态方法名称,因为这个
    Promise
    可以在外部解析。

    • static all(iterable_or_dict)
      当所有传递的
      promise
      都解决时,
      promise
      就解决了。你可以传递一个
      promise
      列表或一个
      promise
      字典。
  • done(on_success=None, on_failure=None)
      将一些回调附加到
      Promise
      并返回
      Promise
  • is_pending
      如果
      promise
      仍然等待,则为
      True
      ,否则为
      False
  • is_rejected
      如果
      promise
      被拒绝,则为
      True
      ,否则为
      False
  • is_resolved
      如果
      promise
      已解决,则为
      True
      ,否则为
      False
  • reason
      如果它被拒绝,这个
      promise
      的原因。
  • reject(reason)
      以给定的理由拒绝
      promise
  • static rejected(reason)
      创建一个以特定值被拒绝的
      promise
      对象。
  • resolve(value)
      用给定的值解决
      promise
  • static resolved(value)
      创建一个以特定值解析的
      promise
      对象。
  • then(success=None, failure=None)
      Promise
      添加成功和/或失败回调的实用方法,该方法还将在此过程中返回另一个
      Promise
  • value
      如果它被解决,这个
      promise
      所持有的值。

    Routers

    class rb.BaseRouter(cluster)

    所有路由的基类。如果你想实现一个自定义路由,这就是你的子类。

    • cluster
      引用回此
      router
      所属的
      Cluster
  • get_host_for_command(command, args)
      返回应执行此命令的主机。
  • get_host_for_key(key)
      执行路由并返回目标的
      host_id
    • 子类需要实现这一点。
  • get_key(command, args)
      返回命令操作的
      key

    class rb.ConsistentHashingRouter(cluster)

    基于一致哈希算法返回

    host_id
    router
    。 一致的哈希算法仅在提供
    key
    参数时才有效。

    router
    要求主机是无间隙的,这意味着
    N
    台主机的
    ID
    范围从
    0
    N-1

    • get_host_for_key(key)
      执行路由并返回目标的
      host_id
    • 子类需要实现这一点。

    class rb.PartitionRouter(cluster)

    一个简单的

    router
    ,仅根据简单的
    crc32 % node_count
    设置将命令单独路由到单个节点。

    router
    要求主机是无间隙的,这意味着
    N
    台主机的
    ID
    范围从
    0
    N-1

    • get_host_for_key(key) 执行路由并返回目标的
      host_id
    • 子类需要实现这一点。

    exception rb.UnroutableCommand

    如果发出的命令无法通过

    router
    路由到单个主机,则引发。

    Testing

    class rb.testing.TestSetup(servers=4, databases_each=8, server_executable='redis-server')

    测试设置是生成多个

    redis
    服务器进行测试并自动关闭它们的便捷方式。 这可以用作
    context manager
    来自动终止客户端。

    • rb.testing.make_test_cluster(*args, **kwargs)
      用于创建测试设置然后从中创建
      cluster
      的便捷快捷方式。这必须用作
      context manager
      from rb.testing import make_test_cluster
      with make_test_cluster() as cluster:
      ...
  • 内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: 
    相关文章推荐