您的位置:首页 > Web前端 > Node.js

HDFS集群的启动(4)——NameNode任务线程之LeaseManager$Monitor

2011-11-08 22:18 495 查看
在本地文件系统中,一个文件可以允许被多个进程同时同时打开并写入数据,但最后文件的内容是什么,谁也无法预测。请注意,我这里所说的这种文件能同时被多个进程写的操作是被操作系统中的文件系统所允许的,但是,我们通常在写程序对某个文件进行写操作的时候,总是额外地想法设法对操作的文件加一个独占锁,以此来保证文件的正确性和一致性。那么,HDFS中的文件是否像本地文件系统那样允许同时被多个用户写呢?答案是否定的,因为就是说,HDFS中的文件在任意时刻只能被一个用户进行写操作。这就有一个问题了,HDFS是如何控制这种文件写独占的呢?答案就是本文将要讨论的重点。

对HDFS文件的写操作(create/append),都必须先要向NameNode节点申请,这就为我们实现文件的独占写锁提供了可能。这里所说的文件的独占写锁在HDFS中对应的是一个比较有趣的概念:文件租约,文件租约就是将操作的文件和操作它的客户端进行绑定,若文件不存在一个租约,则说明该文件当前没有被任何客户端写,否则,就表示它正在被该文件租约中的客户端holder写。这中间可能会发生一些意想不到的异常情况,比如正在对某个文件进行写操作的客户端突然宕机了,那么与这个文件相关的租约会迟迟得不到客户端的续租而过期,那么NameNode是如何释放这些过期的租约,好让其它的客户端能及时的操作该租约对应文件呢?下面以文件的create为例并结合源代码来看看HDFS的客户端(DistributedFileSystem)和服务器端(NameNode)都干了写神马来维护这个文件的租约。

在客户端:

public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite,
int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {

//创建一个DFS文件的写入流
return new FSDataOutputStream(dfs.create(getPathName(f), permission, overwrite, replication, blockSize, progress, bufferSize), statistics);
}

public OutputStream create(String src,
FsPermission permission,
boolean overwrite,
short replication,
long blockSize,
Progressable progress,
int buffersize
) throws IOException {
checkOpen();
if (permission == null) {
permission = FsPermission.getDefault();
}
FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
LOG.debug(src + ": masked=" + masked);

OutputStream result = new DFSOutputStream(src, masked, overwrite, replication, blockSize, progress, buffersize,conf.getInt("io.bytes.per.checksum", 512));
leasechecker.put(src, result);
return result;
}

关于在创建DFSOutputStream的过程中客服端干了些什么,我在前面的关于博文HDFS的文件操作流(1)——写操作中已经相当清楚地描述了这一过程(其中包括调用ClientProtocol的远程方法create),不清楚的盆友可以参考。从上那个面的代码我们可以看出,在客户端创建了DFSOutputStream文件流之后,它紧接着还干了一件事情,就是把文件路径名和对应的文件流存入LeaseChecker中,现在来看看LeaseChecker里是神马东东。
synchronized void put(String src, OutputStream out) {
if (clientRunning) {
if (daemon == null) {
daemon = new Daemon(this);
daemon.start();
}
pendingCreates.put(src, out);
}
}

synchronized void remove(String src) {
pendingCreates.remove(src);
}

private void renew() throws IOException {
synchronized(this) {
if (pendingCreates.isEmpty()) {
return;
}
}

//调用ClientProtocol的远程方法renewLease,向NameNode节点续租与客户端相关的文件租约

namenode.renewLease(clientName);
}

public void run() {
long lastRenewed = 0;
while (clientRunning && !Thread.interrupted()) {

//判断客户端租约是否快过期

if (System.currentTimeMillis() - lastRenewed > (LEASE_SOFTLIMIT_PERIOD / 2)) {
try {

//客户端租约快过期,重新向NameNode续租

renew();
lastRenewed = System.currentTimeMillis();
} catch (IOException ie) {
LOG.warn("Problem renewing lease for " + clientName, ie);
}
}

try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
if (LOG.isDebugEnabled()) {
LOG.debug(this + " is interrupted.", ie);
}
return;
}
}
}

当客户端向文件写完数据关闭文件流的时候,又做了些什么呢?

public void close() throws IOException {
if (closed) {
IOException e = lastException;
if (e == null)
return;
else
throw e;
}
closeInternal();
leasechecker.remove(src);
if (s != null) {
s.close();
s = null;
}
}

private synchronized void closeInternal() throws IOException {
...
while (!fileComplete) {

//调用ClientProtocol的远程方法complete,向NameNode节点解除文件src的租约

fileComplete = namenode.complete(src, clientName);
if (!fileComplete) {
try {
Thread.sleep(400);
if (System.currentTimeMillis() - localstart > 5000) {
LOG.info("Could not complete file " + src + " retrying...");
}
} catch (InterruptedException ie) {
}
}
}

...

}

从客户端完成对文件的写操作的过程中,只有服务器端的create、renewLease、complete三个方法参与其中,下面就让我们来看看(省略了与Lease无关的代码):

public void create(String src, FsPermission masked, String clientName, boolean overwrite, short replication, long blockSize) throws IOException {
String clientMachine = getClientMachine();
...
//执行文件的创建操作
namesystem.startFile(src,new PermissionStatus(UserGroupInformation.getCurrentUGI().getUserName(),null, masked),clientName, clientMachine, overwrite, replication, blockSize);
...
}

void startFile(String src, PermissionStatus permissions,
String holder, String clientMachine,
boolean overwrite, short replication, long blockSize
) throws IOException {
startFileInternal(src, permissions, holder, clientMachine, overwrite, false, replication, blockSize);
...
}

private synchronized void startFileInternal(String src,
PermissionStatus permissions,
String holder,
String clientMachine,
boolean overwrite,
boolean append,
short replication,
long blockSize
) throws IOException {

...
if (append) {

INodeFile node = (INodeFile) myFile;
INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
node.getLocalNameBytes(),
node.getReplication(),
node.getModificationTime(),
node.getPreferredBlockSize(),
node.getBlocks(),
node.getPermissionStatus(),
holder,
clientMachine,
clientNode);
dir.replaceNode(src, node, cons);
leaseManager.addLease(cons.clientName, src);

} else {
checkFsObjectLimit();
long genstamp = nextGenerationStamp();
INodeFileUnderConstruction newNode = dir.addFile(src, permissions, replication, blockSize, holder, clientMachine, clientNode, genstamp);
if (newNode == null) {
throw new IOException("DIR* NameSystem.startFile: " + "Unable to add file to namespace.");
}
leaseManager.addLease(newNode.clientName, src);

if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: " +"add "+src+" to namespace for "+holder);
}

}
} catch (IOException ie) {
NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: " +ie.getMessage());
throw ie;
}
}

public void renewLease(String clientName) throws IOException {
namesystem.renewLease(clientName);
}

void renewLease(String holder) throws IOException {
if (isInSafeMode())
throw new SafeModeException("Cannot renew lease for " + holder, safeMode);
//为客户端holder续租
leaseManager.renewLease(holder);
}

public boolean complete(String src, String clientName) throws IOException {
stateChangeLog.debug("*DIR* NameNode.complete: " + src + " for " + clientName);
//客户端完成对指定文件的写操作
CompleteFileStatus returnCode = namesystem.completeFile(src, clientName);
...
}

public CompleteFileStatus completeFile(String src, String holder) throws IOException {
CompleteFileStatus status = completeFileInternal(src, holder);
...
}

private synchronized CompleteFileStatus completeFileInternal(String src,
String holder) throws IOException {
...
finalizeINodeFileUnderConstruction(src, pendingFile);
NameNode.stateChangeLog.info("DIR* NameSystem.completeFile: file " + src
+ " is closed by " + holder);
return CompleteFileStatus.COMPLETE_SUCCESS;
}

private void finalizeINodeFileUnderConstruction(String src,INodeFileUnderConstruction pendingFile) throws IOException {
//释放客户端文件src的租约
leaseManager.removeLease(pendingFile.clientName, src);
...
}
不难看出,服务器端对文件租约的所有操作都是交给LeaseManager来管理的,而且NameNode通过后台工作线程LeaseManager$Monitor来定期检查LeaseManager中的文件租约是否过期,如果过期就释放该文件租约。

class Monitor implements Runnable{
final String name = getClass().getSimpleName();

public void run() {
for(; fsnamesystem.isRunning(); ) {
synchronized(fsnamesystem) {
checkLeases();
}

try {
Thread.sleep(2000);
} catch(InterruptedException ie) {
if (LOG.isDebugEnabled()) {
LOG.debug(name + " is interrupted", ie);
}
}
}
}
}

/** Check the leases beginning from the oldest. */
private synchronized void checkLeases() {
for(; sortedLeases.size() > 0; ) {
final Lease oldest = sortedLeases.first();
if (!oldest.expiredHardLimit()) {
return;
}

final List<String> removing = new ArrayList<String>();
String[] leasePaths = new String[oldest.getPaths().size()];
oldest.getPaths().toArray(leasePaths);
for(String p : leasePaths) {
try {
fsnamesystem.internalReleaseLease(oldest, p);
} catch (IOException e) {
LOG.error("Cannot release the path "+p+" in the lease "+oldest, e);
removing.add(p);
}
}
for(String p : removing) {
removeLease(oldest, p);
}
}
}

本文旨在介绍LeaseManager$Monitor线程是如何管理文件租约的Lease,并没有介绍Lease是如何保证数据一致性的,关于这一点,我将结合具体的文件操作来详细讲解。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