您的位置:首页 > Web前端 > Node.js

HDFS1.0源代码解析—DataNode启动(一)

2012-05-03 13:53 274 查看
DataNode的启动实在DataNode.java中进行的,具体启动流程如下:

public static void main(String args[]) {
secureMain(args, null);
}

在secureMain中调用createDataNode方法,该方法

public static DataNode createDataNode(String args[],
Configuration conf, SecureResources resources) throws IOException {
DataNode dn = instantiateDataNode(args, conf, resources);
runDatanodeDaemon(dn);
return dn;
}
利用instantiateDataNode创建DataNode 对象,通过runDatanodeDaemon
dn.register();
dn.dataNodeThread = new Thread(dn, dnThreadName);
dn.dataNodeThread.setDaemon(true); // needed for JUnit testing
dn.dataNodeThread.start();

进行DataNode注册,创建线程,设置守护线程,启动线程。

以上就是DataNode主线程启动的过程。

其中,创建创建DataNode 对象的过程如下:

在函数instantiateDataNode中, 调用makeInstance函数

//获取配置文件中的dfs.data.dir配置的文件路径
String[] dataDirs = conf.getStrings(DATA_DIR_KEY);
makeInstance(dataDirs, conf, resources);
在makeInstance函数中,创建dfs.data.dir指定的文件路径,确保至少有一个路径可以被创建,然后调用 DataNode的构造函数。具体如下:

for (String dir : dataDirs) {
try {
//进行用户权限的查询
DiskChecker.checkDir(localFS, new Path(dir), dataDirPermission);
//dir存储检查之后的存储文件目录
dirs.add(new File(dir));
} catch(IOException e) {
LOG.warn("Invalid directory in " + DATA_DIR_KEY +  ": " +
e.getMessage());
}
}
if (dirs.size() > 0)//至少有一个路径是可创建的
return new DataNode(conf, dirs, resources);
DataNode构造函数中调用 startDataNode根据具体配置文件的信息进行具体的初始化过程。 startDataNode具体执行流程如下:

InetSocketAddress nameNodeAddr = NameNode.getServiceAddress(conf, true);

获取配置文件中fs.default.name指定的IP和端口。

// connect to name node
this.namenode = (DatanodeProtocol)
RPC.waitForProxy(DatanodeProtocol.class,
DatanodeProtocol.versionID,
nameNodeAddr,
conf);//通过Hadoop的RPC机制与NameNode进行连接,获取namenode变量,这是DataNode与NameNode进行交互的工具
// get version and id info from the name-node
NamespaceInfo nsInfo = handshake();
StartupOption startOpt = getStartupOption(conf);//获取DataNode启动的模式是regular还是format,如果是第一次启动必须指定format,不指定默认是regular


static public enum StartupOption{
FORMAT  ("-format"),
REGULAR ("-regular"),
UPGRADE ("-upgrade"),
ROLLBACK("-rollback"),
FINALIZE("-finalize"),
IMPORT  ("-importCheckpoint");
这些是DataNode可能处于的一些状态。具体状态的切换见另外一篇日志的分析/article/8931642.html
boolean simulatedFSDataset =
conf.getBoolean("dfs.datanode.simulateddatastorage", false);
if (simulatedFSDataset) {

这段代码判断是否是伪分布模式。之后会根据data中存储的文件和数据的特点判断是否需要进行恢复、升级、回滚的操作,如果没有按正常流程启动。

storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);
调用DataStorage类中的recoverTransitionRead进行检查,具体代码如下:

for(Iterator<File> it = dataDirs.iterator(); it.hasNext();) {
File dataDir = it.next();
StorageDirectory sd = new StorageDirectory(dataDir);//初始化storage中的root
StorageState curState;
try {
curState = sd.analyzeStorage(startOpt);
// sd is locked but not opened
switch(curState) {
case NORMAL:
break;
case NON_EXISTENT:
// ignore this storage
LOG.info("Storage directory " + dataDir + " does not exist.");
it.remove();
continue;
case NOT_FORMATTED: // format
LOG.info("Storage directory " + dataDir + " is not formatted.");
LOG.info("Formatting ...");
format(sd, nsInfo);
break;
default:  // recovery part is common
sd.doRecover(curState);
}
} catch (IOException ioe) {
sd.unlock();
throw ioe;
}
// add to the storage list
addStorageDir(sd);
dataDirStates.add(curState);

具体执行是在Storage.java中的analyzeStorage,通过检查返回当前所处的状态,在switch分支中不能处理的分支交与doRecover方法实现。处理完之后会根据StartupOption进行响应的处理,具体由 doTransition(getStorageDir(idx), nsInfo, startOpt);方法执行,该方法的详细解释在另外一篇博客/article/8931642.html,最后执行this.writeAll();
将元信息写入磁盘。

当前DataNode可能处于一下状态:

public enum StorageState {
NON_EXISTENT,//数据不存在,不做处理
NOT_FORMATTED,//没有进行初始化
COMPLETE_UPGRADE,//完成升级
RECOVER_UPGRADE,//取消升级
COMPLETE_FINALIZE,//完成提交
COMPLETE_ROLLBACK,//取消提交
RECOVER_ROLLBACK,//取消回滚
COMPLETE_CHECKPOINT,//NameNode用到,完成checkpoint的恢复
RECOVER_CHECKPOINT,//取消checkpoint恢复
NORMAL;//正常启动
}
analyzeStorage函数是Storage.java中比较重要的一个函数,主要的实现流程是:首先,根据数据目录是否存在和StartupOption选项判断状态是数据不存在还是没有进行初始化。然后根据下面的状态判断DataNode处于的状态,将获得的状态信息返回给recoverTransitionRead进行处理。

// check whether current directory is valid
File versionFile = getVersionFile();//获取current下的VERSION文件
boolean hasCurrent = versionFile.exists();

// check which directories exist
boolean hasPrevious = getPreviousDir().exists();
boolean hasPreviousTmp = getPreviousTmp().exists();
boolean hasRemovedTmp = getRemovedTmp().exists();
boolean hasFinalizedTmp = getFinalizedTmp().exists();
boolean hasCheckpointTmp = getLastCheckpointTmp().exists();
storage.recoverTransitionRead函数分析完之后,我们继续分析下一个函数 this.data = new FSDataset(storage, conf);在FSDataset构造函数中,主要完成对current目录下文件的遍历,建立volumes、volumeMap、roots。其中volumes是FSVolumeSet对象,用于管理各个FSVolume,每一个data的存储目录(即dfs.data.dirFSVolume指定的每个文件路径)对应一个FSVolume对象,FSVolume对象会记录对应路径的磁盘使用情况,以及 dataDir ,其中this.dataDir = new FSDir(currentDir);管理FSVolume对象对应磁盘路径的目录结构。volumeMap是HashMap<Block,DatanodeBlockInfo>类型的对象,建立的是Block和DatanodeBlockInfo之间的对应关系,其中Block记录blockid、block大小、generationStamp,DatanodeBlockInfo记录对应的FSVolume,block的file对象。

























                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: