您的位置:首页 > 数据库 > Mongodb

Mongodb源码分析--更新记录

2011-06-26 13:43 543 查看
在之前的一篇文章
中,介绍了assembleResponse函数(位于instance.cpp第224行),它会根据op操作枚举类型来调用相应的crud操作,枚举类型定义如下:

enum
Operations {

opReply
=

1
,
/*
reply. responseTo is set.
*/

dbMsg
=

1000
,
/*
generic msg command followed by a string
*/

dbUpdate
=

2001
,
/*
更新对象
*/

dbInsert
=

2002
,

//
dbGetByOID = 2003,

dbQuery
=

2004
,

dbGetMore
=

2005
,

dbDelete
=

2006
,

dbKillCursors
=

2007

};

可以看到dbUpdate = 2001 为更新操作枚举值,下面我们看一下assembleResponse在确定是更新操作时调用的方法,如下:

//
instance.cpp文件第224行

assembleResponse( Message
&
m, DbResponse
&
dbresponse,
const
SockAddr
&
client ) {

.....

try
{

if
( op
==
dbInsert ) {
//
添加记录操作

receivedInsert(m, currentOp);

}

else

if
( op
==
dbUpdate ) {
//
更新记录

receivedUpdate(m, currentOp);

}

else

if
( op
==
dbDelete ) {
//
删除记录

receivedDelete(m, currentOp);

}

else

if
( op
==
dbKillCursors ) {
//
删除Cursors(游标)对象

currentOp.ensureStarted();

logThreshold
=

10
;

ss
<<

"
killcursors
"
;

receivedKillCursors(m);

}

else
{

mongo::log()
<<

"
operation isn't supported:
"

<<
op
<<
endl;

currentOp.done();

log
=

true
;

}

}

.....

}

}

从上面代码可以看出,系统在确定dbUpdate操作时,调用了receivedUpdate()方法(位于instance.cpp文件第570行),下面是该方法的定义:

void
receivedUpdate(Message
&
m, CurOp
&
op) {

DbMessage d(m);
//
初始化数据库格式的消息

const

char

*
ns
=
d.getns();
//
获取名空间,用于接下来insert数据

assert(
*
ns);

//
因为CUD操作在主库中操作,所以这里断言名空间包含的db信息中是不是主库,即"master"

uassert(
10054
,
"
not master
"
, isMasterNs( ns ) );

op.debug().str
<<
ns
<<

'

'
;

//
获取标志位信息(标识更新一条或多条等)关于消息结构体。有关消息结构参见我的这篇文章:

// http://www.cnblogs.com/daizhj/archive/2011/04/02/2003335.html
int
flags
=
d.pullInt();

//
获取"更新消息"结构体中的selector(也就是要更新的数据条件,相关于where)

BSONObj query
=
d.nextJsObj();

assert( d.moreJSObjs() );

assert( query.objsize()
<
m.header()
->
dataLen() );

BSONObj toupdate
=
d.nextJsObj();
//
要更新的记录

uassert(
10055
,
"
update object too large
"
, toupdate.objsize()
<=
BSONObjMaxUserSize);

assert( toupdate.objsize()
<
m.header()
->
dataLen() );

assert( query.objsize()
+
toupdate.objsize()
<
m.header()
->
dataLen() );

//
标识是否为upsert方式,即:如果存在就更新,如果不存在就插入

bool
upsert
=
flags
&
UpdateOption_Upsert;

//
是否更新所有满足条件(where)的记录

bool
multi
=
flags
&
UpdateOption_Multi;

//
是否更新所有节点(sharding状态)

bool
broadcast
=
flags
&
UpdateOption_Broadcast;

{

string
s
=
query.toString();

/*
todo: we shouldn't do all this ss stuff when we don't need it, it will slow us down.

instead, let's just story the query BSON in the debug object, and it can toString()

lazily

*/

op.debug().str
<<

"
query:
"

<<
s;

op.setQuery(query);

}

writelock lk;

//
如果不更新所有节点(sharding)且当前物理结点是shard 状态时

if
(
!
broadcast
&&
handlePossibleShardedMessage( m ,
0
) )

return
;

//
if this ever moves to outside of lock, need to adjust check Client::Context::_finishInit

Client::Context ctx( ns );

UpdateResult res
=
updateObjects(ns, toupdate, query, upsert, multi,
true
, op.debug() );
//
更新对象

lastError.getSafe()
->
recordUpdate( res.existing , res.num , res.upserted );
//
for getlasterror

}

上面的方法中,主要是对消息进行折包解析,找出要更新的数据记录及相应查询条件,以及更新方式(即upsert),然后再在“写锁”环境下执行更新数据操作。

最终上面代码会调用 updateObjects()方法,该方法定义如下:

//
update.cpp 文件第1279行

UpdateResult updateObjects(
const

char

*
ns,
const
BSONObj
&
updateobj, BSONObj patternOrig,
bool
upsert,
bool
multi,
bool
logop , OpDebug
&
debug ) {

//
断言记录的ns是否在"保留的$集合"中

uassert(
10155
,
"
cannot update reserved $ collection
"
, strchr(ns,
'
$
'
)
==

0
);

if
( strstr(ns,
"
.system.
"
) ) {

/*
dm: it's very important that system.indexes is never updated as IndexDetails has pointers into it
*/

uassert(
10156
, str::stream()
<<

"
cannot update system collection:
"

<<
ns
<<

"
q:
"

<<
patternOrig
<<

"
u:
"

<<
updateobj , legalClientSystemNS( ns ,
true
) );

}

return
_updateObjects(
false
, ns, updateobj, patternOrig, upsert, multi, logop, debug);

}

上面方法对要更新的ns进行判断,以避免因更新保留的集合而对系统结构造成损坏,如果一切正常,则调用 _updateObjects方法,如下:

//
update.cpp 文件第1027行

UpdateResult _updateObjects(
bool
god,
const

char

*
ns,
const
BSONObj
&
updateobj, BSONObj patternOrig,
bool
upsert,
bool
multi,
bool
logop , OpDebug
&
debug, RemoveSaver
*
rs ) {

DEBUGUPDATE(
"
update:
"

<<
ns
<<

"
update:
"

<<
updateobj
<<

"
query:
"

<<
patternOrig
<<

"
upsert:
"

<<
upsert
<<

"
multi:
"

<<
multi );

Client
&
client
=
cc();

int
profile
=
client.database()
->
profile;

StringBuilder
&
ss
=
debug.str;

if
( logLevel
>

2
)

ss
<<

"
update:
"

<<
updateobj.toString();

/*
idea with these here it to make them loop invariant for multi updates, and thus be a bit faster for that case
*/

/*
NOTE: when yield() is added herein, these must be refreshed after each call to yield!
*/

NamespaceDetails
*
d
=
nsdetails(ns);
//
can be null if an upsert...

NamespaceDetailsTransient
*
nsdt
=

&
NamespaceDetailsTransient::get_w(ns);

/*
end note
*/

auto_ptr
<
ModSet
>
mods;
//
定义存储修改信息操作(如$inc, $set, $push,)的集合实例

bool
isOperatorUpdate
=
updateobj.firstElement().fieldName()[
0
]
==

'
$
'
;

int
modsIsIndexed
=

false
;
//
really the # of indexes

if
( isOperatorUpdate ) {

if
( d
&&
d
->
indexBuildInProgress ) {
//
如果正在构建索引

set
<
string
>
bgKeys;

d
->
inProgIdx().keyPattern().getFieldNames(bgKeys);
//
获取当前对象的所有字段(field)信息

mods.reset(
new
ModSet(updateobj, nsdt
->
indexKeys(),
&
bgKeys));
//
为mods绑定操作信息

}

else
{

mods.reset(
new
ModSet(updateobj, nsdt
->
indexKeys()) );
//
为mods绑定操作信息;

}

modsIsIndexed
=
mods
->
isIndexed();

}

//
upsert:如果存在就更新,如果不存在就插入

if
(
!
upsert
&&

!
multi
&&
isSimpleIdQuery(patternOrig)
&&
d
&&

!
modsIsIndexed ) {

int
idxNo
=
d
->
findIdIndex();

if
( idxNo
>=

0
) {

ss
<<

"
byid
"
;

//
根据id更新记录信息

return
_updateById(isOperatorUpdate, idxNo, mods.
get
(), profile, d, nsdt, god, ns, updateobj, patternOrig, logop, debug);

}

}

set
<
DiskLoc
>
seenObjects;

int
numModded
=

0
;

long

long
nscanned
=

0
;

MatchDetails details;

//
构造“更新操作”实例对象并用其构造游标操作(符)实例

shared_ptr
<
MultiCursor::CursorOp
>
opPtr(
new
UpdateOp( mods.
get
()
&&
mods
->
hasDynamicArray() ) );

//
构造MultiCursor查询游标(参见其构造方法中的 nextClause()语句)

shared_ptr
<
MultiCursor
>
c(
new
MultiCursor( ns, patternOrig, BSONObj(), opPtr,
true
) );

auto_ptr
<
ClientCursor
>
cc;

while
( c
->
ok() ) {
//
遍历(下面的c->advance()调用)游标指向的记录信息

nscanned
++
;

bool
atomic
=
c
->
matcher()
->
docMatcher().atomic();

//
并将其与更新操作中的条件进行匹配

if
(
!
c
->
matcher()
->
matches( c
->
currKey(), c
->
currLoc(),
&
details ) ) {

c
->
advance();
//
将游标跳转到下一条记录

if
( nscanned
%

256

==

0

&&

!
atomic ) {

if
( cc.
get
()
==

0
) {

shared_ptr
<
Cursor
>
cPtr
=
c;

cc.reset(
new
ClientCursor( QueryOption_NoCursorTimeout , cPtr , ns ) );

}

if
(
!
cc
->
yield
() ) {

cc.release();

//
TODO should we assert or something?

break
;

}

if
(
!
c
->
ok() ) {

break
;

}

}

continue
;

}

Record
*
r
=
c
->
_current();
//
游标当前所指向的记录

DiskLoc loc
=
c
->
currLoc();
//
游标当前所指向的记录所在地址

//
TODO Maybe this is unnecessary since we have seenObjects

if
( c
->
getsetdup( loc ) ) {
//
判断当前记录是否是重复

c
->
advance();

continue
;

}

BSONObj js(r);

BSONObj pattern
=
patternOrig;

if
( logop ) {
//
记录日志

BSONObjBuilder idPattern;

BSONElement id;

//
NOTE: If the matching object lacks an id, we'll log

//
with the original pattern. This isn't replay-safe.

//
It might make sense to suppress the log instead

//
if there's no id.

if
( js.getObjectID( id ) ) {

idPattern.append( id );

pattern
=
idPattern.obj();

}

else
{

uassert(
10157
,
"
multi-update requires all modified objects to have an _id
"
,
!
multi );

}

}

if
( profile )

ss
<<

"
nscanned:
"

<<
nscanned;

......

uassert(
10158
,
"
multi update only works with $ operators
"
,
!
multi );

//
查看更新记录操作的时间戳,本人猜测这么做可能因为mongodb会采用最后更新时间戳解决分布式系统

//
一致性的问题,
也就是通常使用的Last write wins准则,有关信息可参见这篇文章:

// http://blog.mongodb.org/post/520888030/on-distributed-consistency-part-5-many-writer
BSONElementManipulator::lookForTimestamps( updateobj );

checkNoMods( updateobj );

//
更新记录

theDataFileMgr.updateRecord(ns, d, nsdt, r, loc , updateobj.objdata(), updateobj.objsize(), debug, god);

if
( logop ) {
//
记录日志操作

DEV
if
( god ) log()
<<

"
REALLY??
"

<<
endl;
//
god doesn't get logged, this would be bad.

logOp(
"
u
"
, ns, updateobj,
&
pattern );

}

return
UpdateResult(
1
,
0
,
1
);
//
返回操作结果

}

if
( numModded )

return
UpdateResult(
1
,
1
, numModded );

......

return
UpdateResult(
0
,
0
,
0
);

}

上面的代码主要执行构造更新消息中的查询条件(selector)游标,并将“游标指向”的记录遍历出来与查询条件进行匹配,如果匹配命中,则进行更
新。(有关游标的构造和继承实现体系,mongodb做的有些复杂,很难一句说清,我会在本系列后面另用篇幅进行说明)

注意上面代码段中的这行代码:

theDataFileMgr.updateRecord(ns, d, nsdt, r, loc , updateobj.objdata(), updateobj.objsize(), debug, god);

该方法会执行最终更新操作,其定义如下:

//
pdfile.cpp 文件934行

const
DiskLoc DataFileMgr::updateRecord(

const

char

*
ns,

NamespaceDetails
*
d,

NamespaceDetailsTransient
*
nsdt,

Record
*
toupdate,
const
DiskLoc
&
dl,

const

char

*
_buf,
int
_len, OpDebug
&
debug,
bool
god) {

StringBuilder
&
ss
=
debug.str;

dassert( toupdate
==
dl.rec() );

BSONObj objOld(toupdate);

BSONObj objNew(_buf);

DEV assert( objNew.objsize()
==
_len );

DEV assert( objNew.objdata()
==
_buf );

//
如果_buf中不包含_id,但要更新的记录(toupdate)有_id

if
(
!
objNew.hasElement(
"
_id
"
)
&&
objOld.hasElement(
"
_id
"
) ) {

/*
add back the old _id value if the update removes it. Note this implementation is slow

(copies entire object multiple times), but this shouldn't happen often, so going for simple

code, not speed.

*/

BSONObjBuilder b;

BSONElement e;

assert( objOld.getObjectID(e) );
//
获取对象objOld的ID并绑定到e

b.append(e);
//
为了最好的性能,先放入_id

b.appendElements(objNew);

objNew
=
b.obj();

}

/*
重复key检查
*/

vector
<
IndexChanges
>
changes;

bool
changedId
=

false
;

//
获取要修改的索引信息(包括要移除和添加的index key,并将结果返回给changes)

getIndexChanges(changes,
*
d, objNew, objOld, changedId);

//
断言是否要修改_id索引

uassert(
13596
, str::stream()
<<

"
cannot change _id of a document old:
"

<<
objOld
<<

"
new:
"

<<
objNew ,
!
changedId );

dupCheck(changes,
*
d, dl);
//
重复key检查,如果重复则通过断言终止当前程序

//
如果要更新的记录比最终要插入的记录尺寸小

if
( toupdate
->
netLength()
<
objNew.objsize() ) {

//
如不合适,则重新分配

uassert(
10003
,
"
failing update: objects in a capped ns cannot grow
"
,
!
(d
&&
d
->
capped));

d
->
paddingTooSmall();

if
( cc().database()
->
profile )

ss
<<

"
moved
"
;

//
删除指定的记录(record),删除操作详见我的这篇文章:

// http://www.cnblogs.com/daizhj/archive/2011/04/06/mongodb_delete_recode_source_code.html
deleteRecord(ns, toupdate, dl);

//
插入新的BSONObj信息,插入操作详见我的这篇文章:

// http://www.cnblogs.com/daizhj/archive/2011/03/30/1999699.html
return
insert(ns, objNew.objdata(), objNew.objsize(), god);

}

nsdt
->
notifyOfWriteOp();

d
->
paddingFits();

/*
如果有要修改的索引
*/

{

unsigned keyUpdates
=

0
;

int
z
=
d
->
nIndexesBeingBuilt();
//
获取索引(包括正在构建)数

for
(
int
x
=

0
; x
<
z; x
++
) {

IndexDetails
&
idx
=
d
->
idx(x);

//
遍历当前更新记录要修改(移除)的索引键信息

for
( unsigned i
=

0
; i
<
changes[x].removed.size(); i
++
) {

try
{

//
移除当前记录在索引b树中相应信息(索引键)

idx.head.btree()
->
unindex(idx.head, idx,
*
changes[x].removed[i], dl);

}

catch
(AssertionException
&
) {

ss
<<

"
exception update unindex
"
;

problem()
<<

"
caught assertion update unindex
"

<<
idx.indexNamespace()
<<
endl;

}

}

assert(
!
dl.isNull() );

//
获取指定名称(key)下的子对象

BSONObj idxKey
=
idx.info.obj().getObjectField(
"
key
"
);

Ordering ordering
=
Ordering::make(idxKey);
//
生成排序方式

keyUpdates
+=
changes[x].added.size();

//
遍历当前更新记录要修改(插入)的索引键信息

for
( unsigned i
=

0
; i
<
changes[x].added.size(); i
++
) {

try
{

//
之前做了dupCheck()操作,所以这里不用担心重复key的问题

//
在b树中添加索引键信息,有关该方法的定义参见我的这篇文章

// http://www.cnblogs.com/daizhj/archive/2011/03/30/1999699.html
idx.head.btree()
->
bt_insert(

idx.head,

dl,
*
changes[x].added[i], ordering,
/*
dupsAllowed
*/
true
, idx);

}

catch
(AssertionException
&
e) {

ss
<<

"
exception update index
"
;

problem()
<<

"
caught assertion update index
"

<<
idx.indexNamespace()
<<

"

"

<<
e
<<
endl;

}

}

}

if
( keyUpdates
&&
cc().database()
->
profile )

ss
<<

'
/n
'

<<
keyUpdates
<<

"
key updates
"
;

}

//
update in place

int
sz
=
objNew.objsize();

//
将新修改的记录信息复制到旧记录(toupdate)所在位置

memcpy(getDur().writingPtr(toupdate
->
data, sz), objNew.objdata(), sz);

return
dl;

}

上面代码段主要先对B树索引进行修改(这里采用先移除再重建方式),之后直接更新旧记录在内存中的数据,最终完成了记录的更新操作。

最后,用一张时序图回顾一下更新记录时mongodb服务端代码的执行流程:



好了,今天的内容到这里就告一段落了,在接下来的文章中,将会介绍Mongodb的游标(cursor)设计体系和实现方式。

参考链接:
http://www.cnblogs.com/daizhj/archive/2011/03/30/1999699.html http://www.cnblogs.com/daizhj/archive/2011/04/06/mongodb_delete_recode_source_code.html http://www.cnblogs.com/daizhj/archive/2011/04/02/2003335.html http://blog.mongodb.org/post/520888030/on-distributed-consistency-part-5-many-writer
原文链接: http://www.cnblogs.com/daizhj/archive/2011/04/08/mongodb_update_recode_source_code.html
作者: daizhj, 代震军

微博: http://t.sina.com.cn/daizhj
Tags: mongodb,c++,source code
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: