您的位置:首页 > 数据库 > Mongodb

mongodb源码分析(十)数据的插入

2012-12-10 21:08 561 查看
本文我们分析mongodb中数据的插入流程.插入的简单流程可以归结于如下:

1. 如果存在对应collection则从collection中分配空间,然后将数据保存到分配的空间中,不存在则先从database中分配对应的collection,若database不存在则分配database,建立xx.ns和xx.0 等文件.

2. 根据插入数据更新collection中的索引.

下面来看代码,根据前面的分析我们知道插入操作的入口函数为:receivedInsert.

void receivedInsert(Message& m, CurOp& op) {
BSONObj first = d.nextJsObj();
vector<BSONObj> multi;
while (d.moreJSObjs()){//批量数据的插入
if (multi.empty()) // first pass
multi.push_back(first);
multi.push_back( d.nextJsObj() );
}
while ( true ) {
Lock::DBWrite lk(ns);
if ( handlePossibleShardedMessage( m , 0 ) )
return;
Client::Context ctx(ns);
if( !multi.empty() ) {
const bool keepGoing = d.reservedField() & InsertOption_ContinueOnError;
insertMulti(keepGoing, ns, multi);//循环调用checkAndInsert做插入操作,keepGoing为true表示当插入操作错误时继续进行插入操作,否则终止插入操作
return;
}
checkAndInsert(ns, first);
globalOpCounters.incInsertInWriteLock(1);
return;
}
}
}
void checkAndInsert(const char *ns, /*modifies*/BSONObj& js) {
uassert( 10059 , "object to insert too large", js.objsize() <= BSONObjMaxUserSize);//单条doc超过BSONObjMaxUserSize的不允许插入,可以通过修改代码的方式调整这个值
{
// check no $ modifiers.  note we only check top level.  (scanning deep would be quite expensive)
BSONObjIterator i( js );//field中不允许以'$'开始
while ( i.more() ) {
BSONElement e = i.next();
uassert( 13511 , "document to insert can't have $ fields" , e.fieldName()[0] != '$' );
}
}
theDataFileMgr.insertWithObjMod(ns, js, false); // js may be modified in the call to add an _id field.
logOp("i", ns, js);//为master/slave或者replset记录操作.
}
DiskLoc DataFileMgr::insertWithObjMod(const char *ns, BSONObj &o, bool god) {
bool addedID = false;
DiskLoc loc = insert( ns, o.objdata(), o.objsize(), god, true, &addedID );
if( addedID && !loc.isNull() )
o = BSONObj::make( loc.rec() );
return loc;
}
insert是mongodb数据的插入操作,其中包括了普通数据和索引数据的插入,下面的代码中大家将看到.

DiskLoc DataFileMgr::insert(const char *ns, const void *obuf, int len, bool god, bool mayAddIndex, bool *addedID) {
bool wouldAddIndex = false;
{
const char *sys = strstr(ns, "system.");
if ( sys && !insert_checkSys(sys, ns, wouldAddIndex, obuf, god) )
return DiskLoc();
}
bool addIndex = wouldAddIndex && mayAddIndex;//这是插入index的操作
NamespaceDetails *d = nsdetails(ns);
if ( d == 0 ) {//当前collection并不存在,首先分配一个collection,具体操作时根据插入的数据分配一个extent,分配一个namespacedetails,并且根据记录该namespacedetails.god为false则创建一个_id的索引,将该collection记录到system.namespaces collection中
d = insert_newNamespace(ns, len, god);
}
NamespaceDetails *tableToIndex = 0;
string tabletoidxns;
BSONObj fixedIndexObject;
if ( addIndex ) {//插入index的操作,如db.coll.ensureIndex({x:1})就是走这里的流程
verify( obuf );
BSONObj io((const char *) obuf);
if( !prepareToBuildIndex(io, god, tabletoidxns, tableToIndex, fixedIndexObject ) ) {
// prepare creates _id itself, or this indicates to fail the build silently (such
// as if index already exists)
return DiskLoc();
}
if ( ! fixedIndexObject.isEmpty() ) {
obuf = fixedIndexObject.objdata();
len = fixedIndexObject.objsize();
}
}
int addID = 0; // 0 if not adding _id; if adding, the length of that new element
if( !god ) {//没有_id数据则生成一个_id
/* Check if we have an _id field. If we don't, we'll add it.
Note that btree buckets which we insert aren't BSONObj's, but in that case god==true.
*/
BSONObj io((const char *) obuf);
BSONElement idField = io.getField( "_id" );
uassert( 10099 ,  "_id cannot be an array", idField.type() != Array );
// we don't add _id for capped collections in local as they don't have an _id index
if( idField.eoo() && !wouldAddIndex &&
!str::equals( nsToDatabase( ns ).c_str() , "local" ) && d->haveIdIndex() ) {
if( addedID )
*addedID = true;
addID = len;
idToInsert_.oid.init();
len += idToInsert.size();
}
BSONElementManipulator::lookForTimestamps( io );
}
int lenWHdr = d->getRecordAllocationSize( len + Record::HeaderSize );//得到分配数据的大小,这个大小值取决于是否设置了Flag_UsePowerOf2Sizes,设置了则分配的数据始终为2的n次方,可通过db.coll.runCommand({collMod:"coll","usePowerOf2Sizes":true})设置,没设置这个标志位就是需要分配的空间乘以paddingfactor,这个值会动态调整,最小为1,最大为2.具体会根据更新数据时是否原doc长度不够来调整.

// If the collection is capped, check if the new object will violate a unique index
// constraint before allocating space.
if ( d->nIndexes && d->isCapped() && !god ) {//唯一性检查,如果创建索引时指定了unique为true.
checkNoIndexConflicts( d, BSONObj( reinterpret_cast<const char *>( obuf ) ) );
}
bool earlyIndex = true;
DiskLoc loc;
if( addID || tableToIndex || d->isCapped() ) {
// if need id, we don't do the early indexing. this is not the common case so that is sort of ok
earlyIndex = false;//实际的空间分配,分配的原理见/article/8409346.html
loc = allocateSpaceForANewRecord(ns, d, lenWHdr, god);
}
else {
loc = d->allocWillBeAt(ns, lenWHdr);
if( loc.isNull() ) {
// need to get a new extent so we have to do the true alloc now (not common case)
earlyIndex = false;
loc = allocateSpaceForANewRecord(ns, d, lenWHdr, god);
}
}
if ( loc.isNull() ) {
log() << "insert: couldn't alloc space for object ns:" << ns << " capped:" << d->isCapped() << endl;
verify(d->isCapped());
return DiskLoc();
}
if( earlyIndex ) {
// add record to indexes using two step method so we can do the reading outside a write lock
if ( d->nIndexes ) {
verify( obuf );
BSONObj obj((const char *) obuf);
try {//从要插入的数据obj中取出所有collection中的field,然后将其插入到相应的索引Btree中
indexRecordUsingTwoSteps(ns, d, obj, loc, true);
}
catch( AssertionException& ) {
// should be a dup key error on _id index
dassert( !tableToIndex && !d->isCapped() );
// no need to delete/rollback the record as it was not added yet
throw;
}
}
// really allocate now
DiskLoc real = allocateSpaceForANewRecord(ns, d, lenWHdr, god);
verify( real == loc );
}
Record *r = loc.rec();
{//将实际的数据复制到空间中
verify( r->lengthWithHeaders() >= lenWHdr );
r = (Record*) getDur().writingPtr(r, lenWHdr);
if( addID ) {
/* a little effort was made here to avoid a double copy when we add an ID */
((int&)*r->data()) = *((int*) obuf) + idToInsert.size();
memcpy(r->data()+4, idToInsert.rawdata(), idToInsert.size());
memcpy(r->data()+4+idToInsert.size(), ((char *)obuf)+4, addID-4);
}
else {
if( obuf ) // obuf can be null from internal callers
memcpy(r->data(), obuf, len);
}
}
addRecordToRecListInExtent(r, loc);//更新extent链表信息
/* durability todo : this could be a bit annoying / slow to record constantly */
{
NamespaceDetails::Stats *s = getDur().writing(&d->stats);
s->datasize += r->netLength();
s->nrecords++;
}
// we don't bother resetting query optimizer stats for the god tables - also god is true when adding a btree bucket
if ( !god )
NamespaceDetailsTransient::get( ns ).notifyOfWriteOp();
if ( tableToIndex ) {//新添加了一个索引,这里将建立具体的索引信息
insert_makeIndex(tableToIndex, tabletoidxns, loc);
}
/* add this record to our indexes */
if ( !earlyIndex && d->nIndexes ) {
BSONObj obj(r->data());
// not sure which of these is better -- either can be used.  oldIndexRecord may be faster,
// but twosteps handles dup key errors more efficiently.
//oldIndexRecord(d, obj, loc);
indexRecordUsingTwoSteps(ns, d, obj, loc, false);
}
d->paddingFits();
return loc;
}
下面来看看2步索引插入indexRecordUsingTwoSteps函数

void indexRecordUsingTwoSteps(const char *ns, NamespaceDetails *d, BSONObj obj,
DiskLoc loc, bool shouldBeUnlocked) {
vector<int> multi;
vector<BSONObjSet> multiKeys;
IndexInterface::IndexInserter inserter;
// Step 1, read phase.
int n = d->nIndexesBeingBuilt();//插入索引的动作比如说有索引{x:1},则取出插入的数据如
{//{x:1,y:1},这里取出1这个数据插入到btree索引中对于比较复杂的multikey索引如插入数据
BSONObjSet keys;//为{x:[1,2,3,4,5]},则分别取出1,2,3,4,5插入到btree中并把索引x标为multikey索引
for ( int i = 0; i < n; i++ ) {//这些需要注意的是当从obj中取出的值为空时如果索引不为sparse,则将key设置为nullkey
// this call throws on unique constraint violation.  we haven't done any writes yet so that is fine.
fetchIndexInserters(/*out*/keys, inserter, d, i, obj, loc);//这里就是从obj中取出索引域的值,然后造出该值应该插入到索引的位置
if( keys.size() > 1 ) {//多值索引
multi.push_back(i);
multiKeys.push_back(BSONObjSet());
multiKeys[multiKeys.size()-1].swap(keys);
}
keys.clear();
}
}//完成具体的插入动作
inserter.finishAllInsertions();  // Step 2, write phase.
// now finish adding multikeys
for( unsigned j = 0; j < multi.size(); j++ ) {//多值索引的插入部分
unsigned i = multi[j];
BSONObjSet& keys = multiKeys[j];
IndexDetails& idx = d->idx(i);
IndexInterface& ii = idx.idxInterface();
Ordering ordering = Ordering::make(idx.keyPattern());
d->setIndexIsMultikey(ns, i);//将该索引标识为多值索引
for( BSONObjSet::iterator k = ++keys.begin()/*skip 1*/; k != keys.end(); k++ ) {
try {
ii.bt_insert(idx.head, loc, *k, ordering, !idx.unique(), idx);
} catch (AssertionException& e) {
if( e.getCode() == 10287 && (int) i == d->nIndexes ) {
DEV log() << "info: caught key already in index on bg indexing (ok)" << endl;
}
else {
/* roll back previously added index entries
note must do self index as it is multikey and could require some cleanup itself
*/
for( int j = 0; j < n; j++ ) {
try {
_unindexRecord(d->idx(j), obj, loc, false);
}
catch(...) {
log(3) << "unindex fails on rollback after unique key constraint prevented insert\n";
}
}
throw;
}
}
}
}
}
最后来看看insert_makeIndex.

void NOINLINE_DECL insert_makeIndex(NamespaceDetails *tableToIndex, const string& tabletoidxns, const DiskLoc& loc) {
BSONObj info = loc.obj();
bool background = info["background"].trueValue();
if( background && cc().isSyncThread() ) {
/* don't do background indexing on slaves.  there are nuances.  this could be added later
but requires more code.
*/
log() << "info: indexing in foreground on this replica; was a background index build on the primary" << endl;
background = false;
}
int idxNo = tableToIndex->nIndexes;//添加索引结构到namespacedetail或者namespacedetail::extra中
IndexDetails& idx = tableToIndex->addIndex(tabletoidxns.c_str(), !background); // clear transient info caches so they refresh; increments nIndexes
getDur().writingDiskLoc(idx.info) = loc;
try {//根据新创建的索引快速建立该collection的索引
buildAnIndex(tabletoidxns, tableToIndex, idx, idxNo, background);
}
}
insert_makeIndex->buildAnIndex

