您的位置:首页 > 大数据 > Hadoop

HBase 数据文件在HDFS上的存储

2010-07-16 17:18 295 查看
英文原文:http://www.larsgeorge.com/2010/05/hbase-file-locality-in-hdfs.html

在HDFS上面最不明确的事情之一就是数据的冗余。它完全是自动进行的,因为无法得知其中详细的信息,我们需要做的就是相信它。HBase完全相信HDFS存储数据的安全性和完整性,并将数据文件交给HDFS存储。正是因为HDFS的数据冗余方式对于HBase来说是完全透明的,产生了一个问题:HBase的效率会受到多大的影响?说的简单一点,当HBase需要存取数据时,如何保证有一份冗余的数据块离自己最近?当我们对HBase做一次MapReduce的扫描操作时,这个问题尤其显现出来。所有的RegionServer都在从HDFS上面读取数据,理想的状况当然是每个RegionServer要读取的数据都离自己很近。这个问题就牵扯到HBase的数据文件是如何在HDFS上面存储的。
让我们首先抛开HBase,假设要处理的数据就是HDFS上面的数据块,看看Hadoop是如何工作的。MapReduce总是有一个建议,那就是在每个TaskTracker上面Map/Reduce程序要处理的数据在本地就有一份冗余。这样程序只需要与本地数据交互,减少了网络流量并提高了效率。为了做到这一点,HDFS会把大文件分割成很多小文件来存储,我们称之为数据块(Block)。每个数据块的大小比操作系统数据块的大小要大得多,默认是64M,但通常我们选择128M,或者某个更大的值(这取决与你的文件大小,最好你的单个文件大小总是大于一个数据块)。在MapReduce中,每个数据块会被分配给一个Task,这个Task就负责处理这个数据块中的数据。所以数据块越大,产生的Task就越少,需要mapper的数量就越少。Hadoop自己知道每个数据块存储的位置,这样在任务分配的时候就可以直接在存储数据块的机器上启动Task,或者选择一个最近机器启动Task。真是因为每个数据块有多份冗余,使得Hadoop有更大的选择空间。只要找到一份冗余符合条件就行了,不是吗?这样Hadoop就可以保证在MapReduce期间Task总是操作本地数据。
让我们回到HBase,现在你已经理解了Hadoop是如何保证在MapReduce的过程中每个Task都尽量处理本地数据。如果你看过HBase的存储架构你就会知道HBase只是简单的将HFile和WAL log存储在HDFS上面。通过简单的调用HDFS的API来创建文件:FileSystem.create(Path path)。接下来你会关心两件事情的效率:1)随机的访问 2)通过MapReduce扫描全表。我们当然希望当每个RegionServer读取数据时存储数据的数据块就在本地。它能做到吗?
第一种情况,你有两个集群,一个集群装Hadoop,另一个集群装HBase,两个集群是分隔开的,只有网线来传输数据。好了,讨论到此为止,神也帮不了你。
第二种情况,你有一个大的集群,每台机器都混装了Hadoop和HBase,每个RegionServer上面都有一个DataNode(这是我们最希望看到的)。好,这样的话RegionServer就具备了从本地读取数据的前提。我们还剩下一个问题,如何保证每个RegionServer管理的Region所对应的HFile和WAL log就存在本地的DataNode上面?设想一种情况,你对HBase创建了大量的数据,每个RegionServer都管理了各自的Region,这时你重启了HBase,重启了所有的RegionServer,所有的Region都会被随机的分配给各个RegionServer,这种情况下你显然无法保证我们希望的本地数据存储。
在讨论如何解决这个问题之前我们先强调一点:HBase不应该频繁的被重启,并且部署的架构不应该被频繁的改变,这是能解决这个问题的一个基础。写入HDFS的文件都有一个特点,一旦写入一个文件就无法更改(由于种种原因)。因此HBase会定期的将数据写入HDFS中并生成一个新文件。这里有一个让人惊奇的地方:HDFS足够聪明,它知道如何将文件写到最合适的地方。换句话说,它知道把文件放到什么地方使得RegionServer用起来最方便。如果想知道HDFS如何做到这一点,我们需要深入学习Hadoop的源代码,看看前面提到的FileSystem.create(Path path) 具体是怎么工作的。
在HDFS中实际调用的函数是:DistributedFileSystem.create(Path path), 他看起来是这个样子的:
public FSDataOutputStream create(Path f) throws
IOException {
return create(f, true);
}
public FSDataOutputStream create(Path f,
FsPermission permission, boolean overwrite, int bufferSize, short replication,
long blockSize, Progressable progress) throws IOException {
return new
FSDataOutputStream(dfs.create(getPathName(f),
permission, overwrite, replication, blockSize, progress, bufferSize),
statistics);
}
其中dfs是一个连接到HDFS NameNode的DFSClient。当你向HDFS写入数据的时候,数据都流过DFSClient.DFSOutputStream,DFSClient将这些数据收集,积攒到一定程度后,作为一个Block写入到DataNode里面。
将一个Block写到DataNode的过程都发生在DFSClient.DFSOutputStream.DataStreamer里面,它是一个运行在后台的守护线程。注意,从现在开始我们将逐渐揭开解决问题的秘密方法。
在接收到一个Block以后,DataStreamer需要知道这个Block应该被写到哪些DataNode上面,同时它也应该让NameNode知道这个Block写到了哪些DataNode上面。它的做法是联络NameNode:Hi,我这里有一个文件的一个Block,请告诉我应该写在哪些DataNode上面?
nodes =
nextBlockOutputStream(src);
->
long startTime =
System.currentTimeMillis();
lb =
locateFollowingBlock(startTime);
block = lb.getBlock();
nodes = lb.getLocations();
->
return namenode.addBlock(src,
clientName);
这时NameNode收到了一个添加Block的请求,它包含两个参数:src和clientName
其中src标明了这个Block属于哪个文件,clientName则是client端的名称。
我们跳过一些简单的步骤来看最重要的一步:
public LocatedBlock
getAdditionalBlock(String src, String clientName) throws IOException {
...
INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
...
fileLength =
pendingFile.computeContentSummary().getLength();
blockSize =
pendingFile.getPreferredBlockSize();
clientNode = pendingFile.getClientNode();
replication =
(int)pendingFile.getReplication();

// choose targets for the new block tobe
allocated.
DatanodeDescriptor targets[] =
replicator.chooseTarget(replication, clientNode, null, blockSize);
...
}
最重要的一步就是replicator.chooseTarget(),它的具体实现如下:
private DatanodeDescriptor chooseTarget(int numOfReplicas,
DatanodeDescriptor writer, List<Node> excludedNodes, long blocksize, int
maxNodesPerRack, List<DatanodeDescriptor> results) {

if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0)
{
return writer;
}

int numOfResults = results.size();
boolean newBlock = (numOfResults==0);
if (writer == null && !newBlock) {
writer =
(DatanodeDescriptor)results.get(0);
}

try {
switch(numOfResults) {
case 0:
writer = chooseLocalNode(writer,
excludedNodes, blocksize, maxNodesPerRack, results);
if (--numOfReplicas == 0) {
break;
}
case 1:
chooseRemoteRack(1, results.get(0),
excludedNodes, blocksize, maxNodesPerRack, results);
if (--numOfReplicas == 0) {
break;
}
case 2:
if
(clusterMap.isOnSameRack(results.get(0), results.get(1))) {
chooseRemoteRack(1, results.get(0),
excludedNodes, blocksize, maxNodesPerRack, results);
} else if (newBlock) {
chooseLocalRack(results.get(1),
excludedNodes, blocksize, maxNodesPerRack, results);
} else {
chooseLocalRack(writer, excludedNodes,
blocksize, maxNodesPerRack, results);
}
if (--numOfReplicas == 0) {
break;
}
default:
chooseRandom(numOfReplicas,
NodeBase.ROOT, excludedNodes, blocksize, maxNodesPerRack, results);
}
} catch (NotEnoughReplicasException e) {
FSNamesystem.LOG.warn("Not able to
place enough replicas, still in need of " + numOfReplicas);
}
return writer;
}
这段代码很清楚的说明了整个的选择过程,NameNode总是为第一份冗余优先选择本地节点作为存储空间,对于第二份冗余,则是优先选择另一个机架的节点。如果前两份冗余位于不同机架,第三份冗余偏向于选择与第一份冗余相同的机架,否则选择不同的机架。大于三份的冗余就听天由命,随机挑选节点了。
总结一下,基于当前的情况,每个Region Server运行的时间越长,那么数据的存储地点就越稳定,每个Region Server就能保证它要管理的数据在本地就有一份拷贝。这样无论是Scan还是MapReduce都能达到效率的最优化。
最后要说的是HBase Team正在致力于重新设计MasterServer分配Region的机制。新的设计能够尽量保证每个Region被分配给拥有最多Region Block的Region Server。这将能够部分解决重启RegionServer所带来的问题。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: