您的位置:首页 > 大数据 > Hadoop

分布式文件系统HDFS

2017-11-18 16:55 483 查看
HDFS原理:

Namenode始终在内存中保存metedata,用于处理“读请求”

到有“写请求”到来时,namenode会首先写editlog到磁盘,即向edits文件中写日志,成功返回后,才会修改内存,并且向客户端返回

Hadoop会维护一个fsimage文件,也就是namenode中metedata的镜像,但是fsimage不会随时与namenode内存中的metedata保持一致,而是每隔一段时间通过合并edits文件来更新内容。Secondarynamenode就是用来合并fsimage和edits文件来更新NameNode的metedata的。

secondarynamenode的工作流程:

secondary通知namenode切换edits文件

secondary从namenode获得fsimage和edits(通过http)

secondary将fsimage载入内存,然后开始合并edits

secondary将新的fsimage发回给namenode

namenode用新的fsimage替换旧的fsimage

什么时候checkpiont:

fs.checkpoint.period指定两次checkpoint的最大时间间隔,默认3600秒。 

fs.checkpoint.size  规定edits文件的最大值,一旦超过这个值则强制checkpoint,不管是否到达最大时间间隔。默认大小是64M。

DataNode:

提供真实文件数据的存储服务。

文件块(block):最基本的存储单位。对于文件内容而言,一个文件的长度大小是size,那么从文件的0偏移开始,按照固定的大小,顺序对文件进行划分并编号,划分好的每一个块称一个Block。HDFS默认Block大小是128MB,以一个256MB文件,共有256/128=2个Block.

不同于普通文件系统的是,HDFS中,如果一个文件小于一个数据块的大小,并不占用整个数据块存储空间

Replication。多复本。默认是三个。

HDFS读过程:

初始化FileSystem,然后客户端(client)用FileSystem的open()函数打开文件

FileSystem用RPC调用元数据节点,得到文件的数据块信息,对于每一个数据块,元数据节点返回保存数据块的数据节点的地址。

FileSystem返回FSDataInputStream给客户端,用来读取数据,客户端调用stream的read()函数开始读取数据。

DFSInputStream连接保存此文件第一个数据块的最近的数据节点,data从数据节点读到客户端(client)

当此数据块读取完毕时,DFSInputStream关闭和此数据节点的连接,然后连接此文件下一个数据块的最近的数据节点。

当客户端读取完毕数据的时候,调用FSDataInputStream的close函数。

在读取数据的过程中,如果客户端在与数据节点通信出现错误,则尝试连接包含此数据块的下一个数据节点。

失败的数据节点将被记录,以后不再连接。

HDFS写过程:

初始化FileSystem,客户端调用create()来创建文件

FileSystem用RPC调用元数据节点,在文件系统的命名空间中创建一个新的文件,元数据节点首先确定文件原来不存在,并且客户端有创建文件的权限,然后创建新文件。

FileSystem返回DFSOutputStream,客户端用于写数据,客户端开始写入数据。

DFSOutputStream将数据分成块,写入dataqueue。dataqueue由DataStreamer读取,并通知元数据节点分配数据节点,用来存储数据块(每块默认复制3块)。分配的数据节点放在一个pipeline里。DataStreamer将数据块写入pipeline中的第一个数据节点。第一个数据节点将数据块发送给第二个数据节点。第二个数据节点将数据发送给第三个数据节点。

DFSOutputStream为发出去的数据块保存了ackqueue,等待pipeline中的数据节点告知数据已经写入成功。

当客户端结束写入数据,则调用stream的close函数。此操作将所有的数据块写入pipeline中的数据节点,并等待ackqueue返回成功。最后通知元数据节点写入完毕。

如果数据节点在写入的过程中失败,关闭pipeline,将ackqueue中的数据块放入dataqueue的开始,当前的数据块在已经写入的数据节点中被元数据节点赋予新的标示,则错误节点重启后能够察觉其数据块是过时的,会被删除。失败的数据节点从pipeline中移除,另外的数据块则写入pipeline中的另外两个数据节点。元数据节点则被通知此数据块是复制块数不足,将来会再创建第三份备份。

分布式文件管理系统很多,hdfs只是其中一种。适用于一次写入多次查询的情况,不支持并发写情况,小文件不合适。

HDFS目录结构参考:
http://blog.csdn.net/opensure/article/details/51452058http://blog.csdn.net/chndata/article/details/46003399  有详细的文件说明,性能调优
http://192.168.174.131:50070/dfshealth.html#tab-overviewhttp://192.168.174.131:8088/cluster/nodelabels
打开HDFS命令:

HDFSfs命令

-help[cmd]//显示命令的帮助信息

-ls(r)<path>//显示当前目录下所有文件

-du(s)<path>//显示目录中所有文件大小

-count[-q]<path>//显示目录中文件数量

-mv<src><dst>//移动多个文件到目标目录

-cp<src><dst>//复制多个文件到目标目录

-rm(r)//删除文件(夹)

-put<localsrc><dst>//本地文件复制到hdfs

-copyFromLocal//同put

-moveFromLocal//从本地文件移动到hdfs

-get[-ignoreCrc]<src><localdst>//复制文件到本地,可以忽略crc校验

-getmerge<src><localdst>//将源目录中的所有文件排序合并到一个文件中

-cat<src>//在终端显示文件内容

-text<src>//在终端显示文件内容

-copyToLocal[-ignoreCrc]<src><localdst>//复制到本地

-moveToLocal<src><localdst>

-mkdir<path>//创建文件夹

-touchz<path>//创建一个空文件

方式一:使用shell命令

查看hdfs根目录:

hadoopfs-lshdfs://主机名:端口号/

hadoopfs-ls/

备注:/是hdfs根目录

上传文件:

