SparkSQL编程指南之Java篇二-数据源(上)
2017-03-10 11:11
471 查看
Spark SQL通过DataFrame接口支持各种不同数据源的操作。一个DataFrame可以进行相关的转换操作,也可以用于创建临时视图。注册DataFrame为一个临时视图可以允许你对其数据执行SQL查询。本文首先会介绍使用Spark数据源加载和保存数据的一般方法,然后对内置数据源进行详细介绍。
1. 一般的Load/Save方法
Spark SQL最简单的也是默认的数据源格式是Parquet(除非使用了spark.sql.sources.default配置修改),它将会被用于所有的操作。以下是一般的Load/Save方法:
* 如果是使用windows系统的话,需要把对应版本编译的hadoop.dll复制到C:\Windows\System32,否则会遇到以下错误:
1.1 手动指定选项
我们也可以通过完整的全名(例如:org.apache.spark.sql.parquet)来指定数据源的类型,对于那些内置的数据源类型,也可以使用简称,例如:json, parquet, jdbc, orc, libsvm, csv, text。从任何数据源类型加载的DataFrames可以转换为其它的类型格式,例如:
1.2 直接执行SQL
我们也可以直接执行SQL查询而不需要使用API加载文件为DataFrame然后再查询,例如:
* 注意parquet.后面的路径前后有个`的字符(与键盘~一起的那个逗点)
1.3 保存模式
保存操作可以选择性地使用SaveMode指定如何处理存在的数据。需要注意的是这些保存模式不使用任何锁和不是原子操作的。此外,当使用Overwrite模式时,原数据会在写入新数据之前就会被删除。以下是SaveMode的选项,当保存一个DataFrame到指定数据源的时候,如果输出路径已经存在:
SaveMode.ErrorIfExists(默认) 抛出异常
SaveMode.Append 数据会以追加的方式保存
SaveMode.Overwrite 新数据会覆盖原数据(先删除原数据,再保存新数据)
SaveMode.Ignore 不保存新数据,相当于SQL语句的CREATE TABLE IF NOT EXISTS
例如:
1.4 持久化到表
我们也可以使用saveAsTable方法把DataFrames保存为持久化的表到Hive metastore。值得注意的是我们不需要部署Hive的环境,Spark会创建一个默认本地的Hive metastore(使用Derby)。与createOrReplaceTempView方法不同,saveAsTable会实质化DataFrame的内容,然后在Hive metastore创建它的指针。只要连接到相同metastore的连接不中断,即使Spark程序重新启动,持久化的表也会一直存在。
默认地saveAsTable方法将创建一个“管理表”(managed table),表示数据的位置是由metastore来控制管理的。当持久化的表被删除时,managed table将会自动删除相应的数据。
2. Parquet文件
Parquet是一种被其它多种数据处理系统支持的纵列格式。Spark SQL提供了读写Parquet文件的功能,保存的Parquet文件会自动保留原始数据的schema。当保存Parquet文件时,基于兼容性考虑,所有的列会被自动转换为允许空值。
2.1 以编程方式加载数据
2.2 分区推断
对表进行分区是对数据进行优化的方式之一。在一个分区的表内,数据通常是通过分区列将数据存储在不同的目录里面。Parquet数据源现在能够自动地发现并推断分区信息。例如,可以使用下面的目录结构存储人口数据到分区表里面,分区列为gender和country:
通过传递path/to/table给SparkSession.read.parquet或SparkSession.read.load,Spark SQL将自动抽取分区信息。返回的DataFrame的Schema如下:
需要注意的是,分区列的数据类型是自动解析的。目前,数值类型和字符串类型是支持的。如果不想分区列的数据类型被自动解析,可以通过配置spark.sql.sources.partitionColumnTypeInference.enabled=false,默认是true。当该配置被设为false时,分区列数据类型会使用string类型。
从Spark 1.6.0版本开始,默认地,分区信息解析只会作用于指定路径下面的分区。例如上面的例子,如果用户传递path/to/table/gender=male给SparkSession.read.parquet或SparkSession.read.load,gender将不会是分区列。如果用户需要指定基础的路径作为分区信息解析的开始路径,那么可以在数据源选项设置basePath。例如,path/to/table/gender=male是数据的路径,用户设置了basePath=path/to/table/,那么gender将会是分区列。
2.3 Schema合并
像ProtocolBuffer、Avro和Thrift一样,Parquet也支持schema evolution(schema演变)。用户可以先定义一个简单的schema,然后根据需要逐渐地向schema中增加列。通过这种方式,用户可以有多个不同的schemas但它们是互相兼容的Parquet文件。Parquet数据源现在能够自动检测这种情况并合并这些文件的schemas。
因为Schema合并是一个相对高消耗的操作,在大多数的情况下并不需要,所以从Spark SQL 1.5.0版本开始,默认关闭了该功能。可以通过下面两种方式开启:
当读取Parquet文件时,设置数据源选项mergeSchema=true(例如下面的例子)
设置全局SQL选项spark.sql.parquet.mergeSchema=true
2.4 Hive metastore Parquet表转换
当读写Hive metastore Parquet表时,基于性能考虑,Spark SQL会先尝试使用自带的Parquet SerDe(序列化与反序列化,Serialize/Deserilize的简称),而不是Hive的SerDe。这个优化选项可以通过spark.sql.hive.convertMetastoreParquet配置,默认为开启。
2.4.1 Hive/Parquet Schema一致化
从表schema处理的角度来看,Hive和Parquet有2个主要的不同点:
Hive不区分大小写,Parquet区分大小写
Hive认为所有的列都可以为空,而Parquet的空值性是有重要意义的(Hive considers all columns nullable, while nullability in Parquet is significant)
由于以上不同点,当把Hive metastore Parquet表转换为Spark SQL Parquet表时,必须将Hive metastore schema和Parquet schema进行一致化。其规则如下:
两个schema中,忽略空值性,具有相同名字的字段必须具有相同的数据类型。一致化后的字段类型应该与Parquet的字段类型一致,以便遵守空值性原则
一致化后的schema只包含在Hive metastore schema定义的字段:
i. 丢弃只在Parquet schema定义的字段
i. 把只在Hive metastore schema定义的字段设为允许为空
2.4.2 元数据刷新
为了提高性能,Spark SQL会缓存Parquet元数据(metadata)。当Hive metastore Parquet表转换的选项开启时,转换后的表元数据也会被缓存。如果这些表被Hive或者其它外部工具更新,则需要手动刷新缓存以确保元数据的一致性。
2.5 配置
Parquet的配置可以使用SparkSession的setConf方法或者使用SQL执行SET key=value命令。详细的配置参数如下:
3. JSON Datasets
Spark SQL可以自动推断JSON数据集的schema并加载为Dataset<Row>。此转换可以使用SparkSession.read().json()方法读取一个String类型的RDD或者一个JSON文件。需要注意的是,这里的JSON文件不是典型的JSON格式。这里的JSON文件每一行必须包含一个独立有效的JSON对象,也称为换行符分割JSON文件。因此,一个规则的多行JSON文件会导致读取出错。读取JSON数据集例子如下:
* 参考Spark SQL官方链接:http://spark.apache.org/docs/latest/sql-programming-guide.html#data-sources
TO BE CONTINUED...O(∩_∩)O
1. 一般的Load/Save方法
Spark SQL最简单的也是默认的数据源格式是Parquet(除非使用了spark.sql.sources.default配置修改),它将会被用于所有的操作。以下是一般的Load/Save方法:
// generic load/save functions Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet"); usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
* 如果是使用windows系统的话,需要把对应版本编译的hadoop.dll复制到C:\Windows\System32,否则会遇到以下错误:
java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Ljava/lang/String;JJJI)Ljava/io/FileDescriptor; at org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Native Method) at org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileOutputStreamWithMode(NativeIO.java:559) at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:219) at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209) at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307) at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296) at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328) at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789) at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:223) ......
1.1 手动指定选项
我们也可以通过完整的全名(例如:org.apache.spark.sql.parquet)来指定数据源的类型,对于那些内置的数据源类型,也可以使用简称,例如:json, parquet, jdbc, orc, libsvm, csv, text。从任何数据源类型加载的DataFrames可以转换为其它的类型格式,例如:
// manually load options Dataset<Row> peopleDF = spark.read().format("json").load("examples/src/main/resources/people.json"); peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");
1.2 直接执行SQL
我们也可以直接执行SQL查询而不需要使用API加载文件为DataFrame然后再查询,例如:
// run SQL on files directly Dataset<Row> sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`"); sqlDF.show(); // +------+--------------+----------------+ // | name|favorite_color|favorite_numbers| // +------+--------------+----------------+ // |Alyssa| null| [3, 9, 15, 20]| // | Ben| red| []| // +------+--------------+----------------+
* 注意parquet.后面的路径前后有个`的字符(与键盘~一起的那个逗点)
1.3 保存模式
保存操作可以选择性地使用SaveMode指定如何处理存在的数据。需要注意的是这些保存模式不使用任何锁和不是原子操作的。此外,当使用Overwrite模式时,原数据会在写入新数据之前就会被删除。以下是SaveMode的选项,当保存一个DataFrame到指定数据源的时候,如果输出路径已经存在:
SaveMode.ErrorIfExists(默认) 抛出异常
SaveMode.Append 数据会以追加的方式保存
SaveMode.Overwrite 新数据会覆盖原数据(先删除原数据,再保存新数据)
SaveMode.Ignore 不保存新数据,相当于SQL语句的CREATE TABLE IF NOT EXISTS
例如:
usersDF.select("name", "favorite_color").write().mode(SaveMode.Overwrite).save("namesAndFavColors.parquet");
1.4 持久化到表
我们也可以使用saveAsTable方法把DataFrames保存为持久化的表到Hive metastore。值得注意的是我们不需要部署Hive的环境,Spark会创建一个默认本地的Hive metastore(使用Derby)。与createOrReplaceTempView方法不同,saveAsTable会实质化DataFrame的内容,然后在Hive metastore创建它的指针。只要连接到相同metastore的连接不中断,即使Spark程序重新启动,持久化的表也会一直存在。
默认地saveAsTable方法将创建一个“管理表”(managed table),表示数据的位置是由metastore来控制管理的。当持久化的表被删除时,managed table将会自动删除相应的数据。
2. Parquet文件
Parquet是一种被其它多种数据处理系统支持的纵列格式。Spark SQL提供了读写Parquet文件的功能,保存的Parquet文件会自动保留原始数据的schema。当保存Parquet文件时,基于兼容性考虑,所有的列会被自动转换为允许空值。
2.1 以编程方式加载数据
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; Dataset<Row> peopleDF = spark.read().json("examples/src/main/resources/people.json"); // DataFrames can be saved as Parquet files, maintaining the schema information peopleDF.write().parquet("people.parquet"); // Read in the Parquet file created above. // Parquet files are self-describing so the schema is preserved // The result of loading a parquet file is also a DataFrame Dataset<Row> parquetFileDF = spark.read().parquet("people.parquet"); // Parquet files can also be used to create a temporary view and then used in SQL statements parquetFileDF.createOrReplaceTempView("parquetFile"); Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19"); Dataset<String> namesDS = namesDF.map(row -> "Name: " + row.getString(0), Encoders.STRING()); namesDS.show(); // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+
2.2 分区推断
对表进行分区是对数据进行优化的方式之一。在一个分区的表内,数据通常是通过分区列将数据存储在不同的目录里面。Parquet数据源现在能够自动地发现并推断分区信息。例如,可以使用下面的目录结构存储人口数据到分区表里面,分区列为gender和country:
path └── to └── table ├── gender=male │ ├── ... │ │ │ ├── country=US │ │ └── data.parquet │ ├── country=CN │ │ └── data.parquet │ └── ... └── gender=female ├── ... │ ├── country=US │ └── data.parquet ├── country=CN │ └── data.parquet └── ...
通过传递path/to/table给SparkSession.read.parquet或SparkSession.read.load,Spark SQL将自动抽取分区信息。返回的DataFrame的Schema如下:
root |-- name: string (nullable = true) |-- age: long (nullable = true) |-- gender: string (nullable = true) |-- country: string (nullable = true)
需要注意的是,分区列的数据类型是自动解析的。目前,数值类型和字符串类型是支持的。如果不想分区列的数据类型被自动解析,可以通过配置spark.sql.sources.partitionColumnTypeInference.enabled=false,默认是true。当该配置被设为false时,分区列数据类型会使用string类型。
从Spark 1.6.0版本开始,默认地,分区信息解析只会作用于指定路径下面的分区。例如上面的例子,如果用户传递path/to/table/gender=male给SparkSession.read.parquet或SparkSession.read.load,gender将不会是分区列。如果用户需要指定基础的路径作为分区信息解析的开始路径,那么可以在数据源选项设置basePath。例如,path/to/table/gender=male是数据的路径,用户设置了basePath=path/to/table/,那么gender将会是分区列。
2.3 Schema合并
像ProtocolBuffer、Avro和Thrift一样,Parquet也支持schema evolution(schema演变)。用户可以先定义一个简单的schema,然后根据需要逐渐地向schema中增加列。通过这种方式,用户可以有多个不同的schemas但它们是互相兼容的Parquet文件。Parquet数据源现在能够自动检测这种情况并合并这些文件的schemas。
因为Schema合并是一个相对高消耗的操作,在大多数的情况下并不需要,所以从Spark SQL 1.5.0版本开始,默认关闭了该功能。可以通过下面两种方式开启:
当读取Parquet文件时,设置数据源选项mergeSchema=true(例如下面的例子)
设置全局SQL选项spark.sql.parquet.mergeSchema=true
import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; public static class Square implements Serializable { private int value; private int square; // Getters and setters... } public static class Cube implements Serializable { private int value; private int cube; // Getters and setters... } List<Square> squares = new ArrayList<>(); for (int value = 1; value <= 5; value++) { Square square = new Square(); square.setValue(value); square.setSquare(value * value); squares.add(square); } // Create a simple DataFrame, store into a partition directory Dataset<Row> squaresDF = spark.createDataFrame(squares, Square.class); squaresDF.write().parquet("data/test_table/key=1"); List<Cube> cubes = new ArrayList<>(); for (int value = 6; value <= 10; value++) { Cube cube = new Cube(); cube.setValue(value); cube.setCube(value * value * value); cubes.add(cube); } // Create another DataFrame in a new partition directory, // adding a new column and dropping an existing column Dataset<Row> cubesDF = spark.createDataFrame(cubes, Cube.class); cubesDF.write().parquet("data/test_table/key=2"); // Read the partitioned table Dataset<Row> mergedDF = spark.read().option("mergeSchema", true).parquet("data/test_table"); mergedDF.printSchema(); // The final schema consists of all 3 columns in the Parquet files together // with the partitioning column appeared in the partition directory paths // root // |-- value: int (nullable = true) // |-- square: int (nullable = true) // |-- cube: int (nullable = true) // |-- key: int (nullable = true)
2.4 Hive metastore Parquet表转换
当读写Hive metastore Parquet表时,基于性能考虑,Spark SQL会先尝试使用自带的Parquet SerDe(序列化与反序列化,Serialize/Deserilize的简称),而不是Hive的SerDe。这个优化选项可以通过spark.sql.hive.convertMetastoreParquet配置,默认为开启。
2.4.1 Hive/Parquet Schema一致化
从表schema处理的角度来看,Hive和Parquet有2个主要的不同点:
Hive不区分大小写,Parquet区分大小写
Hive认为所有的列都可以为空,而Parquet的空值性是有重要意义的(Hive considers all columns nullable, while nullability in Parquet is significant)
由于以上不同点,当把Hive metastore Parquet表转换为Spark SQL Parquet表时,必须将Hive metastore schema和Parquet schema进行一致化。其规则如下:
两个schema中,忽略空值性,具有相同名字的字段必须具有相同的数据类型。一致化后的字段类型应该与Parquet的字段类型一致,以便遵守空值性原则
一致化后的schema只包含在Hive metastore schema定义的字段:
i. 丢弃只在Parquet schema定义的字段
i. 把只在Hive metastore schema定义的字段设为允许为空
2.4.2 元数据刷新
为了提高性能,Spark SQL会缓存Parquet元数据(metadata)。当Hive metastore Parquet表转换的选项开启时,转换后的表元数据也会被缓存。如果这些表被Hive或者其它外部工具更新,则需要手动刷新缓存以确保元数据的一致性。
// spark is an existing SparkSession spark.catalog().refreshTable("my_table");
2.5 配置
Parquet的配置可以使用SparkSession的setConf方法或者使用SQL执行SET key=value命令。详细的配置参数如下:
3. JSON Datasets
Spark SQL可以自动推断JSON数据集的schema并加载为Dataset<Row>。此转换可以使用SparkSession.read().json()方法读取一个String类型的RDD或者一个JSON文件。需要注意的是,这里的JSON文件不是典型的JSON格式。这里的JSON文件每一行必须包含一个独立有效的JSON对象,也称为换行符分割JSON文件。因此,一个规则的多行JSON文件会导致读取出错。读取JSON数据集例子如下:
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files Dataset<Row> people = spark.read().json("examples/src/main/resources/people.json"); // The inferred schema can be visualized using the printSchema() method people.printSchema(); // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Creates a temporary view using the DataFrame people.createOrReplaceTempView("people"); // SQL statements can be run by using the sql methods provided by spark Dataset<Row> namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19"); namesDF.show(); // +------+ // | name| // +------+ // |Justin| // +------+ // Alternatively, a DataFrame can be created for a JSON dataset represented by // an RDD[String] storing one JSON object per string. List<String> jsonData = Arrays.asList( "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}"); JavaRDD<String> anotherPeopleRDD = new JavaSparkContext(spark.sparkContext()).parallelize(jsonData); Dataset<Row> anotherPeople = spark.read().json(anotherPeopleRDD); anotherPeople.show(); // +---------------+----+ // | address|name| // +---------------+----+ // |[Columbus,Ohio]| Yin| // +---------------+----+
* 参考Spark SQL官方链接:http://spark.apache.org/docs/latest/sql-programming-guide.html#data-sources
TO BE CONTINUED...O(∩_∩)O
相关文章推荐
- Spark编程指南入门之Java篇二-基本操作
- Spark编程指南入门之Java篇二-基本操作
- Spark编程指南入门之Java篇二-基本操作
- Spark(1.6.1) Sql 编程指南+实战案例分析
- Spark编程指南入门之Java篇五-数据重组Shuffle介绍
- 4.Spark SQL:数据源Parquet之使用编程方式加载数据
- 4.Spark SQL:数据源Parquet之使用编程方式加载数据
- 4.Spark SQL:数据源Parquet之使用编程方式加载数据
- Spark编程指南入门之Java篇一-基本知识
- Spark SQL编程指南pdf
- 4.Spark SQL:数据源Parquet之使用编程方式加载数据
- 4.Spark SQL:数据源Parquet之使用编程方式加载数据
- 4.Spark SQL:数据源Parquet之使用编程方式加载数据
- Spark -9:Spark SQL, DataFrames and Datasets 编程指南
- 4.Spark SQL:数据源Parquet之使用编程方式加载数据
- Spark1.4.1 编程指南 (Scala/Java/Python)
- Spark编程指南入门之Java篇四-常用Actions操作
- SparkSQL编程指南之Java篇三-数据源(下)
- 《Spark 官方文档》Spark SQL, DataFrames 以及 Datasets 编程指南
- 4.Spark SQL:数据源Parquet之使用编程方式加载数据