您的位置:首页 > Web前端 > Node.js

hdfs2.6.2源码学习:Day2-DataNode启动流程分析

2017-11-04 20:03 696 查看
DataNode启动流程分析

DataNode类源码学习
这个类代码比较多,就不全部浏览了,先把开头部分的参数以及构造器大致浏览一下,然后看main方法
这一句比较有意思:
* DataNodes spend their lives in an endless loop of asking
* the NameNode for something to do. A NameNode cannot connect
* to a DataNode directly; a NameNode simply returns values from
* functions invoked by a DataNode.

main方法
public static void
main(String args[]) {
//判断参数是不是打印帮助信息,如果是,打印帮助信息并退出
if
(DFSUtil.parseHelpArgument(args,
DataNode.USAGE,
System.out,
true)) {
System.exit(0);
}

secureMain(args,
null);
}

secureMain调用了createDataNode方法
public static
DataNode
createDataNode(String args[],
Configuration conf,
SecureResources
resources) throws
IOException {
//创建datanode
DataNode dn
= instantiateDataNode(args,
conf,
resources);
if
(dn !=
null) {
//启动datanode内部服务
dn.runDatanodeDaemon();
}
return
dn;
}

instantiateDataNode方法
public static
DataNode
instantiateDataNode(String args [],
Configuration conf,
SecureResources
resources) throws
IOException {
if
(conf ==
null)
conf =
new
HdfsConfiguration();

if
(args !=
null) {
// parse generic
hadoop options
GenericOptionsParser
hParser = new
GenericOptionsParser(conf,
args);
args = hParser.getRemainingArgs();
}

if
(!parseArguments(args,
conf)) {
printUsage(System.err);
return null;
}
//取得datanode的实际数据存放位置
Collection<StorageLocation>
dataLocations = getStorageLocations(conf);
UserGroupInformation.setConfiguration(conf);
SecurityUtil.login(conf,
DFS_DATANODE_KEYTAB_FILE_KEY,
DFS_DATANODE_KERBEROS_PRINCIPAL_KEY);
//创建实例
return
makeInstance(dataLocations,
conf,
resources);
}

makeInstance方法
static
DataNode
makeInstance(Collection<StorageLocation>
dataDirs,
Configuration
conf,
SecureResources resources)
throws
IOException {
LocalFileSystem localFS = FileSystem.getLocal(conf);
//本地目录权限,默认是700
FsPermission
permission = new
FsPermission(
conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
//用来创建本地目录和检查目录权限
DataNodeDiskChecker
dataNodeDiskChecker =
new
DataNodeDiskChecker(permission);
//返回经过检查后没问题的目录(检查过程中可能会创建该目录)
List<StorageLocation>
locations =
checkStorageLocations(dataDirs,
localFS,
dataNodeDiskChecker);
DefaultMetricsSystem.initialize("DataNode");

assert
locations.size() >
0
:
"number of data directories should be > 0";
//创建dataNode对象
return
new DataNode(conf,
locations,
resources);
}

DataNode构造方法
DataNode(final
Configuration conf,
final
List<StorageLocation> dataDirs,
final
SecureResources resources)
throws
IOException {
super(conf);
this.lastDiskErrorCheck
=
0;
this.maxNumberOfBlocksToLog
= conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
//配置各种参数
this.usersWithLocalPathAccess
= Arrays.asList(
conf.getTrimmedStrings(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY));
this.connectToDnViaHostname
= conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
this.getHdfsBlockLocationsEnabled
= conf.getBoolean(
DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,

DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
this.supergroup
= conf.get(DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
//是否启用权限
this.isPermissionEnabled
= conf.getBoolean(
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);

confVersion
=
"core-"
+
conf.get("hadoop.common.configuration.version",
"UNSPECIFIED") +
",hdfs-"
+
conf.get("hadoop.hdfs.configuration.version",
"UNSPECIFIED");

// Determine
whether we should try to pass file descriptors to clients.
if
(conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT))
{
String reason = DomainSocket.getLoadingFailureReason();
if
(reason !=
null) {
LOG.warn("File
descriptor passing is disabled because " + reason);
this.fileDescriptorPassingDisabledReason
= reason;
}
else
{
LOG.info("File
descriptor passing is enabled.");
this.fileDescriptorPassingDisabledReason
=
null;
}
}
else
{
this.fileDescriptorPassingDisabledReason
=
"File descriptor
passing was not configured.";
LOG.debug(this.fileDescriptorPassingDisabledReason);
}

try
{
//
取hostname,优先从dfs.datanode.hostname配置取
hostName
=
getHostName(conf);
LOG.info("Configured
hostname is " +
hostName);
//
启动datanode
startDataNode(conf,
dataDirs,
resources);
}
catch
(IOException ie) {
shutdown();
throw
ie;
}
}

startDataNode方法
void
startDataNode(Configuration conf,

List<StorageLocation>
dataDirs,
SecureResources
resources
)
throws
IOException {

// settings
global for all BPs in the Data Node
this.secureResources
= resources;
synchronized
(this)
{
this.dataDirs
= dataDirs;
}
this.conf
= conf;
this.dnConf
=
new
DNConf(conf);
checkSecureConfig(dnConf,
conf,
resources);
//这个不太明白是干什么的?
this.spanReceiverHost
= SpanReceiverHost.getInstance(conf);

if
(dnConf.maxLockedMemory
>
0) {
if
(!NativeIO.POSIX.getCacheManipulator().verifyCanMlock())
{
throw new
RuntimeException(String.format(
"Cannot start
datanode because the configured max locked memory" +
" size (%s) is
greater than zero and native code is not available.",
DFS_DATANODE_MAX_LOCKED_MEMORY_KEY));
}
if
(Path.WINDOWS)
{
NativeIO.Windows.extendWorkingSetSize(dnConf.maxLockedMemory);
}
else
{
long
ulimit = NativeIO.POSIX.getCacheManipulator().getMemlockLimit();
if
(dnConf.maxLockedMemory
> ulimit) {
throw new
RuntimeException(String.format(
"Cannot start
datanode because the configured max locked memory" +
" size (%s) of
%d bytes is more than the datanode's available" +
" RLIMIT_MEMLOCK
ulimit of %d bytes.",
DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
dnConf.maxLockedMemory,
ulimit));
}
}
}
LOG.info("Starting
DataNode with maxLockedMemory = " +
dnConf.maxLockedMemory);
//存储数据相关的类,规定了一些存储本地文件时的格式,比如以subdir开头的文件夹、以blk_开头的块文件等
storage
=
new
DataStorage();

// global DN
settings
registerMXBean();
//创建块文件接收器服务
initDataXceiver(conf);
//
启动web页面,默认端口是50075
startInfoServer(conf);
//
启动一个java监控线程,监控java虚拟机(因为垃圾回收等)暂停的次数,
//
内部实现是一个线程每隔一段时间就休眠一下,如果休眠时间明显比规定时间长,则判断jvm暂停过
pauseMonitor
=
new
JvmPauseMonitor(conf);
pauseMonitor.start();
//
为每个block pool管理一个BlockTokenSecretManager
// BlockPoolTokenSecretManager is required to create ipc server.
this.blockPoolTokenSecretManager
=
new
BlockPoolTokenSecretManager();

// Login is done
by now. Set the DN user name.
dnUserName
= UserGroupInformation.getCurrentUser().getShortUserName();
LOG.info("dnUserName
= " +
dnUserName);
LOG.info("supergroup
= " +
supergroup);
//初始化ipc服务(用于通信)
initIpcServer(conf);
//监控数据
metrics
= DataNodeMetrics.create(conf,
getDisplayName());
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
//
管理BPOfferService相关
blockPoolManager
=
new
BlockPoolManager(this);
//
这里向namenode注册了自己,并开始轮询namenode请求待执行命令?
blockPoolManager.refreshNamenodes(conf);

// Create the
ReadaheadPool from the DataNode context so we can
// exit without having to explicitly shutdown its thread pool.
readaheadPool
= ReadaheadPool.getInstance();
saslClient
=
new
SaslDataTransferClient(dnConf.conf,

dnConf.saslPropsResolver,
dnConf.trustedChannelResolver);
saslServer
=
new
SaslDataTransferServer(dnConf,
blockPoolTokenSecretManager);
}

创建完DataNode后,调用runDatanodeDaemon方法
public void
runDatanodeDaemon()
throws
IOException {
// blockPoolManager服务启动,并开始轮询namenode请求待执行命令?
blockPoolManager.startAll();

//
数据传输服务启动
//
start dataXceiveServer
dataXceiverServer.start();
if
(localDataXceiverServer
!=
null) {
localDataXceiverServer.start();
}
// ipc服务启动
ipcServer.start();
//
插件启动
startPlugins(conf);
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hdfs hadoop 源码