hadoopfs-putlinux路径hdfs路径

hadoopfs-copyFromLocal linux路径文件/

查看文件内容:

hadoopfs-cathdfs路径

下载文件:

hadoopfs-gethdfs上的路径linux路径

命令比较简单,可以通过hadoopfs-help回车查看命令使用

添加执行权限:

hadoopfs-chomda+x/in.log

去掉执行权限包括子文件子文件夹:

hadoopfs-chomd-R-x/wcout

同时改变所属组所属用户:

hadoopfs-chomd所属用户:所属组文件名

查看文件信息:

hadoopdfs-stat

方式二:

whichhadoop

whichhdfs

进入sbin

hdfsdfs-ls/

上传文件并核对大小,用网页查看HDFS:

 hadoopfs-put/usr/local/jdk1.8.0_151 /jdk1.8



Block块大小,默认是64m?128m?根据官网上面建议大小是64M/128M/256M

大小通过修改hdfs-site.xml

<configuration>

   <property> 

     <name>dfs.replication</name> 

     <value>1</value> <!--不修改,默认复本3个-->

   </property>

   <property> 

     <name>dfs.block.size</name> 

     <value>5242880</value> 

   </property>

</configuration>

HDFS的java接口:

publicclassHdfsTest{
privateFileSystemfs=null;

publicstaticvoidmain(String[]args)throwsIOException,URISyntaxException{

FileSystemfs=FileSystem.get(newURI("hdfs://192.168.174.131:9000/"),newConfiguration());

InputStreamin=fs.open(newPath("/jdk1.8"));

FileOutputStreamout=newFileOutputStream(newFile("c:/jdk123456"));

IOUtils.copyBytes(in,out,2048,true);

}

@Before
publicvoidinit()throwsIOException,URISyntaxException,InterruptedException{
fs=FileSystem.get(newURI("hdfs://193.168.174.131:9000/"),newConfiguration(),"root");
}

@Test
publicvoidtestDel()throwsIllegalArgumentException,IOException{
booleanflag=fs.delete(newPath("/words.txt"),true);
System.out.println(flag);
}

@Test
publicvoidtestMkdir()throwsIllegalArgumentException,IOException{
booleanflag=fs.mkdirs(newPath("/哈哈8888888"));
System.out.println(flag);
}

@Test
publicvoidtestUpload()throwsIllegalArgumentException,IOException{
FSDataOutputStreamout=fs.create(newPath("/words.txt"));

FileInputStreamin=newFileInputStream(newFile("c:/w.txt"));

IOUtils.copyBytes(in,out,2048,true);
}
}


RPC(远程过程调用协议):

实现跨语言,跨系统,不同进程之间方法调用,底层还是scoket

publicinterfaceBarty{

publicstaticfinallongversionID=10010;

publicStringsayHi(Stringname);
}

publicclassRPCServerimplementsBarty{

publicstaticvoidmain(String[]args)throwsHadoopIllegalArgumentException,IOException{

Serverserver=newRPC.Builder(newConfiguration())
.setInstance(newRPCServer())
.setBindAddress("192.168.174.1")//windows地址
.setPort(9527)
.setProtocol(Barty.class)
.build();
server.start();

}

@Override
publicStringsayHi(Stringname){
return"HI~"+name;
}

}

publicclassRPCClient{

publicstaticvoidmain(String[]args)throwsIOException{
Bartyproxy=RPC.getProxy(Barty.class,10010,
newInetSocketAddress("192.168.174.1",9527),newConfiguration());
StringsayHi=proxy.sayHi("tomcat");
System.out.println(sayHi);
}
}


server运行在win上,在IDEA上直接运行

在linux上运行client:java-jar RPCClient.jar

源码分析:

FileSystemget方法获取
CACHE.get(uri,conf)→
getInternal(uri,conf,key)
privateFileSystemgetInternal(URIuri,Configurationconf,FileSystem.Cache.Keykey)throwsIOException{
FileSystemfs;
synchronized(this){
fs=(FileSystem)this.map.get(key);//懒汉模式,线程不安全,需要使用同步
}
.....
FileSystem.createFileSystem(uri,conf)→
getFileSystemClass(uri.getScheme(),conf);
publicstaticClass<?extendsFileSystem>getFileSystemClass(Stringscheme,Configurationconf)throwsIOException{
if(!FILE_SYSTEMS_LOADED){
loadFileSystems();//记载到SERVICE_FILE_SYSTEMS
}
LOGGER.debug("LookingforFSsupporting{}",scheme);
Class<?extendsFileSystem>clazz=null;
if(conf!=null){//conf从core-site.xmlcore-default.xmlhdfs-default.xmlhdfs-site.xml中取
Stringproperty="fs."+scheme+".impl";//fs.hdfs.impl
LOGGER.debug("lookingforconfigurationoption{}",property);
clazz=conf.getClass(property,(Class)null);//自己配置fs.hdfs.impl,这里才不为空
}else{
LOGGER.debug("Noconfiguration:skippingcheckforfs.{}.impl",scheme);
}

if(clazz==null){
LOGGER.debug("Lookinginservicefilesystemsforimplementationclass");
clazz=(Class)SERVICE_FILE_SYSTEMS.get(scheme);
.......
回到createFileSystem
利用反射得到实例对象,但是有些属性还没有初始化,需要→
fs.initialize(uri,conf);
看源码需要看有返回值的,某个值是在哪里new的
DistributedFileSystem
initialize(URIuri,Configurationconf)主要newDFSClient..先静态属性代码块,然后构造方法,NameNodeProxy各种Policy策略
完成FileSystem创建
补充:

hdfs-default.xml配置block

读取默认配置文件信息core-default.xml在hadoop-common-3.0.0-beta1.jar下面


                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: