您的位置:首页 > 运维架构

hadoop单个数据节点的不同存储路径的存储策略源码分析

2015-07-13 13:35 1011 查看

hadoop单个数据节点的不同存储路径的存储策略源码分析。

来源:IT165收集发布日期:2014-09-0120:52:50
我来说两句(0)收藏本文

-

产生问题于数据集群的数节点存储磁盘大小不同,造成使用一段时间以后容量小的磁盘空间紧张。

其实,早期配置了磁盘使用存储策略,就能解决该问题,部分网来上说这个策略无效,再hadoop2.0.1本版有效,该版本应用于CHD4.6中。

为了找到准确的程序定位点,参考了以下的Hadoop设计文档。

参考

Hadoop中HDFS文件系统的Append/Hflush/Read设计文档:

http://blog.csdn.net/chenpingbupt/article/details/7972589

文档中给出:

在一个DN的disk中,每个DN具有三个目录:currentembw,current包含finallized的replica,tmp包含temporaryreplica,rbw包含rbw,rwr,rurreplicas。当一个replica第一次被dfsclient发起请求而创建的时候,将会放到rbw中。当第一次创建是在blockreplication和clustbalance过程中发起的话,replica就会放置到tmp中。一旦一个replica被finallized,他就会被move到current中。当一个DN重启之后,tmp中的replica将会被删除,rbw中的将会被加载为rwr状态,current中的会load为finallized状态

我们就从tmp或rbw文件创建开始。

1.参见javaclassBlockPoolSlice



viewsourceprint?

01.
/**

02.
*Ablockpoolslicerepresentsaportionofablockpoolstoredonavolume.

03.
*Takentogether,allBlockPoolSlicessharingablockpoolIDacrossa

04.
*clusterrepresentasingleblockpool.

05.
*

06.
*Thisclassissynchronizedby{@linkFsVolumeImpl}.

07.
*/

08.
class

BlockPoolSlice{

09.
private

final
Stringbpid;

10.
private

final
FsVolumeImplvolume;

//volumetowhichthisBlockPoolbelongsto

11.
private

final
FilecurrentDir;
//StorageDirectory/current/bpid/current

12.
private

final
LDirfinalizedDir;

//directorystoreFinalizedreplica

13.
private

final
FilerbwDir;
//directorystoreRBWreplica

14.
private

final
FiletmpDir;
//directorystoreTemporaryreplica


从类的描述中看出BlockPoolSlice是创建集群数据block的基础。




viewsourceprint?

01.
/**

02.
*Temporaryfiles.Theygetmovedtothefinalizedblockdirectorywhen

03.
*theblockisfinalized.

04.
*/

05.
FilecreateTmpFile(Blockb)
throws
IOException{


06.
Filef=
new
File(tmpDir,b.getBlockName());

07.
return

DatanodeUtil.createTmpFile(b,f);

08.
}

09.

10.
/**

11.
*RBWfiles.Theygetmovedtothefinalizedblockdirectorywhen

12.
*theblockisfinalized.

13.
*/

14.
FilecreateRbwFile(Blockb)
throws
IOException{


15.
Filef=
new
File(rbwDir,b.getBlockName());

16.
return

DatanodeUtil.createTmpFile(b,f);

17.
}


这是创建基础文件的方法。


2.该方法的实现



viewsourceprint?

01.
/**ProvideutilitymethodsforDatanode.*/

02.
@InterfaceAudience
.Private

03.
public

class
DatanodeUtil{

04.
public

static
final

StringUNLINK_BLOCK_SUFFIX=
".unlinked"
;

05.

06.
public

static
final

StringDISK_ERROR=
"Possiblediskerror:"
;

07.

08.
/**GetthecauseofanI/Oexceptionifcausedbyapossiblediskerror

09.
*@paramioeanI/Oexception

10.
*@returncauseiftheI/Oexceptioniscausedbyapossiblediskerror;

11.
*nullotherwise.

12.
*/

13.
static

IOExceptiongetCauseIfDiskError(IOExceptionioe){

14.
if

(ioe.getMessage()!=
null

&&ioe.getMessage().startsWith(DISK_ERROR)){

15.
return

(IOException)ioe.getCause();

16.
}
else
{

17.
return

null
;

18.
}

19.
}

20.

21.
/**

22.
*Createanewfile.

23.
*@throwsIOException

24.
*ifthefilealreadyexistsorifthefilecannotbecreated.

25.
*/

26.
public

static
FilecreateTmpFile(Blockb,Filef)
throws
IOException{


27.
if

(f.exists()){

28.
throw

new
IOException(
"Failedtocreatetemporaryfilefor"

+b

29.
+
".File"
+f+
"shouldnotbepresent,butis."
);

30.
}

31.
//Createthezero-lengthtempfile

32.
final

boolean
fileCreated;

33.
try

{

34.
fileCreated=f.createNewFile();

35.
}
catch
(IOExceptionioe){

36.
throw

new
IOException(DISK_ERROR+

"Failedtocreate"
+f,ioe);


37.
}

38.
if

(!fileCreated){

39.
throw

new
IOException(
"Failedtocreatetemporaryfilefor"

+b

40.
+
".File"
+f+
"shouldbecreatable,butisalreadypresent."
);

41.
}

42.
return

f;

43.
}




在调用该方法创建数据block时,并没有我们关心的存储路径的选择策略。

3.我们再来查找createRbwFile调用出处



viewsourceprint?

1.
/**************************************************

2.
*FSDatasetmanagesasetofdatablocks.Eachblock

3.
*hasauniquenameandanextentondisk.

4.
*

5.
***************************************************/

6.
@InterfaceAudience
.Private

7.
class

FsDatasetImpl
implements

FsDatasetSpi<FsVolumeImpl>{

8.
static

final
LogLOG=LogFactory.getLog(FsDatasetImpl.
class
);


block管理操作类




viewsourceprint?

01.
@Override

//FsDatasetSpi

02.
public

synchronized
ReplicaInPipelinecreateRbw(ExtendedBlockb)

03.
throws

IOException{

04.
ReplicaInforeplicaInfo=volumeMap.get(b.getBlockPoolId(),

05.
b.getBlockId());

06.
if

(replicaInfo!=
null
){

07.
throw

new
ReplicaAlreadyExistsException(
"Block"

+b+

08.
"alreadyexistsinstate"

+replicaInfo.getState()+

09.
"andthuscannotbecreated."
);

10.
}

11.
//createanewblock

12.
FsVolumeImplv=volumes.getNextVolume(b.getNumBytes());

13.
//createarbwfiletoholdblockinthedesignatedvolume

14.
Filef=v.createRbwFile(b.getBlockPoolId(),b.getLocalBlock());

15.
ReplicaBeingWrittennewReplicaInfo=
new
ReplicaBeingWritten(b.getBlockId(),

16.
b.getGenerationStamp(),v,f.getParentFile());

17.
volumeMap.add(b.getBlockPoolId(),newReplicaInfo);

18.
return

newReplicaInfo;

19.
}


调用了createRbwFile方法,该方法同样创建rbw文件。


这里发现了我们关系的volumes,它是配置的存储路径。

4.查看volumes的初始

volumnes是在构造函数中初始化的,使用了volArray



viewsourceprint?

01.
/**

02.
*AnFSDatasethasadirectorywhereitloadsitsdatafiles.

03.
*/

04.
FsDatasetImpl(DataNodedatanode,DataStoragestorage,Configurationconf

05.
)
throws
IOException{


06.
this
.datanode=datanode;

07.
//Thenumberofvolumesrequiredforoperationisthetotalnumber

08.
//ofvolumesminusthenumberoffailedvolumeswecantolerate.

09.
final

int
volFailuresTolerated=


10.
conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,

11.
DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);

12.

13.
String[]dataDirs=conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);

14.

15.
int

volsConfigured=(dataDirs==
null
)?
0
:dataDirs.length;


16.
int

volsFailed=volsConfigured-storage.getNumStorageDirs();


17.
this
.validVolsRequired=volsConfigured-volFailuresTolerated;

18.

19.
if

(volFailuresTolerated<
0

||volFailuresTolerated>=volsConfigured){

20.
throw

new
DiskErrorException(
"Invalidvolumefailure"

21.
+
"configvalue:"
+volFailuresTolerated);

22.
}

23.
if

(volsFailed>volFailuresTolerated){

24.
throw

new
DiskErrorException(
"Toomanyfailedvolumes-"

25.
+
"currentvalidvolumes:"

+storage.getNumStorageDirs()

26.
+
",volumesconfigured:"
+volsConfigured

27.
+
",volumesfailed:"
+volsFailed

28.
+
",volumefailurestolerated:"

+volFailuresTolerated);

29.
}

30.

31.
final

List<FsVolumeImpl>volArray=
new

ArrayList<FsVolumeImpl>(

32.
storage.getNumStorageDirs());

33.
for

(
int

idx=
0
;idx<storage.getNumStorageDirs();idx++){

34.
final

Filedir=storage.getStorageDir(idx).getCurrentDir();


35.
volArray.add(
new

FsVolumeImpl(
this
,storage.getStorageID(),dir,conf));

36.
LOG.info(
"Addedvolume-"

+dir);

37.
}

38.
volumeMap=
new
ReplicaMap(
this
);

39.

40.
@SuppressWarnings
(
"unchecked"
)

41.
final

VolumeChoosingPolicy<FsVolumeImpl>blockChooserImpl=


42.
ReflectionUtils.newInstance(conf.getClass(

43.
DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,

44.
RoundRobinVolumeChoosingPolicy.
class
,

45.
VolumeChoosingPolicy.
class
),conf);

46.
volumes=
new
FsVolumeList(volArray,volsFailed,blockChooserImpl);

47.
volumes.getVolumeMap(volumeMap);

48.

49.
File[]roots=
new
File[storage.getNumStorageDirs()];

50.
for

(
int

idx=
0
;idx<storage.getNumStorageDirs();idx++){

51.
roots[idx]=storage.getStorageDir(idx).getCurrentDir();

52.
}

53.
asyncDiskService=
new
FsDatasetAsyncDiskService(datanode,roots);

54.
registerMBean(storage.getStorageID());

55.
}


而volArray如下生成的:




viewsourceprint?

1.
final

List<FsVolumeImpl>volArray=
new

ArrayList<FsVolumeImpl>(

2.
storage.getNumStorageDirs());

3.
for

(
int

idx=
0
;idx<storage.getNumStorageDirs();idx++){

4.
final

Filedir=storage.getStorageDir(idx).getCurrentDir();


5.
volArray.add(
new

FsVolumeImpl(
this
,storage.getStorageID(),dir,conf));

6.
LOG.info(
"Addedvolume-"

+dir);

7.
}


正式配置文件中的存储路径。


到此,我们找到了需要的存储路径,下面再找到如何选择的路径的就容易多了。

5.路径选择从getNextVolume开始



viewsourceprint?

01.
class

FsVolumeList{

02.
/**

03.
*Readaccesstothisunmodifiablelistisnotsynchronized.

04.
*Thislistisreplacedonmodificationholding"this"lock.

05.
*/

06.
volatile

List<FsVolumeImpl>volumes=
null
;

07.

08.
private

final
VolumeChoosingPolicy<FsVolumeImpl>blockChooser;

09.
private

volatile
int

numFailedVolumes;

10.

11.
FsVolumeList(List<FsVolumeImpl>volumes,
int
failedVols,

12.
VolumeChoosingPolicy<FsVolumeImpl>blockChooser){

13.
this
.volumes=Collections.unmodifiableList(volumes);

14.
this
.blockChooser=blockChooser;

15.
this
.numFailedVolumes=failedVols;

16.
}

17.

18.
int

numberOfFailedVolumes(){

19.
return

numFailedVolumes;

20.
}

21.

22.
/**

23.
*Getnextvolume.Synchronizedtoensure{@link#curVolume}isupdated

24.
*byasinglethreadandnextvolumeischosenwithnoconcurrent

25.
*updateto{@link#volumes}.

26.
*@paramblockSizefreespaceneededonthevolume

27.
*@returnnextvolumetostoretheblockin.

28.
*/

29.
synchronized

FsVolumeImplgetNextVolume(
long

blockSize)
throws

IOException{

30.
return

blockChooser.chooseVolume(volumes,blockSize);

31.
}




6.继续chooseVolume源自于blockChooser类型是VolumeChoosingPolicy,该方法实现在下面的类中:



viewsourceprint?

01.
/**

02.
*ADNvolumechoosingpolicywhichtakesintoaccounttheamountoffree

03.
*spaceoneachoftheavailablevolumeswhenconsideringwheretoassigna

04.
*newreplicaallocation.Bydefaultthispolicyprefersassigningreplicasto

05.
*thosevolumeswithmoreavailablefreespace,soastoovertimebalancethe

06.
*availablespaceofallthevolumeswithinaDN.

07.
*/

08.
public

class
AvailableSpaceVolumeChoosingPolicy<V
extends
FsVolumeSpi>


09.
implements

VolumeChoosingPolicy<V>,Configurable{

10.

11.
private

static
final

LogLOG=LogFactory.getLog(AvailableSpaceVolumeChoosingPolicy.
class
);

12.

13.
private

static
final

RandomRAND=
new
Random();

14.

15.
private

long
balancedSpaceThreshold=DFS_DATANODE_***AILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT;

16.
private

float
balancedPreferencePercent=DFS_DATANODE_***AILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT;


从描述中可以看出了,这就是策略文件。


7.策略实现就是这样的:



viewsourceprint?

01.
@Override

02.
public

synchronized
VchooseVolume(List<V>volumes,

03.
final

long
replicaSize)
throws

IOException{

04.
if

(volumes.size()<
1
){

05.
throw

new
DiskOutOfSpaceException(
"Nomoreavailablevolumes"
);

06.
}

07.

08.
AvailableSpaceVolumeListvolumesWithSpaces=

09.
new

AvailableSpaceVolumeList(volumes);

10.

11.
if

(volumesWithSpaces.areAllVolumesWithinFreeSpaceThreshold()){

12.
//Ifthey'reactuallynottoofaroutofwhack,fallbackonpureround

13.
//robin.

14.
Vvolume=roundRobinPolicyBalanced.chooseVolume(volumes,replicaSize);

15.
if

(LOG.isDebugEnabled()){

16.
LOG.debug(
"Allvolumesarewithintheconfiguredfree
spacebalance"
+

17.
"threshold.Selecting"

+volume+
"forwriteofblocksize"

+

18.
replicaSize);

19.
}

20.
return

volume;

21.
}
else
{

22.
Vvolume=
null
;

23.
//Ifnoneofthevolumeswithlowfreespacehaveenoughspaceforthe

24.
//replica,alwaystrytochooseavolumewithalotoffreespace.

25.
long

mostAvailableAmongLowVolumes=volumesWithSpaces

26.
.getMostAvailableSpaceAmongVolumesWithLowAvailableSpace();

27.

28.
List<V>highAvailableVolumes=extractVolumesFromPairs(

29.
volumesWithSpaces.getVolumesWithHighAvailableSpace());

30.
List<V>lowAvailableVolumes=extractVolumesFromPairs(

31.
volumesWithSpaces.getVolumesWithLowAvailableSpace());

32.

33.
float

preferencePercentScaler=

34.
(highAvailableVolumes.size()*balancedPreferencePercent)+

35.
(lowAvailableVolumes.size()*(
1

-balancedPreferencePercent));

36.
float

scaledPreferencePercent=

37.
(highAvailableVolumes.size()*balancedPreferencePercent)/

38.
preferencePercentScaler;

39.
if

(mostAvailableAmongLowVolumes<replicaSize||

40.
RAND.nextFloat()<scaledPreferencePercent){

41.
volume=roundRobinPolicyHighAvailable.chooseVolume(

42.
highAvailableVolumes,

43.
replicaSize);

44.
if

(LOG.isDebugEnabled()){

45.
LOG.debug(
"Volumesareimbalanced.Selecting"

+volume+

46.
"fromhighavailablespacevolumesforwriteofblocksize"

47.
+replicaSize);

48.
}

49.
}
else
{

50.
volume=roundRobinPolicyLowAvailable.chooseVolume(

51.
lowAvailableVolumes,

52.
replicaSize);

53.
if

(LOG.isDebugEnabled()){

54.
LOG.debug(
"Volumesareimbalanced.Selecting"

+volume+

55.
"fromlowavailablespacevolumesforwriteofblocksize"

56.
+replicaSize);

57.
}

58.
}

59.
return

volume;

60.
}

61.
}


关于配置中各个存储路径如何选择及选择策略都在这里了,sigh累死了~~


花费了接近3天的时间,纯代码看着实累,可以步进就好了。

相关的配置说明。

dfs.datanode.fsdataset.volume.choosing.policy

dfs.datanode.available-space-volume-choosing-policy.balanced-space-threshold

10737418240

Onlyusedwhenthedfs.datanode.fsdataset.volume.choosing.policyissettoorg.apache.hadoop.hdfs.server.datanode.fsdataset.AvailableSpaceVolumeChoosingPolicy.ThissettingcontrolshowmuchDNvolumesareallowedtodifferintermsofbytesoffreedisk
spacebeforetheyareconsideredimbalanced.Ifthefreespaceofallthevolumesarewithinthisrangeofeachother,thevolumeswillbeconsideredbalancedandblockassignmentswillbedoneonapureroundrobinbasis.

dfs.datanode.available-space-volume-choosing-policy.balanced-space-preference-fraction

0.75f

Onlyusedwhenthedfs.datanode.fsdataset.volume.choosing.policyissettoorg.apache.hadoop.hdfs.server.datanode.fsdataset.AvailableSpaceVolumeChoosingPolicy.Thissettingcontrolswhatpercentageofnewblockallocationswillbesenttovolumeswithmore
availablediskspacethanothers.Thissettingshouldbeintherange0.0-1.0,thoughinpractice0.5-1.0,sincethereshouldbenoreasontopreferthatvolumeswithlessavailablediskspacereceivemoreblockallocations.

另附上其他的一些类分析:



DataNode的相关重要类

FSDataset:所有和数据块相关的操作,都在FSDataset相关的类。详细分析参考http://caibinbupt.iteye.com/blog/284365
DataXceiverServer:处理数据块的流读写的的服务器,处理逻辑由DataXceiver完成。详细分析参考http://caibinbupt.iteye.com/blog/284979
DataXceiver:处理数据块的流读写的线程。详细分析参考http://caibinbupt.iteye.com/blog/284979
还有处理非读写的非主流的流程。详细分析参考http://caibinbupt.iteye.com/blog/286533
BlockReceiver:完成数据块的流写操作。详细分析参考http://caibinbupt.iteye.com/blog/286259
BlockSender:完成数据块的流读操作。

DataBlockScanner:用于定时对数据块文件进行校验。详细分析参考http://caibinbupt.iteye.com/blog/286650
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: