bigchaindb源码分析(六)——写事务(下)
2017-07-16 17:08
295 查看
我们之前在源码分析(二)——pipeline中说到,在命令
源码分析(五)中分析了在客户端利用API往bigchaindb写事务时,实际上只是将事务写入到了表backlog中,那么如何处理backlog中的事务则很明显要依赖于在start时创建的进程了
首先来回顾创建block对应的pipeline的代码,按照之前的分析,这段代码会为每个Node创建一直在无限循环中的进程,每个Node的输入队列(inqueue)为前一个Node的输出队列(outqueue),最后一个Node的outqueue为None,第一个Node的inqueue是一个Queue。当在调用
我们已经知道,在配置后端存储为mongodb时,上述代码中的
其具体实现由
该函数的参数
之前介绍了
而
现在我们来看
.find_one(
{'_id': record['o2']['_id']},
{'_id': False}
)
self.outqueue.put(doc)
bigchaindb start执行后,bigchaindb将会利用pipeline来运行很多进程,如block\vote等等
源码分析(五)中分析了在客户端利用API往bigchaindb写事务时,实际上只是将事务写入到了表backlog中,那么如何处理backlog中的事务则很明显要依赖于在start时创建的进程了
首先来回顾创建block对应的pipeline的代码,按照之前的分析,这段代码会为每个Node创建一直在无限循环中的进程,每个Node的输入队列(inqueue)为前一个Node的输出队列(outqueue),最后一个Node的outqueue为None,第一个Node的inqueue是一个Queue。当在调用
pipeline.setup函数时,若形参
indata有参数,则使得
indata的outqueue指向之前的第一个Node的inqueue,依旧形成pipeline
# pipeline/block.py def start(): pipeline = create_pipeline() pipeline.setup(indata=get_changefeed()) pipeline.start() return pipeline def create_pipeline(): block_pipeline = BlockPipeline() pipeline = Pipeline([ Pipe(maxsize=1000), Node(block_pipeline.filter_tx), Node(block_pipeline.validate_tx, fraction_of_cores=1), Node(block_pipeline.create, timeout=1), Node(block_pipeline.write), Node(block_pipeline.delete_tx), ]) return pipeline def Pipe(maxsize=0): return queues.Queue(maxsize, ctx=mp.get_context())
indata
由于在介绍pipeline时,我们还没有分析后端存储的源码,所以之前我们并没有深究indata的具体细节。现在让我们来看看block的pipeline的
indata=get_changefeed()到底是什么
def get_changefeed(): connection = backend.connect(**bigchaindb.config['database']) return backend.get_changefeed(connection, 'backlog', ChangeFeed.INSERT | ChangeFeed.UPDATE)
我们已经知道,在配置后端存储为mongodb时,上述代码中的
connection是一个
bigchaindb.backend.mongodb.connection.MongoDBConnection类的实例。而
get_changefeed也是一个使用
singledispatch装饰器的用来支持多态的函数
# backend/changefeed.py @singledispatch def get_changefeed(connection, table, operation, *, prefeed=None): raise NotImplementedError
其具体实现由
connection的类型而定,在后端存储为mongodb时,其真正实现为
# backend/mongodb/changefeed.py @register_changefeed(MongoDBConnection) def get_changefeed(connection, table, operation, *, prefeed=None): return MongoDBChangeFeed(table, operation, prefeed=prefeed, connection=connection)
该函数的参数
table为
"backlog"。再来看调用的类
MongoDBChangeFeed的定义,可知其继承于
ChangeFeed类,该类又继承于pipeline时分析的
Node类
# backend/mongodb/changefeed.py class MongoDBChangeFeed(ChangeFeed): def run_forever(self): ... # backend/changefeed.py class ChangeFeed(Node): ...
之前介绍了
Node类是开启了进程在无限循环,只要输入队列
inqueue中有数据,就将执行初始化时给出的
target程序,然后将程序输出结果放入到输出队列
outqueue中,这个无限循环是由
Node类的
run_forever函数来完成的
而
MongoDBChangeFeed类重载了
run_forever函数,故而使得
get_changefeed函数启动了一个以运行
MongoDBChangeFeed::run_forever函数为主要任务的进程
现在我们来看
MongoDBChangeFeed::run_forever的函数体
def run_forever(self): for element in self.prefeed: self.outqueue.put(element) table = self.table dbname = self.connection.dbname last_ts = self.connection.run( self.connection.query().local.oplog.rs.find() .sort('$natural', pymongo.DESCENDING).limit(1) .next()['ts']) for record in run_changefeed(self.connection, table, last_ts): is_insert = record['op'] == 'i' is_delete = record['op'] == 'd' is_update = record['op'] == 'u' if is_insert and (self.operation & ChangeFeed.INSERT): record['o'].pop('_id', None) self.outqueue.put(record['o']) elif is_delete and (self.operation & ChangeFeed.DELETE): self.outqueue.put(record['o']) elif is_update and (self.operation & ChangeFeed.UPDATE): doc = self.connection.conn[dbname]