Hadoop源码分析18:common包中的fs.FileSystem
2014-05-28 08:47
597 查看
1. Hadoop 抽象文件系统 org.apache.hadoop.fs.FileSystem,具体HDFS是这个抽象类的子类
publicabstractclassFileSystemextendsConfiguredimplementsCloseable{
publicstaticfinalStringFS_DEFAULT_NAME_KEY="fs.default.name";
privatestaticfinalCacheCACHE=newCache();
privateCache.Keykey;
privatestaticfinalMap<Class<?
extendsFileSystem>,Statistics>
statisticsTable=
newIdentityHashMap<Class<?extendsFileSystem>,Statistics>();
protectedStatisticsstatistics;
privateSet<Path>
deleteOnExit=newTreeSet<Path>();
staticclassCache
{
privatefinalMap<Key,FileSystem>
map=newHashMap<Key,FileSystem>();
closeAll()
closeAll(UserGroupInformation)
get(URI, Configuration)
remove(Key, FileSystem)
size()
}
privatestaticclassClientFinalizerextendsThread{
run()
}
publicstaticfinalclassStatistics{
privatefinalStringscheme;
privateAtomicLongbytesRead=newAtomicLong();
privateAtomicLongbytesWritten=newAtomicLong();
privateAtomicIntegerreadOps=newAtomicInteger();
privateAtomicIntegerlargeReadOps=newAtomicInteger();
privateAtomicIntegerwriteOps=newAtomicInteger();
}
addFileSystemForTesting(URI,Configuration, FileSystem)
clearStatistics()
closeAll()
closeAllForUGI(UserGroupInformation)
create(FileSystem, Path,FsPermission)
createFileSystem(URI,Configuration)
fixName(String)
get(Configuration)
get(URI,Configuration)
get(URI,Configuration, String)
getAllStatistics()
getDefaultUri(Configuration)
getLocal(Configuration)
getNamed(String,Configuration)
getStatistics()
getStatistics(String,Class<?extends FileSystem>)
mkdirs(FileSystem, Path,FsPermission)
printStatistics()
setDefaultUri(Configuration,String)
setDefaultUri(Configuration,URI)
deleteOnExit :Set<Path>
key :Key
statistics :Statistics
FileSystem()
append(Path)
append(Path,int)
append(Path,int, Progressable)
checkPath(Path)
close()
completeLocalOutput(Path,Path)
copyFromLocalFile(boolean,boolean, Path, Path)
copyFromLocalFile(boolean,boolean, Path[], Path)
copyFromLocalFile(boolean,Path, Path)
copyFromLocalFile(Path,Path)
copyToLocalFile(boolean,Path, Path)
copyToLocalFile(Path,Path)
create(Path)
create(Path,boolean)
create(Path,boolean, int)
create(Path,boolean, int, Progressable)
create(Path,boolean, int, short, long)
create(Path,boolean, int, short, long, Progressable)
create(Path,FsPermission, boolean, int, short, long, Progressable)
create(Path,Progressable)
create(Path,short)
create(Path,short, Progressable)
createNewFile(Path)
createNonRecursive(Path,boolean, int, short, long, Progressable)
createNonRecursive(Path,FsPermission, boolean, int, short, long, Progressable)
delete(Path)
delete(Path,boolean)
deleteOnExit(Path)
exists(Path)
getBlockSize(Path)
getCanonicalServiceName()
getCanonicalUri()
getContentSummary(Path)
getDefaultBlockSize()
getDefaultPort()
getDefaultReplication()
getDelegationToken(String)
getFileBlockLocations(FileStatus,long, long)
getFileChecksum(Path)
getFileStatus(Path)
getFileStatus(Path[])
getHomeDirectory()
getLength(Path)
getName()
getReplication(Path)
getUri()
getUsed()
getWorkingDirectory()
globPathsLevel(Path[],String[], int, boolean[])
globStatus(Path)
globStatus(Path,PathFilter)
globStatusInternal(Path,PathFilter)
initialize(URI,Configuration)
isDirectory(Path)
isFile(Path)
listStatus(ArrayList<FileStatus>,Path,
PathFilter)
listStatus(Path)
listStatus(Path,PathFilter)
listStatus(Path[])
listStatus(Path[],PathFilter)
makeQualified(Path)
mkdirs(Path)
mkdirs(Path,FsPermission)
moveFromLocalFile(Path,Path)
moveFromLocalFile(Path[],Path)
moveToLocalFile(Path,Path)
open(Path)
open(Path,int)
processDeleteOnExit()
rename(Path,Path)
setOwner(Path,String, String)
setPermission(Path,FsPermission)
setReplication(Path,short)
setTimes(Path,long, long)
setVerifyChecksum(boolean)
setWorkingDirectory(Path)
startLocalOutput(Path,Path)
}
2. 文件状态类
org.apache.hadoop.fs.FileStatus
publicclassFileStatusimplementsWritable,Comparable
{
privatePathpath;
privatelonglength;
privatebooleanisdir;
privateshortblock_replication;
privatelongblocksize;
privatelongmodification_time;
privatelongaccess_time;
privateFsPermissionpermission;
privateStringowner;
privateStringgroup;
}
文件权限org.apache.hadoop.fs.FsPermission
public
class
FsPermissionimplements
Writable{
privateFsActionuseraction=null;
privateFsActiongroupaction=null;
privateFsActionotheraction
=null;
}
publicenumFsAction{
// POSIXstyle
NONE("---"),
EXECUTE("--x"),
WRITE("-w-"),
WRITE_EXECUTE("-wx"),
READ("r--"),
READ_EXECUTE("r-x"),
READ_WRITE("rw-"),
ALL("rwx");
}
资源使用概要 (相当于du、df命令) ,
org.apache.hadoop.fs.ContentSummary
publicclassContentSummaryimplementsWritable{
privatelonglength;
privatelongfileCount;
privatelongdirectoryCount;
privatelongquota;
privatelongspaceConsumed;
private
long
spaceQuota;
}
3.文件输入输出流
publicabstractclassFSInputStreamextendsInputStream
implementsSeekable,PositionedReadable
{
getPos()
read(long,byte[], int, int)
readFully(long,byte[])
readFully(long,byte[], int, int)
seek(long)
seekToNewSource(long)
}
publicclassFSDataInputStreamextendsDataInputStream
implementsSeekable,PositionedReadable,
Closeable {
getPos()
read(long,byte[], int, int)
readFully(long,byte[])
readFully(long,byte[], int, int)
seek(long)
seekToNewSource(long)
}
public
class
FSDataOutputStreamextends
DataOutputStreamimplementsSyncable{
privatestaticclassPositionCacheextendsFilterOutputStream{
privateFileSystem.Statisticsstatistics;
long
position;
PositionCache(OutputStream, Statistics, long)
close()
getPos()
write(byte[], int, int)
write(int)
}
close()
getPos()
getWrappedStream()
sync()
}
4.FileSystem打开文件系统
publicstaticFileSystemget(URIuri,
Configuration conf) throwsIOException{
String scheme = uri.getScheme();
String authority = uri.getAuthority();
if(scheme==
null){ //
no scheme:use default FS
returnget(conf);
}
if(authority==
null){ //
noauthority
URI defaultUri =
getDefaultUri(conf);
if(scheme.equals(defaultUri.getScheme()) //
if schemematches default
&& defaultUri.getAuthority() !=
null){
// &default has authority
returnget(defaultUri,conf); //
returndefault
}
}
String disableCacheName = String.format("fs.%s.impl.disable.cache",scheme);
if(conf.getBoolean(disableCacheName,false)){
returncreateFileSystem(uri,conf);
}
returnCACHE.get(uri,conf);
}
privatestaticFileSystemcreateFileSystem(URIuri,
Configuration conf
)
throwsIOException{
Class<?>
clazz =conf.getClass("fs."+uri.getScheme() +
".impl",null);
LOG.debug("Creatingfilesystem
for " + uri);
if(clazz==
null){
thrownewIOException("NoFileSystem
for scheme: " +uri.getScheme());
}
FileSystem fs =(FileSystem)ReflectionUtils.newInstance(clazz,conf);
fs.initialize(uri, conf);
returnfs;
}
staticclassCache{
privatefinalMap<Key,FileSystem>
map=newHashMap<Key,FileSystem>();
FileSystem
get(URI uri,Configuration conf) throwsIOException{
Key key =
newKey(uri,conf);
FileSystem fs =
null;
synchronized(this){
fs =
map.get(key);
}
if(fs !=null){
returnfs;
}
fs =
createFileSystem(uri, conf);
synchronized(this){
// refetchthe lock again
FileSystem oldfs =
map.get(key);
if(oldfs!=
null){
// a filesystem is created while lock is releasing
fs.close();
// close thenew file system
returnoldfs; //
returnthe old file system
}
// now insertthe new file system into the map
if(map.isEmpty()&&
!clientFinalizer.isAlive()){
Runtime.getRuntime().addShutdownHook(clientFinalizer);
}
fs.key=key;
map.put(key,fs);
returnfs;
}
}
staticclassKey{
finalStringscheme;
finalStringauthority;
final
UserGroupInformation
ugi;
}
5.本地文件系统org.apache.hadoop.fs.RawLocalFileSystem
publicclassRawLocalFileSystemextendsFileSystem{
staticfinalURINAME=URI.create("file:///");
privatePathworkingDir;
classLocalFSFileInputStreamextendsFSInputStream{
FileInputStream
fis;
private
long
position;LocalFSFileOutputStream;
LocalFSFileInputStream(Path)
available()
close()
getPos()
markSupport()
read()
read(byte[], int, int)
read(long, byte[], int, int)
seek(long)
seekToNewSource(long)
skip(long)
}
classLocalFSFileOutputStreamextendsOutputStreamimplementsSyncable{
FileOutputStreamfos;
close()
flush()
sync()
write(byte[], int, int)
write(int)
}
static
class
RawLocalFileStatusextends
FileStatus{
getGroup()
getOwner()
getPermission()
isPermissionLoaded()
loadPermissionInfo()
write(DataOutput)
}
classTrackingFileInputStreamextendsFileInputStream{
TrackingFileInputStream(File)
read()
read(byte[])
read(byte[],int, int)
}
append(Path,int, Progressable)
close()
completeLocalOutput(Path,Path)
create(Path,boolean, boolean, int, short, long, Progressable)
create(Path,boolean, int, short, long, Progressable)
create(Path,FsPermission, boolean, int, short, long, Progressable)
createNonRecursive(Path,FsPermission, boolean, int, short, long, Progressable)
delete(Path)
delete(Path,boolean)
getFileStatus(Path)
getHomeDirectory()
getUri()
getWorkingDirectory()
initialize(URI,Configuration)
listStatus(Path)
makeAbsolute(Path)
mkdirs(Path)
mkdirs(Path,FsPermission)
moveFromLocalFile(Path,Path)
open(Path,int)
pathToFile(Path)
rename(Path,Path)
setOwner(Path,String, String)
setPermission(Path,FsPermission)
setWorkingDirectory(Path)
startLocalOutput(Path,Path)
toString()
}
6.带检验和的文件系统org.apache.hadoop.fs.ChecksumFileSystem
publicabstractclassChecksumFileSystemextendsFilterFileSystem{
privatestaticfinalbyte[]CHECKSUM_VERSION=newbyte[]{'c','r','c',0};
privateintbytesPerChecksum=512;
private
boolean
verifyChecksum
=true;
private
staticclassChecksumFSInputCheckerextendsFSInputChecker{
publicstaticfinalLogLOG
= LogFactory.getLog(FSInputChecker.class);
privateChecksumFileSystemfs;
privateFSDataInputStreamdatas;
privateFSDataInputStreamsums;
privatestaticfinalintHEADER_LENGTH=8;
privateintbytesPerSum=1;
private
long
fileLen
=-1L;
available()
close()
getChecksumFilePos(long)
getChunkPosition(long)
getFileLength()
read(long, byte[], int, int)
readChunk(long, byte[], int, int, byte[])
seek(long)
seekToNewSource(long)
skip(long)
}
privatestaticclassChecksumFSOutputSummerextendsFSOutputSummer{
privateFSDataOutputStreamdatas;
privateFSDataOutputStreamsums;
private
static
final
float CHKSUM_AS_FRACTION
=0.01f;
close()
writeChunk(byte[], int, int, byte[])
}
getApproxChkSumLength(long)
getChecksumLength(long,int)
isChecksumFile(Path)
append(Path,int, Progressable)
completeLocalOutput(Path,Path)
copyFromLocalFile(boolean,Path, Path)
copyToLocalFile(boolean,Path, Path)
copyToLocalFile(Path,Path, boolean)
create(Path,FsPermission, boolean, boolean, int, short, long,Progressable)
create(Path,FsPermission, boolean, int, short, long, Progressable)
createNonRecursive(Path,FsPermission, boolean, int, short, long, Progressable)
delete(Path,boolean)
getBytesPerSum()
getChecksumFile(Path)
getChecksumFileLength(Path,long)
getRawFileSystem()
getSumBufferSize(int,int)
listStatus(Path)
mkdirs(Path)
open(Path,int)
rename(Path,Path)
reportChecksumFailure(Path,FSDataInputStream, long, FSDataInputStream, long)
setConf(Configuration)
setReplication(Path,short)
setVerifyChecksum(boolean)
startLocalOutput(Path,Path)
}
其中
abstractpublicclassFSInputCheckerextendsFSInputStream{
publicstaticfinalLogLOG
=LogFactory.getLog(FSInputChecker.class);
protectedPathfile;
privateChecksumsum;
privatebooleanverifyChecksum=true;
privatebyte[]buf;
privatebyte[]checksum;
privateintpos;
privateintcount;
privateintnumOfRetries;
// cachedfile position
private
long
chunkPos
=0;
available()
fill()
getChecksum()
getChunkPosition(long)
getPos()
mark(int)
markSupported()
needChecksum()
read()
read(byte[],int, int)
read1(byte[],int, int)
readChecksumChunk(byte[],int, int)
readChunk(long,byte[], int, int, byte[])
reset()
resetState()
seek(long)
set(boolean,Checksum, int, int)
skip(long)
verifySum(long)
}
abstractpublicclassFSOutputSummerextendsOutputStream{
// datachecksum
privateChecksumsum;
// internalbuffer for storing data before it is checksumed
privatebytebuf[];
// internalbuffer for storing checksum
privatebytechecksum[];
// The numberof valid bytes in the buffer.
private
int
count;
convertToByteStream(Checksum,int)
int2byte(int,byte[])
flushBuffer()
flushBuffer(boolean)
resetChecksumChunk(int)
write(byte[],int, int)
write(int)
write1(byte[],int, int)
writeChecksumChunk(byte[],int, int, boolean)
writeChunk(byte[],int, int, byte[])
}
7.用于测试的内存文件系统InMemoryFileSystem
publicclassInMemoryFileSystemextendsChecksumFileSystem{
privatestaticclassRawInMemoryFileSystemextendsFileSystem{
privateURIuri;
privatelongfsSize;
privatevolatilelongtotalUsed;
privatePathstaticWorkingDir
privateMap<String,FileAttributes>
pathToFileAttribs=
newHashMap<String,FileAttributes>();
privateMap<String,FileAttributes>
tempFileAttribs=
newHashMap<String,FileAttributes>();
privatestaticclassFileAttributes{
privatebyte[]data;
privateintsize;
}
privateclassInMemoryFileStatusextendsFileStatus{
}
privateclassInMemoryInputStreamextendsFSInputStream{
privateDataInputBufferdin=newDataInputBuffer();
privateFileAttributesfAttr;
}
privateclassInMemoryOutputStreamextendsOutputStream{
privateintcount;
privateFileAttributesfAttr;
privatePathf;
}
append(Path, int, Progressable)
canFitInMemory(long)
close()
create(Path, FileAttributes)
create(Path, FsPermission, boolean, int, short, long,Progressable)
delete(Path)
delete(Path, boolean)
getFiles(PathFilter)
getFileStatus(Path)
getFSSize()
getNumFiles(PathFilter)
getPath(Path)
getPercentUsed()
getUri()
getWorkingDirectory()
initialize(URI, Configuration)
listStatus(Path)
mkdirs(Path, FsPermission)
open(Path, int)
rename(Path, Path)
reserveSpace(Path, long)
setReplication(Path, short)
setWorkingDirectory(Path)
unreserveSpace(Path)
}
getFiles(PathFilter)
getFSSize()
getNumFiles(PathFilter)
getPercentUsed()
reserveSpaceWithCheckSum(Path, long)
}
publicabstractclassFileSystemextendsConfiguredimplementsCloseable{
publicstaticfinalStringFS_DEFAULT_NAME_KEY="fs.default.name";
privatestaticfinalCacheCACHE=newCache();
privateCache.Keykey;
privatestaticfinalMap<Class<?
extendsFileSystem>,Statistics>
statisticsTable=
newIdentityHashMap<Class<?extendsFileSystem>,Statistics>();
protectedStatisticsstatistics;
privateSet<Path>
deleteOnExit=newTreeSet<Path>();
staticclassCache
{
privatefinalMap<Key,FileSystem>
map=newHashMap<Key,FileSystem>();
closeAll()
closeAll(UserGroupInformation)
get(URI, Configuration)
remove(Key, FileSystem)
size()
}
privatestaticclassClientFinalizerextendsThread{
run()
}
publicstaticfinalclassStatistics{
privatefinalStringscheme;
privateAtomicLongbytesRead=newAtomicLong();
privateAtomicLongbytesWritten=newAtomicLong();
privateAtomicIntegerreadOps=newAtomicInteger();
privateAtomicIntegerlargeReadOps=newAtomicInteger();
privateAtomicIntegerwriteOps=newAtomicInteger();
}
addFileSystemForTesting(URI,Configuration, FileSystem)
clearStatistics()
closeAll()
closeAllForUGI(UserGroupInformation)
create(FileSystem, Path,FsPermission)
createFileSystem(URI,Configuration)
fixName(String)
get(Configuration)
get(URI,Configuration)
get(URI,Configuration, String)
getAllStatistics()
getDefaultUri(Configuration)
getLocal(Configuration)
getNamed(String,Configuration)
getStatistics()
getStatistics(String,Class<?extends FileSystem>)
mkdirs(FileSystem, Path,FsPermission)
printStatistics()
setDefaultUri(Configuration,String)
setDefaultUri(Configuration,URI)
deleteOnExit :Set<Path>
key :Key
statistics :Statistics
FileSystem()
append(Path)
append(Path,int)
append(Path,int, Progressable)
checkPath(Path)
close()
completeLocalOutput(Path,Path)
copyFromLocalFile(boolean,boolean, Path, Path)
copyFromLocalFile(boolean,boolean, Path[], Path)
copyFromLocalFile(boolean,Path, Path)
copyFromLocalFile(Path,Path)
copyToLocalFile(boolean,Path, Path)
copyToLocalFile(Path,Path)
create(Path)
create(Path,boolean)
create(Path,boolean, int)
create(Path,boolean, int, Progressable)
create(Path,boolean, int, short, long)
create(Path,boolean, int, short, long, Progressable)
create(Path,FsPermission, boolean, int, short, long, Progressable)
create(Path,Progressable)
create(Path,short)
create(Path,short, Progressable)
createNewFile(Path)
createNonRecursive(Path,boolean, int, short, long, Progressable)
createNonRecursive(Path,FsPermission, boolean, int, short, long, Progressable)
delete(Path)
delete(Path,boolean)
deleteOnExit(Path)
exists(Path)
getBlockSize(Path)
getCanonicalServiceName()
getCanonicalUri()
getContentSummary(Path)
getDefaultBlockSize()
getDefaultPort()
getDefaultReplication()
getDelegationToken(String)
getFileBlockLocations(FileStatus,long, long)
getFileChecksum(Path)
getFileStatus(Path)
getFileStatus(Path[])
getHomeDirectory()
getLength(Path)
getName()
getReplication(Path)
getUri()
getUsed()
getWorkingDirectory()
globPathsLevel(Path[],String[], int, boolean[])
globStatus(Path)
globStatus(Path,PathFilter)
globStatusInternal(Path,PathFilter)
initialize(URI,Configuration)
isDirectory(Path)
isFile(Path)
listStatus(ArrayList<FileStatus>,Path,
PathFilter)
listStatus(Path)
listStatus(Path,PathFilter)
listStatus(Path[])
listStatus(Path[],PathFilter)
makeQualified(Path)
mkdirs(Path)
mkdirs(Path,FsPermission)
moveFromLocalFile(Path,Path)
moveFromLocalFile(Path[],Path)
moveToLocalFile(Path,Path)
open(Path)
open(Path,int)
processDeleteOnExit()
rename(Path,Path)
setOwner(Path,String, String)
setPermission(Path,FsPermission)
setReplication(Path,short)
setTimes(Path,long, long)
setVerifyChecksum(boolean)
setWorkingDirectory(Path)
startLocalOutput(Path,Path)
}
2. 文件状态类
org.apache.hadoop.fs.FileStatus
publicclassFileStatusimplementsWritable,Comparable
{
privatePathpath;
privatelonglength;
privatebooleanisdir;
privateshortblock_replication;
privatelongblocksize;
privatelongmodification_time;
privatelongaccess_time;
privateFsPermissionpermission;
privateStringowner;
privateStringgroup;
}
文件权限org.apache.hadoop.fs.FsPermission
public
class
FsPermissionimplements
Writable{
privateFsActionuseraction=null;
privateFsActiongroupaction=null;
privateFsActionotheraction
=null;
}
publicenumFsAction{
// POSIXstyle
NONE("---"),
EXECUTE("--x"),
WRITE("-w-"),
WRITE_EXECUTE("-wx"),
READ("r--"),
READ_EXECUTE("r-x"),
READ_WRITE("rw-"),
ALL("rwx");
}
资源使用概要 (相当于du、df命令) ,
org.apache.hadoop.fs.ContentSummary
publicclassContentSummaryimplementsWritable{
privatelonglength;
privatelongfileCount;
privatelongdirectoryCount;
privatelongquota;
privatelongspaceConsumed;
private
long
spaceQuota;
}
3.文件输入输出流
publicabstractclassFSInputStreamextendsInputStream
implementsSeekable,PositionedReadable
{
getPos()
read(long,byte[], int, int)
readFully(long,byte[])
readFully(long,byte[], int, int)
seek(long)
seekToNewSource(long)
}
publicclassFSDataInputStreamextendsDataInputStream
implementsSeekable,PositionedReadable,
Closeable {
getPos()
read(long,byte[], int, int)
readFully(long,byte[])
readFully(long,byte[], int, int)
seek(long)
seekToNewSource(long)
}
public
class
FSDataOutputStreamextends
DataOutputStreamimplementsSyncable{
privatestaticclassPositionCacheextendsFilterOutputStream{
privateFileSystem.Statisticsstatistics;
long
position;
PositionCache(OutputStream, Statistics, long)
close()
getPos()
write(byte[], int, int)
write(int)
}
close()
getPos()
getWrappedStream()
sync()
}
4.FileSystem打开文件系统
publicstaticFileSystemget(URIuri,
Configuration conf) throwsIOException{
String scheme = uri.getScheme();
String authority = uri.getAuthority();
if(scheme==
null){ //
no scheme:use default FS
returnget(conf);
}
if(authority==
null){ //
noauthority
URI defaultUri =
getDefaultUri(conf);
if(scheme.equals(defaultUri.getScheme()) //
if schemematches default
&& defaultUri.getAuthority() !=
null){
// &default has authority
returnget(defaultUri,conf); //
returndefault
}
}
String disableCacheName = String.format("fs.%s.impl.disable.cache",scheme);
if(conf.getBoolean(disableCacheName,false)){
returncreateFileSystem(uri,conf);
}
returnCACHE.get(uri,conf);
}
privatestaticFileSystemcreateFileSystem(URIuri,
Configuration conf
)
throwsIOException{
Class<?>
clazz =conf.getClass("fs."+uri.getScheme() +
".impl",null);
LOG.debug("Creatingfilesystem
for " + uri);
if(clazz==
null){
thrownewIOException("NoFileSystem
for scheme: " +uri.getScheme());
}
FileSystem fs =(FileSystem)ReflectionUtils.newInstance(clazz,conf);
fs.initialize(uri, conf);
returnfs;
}
staticclassCache{
privatefinalMap<Key,FileSystem>
map=newHashMap<Key,FileSystem>();
FileSystem
get(URI uri,Configuration conf) throwsIOException{
Key key =
newKey(uri,conf);
FileSystem fs =
null;
synchronized(this){
fs =
map.get(key);
}
if(fs !=null){
returnfs;
}
fs =
createFileSystem(uri, conf);
synchronized(this){
// refetchthe lock again
FileSystem oldfs =
map.get(key);
if(oldfs!=
null){
// a filesystem is created while lock is releasing
fs.close();
// close thenew file system
returnoldfs; //
returnthe old file system
}
// now insertthe new file system into the map
if(map.isEmpty()&&
!clientFinalizer.isAlive()){
Runtime.getRuntime().addShutdownHook(clientFinalizer);
}
fs.key=key;
map.put(key,fs);
returnfs;
}
}
staticclassKey{
finalStringscheme;
finalStringauthority;
final
UserGroupInformation
ugi;
}
5.本地文件系统org.apache.hadoop.fs.RawLocalFileSystem
publicclassRawLocalFileSystemextendsFileSystem{
staticfinalURINAME=URI.create("file:///");
privatePathworkingDir;
classLocalFSFileInputStreamextendsFSInputStream{
FileInputStream
fis;
private
long
position;LocalFSFileOutputStream;
LocalFSFileInputStream(Path)
available()
close()
getPos()
markSupport()
read()
read(byte[], int, int)
read(long, byte[], int, int)
seek(long)
seekToNewSource(long)
skip(long)
}
classLocalFSFileOutputStreamextendsOutputStreamimplementsSyncable{
FileOutputStreamfos;
close()
flush()
sync()
write(byte[], int, int)
write(int)
}
static
class
RawLocalFileStatusextends
FileStatus{
getGroup()
getOwner()
getPermission()
isPermissionLoaded()
loadPermissionInfo()
write(DataOutput)
}
classTrackingFileInputStreamextendsFileInputStream{
TrackingFileInputStream(File)
read()
read(byte[])
read(byte[],int, int)
}
append(Path,int, Progressable)
close()
completeLocalOutput(Path,Path)
create(Path,boolean, boolean, int, short, long, Progressable)
create(Path,boolean, int, short, long, Progressable)
create(Path,FsPermission, boolean, int, short, long, Progressable)
createNonRecursive(Path,FsPermission, boolean, int, short, long, Progressable)
delete(Path)
delete(Path,boolean)
getFileStatus(Path)
getHomeDirectory()
getUri()
getWorkingDirectory()
initialize(URI,Configuration)
listStatus(Path)
makeAbsolute(Path)
mkdirs(Path)
mkdirs(Path,FsPermission)
moveFromLocalFile(Path,Path)
open(Path,int)
pathToFile(Path)
rename(Path,Path)
setOwner(Path,String, String)
setPermission(Path,FsPermission)
setWorkingDirectory(Path)
startLocalOutput(Path,Path)
toString()
}
6.带检验和的文件系统org.apache.hadoop.fs.ChecksumFileSystem
publicabstractclassChecksumFileSystemextendsFilterFileSystem{
privatestaticfinalbyte[]CHECKSUM_VERSION=newbyte[]{'c','r','c',0};
privateintbytesPerChecksum=512;
private
boolean
verifyChecksum
=true;
private
staticclassChecksumFSInputCheckerextendsFSInputChecker{
publicstaticfinalLogLOG
= LogFactory.getLog(FSInputChecker.class);
privateChecksumFileSystemfs;
privateFSDataInputStreamdatas;
privateFSDataInputStreamsums;
privatestaticfinalintHEADER_LENGTH=8;
privateintbytesPerSum=1;
private
long
fileLen
=-1L;
available()
close()
getChecksumFilePos(long)
getChunkPosition(long)
getFileLength()
read(long, byte[], int, int)
readChunk(long, byte[], int, int, byte[])
seek(long)
seekToNewSource(long)
skip(long)
}
privatestaticclassChecksumFSOutputSummerextendsFSOutputSummer{
privateFSDataOutputStreamdatas;
privateFSDataOutputStreamsums;
private
static
final
float CHKSUM_AS_FRACTION
=0.01f;
close()
writeChunk(byte[], int, int, byte[])
}
getApproxChkSumLength(long)
getChecksumLength(long,int)
isChecksumFile(Path)
append(Path,int, Progressable)
completeLocalOutput(Path,Path)
copyFromLocalFile(boolean,Path, Path)
copyToLocalFile(boolean,Path, Path)
copyToLocalFile(Path,Path, boolean)
create(Path,FsPermission, boolean, boolean, int, short, long,Progressable)
create(Path,FsPermission, boolean, int, short, long, Progressable)
createNonRecursive(Path,FsPermission, boolean, int, short, long, Progressable)
delete(Path,boolean)
getBytesPerSum()
getChecksumFile(Path)
getChecksumFileLength(Path,long)
getRawFileSystem()
getSumBufferSize(int,int)
listStatus(Path)
mkdirs(Path)
open(Path,int)
rename(Path,Path)
reportChecksumFailure(Path,FSDataInputStream, long, FSDataInputStream, long)
setConf(Configuration)
setReplication(Path,short)
setVerifyChecksum(boolean)
startLocalOutput(Path,Path)
}
其中
abstractpublicclassFSInputCheckerextendsFSInputStream{
publicstaticfinalLogLOG
=LogFactory.getLog(FSInputChecker.class);
protectedPathfile;
privateChecksumsum;
privatebooleanverifyChecksum=true;
privatebyte[]buf;
privatebyte[]checksum;
privateintpos;
privateintcount;
privateintnumOfRetries;
// cachedfile position
private
long
chunkPos
=0;
available()
fill()
getChecksum()
getChunkPosition(long)
getPos()
mark(int)
markSupported()
needChecksum()
read()
read(byte[],int, int)
read1(byte[],int, int)
readChecksumChunk(byte[],int, int)
readChunk(long,byte[], int, int, byte[])
reset()
resetState()
seek(long)
set(boolean,Checksum, int, int)
skip(long)
verifySum(long)
}
abstractpublicclassFSOutputSummerextendsOutputStream{
// datachecksum
privateChecksumsum;
// internalbuffer for storing data before it is checksumed
privatebytebuf[];
// internalbuffer for storing checksum
privatebytechecksum[];
// The numberof valid bytes in the buffer.
private
int
count;
convertToByteStream(Checksum,int)
int2byte(int,byte[])
flushBuffer()
flushBuffer(boolean)
resetChecksumChunk(int)
write(byte[],int, int)
write(int)
write1(byte[],int, int)
writeChecksumChunk(byte[],int, int, boolean)
writeChunk(byte[],int, int, byte[])
}
7.用于测试的内存文件系统InMemoryFileSystem
publicclassInMemoryFileSystemextendsChecksumFileSystem{
privatestaticclassRawInMemoryFileSystemextendsFileSystem{
privateURIuri;
privatelongfsSize;
privatevolatilelongtotalUsed;
privatePathstaticWorkingDir
privateMap<String,FileAttributes>
pathToFileAttribs=
newHashMap<String,FileAttributes>();
privateMap<String,FileAttributes>
tempFileAttribs=
newHashMap<String,FileAttributes>();
privatestaticclassFileAttributes{
privatebyte[]data;
privateintsize;
}
privateclassInMemoryFileStatusextendsFileStatus{
}
privateclassInMemoryInputStreamextendsFSInputStream{
privateDataInputBufferdin=newDataInputBuffer();
privateFileAttributesfAttr;
}
privateclassInMemoryOutputStreamextendsOutputStream{
privateintcount;
privateFileAttributesfAttr;
privatePathf;
}
append(Path, int, Progressable)
canFitInMemory(long)
close()
create(Path, FileAttributes)
create(Path, FsPermission, boolean, int, short, long,Progressable)
delete(Path)
delete(Path, boolean)
getFiles(PathFilter)
getFileStatus(Path)
getFSSize()
getNumFiles(PathFilter)
getPath(Path)
getPercentUsed()
getUri()
getWorkingDirectory()
initialize(URI, Configuration)
listStatus(Path)
mkdirs(Path, FsPermission)
open(Path, int)
rename(Path, Path)
reserveSpace(Path, long)
setReplication(Path, short)
setWorkingDirectory(Path)
unreserveSpace(Path)
}
getFiles(PathFilter)
getFSSize()
getNumFiles(PathFilter)
getPercentUsed()
reserveSpaceWithCheckSum(Path, long)
}
相关文章推荐
- Hadoop源码分析之DistributedFileSystem
- Hadoop源码分析FSNamesystem几个重要的成员变量
- Hadoop 源码分析(二四)FSNamesystem
- hadoop源码分析系列(二)——org.apache.hadoop.fs包 ----(上)
- 利用Hadoop的FileSystem create方法获取 FSDataOutputStream 实现文件的上传
- hadoop-common源码分析之-Configuration
- 全网第一人:CAUSED BY: NoClassDefFoundError: org/apache/hadoop/fs/adl/AdlFileSystem
- hadoop2.7.2学习笔记05-hadoop文件系统API定义-hadoop文件系统类org.apache.hadoop.fs.FileSystem
- hadoop-common源码分析之-Configuration
- Hadoop/Eclipse - Exception NoClassDefFoundError: org/apache/hadoop/fs/FileSystem解决方法
- 利用 hadoop 的 FileSystem open 方法获取 FSDataInputStream 实现文件下载到本地 Hadoop 版本 2.7.0
- hadoop源码剖析--RawLocalFileSystem
- Hadoop Common源码分析之SerializationFactory、Serialization
- hadoop-common源码分析之-WritableUtils
- Hadoop Common源码分析之服务Service
- Hadoop 源代码分析(二四)FSNamesystem
- hadoop-common源码分析之-WritableUtils
- 利用Hadoop的FileSystem create方法获取 FSDataOutputStream 实现文件的上传
- OSMDroid源码分析之Cache:MapTileFilesystemProvider
- 用java运行Hadoop程序报错:org.apache.hadoop.fs.LocalFileSystem cannot be cast to org.apache.