您的位置:首页 > 运维架构

Hadoop源码分析18:common包中的fs.FileSystem

2014-05-28 08:47 597 查看
1. Hadoop 抽象文件系统 org.apache.hadoop.fs.FileSystem,具体HDFS是这个抽象类的子类

publicabstractclassFileSystemextendsConfiguredimplementsCloseable{

publicstaticfinalStringFS_DEFAULT_NAME_KEY="fs.default.name";

privatestaticfinalCacheCACHE=newCache();

privateCache.Keykey;

privatestaticfinalMap<Class<?
extendsFileSystem>,Statistics>

statisticsTable=

newIdentityHashMap<Class<?extendsFileSystem>,Statistics>();

protectedStatisticsstatistics;

privateSet<Path>
deleteOnExit=newTreeSet<Path>();

staticclassCache
{

privatefinalMap<Key,FileSystem>
map=newHashMap<Key,FileSystem>();

closeAll()

closeAll(UserGroupInformation)

get(URI, Configuration)

remove(Key, FileSystem)

size()

}

privatestaticclassClientFinalizerextendsThread{

run()

}

publicstaticfinalclassStatistics{

privatefinalStringscheme;

privateAtomicLongbytesRead=newAtomicLong();

privateAtomicLongbytesWritten=newAtomicLong();

privateAtomicIntegerreadOps=newAtomicInteger();

privateAtomicIntegerlargeReadOps=newAtomicInteger();

privateAtomicIntegerwriteOps=newAtomicInteger();

}

addFileSystemForTesting(URI,Configuration, FileSystem)

clearStatistics()

closeAll()

closeAllForUGI(UserGroupInformation)

create(FileSystem, Path,FsPermission)

createFileSystem(URI,Configuration)

fixName(String)

get(Configuration)

get(URI,Configuration)

get(URI,Configuration, String)

getAllStatistics()

getDefaultUri(Configuration)

getLocal(Configuration)

getNamed(String,Configuration)

getStatistics()

getStatistics(String,Class<?extends FileSystem>)

mkdirs(FileSystem, Path,FsPermission)

printStatistics()

setDefaultUri(Configuration,String)

setDefaultUri(Configuration,URI)

deleteOnExit :Set<Path>

key :Key

statistics :Statistics

FileSystem()

append(Path)

append(Path,int)

append(Path,int, Progressable)

checkPath(Path)

close()

completeLocalOutput(Path,Path)

copyFromLocalFile(boolean,boolean, Path, Path)

copyFromLocalFile(boolean,boolean, Path[], Path)

copyFromLocalFile(boolean,Path, Path)

copyFromLocalFile(Path,Path)

copyToLocalFile(boolean,Path, Path)

copyToLocalFile(Path,Path)

create(Path)

create(Path,boolean)

create(Path,boolean, int)

create(Path,boolean, int, Progressable)

create(Path,boolean, int, short, long)

create(Path,boolean, int, short, long, Progressable)

create(Path,FsPermission, boolean, int, short, long, Progressable)

create(Path,Progressable)

create(Path,short)

create(Path,short, Progressable)

createNewFile(Path)

createNonRecursive(Path,boolean, int, short, long, Progressable)

createNonRecursive(Path,FsPermission, boolean, int, short, long, Progressable)

delete(Path)

delete(Path,boolean)

deleteOnExit(Path)

exists(Path)

getBlockSize(Path)

getCanonicalServiceName()

getCanonicalUri()

getContentSummary(Path)

getDefaultBlockSize()

getDefaultPort()

getDefaultReplication()

getDelegationToken(String)

getFileBlockLocations(FileStatus,long, long)

getFileChecksum(Path)

getFileStatus(Path)

getFileStatus(Path[])

getHomeDirectory()

getLength(Path)

getName()

getReplication(Path)

getUri()

getUsed()

getWorkingDirectory()

globPathsLevel(Path[],String[], int, boolean[])

globStatus(Path)

globStatus(Path,PathFilter)

globStatusInternal(Path,PathFilter)

initialize(URI,Configuration)

isDirectory(Path)

isFile(Path)

listStatus(ArrayList<FileStatus>,Path,
PathFilter)

listStatus(Path)

listStatus(Path,PathFilter)

listStatus(Path[])

listStatus(Path[],PathFilter)

makeQualified(Path)

mkdirs(Path)

mkdirs(Path,FsPermission)

moveFromLocalFile(Path,Path)

moveFromLocalFile(Path[],Path)

moveToLocalFile(Path,Path)

open(Path)

open(Path,int)

processDeleteOnExit()

rename(Path,Path)

setOwner(Path,String, String)

setPermission(Path,FsPermission)

setReplication(Path,short)

setTimes(Path,long, long)

setVerifyChecksum(boolean)

setWorkingDirectory(Path)

startLocalOutput(Path,Path)

}

2. 文件状态类
org.apache.hadoop.fs.FileStatus

publicclassFileStatusimplementsWritable,Comparable
{

privatePathpath;

privatelonglength;

privatebooleanisdir;

privateshortblock_replication;

privatelongblocksize;

privatelongmodification_time;

privatelongaccess_time;

privateFsPermissionpermission;

privateStringowner;
privateStringgroup;

}

文件权限org.apache.hadoop.fs.FsPermission

public

class

FsPermissionimplements
Writable{

privateFsActionuseraction=null;

privateFsActiongroupaction=null;
privateFsActionotheraction
=null;

}

publicenumFsAction{

// POSIXstyle

NONE("---"),

EXECUTE("--x"),

WRITE("-w-"),

WRITE_EXECUTE("-wx"),

READ("r--"),

READ_EXECUTE("r-x"),

READ_WRITE("rw-"),
ALL("rwx");

}

资源使用概要 (相当于du、df命令) ,
org.apache.hadoop.fs.ContentSummary

publicclassContentSummaryimplementsWritable{

privatelonglength;

privatelongfileCount;

privatelongdirectoryCount;

privatelongquota;

privatelongspaceConsumed;
private

long

spaceQuota;

}

3.文件输入输出流

publicabstractclassFSInputStreamextendsInputStream
implementsSeekable,PositionedReadable
{

getPos()
read(long,byte[], int, int)
readFully(long,byte[])
readFully(long,byte[], int, int)
seek(long)
seekToNewSource(long)
}

publicclassFSDataInputStreamextendsDataInputStream
implementsSeekable,PositionedReadable,
Closeable {

getPos()
read(long,byte[], int, int)
readFully(long,byte[])
readFully(long,byte[], int, int)
seek(long)
seekToNewSource(long)
}

public

class

FSDataOutputStreamextends
DataOutputStreamimplementsSyncable{

privatestaticclassPositionCacheextendsFilterOutputStream{

privateFileSystem.Statisticsstatistics;
long

position;

PositionCache(OutputStream, Statistics, long)
close()
getPos()
write(byte[], int, int)
write(int)
}

close()
getPos()
getWrappedStream()
sync()
}

4.FileSystem打开文件系统

publicstaticFileSystemget(URIuri,
Configuration conf) throwsIOException{

String scheme = uri.getScheme();

String authority = uri.getAuthority();

if(scheme==
null){ //
no scheme:use default FS

returnget(conf);

}

if(authority==
null){ //
noauthority

URI defaultUri =
getDefaultUri(conf);

if(scheme.equals(defaultUri.getScheme()) //
if schemematches default

&& defaultUri.getAuthority() !=
null){
// &default has authority

returnget(defaultUri,conf); //
returndefault

}

}

String disableCacheName = String.format("fs.%s.impl.disable.cache",scheme);

if(conf.getBoolean(disableCacheName,false)){

returncreateFileSystem(uri,conf);

}

returnCACHE.get(uri,conf);
}

privatestaticFileSystemcreateFileSystem(URIuri,
Configuration conf

)
throwsIOException{

Class<?>
clazz =conf.getClass("fs."+uri.getScheme() +
".impl",null);

LOG.debug("Creatingfilesystem
for " + uri);

if(clazz==
null){

thrownewIOException("NoFileSystem
for scheme: " +uri.getScheme());

}

FileSystem fs =(FileSystem)ReflectionUtils.newInstance(clazz,conf);

fs.initialize(uri, conf);

returnfs;
}

staticclassCache{

privatefinalMap<Key,FileSystem>
map=newHashMap<Key,FileSystem>();

FileSystem
get(URI uri,Configuration conf) throwsIOException{

Key key =
newKey(uri,conf);

FileSystem fs =
null;

synchronized(this){

fs =
map.get(key);

}

if(fs !=null){

returnfs;

}

fs =
createFileSystem(uri, conf);

synchronized(this){
// refetchthe lock again

FileSystem oldfs =
map.get(key);

if(oldfs!=
null){
// a filesystem is created while lock is releasing

fs.close();
// close thenew file system

returnoldfs; //
returnthe old file system

}

// now insertthe new file system into the map

if(map.isEmpty()&&
!clientFinalizer.isAlive()){

Runtime.getRuntime().addShutdownHook(clientFinalizer);

}

fs.key=key;

map.put(key,fs);

returnfs;

}
}

staticclassKey{

finalStringscheme;

finalStringauthority;
final
UserGroupInformation

ugi;

}

5.本地文件系统org.apache.hadoop.fs.RawLocalFileSystem

publicclassRawLocalFileSystemextendsFileSystem{

staticfinalURINAME=URI.create("file:///");
privatePathworkingDir;

classLocalFSFileInputStreamextendsFSInputStream{

FileInputStream
fis;
private

long

position;LocalFSFileOutputStream;

LocalFSFileInputStream(Path)

available()

close()

getPos()

markSupport()

read()

read(byte[], int, int)

read(long, byte[], int, int)

seek(long)

seekToNewSource(long)

skip(long)

}

classLocalFSFileOutputStreamextendsOutputStreamimplementsSyncable{
FileOutputStreamfos;

close()

flush()

sync()

write(byte[], int, int)

write(int)

}

static

class

RawLocalFileStatusextends
FileStatus{

getGroup()

getOwner()

getPermission()

isPermissionLoaded()

loadPermissionInfo()

write(DataOutput)

}

classTrackingFileInputStreamextendsFileInputStream{

TrackingFileInputStream(File)

read()

read(byte[])

read(byte[],int, int)
}

append(Path,int, Progressable)
close()
completeLocalOutput(Path,Path)
create(Path,boolean, boolean, int, short, long, Progressable)
create(Path,boolean, int, short, long, Progressable)
create(Path,FsPermission, boolean, int, short, long, Progressable)
createNonRecursive(Path,FsPermission, boolean, int, short, long, Progressable)
delete(Path)
delete(Path,boolean)
getFileStatus(Path)
getHomeDirectory()
getUri()
getWorkingDirectory()
initialize(URI,Configuration)
listStatus(Path)
makeAbsolute(Path)
mkdirs(Path)
mkdirs(Path,FsPermission)
moveFromLocalFile(Path,Path)
open(Path,int)
pathToFile(Path)
rename(Path,Path)
setOwner(Path,String, String)
setPermission(Path,FsPermission)
setWorkingDirectory(Path)
startLocalOutput(Path,Path)
toString()
}

6.带检验和的文件系统org.apache.hadoop.fs.ChecksumFileSystem

publicabstractclassChecksumFileSystemextendsFilterFileSystem{

privatestaticfinalbyte[]CHECKSUM_VERSION=newbyte[]{'c','r','c',0};

privateintbytesPerChecksum=512;
private

boolean

verifyChecksum
=true;

private
staticclass
ChecksumFSInputCheckerextendsFSInputChecker{

publicstaticfinalLogLOG

= LogFactory.getLog(FSInputChecker.class);

privateChecksumFileSystemfs;

privateFSDataInputStreamdatas;

privateFSDataInputStreamsums;

privatestaticfinalintHEADER_LENGTH=8;

privateintbytesPerSum=1;
private

long

fileLen
=-1L;

available()

close()

getChecksumFilePos(long)

getChunkPosition(long)

getFileLength()

read(long, byte[], int, int)

readChunk(long, byte[], int, int, byte[])

seek(long)

seekToNewSource(long)

skip(long)

}

privatestaticclassChecksumFSOutputSummerextendsFSOutputSummer{

privateFSDataOutputStreamdatas;

privateFSDataOutputStreamsums;
private

static
final
float
CHKSUM_AS_FRACTION
=0.01f;

close()

writeChunk(byte[], int, int, byte[])

}

getApproxChkSumLength(long)
getChecksumLength(long,int)
isChecksumFile(Path)
append(Path,int, Progressable)
completeLocalOutput(Path,Path)
copyFromLocalFile(boolean,Path, Path)
copyToLocalFile(boolean,Path, Path)
copyToLocalFile(Path,Path, boolean)
create(Path,FsPermission, boolean, boolean, int, short, long,Progressable)
create(Path,FsPermission, boolean, int, short, long, Progressable)
createNonRecursive(Path,FsPermission, boolean, int, short, long, Progressable)
delete(Path,boolean)
getBytesPerSum()
getChecksumFile(Path)
getChecksumFileLength(Path,long)
getRawFileSystem()
getSumBufferSize(int,int)
listStatus(Path)
mkdirs(Path)
open(Path,int)
rename(Path,Path)
reportChecksumFailure(Path,FSDataInputStream, long, FSDataInputStream, long)
setConf(Configuration)
setReplication(Path,short)
setVerifyChecksum(boolean)
startLocalOutput(Path,Path)
}

其中

abstractpublicclassFSInputCheckerextendsFSInputStream{

publicstaticfinalLogLOG

=LogFactory.getLog(FSInputChecker.class);

protectedPathfile;

privateChecksumsum;

privatebooleanverifyChecksum=true;

privatebyte[]buf;

privatebyte[]checksum;

privateintpos;

privateintcount;

privateintnumOfRetries;

// cachedfile position
private

long

chunkPos
=0;

available()
fill()
getChecksum()
getChunkPosition(long)
getPos()
mark(int)
markSupported()
needChecksum()
read()
read(byte[],int, int)
read1(byte[],int, int)
readChecksumChunk(byte[],int, int)
readChunk(long,byte[], int, int, byte[])
reset()
resetState()
seek(long)
set(boolean,Checksum, int, int)
skip(long)
verifySum(long)
}

abstractpublicclassFSOutputSummerextendsOutputStream{

// datachecksum

privateChecksumsum;

// internalbuffer for storing data before it is checksumed

privatebytebuf[];

// internalbuffer for storing checksum

privatebytechecksum[];

// The numberof valid bytes in the buffer.
private

int

count;

convertToByteStream(Checksum,int)
int2byte(int,byte[])
flushBuffer()
flushBuffer(boolean)
resetChecksumChunk(int)
write(byte[],int, int)
write(int)
write1(byte[],int, int)
writeChecksumChunk(byte[],int, int, boolean)
writeChunk(byte[],int, int, byte[])
}

7.用于测试的内存文件系统InMemoryFileSystem

publicclassInMemoryFileSystemextendsChecksumFileSystem{

privatestaticclassRawInMemoryFileSystemextendsFileSystem{

privateURIuri;

privatelongfsSize;

privatevolatilelongtotalUsed;
privatePathstaticWorkingDir

privateMap<String,FileAttributes>
pathToFileAttribs=

newHashMap<String,FileAttributes>();

privateMap<String,FileAttributes>
tempFileAttribs=
newHashMap<String,FileAttributes>();

privatestaticclassFileAttributes{

privatebyte[]data;

privateintsize;
}

privateclassInMemoryFileStatusextendsFileStatus{
}

privateclassInMemoryInputStreamextendsFSInputStream{

privateDataInputBufferdin=newDataInputBuffer();
privateFileAttributesfAttr;

}

privateclassInMemoryOutputStreamextendsOutputStream{

privateintcount;

privateFileAttributesfAttr;
privatePathf;

}

append(Path, int, Progressable)
canFitInMemory(long)
close()
create(Path, FileAttributes)
create(Path, FsPermission, boolean, int, short, long,Progressable)
delete(Path)
delete(Path, boolean)
getFiles(PathFilter)
getFileStatus(Path)
getFSSize()
getNumFiles(PathFilter)
getPath(Path)
getPercentUsed()
getUri()
getWorkingDirectory()
initialize(URI, Configuration)
listStatus(Path)
mkdirs(Path, FsPermission)
open(Path, int)
rename(Path, Path)
reserveSpace(Path, long)
setReplication(Path, short)
setWorkingDirectory(Path)
unreserveSpace(Path)
}

getFiles(PathFilter)
getFSSize()
getNumFiles(PathFilter)
getPercentUsed()
reserveSpaceWithCheckSum(Path, long)

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: