(一)HDFS客户端开发和读写数据流程
第1章 HDFS概述
略
第2章 HDFS的Shell操作
基本语法
hadoop fs 具体命令 OR hdfs dfs 具体命令 两个是完全相同的。
常用命令实操
1、启动Hadoop集群(方便后续的测试)
[lu@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh [lu@hadoop103 hadoop-3.1.3]$ sbin/start-yarn.sh
2、-help:输出这个命令参数
[lu@hadoop102 hadoop-3.1.3]$ hadoop fs -help rm
上传
1)-moveFromLocal:从本地剪切粘贴到HDFS
[lu@hadoop102 hadoop-3.1.3]$ touch kongming.txt [lu@hadoop102 hadoop-3.1.3]$ hadoop fs -moveFromLocal ./kongming.txt /sanguo/shuguo
2)-copyFromLocal:从本地文件系统中拷贝文件到HDFS路径去
[lu@hadoop102 hadoop-3.1.3]$ hadoop fs -copyFromLocal README.txt /
3)-appendToFile:追加一个文件到已经存在的文件末尾
[lu@hadoop102 hadoop-3.1.3]$ touch liubei.txt [lu@hadoop102 hadoop-3.1.3]$ vi liubei.txt 输入 san gu mao lu [lu@hadoop102 hadoop-3.1.3]$ hadoop fs -appendToFile liubei.txt /sanguo/shuguo/kongming.txt
4)-put:等同于copyFromLocal
[lu@hadoop102 hadoop-3.1.3]$ hadoop fs -put ./liubei.txt /user/lu/test/
2:
第3章 HDFS客户端操作(开发重点)
3.1 HDFS客户端环境准备
1)找到资料目录下的Windows依赖目录,打开:
2)配置HADOOP_HOME环境变量。
3)创建一个Maven工程HdfsClientDemo,并导入相应的依赖坐标+日志添加
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>2.12.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.1.3</version> </dependency> </dependencies>
在项目的src/main/resources目录下,新建一个文件,命名为“log4j2.xml”,在文件中填入
<?xml version="1.0" encoding="UTF-8"?> <Configuration status="error" strict="true" name="XMLConfig"> <Appenders> <!-- 类型名为Console,名称为必须属性 --> <Appender type="Console" name="STDOUT"> <!-- 布局为PatternLayout的方式, 输出样式为[INFO] [2018-01-22 17:34:01][org.test.Console]I'm here --> <Layout type="PatternLayout" pattern="[%p] [%d{yyyy-MM-dd HH:mm:ss}][%c{10}]%m%n" /> </Appender> </Appenders> <Loggers> <!-- 可加性为false --> <Logger name="test" level="info" additivity="false"> <AppenderRef ref="STDOUT" /> </Logger> <!-- root loggerConfig设置 --> <Root level="info"> <AppenderRef ref="STDOUT" /> </Root> </Loggers> </Configuration>
创建包名:com.lu.hdfs
创建HdfsClient类
public class HdfsClient{ @Test public void testMkdirs() throws IOException, InterruptedException, URISyntaxException{ // 1 获取文件系统 Configuration configuration = new Configuration(); // 配置在集群上运行 // configuration.set("fs.defaultFS", "hdfs://hadoop102:9820"); // FileSystem fs = FileSystem.get(configuration); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9820"), configuration, "lu"); // 2 创建目录 fs.mkdirs(new Path("/1108/daxian/banzhang")); // 3 关闭资源 fs.close(); } }
3.2:IDEA登录HDFS
1、预准备
public class HdfsClient { private URI uri; private Configuration conf; private String user; private FileSystem fs; @Before //before方法在test方法之前运行一次,我们在before方法里面获取客户端对象 public void init() throws URISyntaxException, IOException, InterruptedException { uri = new URI("hdfs://hadoop102:9820"); conf = new Configuration(); user = "lu"; conf.set("dfs.replication", "2"); //1.获取一个客户端对象 //参数解读: 1.namenode的连接地址uri 2.配置对象conf fs = FileSystem.get(uri, conf, user); } @After //after方法在test方法运行之后运行一次,我们用after方法关闭客户端对象 public void close() throws IOException { //3.关闭客户端对象 fs.close(); System.out.println("over!!!!"); } }
2 、创建目录
@Test public void testMkdirs() throws IOException, InterruptedException, URISyntaxException { // 2 创建目录 fs.mkdirs(new Path("/lu/banzhang")); }
3、上传文件
@Test public void put() throws IOException { //参数解读 1.是否删除源文件(本地文件) 2.是否覆盖目标文件(hdfs文件) 3.源文件路径 4.目标文件路径 //fs.copyFromLocalFile(false,false,new Path("D:\\input\\hello.txt"),new Path("/java")); //fs.copyFromLocalFile(false,true,new Path("D:\\input\\hello.txt"),new Path("/java")); //fs.copyFromLocalFile(true,true,new Path("D:\\input\\hello2.txt"),new Path("/java")); fs.copyFromLocalFile(false, true, new Path("D:\\input\\wc2.txt"), new Path("/java")); }
4、下载文件
@Test public void get() throws IOException { //参数解读 1.是否删除源文件(hdfs文件) 2.源文件路径(hdfs) 3.目标路径(下载到本地的路径) 4.是否开启crc校验 false开启 true不开启 //fs.copyToLocalFile(false,new Path("/java/hello2.txt"),new Path("d:/input"),true); fs.copyToLocalFile(true, new Path("/java/hello2.txt"), new Path("d:/input"), true); }
5、删除文件和目录
@Test public void rm() throws IOException { //删除文件 //fs.delete(new Path("/java/abcd.txt"),false); //删除空目录 //fs.delete(new Path("/java2"),false); //删除非空目录,第二个参数表示是否递归删除 fs.delete(new Path("/java"), true); }
6、文件和目录的更名和移动
@Test public void mv() throws IOException { //文件的更名 //fs.rename(new Path("/kongming.txt"),new Path("/zhugeliang.txt")); //文件的移动并且更名 //fs.rename(new Path("/zhugeliang.txt"),new Path("/aaa/kongming.txt")); //目录的更名 // fs.rename(new Path("/aaa"),new Path("/bbb")); //目录的移动,第二个参数为已经存在的目录 fs.rename(new Path("/bbb"), new Path("/input")); }
7、文件详细信息查看
@Test public void ls() throws IOException { RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path("/"), true); while (iterator.hasNext()) { LocatedFileStatus fileStatus = iterator.next(); System.out.println("===========" + fileStatus.getPath() + "==========="); System.out.println(fileStatus.getPermission()); System.out.println(fileStatus.getOwner()); System.out.println(fileStatus.getGroup()); System.out.println(fileStatus.getLen()); System.out.println(fileStatus.getModificationTime()); System.out.println(fileStatus.getReplication()); System.out.println(fileStatus.getBlockSize()); System.out.println(fileStatus.getPath().getName()); //获取块信息数组 BlockLocation[] blockLocations = fileStatus.getBlockLocations(); System.out.println(Arrays.toString(blockLocations)); } } ===================== 获取块信息数组 0为开始位置,15为长度,后面和副本位置 [0,15,hadoop103,hadoop102,hadoop104]
8、文件和文件夹判断
@Test public void isFileOrDir() throws IOException { FileStatus[] fileStatuses = fs.listStatus(new Path("/test")); //遍历数组 for (FileStatus fileStatus : fileStatuses) { boolean file = fileStatus.isFile(); if (file) { System.out.println("文件:" + fileStatus.getPath()); } else { System.out.println("目录:" + fileStatus.getPath()); } } }
自己实现一个方法,递归判断传入路径下的文件和目录
public void isAll(String path,FileSystem fileSystem) throws IOException { FileStatus[] fileStatuses = fileSystem.listStatus(new Path(path)); for (FileStatus fileStatus : fileStatuses) { boolean file = fileStatus.isFile(); if (file) { System.out.println("文件:" + fileStatus.getPath()); }else { System.out.println("目录:" + fileStatus.getPath()); //如果是目录,因为不知道目录下面还有没有子目录,所以要递归调用自己 isAll(fileStatus.getPath().toString(),fileSystem); } } } ==================== @Test public void testIsAll() throws IOException { isAll("/test",fs); }
9、基于IO流的上传
@Test public void putByIO() throws IOException { //1 获取本地文件输入流 FileInputStream fis = new FileInputStream(new File("d:/input/abcd.txt")); //2 获取hdfs文件输出流 FSDataOutputStream hdfsfos = fs.create(new Path("/test/abcd.txt")); //3 流的对拷 IOUtils.copyBytes(fis,hdfsfos,conf); //4 流的关闭 IOUtils.closeStream(hdfsfos); IOUtils.closeStream(fis); }
10、基于IO流的下载
@Test public void getByIO() throws IOException { //1 获取hdfs文件输入流 FSDataInputStream hdfsfis = fs.open(new Path("/test/abcd.txt")); //2 获取本地文件输出流 FileOutputStream fos = new FileOutputStream(new File("d:/input/a.txt")); //3 流的对拷 IOUtils.copyBytes(hdfsfis,fos,conf); //4 流的关闭 IOUtils.closeStream(fos); IOUtils.closeStream(hdfsfis); }
第4章 HDFS的数据流
4.1 HDFS写数据流程
(1)客户端通过Distributed FileSystem模块向NameNode请求上传文件,NameNode检查目标文件是否已存在,父目录是否存在。
(2)NameNode返回是否可以上传。
(3)客户端请求第一个 Block上传到哪几个DataNode服务器上。
(4)NameNode返回3个DataNode节点,分别为dn1、dn2、dn3。
(5)客户端通过FSDataOutputStream模块请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成。
(6)dn1、dn2、dn3逐级应答客户端。
(7)客户端开始往dn1上传第一个Block(先从磁盘读取数据放到一个本地内存缓存),以Packet为单位,dn1收到一个Packet就会传给dn2,dn2传给dn3;dn1每传一个packet会放入一个应答队列等待应答。
(8)当一个Block传输完成之后,客户端再次请求NameNode上传第二个Block的服务器。(重复执行3-7步)。
源码解析:org.apache.hadoop.hdfs.DFSOutputStream
4.2 HDFS读数据流程
(1)客户端通过DistributedFileSystem向NameNode请求下载文件,NameNode通过查询元数据,找到文件块所在的DataNode地址。
(2)挑选一台DataNode(就近原则,然后随机)服务器,请求读取数据。
(3)DataNode开始传输数据给客户端(从磁盘里面读取数据输入流,以Packet为单位来做校验)。
(4)客户端以Packet为单位接收,先在本地缓存,然后写入目标文件。
- 大数据学习9:HDFS读写流程理解
- Java操作HDFS开发环境搭建以及HDFS的读写流程
- HDFS读写数据流程
- Hdfs读写数据流程
- HDFS读写数据流程
- HDFS上读写数据的流程解释
- 【若泽大数据基础第十一天】HDFS详解二---HDFS的读写流程
- HDFS上读写数据的流程解释
- 【大数据面试常问问题】----HDFS读写流程
- hadoop源码解析之hdfs写数据全流程分析---客户端处理
- hadoop(二)HDFS概述、shell操作、客户端操作(各种API操作)以及hdfs读写流程
- 大数据-Hadoop生态(8)-HDFS的读写数据流程以及机架感知
- 客户端向HDFS读写数据机制
- HDFS读写数据流程
- HDFS读写数据流程
- 【开发笔记】Unity联网斗地主的实现(一,服务器与客户端的数据传递流程)
- Hadoop中HDFS读写数据的流程分析
- 带你入坑大数据(二) --- HDFS的读写流程和一些重要策略
- ESP8266 SDK开发之数据读写
- 通过源码了解hdfs客户端写文件流程