hadoop单个数据节点的不同存储路径的存储策略源码分析
2015-07-13 13:35
1011 查看
hadoop单个数据节点的不同存储路径的存储策略源码分析。
来源:IT165收集发布日期:2014-09-0120:52:50-
产生问题于数据集群的数节点存储磁盘大小不同,造成使用一段时间以后容量小的磁盘空间紧张。
其实,早期配置了磁盘使用存储策略,就能解决该问题,部分网来上说这个策略无效,再hadoop2.0.1本版有效,该版本应用于CHD4.6中。
为了找到准确的程序定位点,参考了以下的Hadoop设计文档。
参考
Hadoop中HDFS文件系统的Append/Hflush/Read设计文档:
http://blog.csdn.net/chenpingbupt/article/details/7972589文档中给出:
在一个DN的disk中,每个DN具有三个目录:currentembw,current包含finallized的replica,tmp包含temporaryreplica,rbw包含rbw,rwr,rurreplicas。当一个replica第一次被dfsclient发起请求而创建的时候,将会放到rbw中。当第一次创建是在blockreplication和clustbalance过程中发起的话,replica就会放置到tmp中。一旦一个replica被finallized,他就会被move到current中。当一个DN重启之后,tmp中的replica将会被删除,rbw中的将会被加载为rwr状态,current中的会load为finallized状态
我们就从tmp或rbw文件创建开始。
1.参见javaclassBlockPoolSlice
01.
/**
02.
*Ablockpoolslicerepresentsaportionofablockpoolstoredonavolume.
03.
*Takentogether,allBlockPoolSlicessharingablockpoolIDacrossa
04.
*clusterrepresentasingleblockpool.
05.
*
06.
*Thisclassissynchronizedby{@linkFsVolumeImpl}.
07.
*/
08.
class
BlockPoolSlice{
09.
private
final
Stringbpid;
10.
private
final
FsVolumeImplvolume;
//volumetowhichthisBlockPoolbelongsto
11.
private
final
FilecurrentDir;
//StorageDirectory/current/bpid/current
12.
private
final
LDirfinalizedDir;
//directorystoreFinalizedreplica
13.
private
final
FilerbwDir;
//directorystoreRBWreplica
14.
private
final
FiletmpDir;
//directorystoreTemporaryreplica
从类的描述中看出BlockPoolSlice是创建集群数据block的基础。
01.
/**
02.
*Temporaryfiles.Theygetmovedtothefinalizedblockdirectorywhen
03.
*theblockisfinalized.
04.
*/
05.
FilecreateTmpFile(Blockb)
throws
IOException{
06.
Filef=
new
File(tmpDir,b.getBlockName());
07.
return
DatanodeUtil.createTmpFile(b,f);
08.
}
09.
10.
/**
11.
*RBWfiles.Theygetmovedtothefinalizedblockdirectorywhen
12.
*theblockisfinalized.
13.
*/
14.
FilecreateRbwFile(Blockb)
throws
IOException{
15.
Filef=
new
File(rbwDir,b.getBlockName());
16.
return
DatanodeUtil.createTmpFile(b,f);
17.
}
这是创建基础文件的方法。
2.该方法的实现
01.
/**ProvideutilitymethodsforDatanode.*/
02.
@InterfaceAudience
.Private
03.
public
class
DatanodeUtil{
04.
public
static
final
StringUNLINK_BLOCK_SUFFIX=
".unlinked"
;
05.
06.
public
static
final
StringDISK_ERROR=
"Possiblediskerror:"
;
07.
08.
/**GetthecauseofanI/Oexceptionifcausedbyapossiblediskerror
09.
*@paramioeanI/Oexception
10.
*@returncauseiftheI/Oexceptioniscausedbyapossiblediskerror;
11.
*nullotherwise.
12.
*/
13.
static
IOExceptiongetCauseIfDiskError(IOExceptionioe){
14.
if
(ioe.getMessage()!=
null
&&ioe.getMessage().startsWith(DISK_ERROR)){
15.
return
(IOException)ioe.getCause();
16.
}
else
{
17.
return
null
;
18.
}
19.
}
20.
21.
/**
22.
*Createanewfile.
23.
*@throwsIOException
24.
*ifthefilealreadyexistsorifthefilecannotbecreated.
25.
*/
26.
public
static
FilecreateTmpFile(Blockb,Filef)
throws
IOException{
27.
if
(f.exists()){
28.
throw
new
IOException(
"Failedtocreatetemporaryfilefor"
+b
29.
+
".File"
+f+
"shouldnotbepresent,butis."
);
30.
}
31.
//Createthezero-lengthtempfile
32.
final
boolean
fileCreated;
33.
try
{
34.
fileCreated=f.createNewFile();
35.
}
catch
(IOExceptionioe){
36.
throw
new
IOException(DISK_ERROR+
"Failedtocreate"
+f,ioe);
37.
}
38.
if
(!fileCreated){
39.
throw
new
IOException(
"Failedtocreatetemporaryfilefor"
+b
40.
+
".File"
+f+
"shouldbecreatable,butisalreadypresent."
);
41.
}
42.
return
f;
43.
}
在调用该方法创建数据block时,并没有我们关心的存储路径的选择策略。
3.我们再来查找createRbwFile调用出处
1.
/**************************************************
2.
*FSDatasetmanagesasetofdatablocks.Eachblock
3.
*hasauniquenameandanextentondisk.
4.
*
5.
***************************************************/
6.
@InterfaceAudience
.Private
7.
class
FsDatasetImpl
implements
FsDatasetSpi<FsVolumeImpl>{
8.
static
final
LogLOG=LogFactory.getLog(FsDatasetImpl.
class
);
block管理操作类
01.
@Override
//FsDatasetSpi
02.
public
synchronized
ReplicaInPipelinecreateRbw(ExtendedBlockb)
03.
throws
IOException{
04.
ReplicaInforeplicaInfo=volumeMap.get(b.getBlockPoolId(),
05.
b.getBlockId());
06.
if
(replicaInfo!=
null
){
07.
throw
new
ReplicaAlreadyExistsException(
"Block"
+b+
08.
"alreadyexistsinstate"
+replicaInfo.getState()+
09.
"andthuscannotbecreated."
);
10.
}
11.
//createanewblock
12.
FsVolumeImplv=volumes.getNextVolume(b.getNumBytes());
13.
//createarbwfiletoholdblockinthedesignatedvolume
14.
Filef=v.createRbwFile(b.getBlockPoolId(),b.getLocalBlock());
15.
ReplicaBeingWrittennewReplicaInfo=
new
ReplicaBeingWritten(b.getBlockId(),
16.
b.getGenerationStamp(),v,f.getParentFile());
17.
volumeMap.add(b.getBlockPoolId(),newReplicaInfo);
18.
return
newReplicaInfo;
19.
}
调用了createRbwFile方法,该方法同样创建rbw文件。
这里发现了我们关系的volumes,它是配置的存储路径。
4.查看volumes的初始
volumnes是在构造函数中初始化的,使用了volArray
01.
/**
02.
*AnFSDatasethasadirectorywhereitloadsitsdatafiles.
03.
*/
04.
FsDatasetImpl(DataNodedatanode,DataStoragestorage,Configurationconf
05.
)
throws
IOException{
06.
this
.datanode=datanode;
07.
//Thenumberofvolumesrequiredforoperationisthetotalnumber
08.
//ofvolumesminusthenumberoffailedvolumeswecantolerate.
09.
final
int
volFailuresTolerated=
10.
conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
11.
DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
12.
13.
String[]dataDirs=conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
14.
15.
int
volsConfigured=(dataDirs==
null
)?
0
:dataDirs.length;
16.
int
volsFailed=volsConfigured-storage.getNumStorageDirs();
17.
this
.validVolsRequired=volsConfigured-volFailuresTolerated;
18.
19.
if
(volFailuresTolerated<
0
||volFailuresTolerated>=volsConfigured){
20.
throw
new
DiskErrorException(
"Invalidvolumefailure"
21.
+
"configvalue:"
+volFailuresTolerated);
22.
}
23.
if
(volsFailed>volFailuresTolerated){
24.
throw
new
DiskErrorException(
"Toomanyfailedvolumes-"
25.
+
"currentvalidvolumes:"
+storage.getNumStorageDirs()
26.
+
",volumesconfigured:"
+volsConfigured
27.
+
",volumesfailed:"
+volsFailed
28.
+
",volumefailurestolerated:"
+volFailuresTolerated);
29.
}
30.
31.
final
List<FsVolumeImpl>volArray=
new
ArrayList<FsVolumeImpl>(
32.
storage.getNumStorageDirs());
33.
for
(
int
idx=
0
;idx<storage.getNumStorageDirs();idx++){
34.
final
Filedir=storage.getStorageDir(idx).getCurrentDir();
35.
volArray.add(
new
FsVolumeImpl(
this
,storage.getStorageID(),dir,conf));
36.
LOG.info(
"Addedvolume-"
+dir);
37.
}
38.
volumeMap=
new
ReplicaMap(
this
);
39.
40.
@SuppressWarnings
(
"unchecked"
)
41.
final
VolumeChoosingPolicy<FsVolumeImpl>blockChooserImpl=
42.
ReflectionUtils.newInstance(conf.getClass(
43.
DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
44.
RoundRobinVolumeChoosingPolicy.
class
,
45.
VolumeChoosingPolicy.
class
),conf);
46.
volumes=
new
FsVolumeList(volArray,volsFailed,blockChooserImpl);
47.
volumes.getVolumeMap(volumeMap);
48.
49.
File[]roots=
new
File[storage.getNumStorageDirs()];
50.
for
(
int
idx=
0
;idx<storage.getNumStorageDirs();idx++){
51.
roots[idx]=storage.getStorageDir(idx).getCurrentDir();
52.
}
53.
asyncDiskService=
new
FsDatasetAsyncDiskService(datanode,roots);
54.
registerMBean(storage.getStorageID());
55.
}
而volArray如下生成的:
1.
final
List<FsVolumeImpl>volArray=
new
ArrayList<FsVolumeImpl>(
2.
storage.getNumStorageDirs());
3.
for
(
int
idx=
0
;idx<storage.getNumStorageDirs();idx++){
4.
final
Filedir=storage.getStorageDir(idx).getCurrentDir();
5.
volArray.add(
new
FsVolumeImpl(
this
,storage.getStorageID(),dir,conf));
6.
LOG.info(
"Addedvolume-"
+dir);
7.
}
正式配置文件中的存储路径。
到此,我们找到了需要的存储路径,下面再找到如何选择的路径的就容易多了。
5.路径选择从getNextVolume开始
01.
class
FsVolumeList{
02.
/**
03.
*Readaccesstothisunmodifiablelistisnotsynchronized.
04.
*Thislistisreplacedonmodificationholding"this"lock.
05.
*/
06.
volatile
List<FsVolumeImpl>volumes=
null
;
07.
08.
private
final
VolumeChoosingPolicy<FsVolumeImpl>blockChooser;
09.
private
volatile
int
numFailedVolumes;
10.
11.
FsVolumeList(List<FsVolumeImpl>volumes,
int
failedVols,
12.
VolumeChoosingPolicy<FsVolumeImpl>blockChooser){
13.
this
.volumes=Collections.unmodifiableList(volumes);
14.
this
.blockChooser=blockChooser;
15.
this
.numFailedVolumes=failedVols;
16.
}
17.
18.
int
numberOfFailedVolumes(){
19.
return
numFailedVolumes;
20.
}
21.
22.
/**
23.
*Getnextvolume.Synchronizedtoensure{@link#curVolume}isupdated
24.
*byasinglethreadandnextvolumeischosenwithnoconcurrent
25.
*updateto{@link#volumes}.
26.
*@paramblockSizefreespaceneededonthevolume
27.
*@returnnextvolumetostoretheblockin.
28.
*/
29.
synchronized
FsVolumeImplgetNextVolume(
long
blockSize)
throws
IOException{
30.
return
blockChooser.chooseVolume(volumes,blockSize);
31.
}
6.继续chooseVolume源自于blockChooser类型是VolumeChoosingPolicy,该方法实现在下面的类中:
01.
/**
02.
*ADNvolumechoosingpolicywhichtakesintoaccounttheamountoffree
03.
*spaceoneachoftheavailablevolumeswhenconsideringwheretoassigna
04.
*newreplicaallocation.Bydefaultthispolicyprefersassigningreplicasto
05.
*thosevolumeswithmoreavailablefreespace,soastoovertimebalancethe
06.
*availablespaceofallthevolumeswithinaDN.
07.
*/
08.
public
class
AvailableSpaceVolumeChoosingPolicy<V
extends
FsVolumeSpi>
09.
implements
VolumeChoosingPolicy<V>,Configurable{
10.
11.
private
static
final
LogLOG=LogFactory.getLog(AvailableSpaceVolumeChoosingPolicy.
class
);
12.
13.
private
static
final
RandomRAND=
new
Random();
14.
15.
private
long
balancedSpaceThreshold=DFS_DATANODE_***AILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT;
16.
private
float
balancedPreferencePercent=DFS_DATANODE_***AILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT;
从描述中可以看出了,这就是策略文件。
7.策略实现就是这样的:
01.
@Override
02.
public
synchronized
VchooseVolume(List<V>volumes,
03.
final
long
replicaSize)
throws
IOException{
04.
if
(volumes.size()<
1
){
05.
throw
new
DiskOutOfSpaceException(
"Nomoreavailablevolumes"
);
06.
}
07.
08.
AvailableSpaceVolumeListvolumesWithSpaces=
09.
new
AvailableSpaceVolumeList(volumes);
10.
11.
if
(volumesWithSpaces.areAllVolumesWithinFreeSpaceThreshold()){
12.
//Ifthey'reactuallynottoofaroutofwhack,fallbackonpureround
13.
//robin.
14.
Vvolume=roundRobinPolicyBalanced.chooseVolume(volumes,replicaSize);
15.
if
(LOG.isDebugEnabled()){
16.
LOG.debug(
"Allvolumesarewithintheconfiguredfree
spacebalance"
+
17.
"threshold.Selecting"
+volume+
"forwriteofblocksize"
+
18.
replicaSize);
19.
}
20.
return
volume;
21.
}
else
{
22.
Vvolume=
null
;
23.
//Ifnoneofthevolumeswithlowfreespacehaveenoughspaceforthe
24.
//replica,alwaystrytochooseavolumewithalotoffreespace.
25.
long
mostAvailableAmongLowVolumes=volumesWithSpaces
26.
.getMostAvailableSpaceAmongVolumesWithLowAvailableSpace();
27.
28.
List<V>highAvailableVolumes=extractVolumesFromPairs(
29.
volumesWithSpaces.getVolumesWithHighAvailableSpace());
30.
List<V>lowAvailableVolumes=extractVolumesFromPairs(
31.
volumesWithSpaces.getVolumesWithLowAvailableSpace());
32.
33.
float
preferencePercentScaler=
34.
(highAvailableVolumes.size()*balancedPreferencePercent)+
35.
(lowAvailableVolumes.size()*(
1
-balancedPreferencePercent));
36.
float
scaledPreferencePercent=
37.
(highAvailableVolumes.size()*balancedPreferencePercent)/
38.
preferencePercentScaler;
39.
if
(mostAvailableAmongLowVolumes<replicaSize||
40.
RAND.nextFloat()<scaledPreferencePercent){
41.
volume=roundRobinPolicyHighAvailable.chooseVolume(
42.
highAvailableVolumes,
43.
replicaSize);
44.
if
(LOG.isDebugEnabled()){
45.
LOG.debug(
"Volumesareimbalanced.Selecting"
+volume+
46.
"fromhighavailablespacevolumesforwriteofblocksize"
47.
+replicaSize);
48.
}
49.
}
else
{
50.
volume=roundRobinPolicyLowAvailable.chooseVolume(
51.
lowAvailableVolumes,
52.
replicaSize);
53.
if
(LOG.isDebugEnabled()){
54.
LOG.debug(
"Volumesareimbalanced.Selecting"
+volume+
55.
"fromlowavailablespacevolumesforwriteofblocksize"
56.
+replicaSize);
57.
}
58.
}
59.
return
volume;
60.
}
61.
}
关于配置中各个存储路径如何选择及选择策略都在这里了,sigh累死了~~
花费了接近3天的时间,纯代码看着实累,可以步进就好了。
相关的配置说明。
dfs.datanode.fsdataset.volume.choosing.policy
dfs.datanode.available-space-volume-choosing-policy.balanced-space-threshold
10737418240
Onlyusedwhenthedfs.datanode.fsdataset.volume.choosing.policyissettoorg.apache.hadoop.hdfs.server.datanode.fsdataset.AvailableSpaceVolumeChoosingPolicy.ThissettingcontrolshowmuchDNvolumesareallowedtodifferintermsofbytesoffreedisk
spacebeforetheyareconsideredimbalanced.Ifthefreespaceofallthevolumesarewithinthisrangeofeachother,thevolumeswillbeconsideredbalancedandblockassignmentswillbedoneonapureroundrobinbasis.
dfs.datanode.available-space-volume-choosing-policy.balanced-space-preference-fraction
0.75f
Onlyusedwhenthedfs.datanode.fsdataset.volume.choosing.policyissettoorg.apache.hadoop.hdfs.server.datanode.fsdataset.AvailableSpaceVolumeChoosingPolicy.Thissettingcontrolswhatpercentageofnewblockallocationswillbesenttovolumeswithmore
availablediskspacethanothers.Thissettingshouldbeintherange0.0-1.0,thoughinpractice0.5-1.0,sincethereshouldbenoreasontopreferthatvolumeswithlessavailablediskspacereceivemoreblockallocations.
另附上其他的一些类分析:
DataNode的相关重要类
FSDataset:所有和数据块相关的操作,都在FSDataset相关的类。详细分析参考DataXceiverServer:处理数据块的流读写的的服务器,处理逻辑由DataXceiver完成。详细分析参考
DataXceiver:处理数据块的流读写的线程。详细分析参考
还有处理非读写的
BlockReceiver:完成数据块的流写操作。详细分析参考
BlockSender:完成数据块的流读操作。
DataBlockScanner:用于定时对数据块文件进行校验。详细分析参考http://caibinbupt.iteye.com/blog/286650
相关文章推荐
- Linux服务篇之九:构建Cacti监控平台
- openstack_kilo+centos7创建Centos6.6镜像
- centos下关于源码打包成rpm安装包的一个例子
- linux常用命令
- Linux学习笔记1--学习前的准备
- Linux查看系统信息的常用命令
- Linux学习笔记(13)——shell scripts
- tomcat server.xml各个端口的作用
- LNTCM nginx-1.5+tomcat+mysql-5.6.16+couchbase-server_3.0实现web集群
- Linux-----Ubuntu Server安装图形界面
- centos6.4 pxe服务器
- pthread_cleanup_push()/pthread_cleanup_pop()的详解
- openwrt libubox 移植到ARM上
- [Zybo u-boot Linux系统移植]-ZYBO Zync-7000 Development Board Work Booting Linux on the ZYBO
- Linux环境下安装mysql
- Setting Up An NFS Server And Client On OpenSUSE 11.3
- Linux日常维护常用命令
- linux安装mysql
- linux替换文本中的字符串
- linux执行shell脚本