您的位置:首页 > 大数据 > 人工智能

bigchaindb源码分析(三)——后端存储

2017-07-08 14:37 260 查看
bigchaindb源码分析(一)分析了bigchaindb如何解析命令行参数与配置文件,并据此启动了日志publisher与subscriber。对于
bigchaindb start
命令,将调用
_run_init
来初始化后端存储,随之利用pipeline来启动block\vote等进程(bigchaindb源码分析(二))。本节介绍bigchaindb的后端存储。

_run_init
run_start
所调用,代码位于
commands/bigchaindb.py


def _run_init():
# Try to access the keypair, throws an exception if it does not exist
b = bigchaindb.Bigchain()

schema.init_database(connection=b.connection)

b.create_genesis_block()
logger.info('Genesis block created.')


该函数首先创建一个Bigchain实例,Bigchain类位于
core.py
,其
__init__
函数将从
bigchaindb.config
中加载配置文件的配置,包括本节点的公私钥、联盟中其他节点的公钥、一致性算法插件、以及定位连接数据库的类等等。之后根据连接数据库的类来初始化数据库。最后创建创世区块。

创建bigchain实例

Bigchain类
__init__
函数连接数据库代码如下所示。当类实例化的connect参数为None时,将调用
backend.connect


self.connection = connection if connection else backend.connect(**bigchaindb.config['database'])


其形参前的
**
相当于将形参中字典的key value一一对应到函数参数列表中。举例来说:

>>> def test(a, b, c):
...     print(a, b, c)
...
>>> test(1,2,3)
1 2 3
>>> x=(1,2,3)
>>> test(*x)
1 2 3
>>> y={'a':1,'b':2,'c':3}
>>> test(**y)
1 2 3


backend.connect
参数列表如下。故而在调用时,将配置文件中database下的值一一对应到参数列表中。

# 参数列表
def connect(backend=None, host=None, port=None, name=None, max_tries=None,
connection_timeout=None, replicaset=None, ssl=None, login=None, password=None,
ca_cert=None, certfile=None, keyfile=None, keyfile_passphrase=None,
crlfile=None):

# 配置文件中的database项
"database": {
"password": null,
"connection_timeout": 5000,
"ssl": false,
"replicaset": "bigchain-rs",
"login": null,
"max_tries": 3,
"name": "bigchain",
"port": 27017,
"host": "localhost",
"backend": "mongodb"
},


bigchaindb后端存储的代码位于
backend
目录下,目前包含了mongodb\rethinkdb两种后端存储数据库。
backend/connect.py
中也指定了目前支持的两种后端存储。

BACKENDS = {
'mongodb': 'bigchaindb.backend.mongodb.connection.MongoDBConnection',
'rethinkdb': 'bigchaindb.backend.rethinkdb.connection.RethinkDBConnection'
}


backend.connect
寻找后端存储类的代码也就可以分为以下几步了:首先是获取配置文件中backend的值,之后从变量
BACKENDS
中找到对应的类,再通过反射机制根据类的字符串路径加载类

# backend为mongodb
backend = backend or bigchaindb.config['database']['backend']

module_name, _, class_name = BACKENDS[backend].rpartition('.')
Class = getattr(import_module(module_name), class_name)

return Class(host=host, port=port, dbname=dbname,
max_tries=max_tries, connection_timeout=connection_timeout,
replicaset=replicaset, ssl=ssl, login=login, password=password,
ca_cert=ca_cert, certfile=certfile, keyfile=keyfile,
keyfile_passphrase=keyfile_passphrase, crlfile=crlfile)


rpartition
将返回一个三元组,分别为最后一个分隔符左边的字符串、分隔符本身、以及最后一个分隔符右边的字符串。因此,
class_name
将对应于
MongoDBConnection
。而后利用getattr函数加载该类,变量
Class
也就对应于
MongoDBConnection
类的一个实例了,最后则调用Class类。

>>> x='bigchaindb.backend.mongodb.connection.MongoDBConnection'
>>> print(x.rpartition('.'))
('bigchaindb.backend.mongodb.connection', '.', 'MongoDBConnection')


我们再来看
bigchaindb.backend.mongodb.connection
模块的
MongoDBConnection
类。该类继承
backend.connection.Connection
类。实际上,所有后端存储的连接类都应该继承Connection类,然后再实现自己所独立的操作数据库的代码。
MongoDBConnection
类的
__init__
函数将调用父类的初始化函数,之后将一些mongodb所独有的配置保存为类的成员,如replicaset。而父类的
__init__
则将一些后端存储所共有的配置保存为类的成员,如host\port\dbname等等。

class MongoDBConnection(Connection):

def __init__(self, replicaset=None, ssl=None, login=None, password=None,
ca_cert=None, certfile=None, keyfile=None,
keyfile_passphrase=None, crlfile=None, **kwargs):
super().__init__(**kwargs)
self.replicaset = replicaset or bigchaindb.config['database']['replicaset']


初始化数据库

_run_init
函数在创建Bigchain实例后,将对数据库进行初始化。根据之前的分析,形参中的connection实际上是
MongoDBConnection
类的实例了。

schema.init_database(connection=b.connection)

# backend/schema.py
def init_database(connection=None, dbname=None):

connection = connection or connect()
dbname = dbname or bigchaindb.config['database']['name']

create_database(connection, dbname)
create_tables(connection, dbname)
create_indexes(connection, dbname)


从函数名称上可以看出,
init_database
的步骤为依次创建数据库、表以及索引。然而,在
backend/schema.py
中这三个函数实际上居然都没有函数体,不过有个装饰器singledispatch

@singledispatch
def create_database(connection, dbname):
raise NotImplementedError

@singledispatch
def create_tables(connection, dbname):
raise NotImplementedError

@singledispatch
def create_indexes(connection, dbname):
raise NotImplementedError


装饰器
singledispatch
的文档在https://pypi.python.org/pypi/singledispatch。阅读这些例子,可以看出相当于实现了函数多态,即可以通过不同的形参来调用同一个函数名的不同函数。文档中的实例如下:

定义:通过装饰器singledispatch来定义需要多态的函数,通过
@fun.register(int)
或者
fun.register
来进行注册

>>> from singledispatch import singledispatch
>>> @singledispatch
... def fun(arg, verbose=False):
...     if verbose:
...         print("Let me just say,", end=" ")
...     print(arg)

>>> @fun.register(int)
... def _(arg, verbose=False):
...     if verbose:
...         print("Strength in numbers, eh?", end=" ")
...     print(arg)

>>> def nothing(arg, verbose=False):
...     print("Nothing.")
...
>>> fun.register(type(None), nothing)


调用

>>> fun("Hello, world.")
Hello, world.
>>> fun("test.", verbose=True)
Let me just say, test.
>>> fun(42, verbose=True)
Strength in numbers, eh? 42
>>> fun(None)
Nothing.


由此,我们来寻找后端存储中对
create_database
进行了注册的函数。mongodb后端存储
backend/mongodb/schema
中有以下代码:

from bigchaindb.backend.utils import module_dispatch_registrar

register_schema = module_dispatch_registrar(backend.schema)

@register_schema(MongoDBConnection)
def create_database(conn, dbname):
if dbname in conn.conn.database_names():
raise exceptions.DatabaseAlreadyExists('Database `{}` already exists'
.format(dbname))

logger.info('Create database `%s`.', dbname)
# TODO: read and write concerns can be declared here
conn.conn.get_database(dbname)


module_dispatch_registrar
的实现代码为:

def module_dispatch_registrar(module):
def dispatch_wrapper(obj_type):
def wrapper(func):
func_name = func.__name__
try:
dispatch_registrar = getattr(module, func_name)
return dispatch_registrar.register(obj_type)(func)
except AttributeError as ex:
raise ModuleDispatchRegistrationError(
('`{module}` does not contain a single-dispatchable '
'function named `{func}`. The module being registered '
'was not implemented correctly!').format(
func=func_name, module=module.__name__)) from ex
return wrapper
return dispatch_wrapper


看着好复杂的样子。。按照之前装饰器的思路,我们先来进行替换吧,首先展开
module_dispatch_registrar
,代码相当于

@module_dispatch_registrar(backend.schema)(MongoDBConnection)
def create_database(conn, dbname):
...


也即

@dispatch_wrapper(MongoDBConnection)
def create_database(conn, dbname):
...

# 其中变量module为backend.schema
def dispatch_wrapper(obj_type):
def wrapper(func):
...
return wrapper


再把
dispatch_wrapper
展开

wrapper(create_database)

# 其中变量module为backend.schema,变量obj_type为MongoDBConnection
def wrapper(func):
...
return wrapper


所以这段代码实际上是调用了(注意,这里将所有变量写成了字符串,实际上均指得是对应的对象)

dispatch_registrar = getattr(backend.schema, create_database)
dispatch_registrar.register(MongoDBConnection)(create_database)


因此,当
init_database
执行代码
create_database(connection, dbname)
时,由于connection的类型为
MongoDBConnection
类,故而将调用的函数为
backend/mongodb/schema.py
中的
create_database
函数。
create_tables
以及
create_indexes
也是如此。

创建数据库

如此,创建数据库所调用的函数为
backend.mongodb.schema.create_database
,形参conn对应MongoDBConnection类的实例

@register_schema(MongoDBConnection)
def create_database(conn, dbname):
if dbname in conn.conn.database_names():
raise exceptions.DatabaseAlreadyExists('Database `{}` already exists'
.format(dbname))

logger.info('Create database `%s`.', dbname)
# TODO: read and write concerns can be declared here
conn.conn.get_database(dbname)


conn.conn
实际上是
MongoDBConnection
的父类
Connection
类的一个属性,由于在该类
__init__
时,
self._conn
值为None,所以将调用
self.connect()
,而connect又将调用
_connect
函数
self.max_tries_counter
次,直到连接成功,或者抛出异常

@property
def conn(self):
if self._conn is None:
self.connect()
return self._conn

def connect(self):
attempt = 0
for i in self.max_tries_counter:
attempt += 1
try:
self._conn = self._connect()
except ConnectionError as exc:
...
else:
break


_connect
函数在去掉异常处理、函数调用时的参数列表后如下所示。函数逻辑也很清晰了,首先是初始化副本集,然后利用pymongo连接到mongodb数据库,再用配置文件中的账号与口令验证,最后返回连接并验证成功的client对象。注意最后返回的client对象将赋值给
MongoDBConnection
类的
conn
成员属性。

def _connect(self):
try:
initialize_replica_set(self.host,...)

if self.ca_cert is None or self.certfile is None or \
self.keyfile is None or self.crlfile is None:
client = pymongo.MongoClient(self.host,...)
if self.login is not None and self.password is not None:
client[self.dbname].authenticate(self.login, self.password)
else:
logger.info('Connecting to MongoDB over TLS/SSL...')
client = pymongo.MongoClient(self.host,...)
if self.login is not None:
client[self.dbname].authenticate(self.login,
mechanism='MONGODB-X509')

return client

except ...


initialize_replica_set
函数也是首先连接mongodb数据库,然后检查数据库是否已经配置了副本集,并与配置文件中的副本集的名字进行匹配,若匹配成功,则说明已经初始化了副本集,否则则初始化。。

创建表与创建索引

在创建数据库成功后的下一步是创建表,这一步的代码就比较简单了:直接调用pymongo的
create_collection
函数,在数据库下创建四个表:bigchain、backlog、votes、assets

@register_schema(MongoDBConnection)
def create_tables(conn, dbname):
for table_name in ['bigchain', 'backlog', 'votes', 'assets']:
logger.info('Create `%s` table.', table_name)
# create the table
# TODO: read and write concerns can be declared here
conn.conn[dbname].create_collection(table_name)


在成功创建表之后,bigchaindb为这四个表分别创建二级索引,以方便进行查询,如创建名为
transaction_id
的索引,用来支持对事务id进行查询

@register_schema(MongoDBConnection)
def create_indexes(conn, dbname):
create_bigchain_secondary_index(conn, dbname)
create_backlog_secondary_index(conn, dbname)
create_votes_secondary_index(conn, dbname)
create_assets_secondary_index(conn, dbname)

def create_bigchain_secondary_index(conn, dbname):
logger.info('Create `bigchain` secondary index.')

# to order blocks by timestamp
conn.conn[dbname]['bigchain'].create_index([('block.timestamp',
ASCENDING)],
name='block_timestamp')

# to query the bigchain for a transaction id, this field is unique
conn.conn[dbname]['bigchain'].create_index('block.transactions.id',
name='transaction_id')
...


至此,数据库初始化完成,
_run_init
中唯一还没有阅读的是如何创建创世区块,这一部分下次再学习。。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: