Spark中加载本地(或者hdfs)文件以及 spark使用SparkContext实例的textFile读取多个文件夹(嵌套)下的多个数据文件
2017-11-08 16:45
861 查看
Spark中加载本地(或者hdfs)文件以及 spark使用SparkContext实例的textFile读取多个文件夹(嵌套)下的多个数据文件
在正常调用过程中,难免需要对多个文件夹下的多个文件进行读取,然而之前只是明确了spark具备读取多个文件的能力。
针对多个文件夹下的多个文件,以前的做法是先进行文件夹的遍历,然后再进行各个文件夹目录的读取,其实不必那么麻烦,因为spark原生就支持这样的能力。
原理也非常简单,就是textFile功能。编写这样的代码,读取上次输出的多个结果,由于RDD保存结果都是保存为一个文件夹。而多个相关联RDD的结果就是多个文件夹。
(1)通过如下代码:
(2)textFile文件路径传入格式
默认是从hdfs读取文件,也可以指定sc.textFile("路径").在路径前面加上hdfs://表示从hdfs文件系统上读
本地文件读取 sc.textFile("路径").在路径前面加上file:// 表示从本地文件系统读,如file:///home/user/spark/README.md
具体格式如下:
textFile的参数是一个path,这个path可以是:
1. 一个文件路径,这时候只装载指定的文件
2. 一个目录路径,这时候只装载指定目录下面的所有文件(不包括子目录下面的文件) (ps,这个没有测试通过,应该是错误的)
3. 通过通配符的形式加载多个文件或者加载多个目录下面的所有文件
1)hdfs://localhost:9002/input/下目录结构如下图:
2)下面是在python编写的spark程序中测试的结果,判断是否达到预期。
其中形如(2)格式的hdfs输入会报如下错误:
3)此外,第三点是一个使用小技巧,现在假设我的数据结构为先按天分区,再按小时分区的,在hdfs上的目录结构类似于:
/user/hdfs/input/dt=20130728/hr=00/
/user/hdfs/input/dt=20130728/hr=01/
...
/user/hdfs/input/dt=20130728/hr=23/
具体的数据都在hr等于某个时间的目录下面,现在我们要分析20130728这一天的数据,我们就必须把这个目录下面的所有hr=*的子目录下面的数据全部装载进RDD,于是我们可以这样写:sc.textFile("hdfs://n1:8020/user/hdfs/input/dt=20130728/hr=*/"),注意到hr=*,是一个模糊匹配的方式。
4)利用Transformations 操作函数union来合并多个文件的输入
'''合并输入多个数据文件,并集操作,将源数据集与union中的输入数据集取并集,
默认保留重复元素'''
input_data_path_1 = "hdfs://localhost:9002/input/2017-11-01.txt"
result1 = sc.textFile(input_data_path_1)
input_data_path_2= "hdfs://localhost:9002/input/2017-11-10.txt"
result2 = sc.textFile(input_data_path_2)
result = result1.union(result2)
union(otherDataset)
并集操作,将源数据集与union中的输入数据集取并集,默认保留重复元素(如果不保留重复元素,可以利用distinct操作去除,下边介绍distinct时会介绍)。
在正常调用过程中,难免需要对多个文件夹下的多个文件进行读取,然而之前只是明确了spark具备读取多个文件的能力。
针对多个文件夹下的多个文件,以前的做法是先进行文件夹的遍历,然后再进行各个文件夹目录的读取,其实不必那么麻烦,因为spark原生就支持这样的能力。
原理也非常简单,就是textFile功能。编写这样的代码,读取上次输出的多个结果,由于RDD保存结果都是保存为一个文件夹。而多个相关联RDD的结果就是多个文件夹。
(1)通过如下代码:
//## read all files(files in different directorys) val alldata = sc.textFile("data/Flag/*/part-*") println(alldata.count())经过测试,可以实现对多个相关联RDD保存结果的一次性读取
(2)textFile文件路径传入格式
默认是从hdfs读取文件,也可以指定sc.textFile("路径").在路径前面加上hdfs://表示从hdfs文件系统上读
本地文件读取 sc.textFile("路径").在路径前面加上file:// 表示从本地文件系统读,如file:///home/user/spark/README.md
具体格式如下:
sc = SparkContext(conf=conf) '''# hdfs目录格式如下''' input_data_path = "hdfs://localhost:9002/input/2017-11*" '''# 本地文件目录''' input_data_path="file:///Users/a6/Downloads/input_local/2017-09*" print input_data_path result = sc.textFile(input_data_path)
textFile的参数是一个path,这个path可以是:
1. 一个文件路径,这时候只装载指定的文件
2. 一个目录路径,这时候只装载指定目录下面的所有文件(不包括子目录下面的文件) (ps,这个没有测试通过,应该是错误的)
3. 通过通配符的形式加载多个文件或者加载多个目录下面的所有文件
1)hdfs://localhost:9002/input/下目录结构如下图:
localhost:userid_hbsid_map_new a6$ hadoop dfs -ls hdfs://localhost:9002/input/ DEPRECATED: Use of this script to execute hdfs command is deprecated. Instead use the hdfs command for it. 17/11/08 16:30:15 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 3 items -rw-r--r-- 1 a6 supergroup 100 2017-11-07 16:41 hdfs://localhost:9002/input/2017-11-01.txt -rw-r--r-- 1 a6 supergroup 100 2017-11-07 16:41 hdfs://localhost:9002/input/2017-11-10.txt drwxr-xr-x - a6 supergroup 0 2017-11-08 15:17 hdfs://localhost:9002/input/test_input localhost:userid_hbsid_map_new a6$ hadoop dfs -ls hdfs://localhost:9002/input/* DEPRECATED: Use of this script to execute hdfs command is deprecated. Instead use the hdfs command for it. 17/11/08 16:30:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable -rw-r--r-- 1 a6 supergroup 100 2017-11-07 16:41 hdfs://localhost:9002/input/2017-11-01.txt -rw-r--r-- 1 a6 supergroup 100 2017-11-07 16:41 hdfs://localhost:9002/input/2017-11-10.txt Found 1 items -rw-r--r-- 1 a6 supergroup 100 2017-11-08 15:17 hdfs://localhost:9002/input/test_input/2017-11-20.txt localhost:userid_hbsid_map_new a6$
2)下面是在python编写的spark程序中测试的结果,判断是否达到预期。
'''# hdfs目录''' (1)input_data_path = "hdfs://localhost:9002/input/2017-11-01.txt" #一个文件路径,这时候只装载指定的文件 (2)input_data_path = "hdfs://localhost:9002/input" #这个报错,没有测试通过 —— 一个目录路径,这时候只装载指定目录下面的所有文件(不包括子目录下面的文件) (3)input_data_path = "hdfs://localhost:9002/input/*" #通过通配符的形式加载多个文件或者加载多个目录下面的所有文件
其中形如(2)格式的hdfs输入会报如下错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o18.partitions. : java.io.IOException: Not a file: hdfs://localhost:9002/input/test_input at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:320)
3)此外,第三点是一个使用小技巧,现在假设我的数据结构为先按天分区,再按小时分区的,在hdfs上的目录结构类似于:
/user/hdfs/input/dt=20130728/hr=00/
/user/hdfs/input/dt=20130728/hr=01/
...
/user/hdfs/input/dt=20130728/hr=23/
具体的数据都在hr等于某个时间的目录下面,现在我们要分析20130728这一天的数据,我们就必须把这个目录下面的所有hr=*的子目录下面的数据全部装载进RDD,于是我们可以这样写:sc.textFile("hdfs://n1:8020/user/hdfs/input/dt=20130728/hr=*/"),注意到hr=*,是一个模糊匹配的方式。
4)利用Transformations 操作函数union来合并多个文件的输入
'''合并输入多个数据文件,并集操作,将源数据集与union中的输入数据集取并集,
默认保留重复元素'''
input_data_path_1 = "hdfs://localhost:9002/input/2017-11-01.txt"
result1 = sc.textFile(input_data_path_1)
input_data_path_2= "hdfs://localhost:9002/input/2017-11-10.txt"
result2 = sc.textFile(input_data_path_2)
result = result1.union(result2)
union(otherDataset)
并集操作,将源数据集与union中的输入数据集取并集,默认保留重复元素(如果不保留重复元素,可以利用distinct操作去除,下边介绍distinct时会介绍)。
>>> data1 = sc.parallelize(range(10)) >>> data2 = sc.parallelize(range(6,15)) >>> data1.union(data2).collect() [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 6, 7, 8, 9, 10, 11, 12, 13, 14]参考网址:http://blog.csdn.net/zy_zhengyang/article/details/46853441
相关文章推荐
- Spark中加载本地(或者hdfs)文件以及SparkContext实例的textFile使用
- Spark中加载本地(或者hdfs)文件以及SparkContext实例的textFile使用
- Spark中加载本地(或者hdfs)文件以及SparkContext实例的textFile使用
- Spark中加载本地(或者hdfs)文件以及SparkContext实例的textFile使用
- Spark中加载本地(或者hdfs)文件以及SparkContext实例的textFile使用
- 【python】如何批量读取文件夹的所有文件数据,os模块与open函数结合使用实例
- spark中的SparkContext实例的textFile使用的小技巧
- C语言判断文件夹或者文件是否存在以及权限等信息access函数和删除文件或目录 remove()函数使用实例
- spark 的WholeTextFile使用以及文件路径
- spark中的SparkContext实例的textFile使用的小技巧
- spark中的SparkContext实例的textFile使用的小技巧
- spark中的SparkContext实例的textFile使用的小技巧
- 使用CURL读取HTTP数据到字符串或者文件中
- jdom 或 dom4j读取xml文件时如何让dtd验证使用本地dtd文件或者不生效
- 第七章:在Spark集群上使用文件中的数据加载成为graph并进行操作(3)
- openfiledialog 使用 读取txt文件 StreamReader 检验数据
- spark中的SparkContext的textFile使用的小窍门
- 使用RandomAccessFile这个类,从文件中读取数据,却读取不出来
- 任何的File.ReadAllText()和使用StreamReader读取文件内容之间的差异?
- ArrayList 与HashSet的比较,及应用反射读取properties配置文件中的数据进行实例化再调用,以及类加载器的使用;还有HashCode的分析,及导致内存泄露,内存溢出的原因之一