您的位置:首页 > 大数据 > 人工智能

bigchaindb源码分析(六)——写事务(下)

2017-07-16 17:08 295 查看
我们之前在源码分析(二)——pipeline中说到,在命令
bigchaindb start
执行后,bigchaindb将会利用pipeline来运行很多进程,如block\vote等等

源码分析(五)中分析了在客户端利用API往bigchaindb写事务时,实际上只是将事务写入到了表backlog中,那么如何处理backlog中的事务则很明显要依赖于在start时创建的进程了

首先来回顾创建block对应的pipeline的代码,按照之前的分析,这段代码会为每个Node创建一直在无限循环中的进程,每个Node的输入队列(inqueue)为前一个Node的输出队列(outqueue),最后一个Node的outqueue为None,第一个Node的inqueue是一个Queue。当在调用
pipeline.setup
函数时,若形参
indata
有参数,则使得
indata
的outqueue指向之前的第一个Node的inqueue,依旧形成pipeline

# pipeline/block.py
def start():
pipeline = create_pipeline()
pipeline.setup(indata=get_changefeed())
pipeline.start()
return pipeline

def create_pipeline():

block_pipeline = BlockPipeline()

pipeline = Pipeline([
Pipe(maxsize=1000),
Node(block_pipeline.filter_tx),
Node(block_pipeline.validate_tx, fraction_of_cores=1),
Node(block_pipeline.create, timeout=1),
Node(block_pipeline.write),
Node(block_pipeline.delete_tx),
])

return pipeline

def Pipe(maxsize=0):
return queues.Queue(maxsize, ctx=mp.get_context())


indata

由于在介绍pipeline时,我们还没有分析后端存储的源码,所以之前我们并没有深究
indata
的具体细节。现在让我们来看看block的pipeline的
indata=get_changefeed()
到底是什么

def get_changefeed():
connection = backend.connect(**bigchaindb.config['database'])
return backend.get_changefeed(connection, 'backlog',
ChangeFeed.INSERT | ChangeFeed.UPDATE)


我们已经知道,在配置后端存储为mongodb时,上述代码中的
connection
是一个
bigchaindb.backend.mongodb.connection.MongoDBConnection
类的实例。而
get_changefeed
也是一个使用
singledispatch
装饰器的用来支持多态的函数

# backend/changefeed.py
@singledispatch
def get_changefeed(connection, table, operation, *, prefeed=None):
raise NotImplementedError


其具体实现由
connection
的类型而定,在后端存储为mongodb时,其真正实现为

# backend/mongodb/changefeed.py
@register_changefeed(MongoDBConnection)
def get_changefeed(connection, table, operation, *, prefeed=None):
return MongoDBChangeFeed(table, operation, prefeed=prefeed,
connection=connection)


该函数的参数
table
"backlog"
。再来看调用的类
MongoDBChangeFeed
的定义,可知其继承于
ChangeFeed
类,该类又继承于pipeline时分析的
Node


# backend/mongodb/changefeed.py
class MongoDBChangeFeed(ChangeFeed):
def run_forever(self):
...

# backend/changefeed.py
class ChangeFeed(Node):
...


之前介绍了
Node
类是开启了进程在无限循环,只要输入队列
inqueue
中有数据,就将执行初始化时给出的
target
程序,然后将程序输出结果放入到输出队列
outqueue
中,这个无限循环是由
Node
类的
run_forever
函数来完成的

MongoDBChangeFeed
类重载了
run_forever
函数,故而使得
get_changefeed
函数启动了一个以运行
MongoDBChangeFeed::run_forever
函数为主要任务的进程

现在我们来看
MongoDBChangeFeed::run_forever
的函数体

def run_forever(self):
for element in self.prefeed:
self.outqueue.put(element)

table = self.table
dbname = self.connection.dbname

last_ts = self.connection.run(
self.connection.query().local.oplog.rs.find()
.sort('$natural', pymongo.DESCENDING).limit(1)
.next()['ts'])

for record in run_changefeed(self.connection, table, last_ts):

is_insert = record['op'] == 'i'
is_delete = record['op'] == 'd'
is_update = record['op'] == 'u'

if is_insert and (self.operation & ChangeFeed.INSERT):
record['o'].pop('_id', None)
self.outqueue.put(record['o'])
elif is_delete and (self.operation & ChangeFeed.DELETE):
self.outqueue.put(record['o'])
elif is_update and (self.operation & ChangeFeed.UPDATE):
doc = self.connection.conn[dbname].find_one(
{'_id': record['o2']['_id']},
{'_id': False}
)
self.outqueue.put(doc)

对于block进程来说,
self.table
指的是
backlog
表,
last_ts
维护的是最新的记录的时间戳,这个值得获取相当于是在mongo中执行了

bigchain-rs:PRIMARY> use local
switched to db local
bigchain-rs:PRIMARY> db.oplog.rs.find().sort({'$natural': -1}).limit(1).next()['ts']
Timestamp(1499944301, 1)


其中
sort({'$natural': -1})
指对记录按照自然顺序反向排序。所谓的自然顺序即是指文档在磁盘上的顺序,也就是,按照插入的顺序返回文档。查询语句的一个示例如下(不过加个next是做什么??)

bigchain-rs:PRIMARY> db.oplog.rs.find().sort({'$natural': -1}).limit(1).next()
{
"ts" : Timestamp(1499944631, 1),
"t" : NumberLong(10),
"h" : NumberLong("-1556575544060296785"),
"v" : 2,
"op" : "n",
"ns" : "",
"o" : {
"msg" : "periodic noop"
}
}
bigchain-rs:PRIMARY>


其中:

ts
指操作发生的时间

ns
指操作所在的namespace

op
的值可以为:
i->insert, u->update, d->delete, c->cmd, n-> null operation
,n为空操作,将会定期执行

o
为操作所对应的document,即当前操作的内容(比如更新操作时要更新的的字段和值)

o2
为在执行更新操作时的where条件,仅限于update时才有该属性

故而
run_forever
首先定位到目前最新的时间戳,然后调用
run_changefeed
函数来返回一些记录,根据记录
op
的类型经过不同的处理后加入到输出队列
outqueue


run_changefeed
函数的任务在于,首先查询数据库中比
last_ts
更新的记录,将这些记录通过yield关键字返回给
run_forever
来处理,同时在每抛出一个记录时,更新
last_ts
。注意
run_changefeed
为无限循环,这使得从bigchaindb启动后的所有对backlog表进行的操作都会被
run_changefeed
捕捉到,而后被
run_forever
放入输出队列中

def run_changefeed(conn, table, last_ts):
while True:
try:
conn._conn = None
namespace = conn.dbname + '.' + table
query = conn.query().local.oplog.rs.find(
{'ns': namespace, 'ts': {'$gt': last_ts}},
{'o._id': False},
cursor_type=pymongo.CursorType.TAILABLE_AWAIT
)
cursor = conn.run(query)
logging.debug('Tailing oplog at %s/%s', namespace, last_ts)
while cursor.alive:
try:
record = cursor.next()
yield record
last_ts = record['ts']
except StopIteration:
if _FEED_STOP:
return
except (BackendError, pymongo.errors.ConnectionFailure):
logger.exception('Lost connection while tailing oplog, retrying')
time.sleep(1)


pipeline

根据pipeline,当indata的输出队列有数据时,即有对
backlog
表有操作时,indata的后一个Node将会以indata的输出数据作为参数,来执行其对应的target程序

当我们在调用API往bigchaindb写事务时,
backlog
将会有新纪录增加,而此时indata将会把记录放入到输出队列中,block进程定义的pipeline触发

再回过来看block进程的pipeline,其pipeline为:
indata->filter_tx->validate_tx->create->write->delete_tx
,除indata外其他Node的程序均位于
bigchaindb.pipelines.block.BlockPipeline


def create_pipeline():

block_pipeline = BlockPipeline()

pipeline = Pipeline([
Pipe(maxsize=1000),
Node(block_pipeline.filter_tx),
Node(block_pipeline.validate_tx, fraction_of_cores=1),
Node(block_pipeline.create, timeout=1),
Node(block_pipeline.write),
Node(block_pipeline.delete_tx),
])

return pipeline


我们接下来对这些Node一个个地分析

filter_tx

如果被分配者是本节点,移除事务中指定分配者的项

def filter_tx(self, tx):
if tx['assignee'] == self.bigchain.me:
tx.pop('assignee')
tx.pop('assignment_timestamp')
return tx


validate_tx

将事务从字典加载为Transaction类。
is_new_transaction
只有事务状态为
valid
(投票后确定为无效)或者
undecided
(投票结果还没有收集完)时,才返回False。因而,第一个判断的意义为:若这个事务之前已经被投票为
valid
或者还没投票完时,从
backlog
表中删除。也就是说,如果是之前投票为
invalid
的事务,再给一次机会。。

tx.validate
我们在源码分析(五)里已经进行了分析,其目标是验证事务的签名是否正确,以用来防止事务伪造,而若验证签名错误,将事务从
backlog
中删除

def validate_tx(self, tx):
try:
tx = Transaction.from_dict(tx)
except ValidationError:
return None

if not self.bigchain.is_new_transaction(tx.id):
self.bigchain.delete_transaction(tx.id)
return None

try:
tx.validate(self.bigchain)
return tx
except ValidationError as e:
logger.warning('Invalid tx: %s', e)
self.bigchain.delete_transaction(tx.id)
return None


create

验证通过后的事务将进入该Node

self.txs = tx_collector()

def create(self, tx, timeout=False):
txs = self.txs.send(tx)
if len(txs) == 1000 or (timeout and txs):
block = self.bigchain.create_block(txs)
self.txs = tx_collector()
return block


我们先来看
tx_collector


def tx_collector():

def snowflake():
txids = set()
txs = []
while True:
tx = yield txs
if tx:
if tx.id not in txids:
txids.add(tx.id)
txs.append(tx)
else:
logger.info('Refusing to add tx to block twice: ' +
tx.id)

s = snowflake()
s.send(None)
return s


这里涉及到
yield
的一种用法了,当一个函数中使用了
yield
关键字时,函数实际上变成了一个生成器,生成器相对于迭代器(如list)而言,区别在于生成器生成的数据是临时的,而不是放在内存中,因而仅仅可以使用一次。
yield
有两种用法,一是使得函数不停止的情况下return数据(如
test1
),二是用来为函数临时接收数据(如
test2


def test1():
for i in range(0, 3):
yield i

for i in test1():
print("test1 returns:", i)

print(">>>>")

def test2():
i = []
while True:
tx = yield i
i.append(tx)
print("test2 receives:", tx)

t = test2()
print(t.send(None)) # run test2 to the point that exists the first `yield`

for i in range(0, 3):
print("result:", t.send(i))


每一次在使用send时,返回的为当前yield语句时i的值

test1 returns: 0
test1 returns: 1
test1 returns: 2
>>>>
[]
test2 receives: 0
result: [0]
test2 receives: 1
result: [0, 1]
test2 receives: 2
result: [0, 1, 2]


所以
tx_collector
的逻辑即为,维护一个
Set
,对所有通过send而收到的数据根据
tx.id
进行去重,每次返回的为去重之后的事务集合。而
create
即为:首先将事务进行去重,当去重后的事务数目达到1000或者超时时,对当前所有的事务调用
create_block
创建一个区块,同时重新创建
tx_collector
(清空事务
Set


创建区块时还指定了整个联盟中的所有节点都作为投票节点,在创建事务会后执行该程序的节点将对区块进行签名

def create_block(self, validated_transactions):
# Prevent the creation of empty blocks
if not validated_transactions:
raise exceptions.OperationError('Empty block creation is not '
'allowed')

voters = list(self.federation)
block = Block(validated_transactions, self.me, gen_timestamp(), voters)
block = block.sign(self.me_private)

return block


write

当区块创建成功之后,则将区块写入
bigchain
表中

def write(self, block):
self.bigchain.write_block(block)
self.bigchain.statsd.incr('pipelines.block.throughput',
len(block.transactions))
return block

@register_query(MongoDBConnection)
def write_block(conn, block_dict):
return conn.run(
conn.collection('bigchain')
.insert_one(block_dict))


delete_tx

当区块写入成功之后,将
backlog
表中本区块的事务全部删除

def delete_tx(self, block):
self.bigchain.delete_transaction(*[tx.id for tx in block.transactions])
return block


总结

综合源码分析(五)与源码分析(六),我们在使用bigchainAPI往bigchaindb写事务时,关于
backlog
表和
bigchain
表的处理逻辑为:

prepare:创建事务的节点给出json格式的事务

fulfill:创建事务的节点对事务进行签名

send:创建事务的节点将事务发送给bigchain守护进程

validate:守护进程验证事务的签名

write:设置事务的被分配者,将事务写入
backlog


indata:当
backlog
表中写入数据时,block进程的pipeline被触发,indata将事务放入输出队列中

filter: 如果事务分配给本节点处理,移除事务中的被分配者项

validate:判断事务是否已经被确定为
valid
或者正在等待投票,若是,则删除事务,否则被分配节点验证事务签名

create:被分配节点对现有的事务进行去重,当要处理的事务数达到1000条,或者超时时,根据这些事务创建区块,同时设置投票者,并对区块进行签名

write: 被分配节点将区块写入
bigchain


delete:被分配节点从
backlog
表中删除当前区块的所有事务

关于投票等其他进程的处理我们之后再分析
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: