您的位置:首页 > Web前端 > Node.js

Hadoop源码分析_NameNode regular初始化 (FSDirectory详解)

2013-10-27 07:01 501 查看
FSNamesystem private void initialize(NameNode nn, Configuration conf) throws IOException对FSDirectory进行了如下调用

this.dir = new FSDirectory(this, conf);

StartupOption startOpt = NameNode.getStartupOption(conf);

this.dir.loadFSImage(getNamespaceDirs(conf),

getNamespaceEditsDirs(conf), startOpt);

我们下面对FSDirectory的loadFsImage的流程进行分析

前提:${dfs.name.dir}目录有数据存在

dataDIrs:[/usr/local/hadoop/tmp/dfs/name]

editsDirs:[/usr/local/hadoop/tmp/dfs/name]

startOpt: -regular

void loadFSImage(Collection<File> dataDirs,

Collection<File> editsDirs,

StartupOption startOpt) throws IOException {

// format before starting up if requested

if (startOpt == StartupOption.FORMAT) {////
设置FSImage映像文件文件的存储目录:${dfs.name.dir},默认是/tmp/hadoop/dfs/name,是一个目录数组

fsImage.setStorageDirectories(dataDirs, editsDirs);

fsImage.format();

startOpt = StartupOption.REGULAR;

}

try {

if (fsImage.recoverTransitionRead(dataDirs, editsDirs, startOpt)) { //
根据启动选项及其对应存储目录(${dfs.name.dir}),分析存储目录,必要的话从先前的事务恢复过来

fsImage.saveNamespace(true);

}

FSEditLog editLog = fsImage.getEditLog();

assert editLog != null : "editLog must be initialized";

if (!editLog.isOpen())

editLog.open();

fsImage.setCheckpointDirectories(null, null);

} catch(IOException e) {

fsImage.close();

throw e;

}

synchronized (this) {

this.ready = true;

this.nameCache.initialized();

this.notifyAll();

}

}

1、fsImage.recoverTransitionRead(dataDirs, editsDirs, startOpt)根据启动选项及其对应存储目录(${dfs.name.dir}),分析存储目录,必要的话从先前的事务恢复过来

代码如下:

/**

* Analyze storage directories.

* Recover from previous transitions if required.

* Perform fs state transition if necessary depending on the namespace info.

* Read storage info.

*

* @param dataDirs

* @param startOpt startup option

* @throws IOException

* @return true if the image needs to be saved or false otherwise

*/

boolean recoverTransitionRead(Collection<File> dataDirs,

Collection<File> editsDirs,

StartupOption startOpt

) throws IOException {

assert startOpt != StartupOption.FORMAT :

"NameNode formatting should be performed before reading the image";

// none of the data dirs exist

if (dataDirs.size() == 0 || editsDirs.size() == 0)

throw new IOException(

"All specified directories are not accessible or do not exist.");

if(startOpt == StartupOption.IMPORT

&& (checkpointDirs == null || checkpointDirs.isEmpty()))

throw new IOException("Cannot import image from a checkpoint. "

+ "\"fs.checkpoint.dir\" is not set." );

if(startOpt == StartupOption.IMPORT

&& (checkpointEditsDirs == null || checkpointEditsDirs.isEmpty()))

throw new IOException("Cannot import image from a checkpoint. "

+ "\"fs.checkpoint.edits.dir\" is not set." );

setStorageDirectories(dataDirs, editsDirs);

// 1. For each data directory calculate its state and

// check whether all is consistent before transitioning.

Map<StorageDirectory, StorageState> dataDirStates =

new HashMap<StorageDirectory, StorageState>();

boolean isFormatted = false;

for (Iterator<StorageDirectory> it =

dirIterator(); it.hasNext();) {

StorageDirectory sd = it.next();

StorageState curState;

try {

curState = sd.analyzeStorage(startOpt);

// sd is locked but not opened

switch(curState) {

case NON_EXISTENT:

// name-node fails if any of the configured storage dirs are missing

throw new InconsistentFSStateException(sd.getRoot(),

"storage directory does not exist or is not accessible.");

case NOT_FORMATTED:

break;

case NORMAL:

break;

default: // recovery is possible

sd.doRecover(curState);

}

if (curState != StorageState.NOT_FORMATTED

&& startOpt != StartupOption.ROLLBACK) {

sd.read(); // read and verify consistency with other directories

isFormatted = true;

}

if (startOpt == StartupOption.IMPORT && isFormatted)

// import of a checkpoint is allowed only into empty image directories

throw new IOException("Cannot import image from a checkpoint. "

+ " NameNode already contains an image in " + sd.getRoot());

} catch (IOException ioe) {

sd.unlock();

throw ioe;

}

dataDirStates.put(sd,curState);

}

if (!isFormatted && startOpt != StartupOption.ROLLBACK

&& startOpt != StartupOption.IMPORT)

throw new IOException("NameNode is not formatted.");

if (layoutVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION) {

checkVersionUpgradable(layoutVersion);

}

if (startOpt != StartupOption.UPGRADE

&& layoutVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION

&& layoutVersion != FSConstants.LAYOUT_VERSION)

throw new IOException(

"\nFile system image contains an old layout version " + layoutVersion

+ ".\nAn upgrade to version " + FSConstants.LAYOUT_VERSION

+ " is required.\nPlease restart NameNode with -upgrade option.");

// check whether distributed upgrade is reguired and/or should be continued

verifyDistributedUpgradeProgress(startOpt);

// 2. Format unformatted dirs.

this.checkpointTime = 0L;

for (Iterator<StorageDirectory> it =

dirIterator(); it.hasNext();) {

StorageDirectory sd = it.next();

StorageState curState = dataDirStates.get(sd);

switch(curState) {

case NON_EXISTENT:

assert false : StorageState.NON_EXISTENT + " state cannot be here";

case NOT_FORMATTED:

LOG.info("Storage directory " + sd.getRoot() + " is not formatted.");

LOG.info("Formatting ...");

sd.clearDirectory(); // create empty currrent dir

break;

default:

break;

}

}

// 3. Do transitions

switch(startOpt) {

case UPGRADE:

doUpgrade();

return false; // upgrade saved image already

case IMPORT:

doImportCheckpoint();

return true;

case ROLLBACK:

doRollback();

break;

case REGULAR:

// just load the image

}

return loadFSImage();

}

--1 fsImage在这个方法中过滤editsDirs,从中remove在dataDirs中存在的存储目录。然后对每一个存储目录进行addStorageDir(new StorageDirectory(dirName, dirType));

从而添加到protected List<StorageDirectory> storageDirs = new ArrayList<StorageDirectory>();

storageDIrs是Storage.java的变量,FSImage extends Storage

--2 For each data directory calculate its state and check whether all is consistent before transitioning.

先来看我们用到的analyzeStorage方法,简单的介绍下其中的逻辑。首先判断root这个路径是否已经存在,如果不存在并且目前执行的是StartupOption.FORMAT格式化命令的话,返回StorageState.NON_EXISTENT。在判断root是否是目录或者是否可写,如果有一个不满足返回StorageState.NON_EXISTENT。如果配置的路径存在,首先对文件进行加锁,对文件加锁的一个常用技巧就是建立一个空的filelock文件,将所加在这个空白文件上,通过对这个文件的加锁控制一个目录的同步行为。之后判断个各种状态产什么的标志性文件是否存在,根据不同文件存在的情况返回不同的状态。根据不同的返回状态,同样是该类中的doRecover方法进行处理,从这部分就可以看出该函数的主要功能就是根据目录当前所处的状态,将目录回复到某个稳定的状态。

变量值:/usr/local/hadoop/tmp/dfs/name代码如下:

/**

* Check consistency of the storage directory

*

* @param startOpt a startup option.

*

* @return state {@link StorageState} of the storage directory

* @throws InconsistentFSStateException if directory state is not

* consistent and cannot be recovered.

* @throws IOException

*/

public StorageState analyzeStorage(StartupOption startOpt) throws IOException {

assert root != null : "root is null";

String rootPath = root.getCanonicalPath();

try { // check that storage exists

if (!root.exists()) {

// storage directory does not exist

if (startOpt != StartupOption.FORMAT) {

LOG.info("Storage directory " + rootPath + " does not exist.");

return StorageState.NON_EXISTENT;

}

LOG.info(rootPath + " does not exist. Creating ...");

if (!root.mkdirs())

throw new IOException("Cannot create directory " + rootPath);

}

// or is inaccessible

if (!root.isDirectory()) {

LOG.info(rootPath + "is not a directory.");

return StorageState.NON_EXISTENT;

}

if (!root.canWrite()) {

LOG.info("Cannot access storage directory " + rootPath);

return StorageState.NON_EXISTENT;

}

} catch(SecurityException ex) {

LOG.info("Cannot access storage directory " + rootPath, ex);

return StorageState.NON_EXISTENT;

}

this.lock(); // lock storage if it exists

if (startOpt == HdfsConstants.StartupOption.FORMAT)

return StorageState.NOT_FORMATTED;

if (startOpt != HdfsConstants.StartupOption.IMPORT) {

//make sure no conversion is required

checkConversionNeeded(this);

}

// check whether current directory is valid

File versionFile = getVersionFile();

boolean hasCurrent = versionFile.exists();

// check which directories exist

boolean hasPrevious = getPreviousDir().exists();

boolean hasPreviousTmp = getPreviousTmp().exists();

boolean hasRemovedTmp = getRemovedTmp().exists();

boolean hasFinalizedTmp = getFinalizedTmp().exists();

boolean hasCheckpointTmp = getLastCheckpointTmp().exists();

if (!(hasPreviousTmp || hasRemovedTmp

|| hasFinalizedTmp || hasCheckpointTmp)) {

// no temp dirs - no recovery

if (hasCurrent)

return StorageState.NORMAL;

if (hasPrevious)

throw new InconsistentFSStateException(root,

"version file in current directory is missing.");

return StorageState.NOT_FORMATTED;

}

if ((hasPreviousTmp?1:0) + (hasRemovedTmp?1:0)

+ (hasFinalizedTmp?1:0) + (hasCheckpointTmp?1:0) > 1)

// more than one temp dirs

throw new InconsistentFSStateException(root,

"too many temporary directories.");

// # of temp dirs == 1 should either recover or complete a transition

if (hasCheckpointTmp) {

return hasCurrent ? StorageState.COMPLETE_CHECKPOINT

: StorageState.RECOVER_CHECKPOINT;

}

if (hasFinalizedTmp) {

if (hasPrevious)

throw new InconsistentFSStateException(root,

STORAGE_DIR_PREVIOUS + " and " + STORAGE_TMP_FINALIZED

+ "cannot exist together.");

return StorageState.COMPLETE_FINALIZE;

}

if (hasPreviousTmp) {

if (hasPrevious)

throw new InconsistentFSStateException(root,

STORAGE_DIR_PREVIOUS + " and " + STORAGE_TMP_PREVIOUS

+ " cannot exist together.");

if (hasCurrent)

return StorageState.COMPLETE_UPGRADE;

return StorageState.RECOVER_UPGRADE;

}

assert hasRemovedTmp : "hasRemovedTmp must be true";

if (!(hasCurrent ^ hasPrevious))

throw new InconsistentFSStateException(root,

"one and only one directory " + STORAGE_DIR_CURRENT

+ " or " + STORAGE_DIR_PREVIOUS

+ " must be present when " + STORAGE_TMP_REMOVED

+ " exists.");

if (hasCurrent)

return StorageState.COMPLETE_ROLLBACK;

return StorageState.RECOVER_ROLLBACK;

}

--3 sd.read(); // read and verify consistency with other directories

isFormatted = true; sd.read(); // read and verify consistency with other directories

--4 // check whether distributed upgrade is reguired and/or should be continued

verifyDistributedUpgradeProgress(startOpt);

--5 Format unformatted dirs.

this.checkpointTime = 0L;

for (Iterator<StorageDirectory> it =

dirIterator(); it.hasNext();) {

StorageDirectory sd = it.next();

StorageState curState = dataDirStates.get(sd);

switch(curState) {

case NON_EXISTENT:

assert false : StorageState.NON_EXISTENT + " state cannot be here";

case NOT_FORMATTED:

LOG.info("Storage directory " + sd.getRoot() + " is not formatted.");

LOG.info("Formatting ...");

sd.clearDirectory(); // create empty currrent dir

break;

default:

break;

}

}

--6 // 3. Do transitions

switch(startOpt) {

case UPGRADE:

doUpgrade();

return false; // upgrade saved image already

case IMPORT:

doImportCheckpoint();

return true;

case ROLLBACK:

doRollback();

break;

case REGULAR:

// just load the image

}

return loadFSImage();

}

loadFSImage();是一个重点,需要好好研究
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: