您的位置:首页 > 数据库 > Mongodb

MongoDB源码阅读之Shard源码分析--CongfigServer启动

2012-12-30 10:41 741 查看

1. 名词解释

Shards : 每一个shard包括一个或多个服务和存储数据的mongod进程(mongod是MongoDB数据的核心进程)典型的每个shard开启多个服务来提高服务的可用性。这些服务/mongod进程在shard中组成一个复制集

Chunks: Chunk是一个来自特殊集合中的一个数据范围,(collection,minKey,maxKey)描叙一个chunk,它介于minKey和maxKey范围之间。例如chunks 的maxsize大小是100M,如果一个文件达到或超过这个范围时,会被切分到2个新的chunks中。当一个shard的数据过量时,chunks将会被迁移到其他的shards上。同样,chunks也可以迁移到其他的shards上

Config Servers : Config服务器存储着集群的metadata信息,包括每个服务器,每个shard的基本信息和chunk信息Config服务器主要存储的是chunk信息。每一个config服务器都复制了完整的chunk信息。

一个完整的MongoDB集群应该包含多个shards,每个shard包含多个replSets互相做备份。单个数据有大小之分,被分配到不同的Chunk之中。当一个shard的存储空间不够时,会将Chunks分配到其他Shard上。这些信息被分开后都要有所记录,这些记录存储在Congfig Servers上做查询的依据。

从这里开始源码研究的范畴变为mongos。

2. 代码结构

shard.h

chunk.h

config.h

以上三个对应结构的类。

strategy.h        //分配调整策略基类

strategy_shard.cpp       //shard分配调整策略的实现

parallel.h        //并行传递消息,分片等操作的工具类

server.h         //启动configserver的管理类

balance.h        //chunk分块的策划者

connpool.h       //管理mongos中的各种连接


3. 各部分结构图

a) Chunk相关的类图

Chunk类:存储数据的起点、终点以及在哪一个shard上,实现了分割chunk,计算块大小等方法。

ChunkRange类:记录在同一个shard上的连续的chunk节点的起点和终点。

ChunkRangeManager类:存储ChunkRange的集合。

ChunkManager类:存储了当前shard上的所有的Chunk集合和一个ChunkRangeManager集合。

b) Shrad相关的类

Shard类:记录了shard的名称、地址、大小等信息。

ShardStatus类:存储了Shard对象,和服务器的一些信息。

StaticShradInfo类:内部类(单件),记录了当前体系中的所有Shard 节点信息及状态,接口供Shard对象调用。

c) Config server相关的类

DBConfig:记录各个Shard之间的连接状态。

ConfigServer:继承DBConfig

4. Mongos server初始化

Mongos的入口在server.cpp文件中。

程序首先对各个参数进行分割存储,之后执行函数runMongosServer。

static bool runMongosServer( bool doUpgrade ) {

//……

// set some global state

//添加连接回调

pool.addHook( new ShardingConnectionHook( false ) );

pool.setName( "mongos connectionpool" );

shardConnectionPool.addHook( new ShardingConnectionHook( true ) );

shardConnectionPool.setName( "mongos shardconnection connectionpool" );

// Mongos shouldn't lazily kill cursors, otherwise we can end up with extras from migration

DBClientConnection::setLazyKillCursor( false );

ReplicaSetMonitor::setConfigChangeHook( boost::bind( &ConfigServer::replicaSetChange , &configServer , _1 ) );

//初始化ConfigServer,将参数中的配置的config server添加到列表中

if ( ! configServer.init( configdbs ) ) {

log() << "couldn't resolve config db address" << endl;

return false;

}

//检查主shard地址,以及各个config server节点的连接状况

if ( ! configServer.ok( true ) ) {

log() << "configServer connection startup check failed" << endl;

return false;

}

//每60秒检查检查一回各节点连接情况

{

class CheckConfigServers : public task::Task {

virtual string name() const { return "CheckConfigServers"; }

virtual void doWork() { configServer.ok(true); }

};

task::repeat(new CheckConfigServers, 60*1000);

}

//检查config sever的版本,下面详细解释

int configError = configServer.checkConfigVersion( doUpgrade );

//传出的configError并不一定是代表错误,而是表示当前server的状态

//总体说来,当configError为0的时候,需要将自己的作为configServer的管理者

//其他时候就直接退出了。一次mongos的工作到此结束。

if ( configError ) {

if ( configError > 0 ) {

log() << "upgrade success!" << endl;

}

else {

log() << "config server error: " << configError << endl;

}

return false;

}

configServer.reloadSettings();

//设置响应系统信号的函数

init();

#if !defined(_WIN32)

CmdLine::launchOk();

#endif

if ( !noHttpInterface )

boost::thread web( boost::bind(&webServerThread, new NoAdminAccess() /* takes ownership */) );

//启动消息Server,监听各端口socket连接

MessageServer::Options opts;

opts.port = cmdLine.port;

opts.ipList = cmdLine.bind_ip;

start(opts);

// listen() will return when exit code closes its socket.

dbexit( EXIT_NET_ERROR );

return true;

}


5. 检查config server的版本(ConfigServer::checkConfigVersion)

int ConfigServer::checkConfigVersion( bool upgrade ) {

//访问shard服务器返回config version:其中0为初始化,1为已经连接上shard或者数据库,2为自己需要更新,3表示环境中有主config server,且不需要更新

int cur = dbConfigVersion();

if ( cur == VERSION )

return 0;

//如正在初始化,即环境里没有config server,则自己变为主config server,并通知shard

if ( cur == 0 ) {

scoped_ptr<ScopedDbConnection> conn(

ScopedDbConnection::getInternalScopedDbConnection( _primary.getConnString() ) );

// If the cluster has not previously been initialized, we need to set the version before using so

// subsequent mongoses use the config data the same way.  This requires all three config servers online

// initially.

try {

conn->get()->insert( "config.version" , BSON( "_id" << 1 << "version" << VERSION ) );

}

catch( DBException& ){

error() << "All config servers must initially be reachable for the cluster to be initialized." << endl;

throw;

}

pool.flush();

verify( VERSION == dbConfigVersion( conn->conn() ) );

conn->done();

return 0;

}

//需要更新

if ( cur == 2 ) {

// need to upgrade

verify( VERSION == 3 );

if ( ! upgrade ) {

log() << "newer version of mongo meta data\n"

<< "need to --upgrade after shutting all mongos down"

<< endl;

return -9;

}

scoped_ptr<ScopedDbConnection> connPtr(

ScopedDbConnection::getInternalScopedDbConnection( _primary.getConnString() ) );

ScopedDbConnection& conn = *connPtr;

// do a backup

string backupName;

{

stringstream ss;

ss << "config-backup-" << terseCurrentTime(false);

backupName = ss.str();

}

log() << "backing up config to: " << backupName << endl;

conn->copyDatabase( "config" , backupName );

//......

//更新shard,database和chunk

conn->update( "config.version" , BSONObj() , BSON( "_id" << 1 << "version" << VERSION ) );

conn.done();

pool.flush();

return 1;

}

log() << "don't know how to upgrade " << cur << " to " << VERSION << endl;

return -8;

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: