mongodb源码分析(十)数据的插入
2012-12-10 21:08
561 查看
本文我们分析mongodb中数据的插入流程.插入的简单流程可以归结于如下:
1. 如果存在对应collection则从collection中分配空间,然后将数据保存到分配的空间中,不存在则先从database中分配对应的collection,若database不存在则分配database,建立xx.ns和xx.0 等文件.
2. 根据插入数据更新collection中的索引.
下面来看代码,根据前面的分析我们知道插入操作的入口函数为:receivedInsert.
的是sparse索引和索引的建立,当已存储数据是建立索引为unique时重复的数据都会被删除.
原文链接:mongodb源码分析(十)数据的插入
作者: yhjj0108,杨浩
1. 如果存在对应collection则从collection中分配空间,然后将数据保存到分配的空间中,不存在则先从database中分配对应的collection,若database不存在则分配database,建立xx.ns和xx.0 等文件.
2. 根据插入数据更新collection中的索引.
下面来看代码,根据前面的分析我们知道插入操作的入口函数为:receivedInsert.
void receivedInsert(Message& m, CurOp& op) { BSONObj first = d.nextJsObj(); vector<BSONObj> multi; while (d.moreJSObjs()){//批量数据的插入 if (multi.empty()) // first pass multi.push_back(first); multi.push_back( d.nextJsObj() ); } while ( true ) { Lock::DBWrite lk(ns); if ( handlePossibleShardedMessage( m , 0 ) ) return; Client::Context ctx(ns); if( !multi.empty() ) { const bool keepGoing = d.reservedField() & InsertOption_ContinueOnError; insertMulti(keepGoing, ns, multi);//循环调用checkAndInsert做插入操作,keepGoing为true表示当插入操作错误时继续进行插入操作,否则终止插入操作 return; } checkAndInsert(ns, first); globalOpCounters.incInsertInWriteLock(1); return; } } }
void checkAndInsert(const char *ns, /*modifies*/BSONObj& js) { uassert( 10059 , "object to insert too large", js.objsize() <= BSONObjMaxUserSize);//单条doc超过BSONObjMaxUserSize的不允许插入,可以通过修改代码的方式调整这个值 { // check no $ modifiers. note we only check top level. (scanning deep would be quite expensive) BSONObjIterator i( js );//field中不允许以'$'开始 while ( i.more() ) { BSONElement e = i.next(); uassert( 13511 , "document to insert can't have $ fields" , e.fieldName()[0] != '$' ); } } theDataFileMgr.insertWithObjMod(ns, js, false); // js may be modified in the call to add an _id field. logOp("i", ns, js);//为master/slave或者replset记录操作. }
DiskLoc DataFileMgr::insertWithObjMod(const char *ns, BSONObj &o, bool god) { bool addedID = false; DiskLoc loc = insert( ns, o.objdata(), o.objsize(), god, true, &addedID ); if( addedID && !loc.isNull() ) o = BSONObj::make( loc.rec() ); return loc; }insert是mongodb数据的插入操作,其中包括了普通数据和索引数据的插入,下面的代码中大家将看到.
DiskLoc DataFileMgr::insert(const char *ns, const void *obuf, int len, bool god, bool mayAddIndex, bool *addedID) { bool wouldAddIndex = false; { const char *sys = strstr(ns, "system."); if ( sys && !insert_checkSys(sys, ns, wouldAddIndex, obuf, god) ) return DiskLoc(); } bool addIndex = wouldAddIndex && mayAddIndex;//这是插入index的操作 NamespaceDetails *d = nsdetails(ns); if ( d == 0 ) {//当前collection并不存在,首先分配一个collection,具体操作时根据插入的数据分配一个extent,分配一个namespacedetails,并且根据记录该namespacedetails.god为false则创建一个_id的索引,将该collection记录到system.namespaces collection中 d = insert_newNamespace(ns, len, god); } NamespaceDetails *tableToIndex = 0; string tabletoidxns; BSONObj fixedIndexObject; if ( addIndex ) {//插入index的操作,如db.coll.ensureIndex({x:1})就是走这里的流程 verify( obuf ); BSONObj io((const char *) obuf); if( !prepareToBuildIndex(io, god, tabletoidxns, tableToIndex, fixedIndexObject ) ) { // prepare creates _id itself, or this indicates to fail the build silently (such // as if index already exists) return DiskLoc(); } if ( ! fixedIndexObject.isEmpty() ) { obuf = fixedIndexObject.objdata(); len = fixedIndexObject.objsize(); } } int addID = 0; // 0 if not adding _id; if adding, the length of that new element if( !god ) {//没有_id数据则生成一个_id /* Check if we have an _id field. If we don't, we'll add it. Note that btree buckets which we insert aren't BSONObj's, but in that case god==true. */ BSONObj io((const char *) obuf); BSONElement idField = io.getField( "_id" ); uassert( 10099 , "_id cannot be an array", idField.type() != Array ); // we don't add _id for capped collections in local as they don't have an _id index if( idField.eoo() && !wouldAddIndex && !str::equals( nsToDatabase( ns ).c_str() , "local" ) && d->haveIdIndex() ) { if( addedID ) *addedID = true; addID = len; idToInsert_.oid.init(); len += idToInsert.size(); } BSONElementManipulator::lookForTimestamps( io ); } int lenWHdr = d->getRecordAllocationSize( len + Record::HeaderSize );//得到分配数据的大小,这个大小值取决于是否设置了Flag_UsePowerOf2Sizes,设置了则分配的数据始终为2的n次方,可通过db.coll.runCommand({collMod:"coll","usePowerOf2Sizes":true})设置,没设置这个标志位就是需要分配的空间乘以paddingfactor,这个值会动态调整,最小为1,最大为2.具体会根据更新数据时是否原doc长度不够来调整. // If the collection is capped, check if the new object will violate a unique index // constraint before allocating space. if ( d->nIndexes && d->isCapped() && !god ) {//唯一性检查,如果创建索引时指定了unique为true. checkNoIndexConflicts( d, BSONObj( reinterpret_cast<const char *>( obuf ) ) ); } bool earlyIndex = true; DiskLoc loc; if( addID || tableToIndex || d->isCapped() ) { // if need id, we don't do the early indexing. this is not the common case so that is sort of ok earlyIndex = false;//实际的空间分配,分配的原理见/article/8409346.html loc = allocateSpaceForANewRecord(ns, d, lenWHdr, god); } else { loc = d->allocWillBeAt(ns, lenWHdr); if( loc.isNull() ) { // need to get a new extent so we have to do the true alloc now (not common case) earlyIndex = false; loc = allocateSpaceForANewRecord(ns, d, lenWHdr, god); } } if ( loc.isNull() ) { log() << "insert: couldn't alloc space for object ns:" << ns << " capped:" << d->isCapped() << endl; verify(d->isCapped()); return DiskLoc(); } if( earlyIndex ) { // add record to indexes using two step method so we can do the reading outside a write lock if ( d->nIndexes ) { verify( obuf ); BSONObj obj((const char *) obuf); try {//从要插入的数据obj中取出所有collection中的field,然后将其插入到相应的索引Btree中 indexRecordUsingTwoSteps(ns, d, obj, loc, true); } catch( AssertionException& ) { // should be a dup key error on _id index dassert( !tableToIndex && !d->isCapped() ); // no need to delete/rollback the record as it was not added yet throw; } } // really allocate now DiskLoc real = allocateSpaceForANewRecord(ns, d, lenWHdr, god); verify( real == loc ); } Record *r = loc.rec(); {//将实际的数据复制到空间中 verify( r->lengthWithHeaders() >= lenWHdr ); r = (Record*) getDur().writingPtr(r, lenWHdr); if( addID ) { /* a little effort was made here to avoid a double copy when we add an ID */ ((int&)*r->data()) = *((int*) obuf) + idToInsert.size(); memcpy(r->data()+4, idToInsert.rawdata(), idToInsert.size()); memcpy(r->data()+4+idToInsert.size(), ((char *)obuf)+4, addID-4); } else { if( obuf ) // obuf can be null from internal callers memcpy(r->data(), obuf, len); } } addRecordToRecListInExtent(r, loc);//更新extent链表信息 /* durability todo : this could be a bit annoying / slow to record constantly */ { NamespaceDetails::Stats *s = getDur().writing(&d->stats); s->datasize += r->netLength(); s->nrecords++; } // we don't bother resetting query optimizer stats for the god tables - also god is true when adding a btree bucket if ( !god ) NamespaceDetailsTransient::get( ns ).notifyOfWriteOp(); if ( tableToIndex ) {//新添加了一个索引,这里将建立具体的索引信息 insert_makeIndex(tableToIndex, tabletoidxns, loc); } /* add this record to our indexes */ if ( !earlyIndex && d->nIndexes ) { BSONObj obj(r->data()); // not sure which of these is better -- either can be used. oldIndexRecord may be faster, // but twosteps handles dup key errors more efficiently. //oldIndexRecord(d, obj, loc); indexRecordUsingTwoSteps(ns, d, obj, loc, false); } d->paddingFits(); return loc; }下面来看看2步索引插入indexRecordUsingTwoSteps函数
void indexRecordUsingTwoSteps(const char *ns, NamespaceDetails *d, BSONObj obj, DiskLoc loc, bool shouldBeUnlocked) { vector<int> multi; vector<BSONObjSet> multiKeys; IndexInterface::IndexInserter inserter; // Step 1, read phase. int n = d->nIndexesBeingBuilt();//插入索引的动作比如说有索引{x:1},则取出插入的数据如 {//{x:1,y:1},这里取出1这个数据插入到btree索引中对于比较复杂的multikey索引如插入数据 BSONObjSet keys;//为{x:[1,2,3,4,5]},则分别取出1,2,3,4,5插入到btree中并把索引x标为multikey索引 for ( int i = 0; i < n; i++ ) {//这些需要注意的是当从obj中取出的值为空时如果索引不为sparse,则将key设置为nullkey // this call throws on unique constraint violation. we haven't done any writes yet so that is fine. fetchIndexInserters(/*out*/keys, inserter, d, i, obj, loc);//这里就是从obj中取出索引域的值,然后造出该值应该插入到索引的位置 if( keys.size() > 1 ) {//多值索引 multi.push_back(i); multiKeys.push_back(BSONObjSet()); multiKeys[multiKeys.size()-1].swap(keys); } keys.clear(); } }//完成具体的插入动作 inserter.finishAllInsertions(); // Step 2, write phase. // now finish adding multikeys for( unsigned j = 0; j < multi.size(); j++ ) {//多值索引的插入部分 unsigned i = multi[j]; BSONObjSet& keys = multiKeys[j]; IndexDetails& idx = d->idx(i); IndexInterface& ii = idx.idxInterface(); Ordering ordering = Ordering::make(idx.keyPattern()); d->setIndexIsMultikey(ns, i);//将该索引标识为多值索引 for( BSONObjSet::iterator k = ++keys.begin()/*skip 1*/; k != keys.end(); k++ ) { try { ii.bt_insert(idx.head, loc, *k, ordering, !idx.unique(), idx); } catch (AssertionException& e) { if( e.getCode() == 10287 && (int) i == d->nIndexes ) { DEV log() << "info: caught key already in index on bg indexing (ok)" << endl; } else { /* roll back previously added index entries note must do self index as it is multikey and could require some cleanup itself */ for( int j = 0; j < n; j++ ) { try { _unindexRecord(d->idx(j), obj, loc, false); } catch(...) { log(3) << "unindex fails on rollback after unique key constraint prevented insert\n"; } } throw; } } } } }最后来看看insert_makeIndex.
void NOINLINE_DECL insert_makeIndex(NamespaceDetails *tableToIndex, const string& tabletoidxns, const DiskLoc& loc) { BSONObj info = loc.obj(); bool background = info["background"].trueValue(); if( background && cc().isSyncThread() ) { /* don't do background indexing on slaves. there are nuances. this could be added later but requires more code. */ log() << "info: indexing in foreground on this replica; was a background index build on the primary" << endl; background = false; } int idxNo = tableToIndex->nIndexes;//添加索引结构到namespacedetail或者namespacedetail::extra中 IndexDetails& idx = tableToIndex->addIndex(tabletoidxns.c_str(), !background); // clear transient info caches so they refresh; increments nIndexes getDur().writingDiskLoc(idx.info) = loc; try {//根据新创建的索引快速建立该collection的索引 buildAnIndex(tabletoidxns, tableToIndex, idx, idxNo, background); } }insert_makeIndex->buildAnIndex
void buildAnIndex(string ns, NamespaceDetails *d, IndexDetails& idx, int idxNo, bool background) { if( inDBRepair || !background ) {//同步建立索引 n = fastBuildIndex(ns.c_str(), d, idx, idxNo); verify( !idx.head.isNull() ); } else {//开启一个线程来建立索引 BackgroundIndexBuildJob j(ns.c_str()); n = j.go(ns, d, idx, idxNo); } }insert_makeIndex->buildAnIndex->fastBuildIndex
unsigned long long fastBuildIndex(const char *ns, NamespaceDetails *d, IndexDetails& idx, int idxNo) { CurOp * op = cc().curop(); bool dupsAllowed = !idx.unique(); bool dropDups = idx.dropDups() || inDBRepair; BSONObj order = idx.keyPattern(); getDur().writingDiskLoc(idx.head).Null(); /* get and sort all the keys ----- */ ProgressMeterHolder pm( op->setMessage( "index: (1/3) external sort" , d->stats.nrecords , 10 ) ); SortPhaseOne _ours; SortPhaseOne *phase1 = precalced; if( phase1 == 0 ) { phase1 = &_ours; SortPhaseOne& p1 = *phase1; shared_ptr<Cursor> c = theDataFileMgr.findAll(ns); p1.sorter.reset( new BSONObjExternalSorter(idx.idxInterface(), order) ); p1.sorter->hintNumObjects( d->stats.nrecords ); const IndexSpec& spec = idx.getSpec(); while ( c->ok() ) {//每读出1000条索引使用快排对其排序,然后输出到一个文件中 BSONObj o = c->current();//记录文件句柄 DiskLoc loc = c->currLoc(); p1.addKeys(spec, o, loc); c->advance(); pm.hit(); if ( logLevel > 1 && p1.n % 10000 == 0 ) { printMemInfo( "\t iterating objects" ); } }; } pm.finished(); BSONObjExternalSorter& sorter = *(phase1->sorter); // Ensure the index and external sorter have a consistent index interface (and sort order). fassert( 16408, &idx.idxInterface() == &sorter.getIndexInterface() ); if( phase1->multi )//multi索引 d->setIndexIsMultikey(ns, idxNo); if ( logLevel > 1 ) printMemInfo( "before final sort" ); phase1->sorter->sort(); if ( logLevel > 1 ) printMemInfo( "after final sort" ); log(t.seconds() > 5 ? 0 : 1) << "\t external sort used : " << sorter.numFiles() << " files " << " in " << t.seconds() << " secs" << endl; set<DiskLoc> dupsToDrop; /* build index --- */ if( idx.version() == 0 )//mongodb有两种索引,V0和V1,其中目前使用V1,10gen称V1索引比V0小25%的空间,V0是为了兼容2.0以前版本的系统,目前默认使用V1 buildBottomUpPhases2And3<V0>(dupsAllowed, idx, sorter, dropDups, dupsToDrop, op, phase1, pm, t); else if( idx.version() == 1 ) //从sorter中读出输入并且将其插入到索引btree中,因为之前排过序,这里只需要顺序读出并做插入操作就行 buildBottomUpPhases2And3<V1>(dupsAllowed, idx, sorter, dropDups, dupsToDrop, op, phase1, pm, t); else verify(false); if( dropDups ) //设置了unique且数据有重复,将重复的数据删除 log() << "\t fastBuildIndex dupsToDrop:" << dupsToDrop.size() << endl; for( set<DiskLoc>::iterator i = dupsToDrop.begin(); i != dupsToDrop.end(); i++ ){ theDataFileMgr.deleteRecord( ns, i->rec(), *i, false /* cappedOk */ , true /* noWarn */ , isMaster( ns ) /* logOp */ ); getDur().commitIfNeeded(); } return phase1->n; }这里所有的关于数据的插入操作讲解完毕,流程很简单,比上查询操作来说太简单了.需要注意
的是sparse索引和索引的建立,当已存储数据是建立索引为unique时重复的数据都会被删除.
原文链接:mongodb源码分析(十)数据的插入
作者: yhjj0108,杨浩
相关文章推荐
- Mongodb源码分析--插入记录及索引B树构建
- Mongodb源码分析--插入记录及索引B树构建
- Mongodb源码分析--插入记录及索引B树构建
- mongodb源码分析(十二)数据的更新
- Mongodb源码分析--插入记录及索引B树构建
- Mongodb源码分析--插入记录及索引B树构建
- mongodb源码分析(十一)数据的删除
- Mongodb源码分析--插入记录及索引B树构建
- Mongodb源码分析--插入记录及索引B树构建
- Mongodb源码分析--删除记录
- 数据库中间件 MyCAT源码分析:【单库单表】插入
- Hadoop HDFS源码分析 读取命名空间镜像和编辑日志数据
- 数据结构:程序加图示分析单链表的插入和删除操作
- 【Netty源码分析】发送数据过程
- mosquitto 源码分析 (一)核心数据结构
- 【开源中国Android客户端】源码分析(二)网络数据传输接口
- 【Netty源码分析】数据读取过程
- MongoDB数据分析之MapReduce
- 第二人生的源码分析(十七)人物Mesh数据显示的实现
- mongodb--c# 插入数据效率