void buildAnIndex(string ns, NamespaceDetails *d, IndexDetails& idx, int idxNo, bool background) {
if( inDBRepair || !background ) {//同步建立索引
n = fastBuildIndex(ns.c_str(), d, idx, idxNo);
verify( !idx.head.isNull() );
}
else {//开启一个线程来建立索引
BackgroundIndexBuildJob j(ns.c_str());
n = j.go(ns, d, idx, idxNo);
}
}
insert_makeIndex->buildAnIndex->fastBuildIndex

unsigned long long fastBuildIndex(const char *ns, NamespaceDetails *d, IndexDetails& idx, int idxNo) {
CurOp * op = cc().curop();
bool dupsAllowed = !idx.unique();
bool dropDups = idx.dropDups() || inDBRepair;
BSONObj order = idx.keyPattern();
getDur().writingDiskLoc(idx.head).Null();
/* get and sort all the keys ----- */
ProgressMeterHolder pm( op->setMessage( "index: (1/3) external sort" , d->stats.nrecords , 10 ) );
SortPhaseOne _ours;
SortPhaseOne *phase1 = precalced;
if( phase1 == 0 ) {
phase1 = &_ours;
SortPhaseOne& p1 = *phase1;
shared_ptr<Cursor> c = theDataFileMgr.findAll(ns);
p1.sorter.reset( new BSONObjExternalSorter(idx.idxInterface(), order) );
p1.sorter->hintNumObjects( d->stats.nrecords );
const IndexSpec& spec = idx.getSpec();
while ( c->ok() ) {//每读出1000条索引使用快排对其排序,然后输出到一个文件中
BSONObj o = c->current();//记录文件句柄
DiskLoc loc = c->currLoc();
p1.addKeys(spec, o, loc);
c->advance();
pm.hit();
if ( logLevel > 1 && p1.n % 10000 == 0 ) {
printMemInfo( "\t iterating objects" );
}
};
}
pm.finished();
BSONObjExternalSorter& sorter = *(phase1->sorter);
// Ensure the index and external sorter have a consistent index interface (and sort order).
fassert( 16408, &idx.idxInterface() == &sorter.getIndexInterface() );
if( phase1->multi )//multi索引
d->setIndexIsMultikey(ns, idxNo);
if ( logLevel > 1 ) printMemInfo( "before final sort" );
phase1->sorter->sort();
if ( logLevel > 1 ) printMemInfo( "after final sort" );
log(t.seconds() > 5 ? 0 : 1) << "\t external sort used : " << sorter.numFiles() << " files " << " in " << t.seconds() << " secs" << endl;
set<DiskLoc> dupsToDrop;
/* build index --- */
if( idx.version() == 0 )//mongodb有两种索引,V0和V1,其中目前使用V1,10gen称V1索引比V0小25%的空间,V0是为了兼容2.0以前版本的系统,目前默认使用V1
buildBottomUpPhases2And3<V0>(dupsAllowed, idx, sorter, dropDups, dupsToDrop, op, phase1, pm, t);
else if( idx.version() == 1 ) //从sorter中读出输入并且将其插入到索引btree中,因为之前排过序,这里只需要顺序读出并做插入操作就行
buildBottomUpPhases2And3<V1>(dupsAllowed, idx, sorter, dropDups, dupsToDrop, op, phase1, pm, t);
else
verify(false);
if( dropDups ) //设置了unique且数据有重复,将重复的数据删除
log() << "\t fastBuildIndex dupsToDrop:" << dupsToDrop.size() << endl;
for( set<DiskLoc>::iterator i = dupsToDrop.begin(); i != dupsToDrop.end(); i++ ){
theDataFileMgr.deleteRecord( ns, i->rec(), *i, false /* cappedOk */ , true /* noWarn */ , isMaster( ns ) /* logOp */ );
getDur().commitIfNeeded();
}
return phase1->n;
}
这里所有的关于数据的插入操作讲解完毕,流程很简单,比上查询操作来说太简单了.需要注意

的是sparse索引和索引的建立,当已存储数据是建立索引为unique时重复的数据都会被删除.

原文链接:mongodb源码分析(十)数据的插入

作者: yhjj0108,杨浩
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: