bigchaindb源码分析(三)——后端存储
2017-07-08 14:37
260 查看
bigchaindb源码分析(一)分析了bigchaindb如何解析命令行参数与配置文件,并据此启动了日志publisher与subscriber。对于
该函数首先创建一个Bigchain实例,Bigchain类位于
其形参前的
bigchaindb后端存储的代码位于
我们再来看
从函数名称上可以看出,
装饰器
定义:通过装饰器singledispatch来定义需要多态的函数,通过
调用
由此,我们来寻找后端存储中对
看着好复杂的样子。。按照之前装饰器的思路,我们先来进行替换吧,首先展开
也即
再把
所以这段代码实际上是调用了(注意,这里将所有变量写成了字符串,实际上均指得是对应的对象)
因此,当
在成功创建表之后,bigchaindb为这四个表分别创建二级索引,以方便进行查询,如创建名为
至此,数据库初始化完成,
bigchaindb start命令,将调用
_run_init来初始化后端存储,随之利用pipeline来启动block\vote等进程(bigchaindb源码分析(二))。本节介绍bigchaindb的后端存储。
_run_init由
run_start所调用,代码位于
commands/bigchaindb.py
def _run_init(): # Try to access the keypair, throws an exception if it does not exist b = bigchaindb.Bigchain() schema.init_database(connection=b.connection) b.create_genesis_block() logger.info('Genesis block created.')
该函数首先创建一个Bigchain实例,Bigchain类位于
core.py,其
__init__函数将从
bigchaindb.config中加载配置文件的配置,包括本节点的公私钥、联盟中其他节点的公钥、一致性算法插件、以及定位连接数据库的类等等。之后根据连接数据库的类来初始化数据库。最后创建创世区块。
创建bigchain实例
Bigchain类__init__函数连接数据库代码如下所示。当类实例化的connect参数为None时,将调用
backend.connect。
self.connection = connection if connection else backend.connect(**bigchaindb.config['database'])
其形参前的
**相当于将形参中字典的key value一一对应到函数参数列表中。举例来说:
>>> def test(a, b, c): ... print(a, b, c) ... >>> test(1,2,3) 1 2 3 >>> x=(1,2,3) >>> test(*x) 1 2 3 >>> y={'a':1,'b':2,'c':3} >>> test(**y) 1 2 3
backend.connect参数列表如下。故而在调用时,将配置文件中database下的值一一对应到参数列表中。
# 参数列表 def connect(backend=None, host=None, port=None, name=None, max_tries=None, connection_timeout=None, replicaset=None, ssl=None, login=None, password=None, ca_cert=None, certfile=None, keyfile=None, keyfile_passphrase=None, crlfile=None): # 配置文件中的database项 "database": { "password": null, "connection_timeout": 5000, "ssl": false, "replicaset": "bigchain-rs", "login": null, "max_tries": 3, "name": "bigchain", "port": 27017, "host": "localhost", "backend": "mongodb" },
bigchaindb后端存储的代码位于
backend目录下,目前包含了mongodb\rethinkdb两种后端存储数据库。
backend/connect.py中也指定了目前支持的两种后端存储。
BACKENDS = { 'mongodb': 'bigchaindb.backend.mongodb.connection.MongoDBConnection', 'rethinkdb': 'bigchaindb.backend.rethinkdb.connection.RethinkDBConnection' }
backend.connect寻找后端存储类的代码也就可以分为以下几步了:首先是获取配置文件中backend的值,之后从变量
BACKENDS中找到对应的类,再通过反射机制根据类的字符串路径加载类
# backend为mongodb backend = backend or bigchaindb.config['database']['backend'] module_name, _, class_name = BACKENDS[backend].rpartition('.') Class = getattr(import_module(module_name), class_name) return Class(host=host, port=port, dbname=dbname, max_tries=max_tries, connection_timeout=connection_timeout, replicaset=replicaset, ssl=ssl, login=login, password=password, ca_cert=ca_cert, certfile=certfile, keyfile=keyfile, keyfile_passphrase=keyfile_passphrase, crlfile=crlfile)
rpartition将返回一个三元组,分别为最后一个分隔符左边的字符串、分隔符本身、以及最后一个分隔符右边的字符串。因此,
class_name将对应于
MongoDBConnection。而后利用getattr函数加载该类,变量
Class也就对应于
MongoDBConnection类的一个实例了,最后则调用Class类。
>>> x='bigchaindb.backend.mongodb.connection.MongoDBConnection' >>> print(x.rpartition('.')) ('bigchaindb.backend.mongodb.connection', '.', 'MongoDBConnection')
我们再来看
bigchaindb.backend.mongodb.connection模块的
MongoDBConnection类。该类继承
backend.connection.Connection类。实际上,所有后端存储的连接类都应该继承Connection类,然后再实现自己所独立的操作数据库的代码。
MongoDBConnection类的
__init__函数将调用父类的初始化函数,之后将一些mongodb所独有的配置保存为类的成员,如replicaset。而父类的
__init__则将一些后端存储所共有的配置保存为类的成员,如host\port\dbname等等。
class MongoDBConnection(Connection): def __init__(self, replicaset=None, ssl=None, login=None, password=None, ca_cert=None, certfile=None, keyfile=None, keyfile_passphrase=None, crlfile=None, **kwargs): super().__init__(**kwargs) self.replicaset = replicaset or bigchaindb.config['database']['replicaset']
初始化数据库
_run_init函数在创建Bigchain实例后,将对数据库进行初始化。根据之前的分析,形参中的connection实际上是
MongoDBConnection类的实例了。
schema.init_database(connection=b.connection) # backend/schema.py def init_database(connection=None, dbname=None): connection = connection or connect() dbname = dbname or bigchaindb.config['database']['name'] create_database(connection, dbname) create_tables(connection, dbname) create_indexes(connection, dbname)
从函数名称上可以看出,
init_database的步骤为依次创建数据库、表以及索引。然而,在
backend/schema.py中这三个函数实际上居然都没有函数体,不过有个装饰器singledispatch
@singledispatch def create_database(connection, dbname): raise NotImplementedError @singledispatch def create_tables(connection, dbname): raise NotImplementedError @singledispatch def create_indexes(connection, dbname): raise NotImplementedError
装饰器
singledispatch的文档在https://pypi.python.org/pypi/singledispatch。阅读这些例子,可以看出相当于实现了函数多态,即可以通过不同的形参来调用同一个函数名的不同函数。文档中的实例如下:
定义:通过装饰器singledispatch来定义需要多态的函数,通过
@fun.register(int)或者
fun.register来进行注册
>>> from singledispatch import singledispatch >>> @singledispatch ... def fun(arg, verbose=False): ... if verbose: ... print("Let me just say,", end=" ") ... print(arg) >>> @fun.register(int) ... def _(arg, verbose=False): ... if verbose: ... print("Strength in numbers, eh?", end=" ") ... print(arg) >>> def nothing(arg, verbose=False): ... print("Nothing.") ... >>> fun.register(type(None), nothing)
调用
>>> fun("Hello, world.") Hello, world. >>> fun("test.", verbose=True) Let me just say, test. >>> fun(42, verbose=True) Strength in numbers, eh? 42 >>> fun(None) Nothing.
由此,我们来寻找后端存储中对
create_database进行了注册的函数。mongodb后端存储
backend/mongodb/schema中有以下代码:
from bigchaindb.backend.utils import module_dispatch_registrar register_schema = module_dispatch_registrar(backend.schema) @register_schema(MongoDBConnection) def create_database(conn, dbname): if dbname in conn.conn.database_names(): raise exceptions.DatabaseAlreadyExists('Database `{}` already exists' .format(dbname)) logger.info('Create database `%s`.', dbname) # TODO: read and write concerns can be declared here conn.conn.get_database(dbname)
module_dispatch_registrar的实现代码为:
def module_dispatch_registrar(module): def dispatch_wrapper(obj_type): def wrapper(func): func_name = func.__name__ try: dispatch_registrar = getattr(module, func_name) return dispatch_registrar.register(obj_type)(func) except AttributeError as ex: raise ModuleDispatchRegistrationError( ('`{module}` does not contain a single-dispatchable ' 'function named `{func}`. The module being registered ' 'was not implemented correctly!').format( func=func_name, module=module.__name__)) from ex return wrapper return dispatch_wrapper
看着好复杂的样子。。按照之前装饰器的思路,我们先来进行替换吧,首先展开
module_dispatch_registrar,代码相当于
@module_dispatch_registrar(backend.schema)(MongoDBConnection) def create_database(conn, dbname): ...
也即
@dispatch_wrapper(MongoDBConnection) def create_database(conn, dbname): ... # 其中变量module为backend.schema def dispatch_wrapper(obj_type): def wrapper(func): ... return wrapper
再把
dispatch_wrapper展开
wrapper(create_database) # 其中变量module为backend.schema,变量obj_type为MongoDBConnection def wrapper(func): ... return wrapper
所以这段代码实际上是调用了(注意,这里将所有变量写成了字符串,实际上均指得是对应的对象)
dispatch_registrar = getattr(backend.schema, create_database) dispatch_registrar.register(MongoDBConnection)(create_database)
因此,当
init_database执行代码
create_database(connection, dbname)时,由于connection的类型为
MongoDBConnection类,故而将调用的函数为
backend/mongodb/schema.py中的
create_database函数。
create_tables以及
create_indexes也是如此。
创建数据库
如此,创建数据库所调用的函数为backend.mongodb.schema.create_database,形参conn对应MongoDBConnection类的实例
@register_schema(MongoDBConnection) def create_database(conn, dbname): if dbname in conn.conn.database_names(): raise exceptions.DatabaseAlreadyExists('Database `{}` already exists' .format(dbname)) logger.info('Create database `%s`.', dbname) # TODO: read and write concerns can be declared here conn.conn.get_database(dbname)
conn.conn实际上是
MongoDBConnection的父类
Connection类的一个属性,由于在该类
__init__时,
self._conn值为None,所以将调用
self.connect(),而connect又将调用
_connect函数
self.max_tries_counter次,直到连接成功,或者抛出异常
@property def conn(self): if self._conn is None: self.connect() return self._conn def connect(self): attempt = 0 for i in self.max_tries_counter: attempt += 1 try: self._conn = self._connect() except ConnectionError as exc: ... else: break
_connect函数在去掉异常处理、函数调用时的参数列表后如下所示。函数逻辑也很清晰了,首先是初始化副本集,然后利用pymongo连接到mongodb数据库,再用配置文件中的账号与口令验证,最后返回连接并验证成功的client对象。注意最后返回的client对象将赋值给
MongoDBConnection类的
conn成员属性。
def _connect(self): try: initialize_replica_set(self.host,...) if self.ca_cert is None or self.certfile is None or \ self.keyfile is None or self.crlfile is None: client = pymongo.MongoClient(self.host,...) if self.login is not None and self.password is not None: client[self.dbname].authenticate(self.login, self.password) else: logger.info('Connecting to MongoDB over TLS/SSL...') client = pymongo.MongoClient(self.host,...) if self.login is not None: client[self.dbname].authenticate(self.login, mechanism='MONGODB-X509') return client except ...
initialize_replica_set函数也是首先连接mongodb数据库,然后检查数据库是否已经配置了副本集,并与配置文件中的副本集的名字进行匹配,若匹配成功,则说明已经初始化了副本集,否则则初始化。。
创建表与创建索引
在创建数据库成功后的下一步是创建表,这一步的代码就比较简单了:直接调用pymongo的create_collection函数,在数据库下创建四个表:bigchain、backlog、votes、assets
@register_schema(MongoDBConnection) def create_tables(conn, dbname): for table_name in ['bigchain', 'backlog', 'votes', 'assets']: logger.info('Create `%s` table.', table_name) # create the table # TODO: read and write concerns can be declared here conn.conn[dbname].create_collection(table_name)
在成功创建表之后,bigchaindb为这四个表分别创建二级索引,以方便进行查询,如创建名为
transaction_id的索引,用来支持对事务id进行查询
@register_schema(MongoDBConnection) def create_indexes(conn, dbname): create_bigchain_secondary_index(conn, dbname) create_backlog_secondary_index(conn, dbname) create_votes_secondary_index(conn, dbname) create_assets_secondary_index(conn, dbname) def create_bigchain_secondary_index(conn, dbname): logger.info('Create `bigchain` secondary index.') # to order blocks by timestamp conn.conn[dbname]['bigchain'].create_index([('block.timestamp', ASCENDING)], name='block_timestamp') # to query the bigchain for a transaction id, this field is unique conn.conn[dbname]['bigchain'].create_index('block.transactions.id', name='transaction_id') ...
至此,数据库初始化完成,
_run_init中唯一还没有阅读的是如何创建创世区块,这一部分下次再学习。。
相关文章推荐
- bigchaindb源码分析(六)——写事务(下)
- bigchaindb源码分析(一)——命令行参数与配置文件解析
- bigchaindb源码分析(九)——选举
- innodb存储引擎之innodb_io_capacity, innodb_max_dirty_pages_pct以及innodb_adaptive_flushing参数研究与源码分析(未完待续)
- PostgreSQL存储引擎源码分析一(不断更新)
- glance-0.1.7 分析(七)—— glance/store 镜像存储后端
- PostgreSQL存储引擎源码分析五(原创,不断更新)
- 【GamingAnywhere源码分析之知识补充五】Windows模拟用户session完成存储权限控制
- TFS源码(基于1.3.1)分析--存储流程
- Hadoop如何组织中间数据的存储和传输(源码级分析)2
- PHP源码分析-变量的存储方式
- PostgreSQL整体架构和存储源码的大体分析
- Hadoop源码分析之NameNode元数据的存储
- mysql innerdb引擎数据存储分析
- Apache Spark源码走读之6 -- 存储子系统分析
- innodb存储引擎之hash算法源码分析(未完待续)
- PostgreSQL存储引擎源码分析四(原创,不断更新)
- PostgreSQL存储引擎源码分析三(原创,不断更新)
- asp.net mvc源码分析-Controllerl篇 TempData数据存储
- Hadoop-datanode存储结构及源码分析<转>