您的位置:首页 > 数据库

SparkSQL编程指南之Java篇二-数据源(上)

2017-03-10 11:11 471 查看
Spark SQL通过DataFrame接口支持各种不同数据源的操作。一个DataFrame可以进行相关的转换操作,也可以用于创建临时视图。注册DataFrame为一个临时视图可以允许你对其数据执行SQL查询。本文首先会介绍使用Spark数据源加载和保存数据的一般方法,然后对内置数据源进行详细介绍。

1. 一般的Load/Save方法

Spark SQL最简单的也是默认的数据源格式是Parquet(除非使用了spark.sql.sources.default配置修改),它将会被用于所有的操作。以下是一般的Load/Save方法:

// generic load/save functions
Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet");
usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");


* 如果是使用windows系统的话,需要把对应版本编译的hadoop.dll复制到C:\Windows\System32,否则会遇到以下错误:

java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Ljava/lang/String;JJJI)Ljava/io/FileDescriptor;
at org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Native Method)
at org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileOutputStreamWithMode(NativeIO.java:559)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:219)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789)
at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:223)
......

1.1 手动指定选项

我们也可以通过完整的全名(例如:org.apache.spark.sql.parquet)来指定数据源的类型,对于那些内置的数据源类型,也可以使用简称,例如:json, parquet, jdbc, orc, libsvm, csv, text。从任何数据源类型加载的DataFrames可以转换为其它的类型格式,例如:

// manually load options
Dataset<Row> peopleDF = spark.read().format("json").load("examples/src/main/resources/people.json");
peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");

1.2 直接执行SQL

我们也可以直接执行SQL查询而不需要使用API加载文件为DataFrame然后再查询,例如:

// run SQL on files directly
Dataset<Row> sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
sqlDF.show();
// +------+--------------+----------------+
// |  name|favorite_color|favorite_numbers|
// +------+--------------+----------------+
// |Alyssa|          null|  [3, 9, 15, 20]|
// |   Ben|           red|              []|
// +------+--------------+----------------+


* 注意parquet.后面的路径前后有个`的字符(与键盘~一起的那个逗点)

1.3 保存模式

保存操作可以选择性地使用SaveMode指定如何处理存在的数据。需要注意的是这些保存模式不使用任何锁和不是原子操作的。此外,当使用Overwrite模式时,原数据会在写入新数据之前就会被删除。以下是SaveMode的选项,当保存一个DataFrame到指定数据源的时候,如果输出路径已经存在:

SaveMode.ErrorIfExists(默认)     抛出异常

SaveMode.Append                      数据会以追加的方式保存

SaveMode.Overwrite                   新数据会覆盖原数据(先删除原数据,再保存新数据)

SaveMode.Ignore                        不保存新数据,相当于SQL语句的CREATE TABLE IF NOT EXISTS

例如:

usersDF.select("name", "favorite_color").write().mode(SaveMode.Overwrite).save("namesAndFavColors.parquet");

1.4 持久化到表

我们也可以使用saveAsTable方法把DataFrames保存为持久化的表到Hive metastore。值得注意的是我们不需要部署Hive的环境,Spark会创建一个默认本地的Hive metastore(使用Derby)。与createOrReplaceTempView方法不同,saveAsTable会实质化DataFrame的内容,然后在Hive metastore创建它的指针。只要连接到相同metastore的连接不中断,即使Spark程序重新启动,持久化的表也会一直存在。

默认地saveAsTable方法将创建一个“管理表”(managed table),表示数据的位置是由metastore来控制管理的。当持久化的表被删除时,managed table将会自动删除相应的数据。

2. Parquet文件

Parquet是一种被其它多种数据处理系统支持的纵列格式。Spark SQL提供了读写Parquet文件的功能,保存的Parquet文件会自动保留原始数据的schema。当保存Parquet文件时,基于兼容性考虑,所有的列会被自动转换为允许空值。

2.1 以编程方式加载数据

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> peopleDF = spark.read().json("examples/src/main/resources/people.json");

// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write().parquet("people.parquet");

// Read in the Parquet file created above.
// Parquet files are self-describing so the schema is preserved
// The result of loading a parquet file is also a DataFrame
Dataset<Row> parquetFileDF = spark.read().parquet("people.parquet");

// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile");
Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
Dataset<String> namesDS = namesDF.map(row -> "Name: " + row.getString(0), Encoders.STRING());
namesDS.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

2.2 分区推断

对表进行分区是对数据进行优化的方式之一。在一个分区的表内,数据通常是通过分区列将数据存储在不同的目录里面。Parquet数据源现在能够自动地发现并推断分区信息。例如,可以使用下面的目录结构存储人口数据到分区表里面,分区列为gender和country:

path
└── to
└── table
├── gender=male
│   ├── ...
│   │
│   ├── country=US
│   │   └── data.parquet
│   ├── country=CN
│   │   └── data.parquet
│   └── ...
└── gender=female
├── ...
│
├── country=US
│   └── data.parquet
├── country=CN
│   └── data.parquet
└── ...


通过传递path/to/table给SparkSession.read.parquet或SparkSession.read.load,Spark SQL将自动抽取分区信息。返回的DataFrame的Schema如下:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)


需要注意的是,分区列的数据类型是自动解析的。目前,数值类型和字符串类型是支持的。如果不想分区列的数据类型被自动解析,可以通过配置spark.sql.sources.partitionColumnTypeInference.enabled=false,默认是true。当该配置被设为false时,分区列数据类型会使用string类型。

从Spark 1.6.0版本开始,默认地,分区信息解析只会作用于指定路径下面的分区。例如上面的例子,如果用户传递path/to/table/gender=male给SparkSession.read.parquet或SparkSession.read.load,gender将不会是分区列。如果用户需要指定基础的路径作为分区信息解析的开始路径,那么可以在数据源选项设置basePath。例如,path/to/table/gender=male是数据的路径,用户设置了basePath=path/to/table/,那么gender将会是分区列。

2.3 Schema合并

像ProtocolBuffer、Avro和Thrift一样,Parquet也支持schema evolution(schema演变)。用户可以先定义一个简单的schema,然后根据需要逐渐地向schema中增加列。通过这种方式,用户可以有多个不同的schemas但它们是互相兼容的Parquet文件。Parquet数据源现在能够自动检测这种情况并合并这些文件的schemas。

因为Schema合并是一个相对高消耗的操作,在大多数的情况下并不需要,所以从Spark SQL 1.5.0版本开始,默认关闭了该功能。可以通过下面两种方式开启:
当读取Parquet文件时,设置数据源选项mergeSchema=true(例如下面的例子)
设置全局SQL选项spark.sql.parquet.mergeSchema=true
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

public static class Square implements Serializable {
private int value;
private int square;

// Getters and setters...

}

public static class Cube implements Serializable {
private int value;
private int cube;

// Getters and setters...

}

List<Square> squares = new ArrayList<>();
for (int value = 1; value <= 5; value++) {
Square square = new Square();
square.setValue(value);
square.setSquare(value * value);
squares.add(square);
}

// Create a simple DataFrame, store into a partition directory
Dataset<Row> squaresDF = spark.createDataFrame(squares, Square.class);
squaresDF.write().parquet("data/test_table/key=1");

List<Cube> cubes = new ArrayList<>();
for (int value = 6; value <= 10; value++) {
Cube cube = new Cube();
cube.setValue(value);
cube.setCube(value * value * value);
cubes.add(cube);
}

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
Dataset<Row> cubesDF = spark.createDataFrame(cubes, Cube.class);
cubesDF.write().parquet("data/test_table/key=2");

// Read the partitioned table
Dataset<Row> mergedDF = spark.read().option("mergeSchema", true).parquet("data/test_table");
mergedDF.printSchema();

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
//  |-- value: int (nullable = true)
//  |-- square: int (nullable = true)
//  |-- cube: int (nullable = true)
//  |-- key: int (nullable = true)

2.4 Hive metastore Parquet表转换

当读写Hive metastore Parquet表时,基于性能考虑,Spark SQL会先尝试使用自带的Parquet SerDe(序列化与反序列化,Serialize/Deserilize的简称),而不是Hive的SerDe。这个优化选项可以通过spark.sql.hive.convertMetastoreParquet配置,默认为开启。

2.4.1 Hive/Parquet Schema一致化

从表schema处理的角度来看,Hive和Parquet有2个主要的不同点:
Hive不区分大小写,Parquet区分大小写
Hive认为所有的列都可以为空,而Parquet的空值性是有重要意义的(Hive considers all columns nullable, while nullability in Parquet is significant)

由于以上不同点,当把Hive metastore Parquet表转换为Spark SQL Parquet表时,必须将Hive metastore schema和Parquet schema进行一致化。其规则如下:
两个schema中,忽略空值性,具有相同名字的字段必须具有相同的数据类型。一致化后的字段类型应该与Parquet的字段类型一致,以便遵守空值性原则
一致化后的schema只包含在Hive metastore schema定义的字段:

             i. 丢弃只在Parquet schema定义的字段

             i. 把只在Hive metastore schema定义的字段设为允许为空

2.4.2 元数据刷新

为了提高性能,Spark SQL会缓存Parquet元数据(metadata)。当Hive metastore Parquet表转换的选项开启时,转换后的表元数据也会被缓存。如果这些表被Hive或者其它外部工具更新,则需要手动刷新缓存以确保元数据的一致性。

// spark is an existing SparkSession
spark.catalog().refreshTable("my_table");

2.5 配置

Parquet的配置可以使用SparkSession的setConf方法或者使用SQL执行SET key=value命令。详细的配置参数如下:



3. JSON Datasets

Spark SQL可以自动推断JSON数据集的schema并加载为Dataset<Row>。此转换可以使用SparkSession.read().json()方法读取一个String类型的RDD或者一个JSON文件。需要注意的是,这里的JSON文件不是典型的JSON格式。这里的JSON文件每一行必须包含一个独立有效的JSON对象,也称为换行符分割JSON文件。因此,一个规则的多行JSON文件会导致读取出错。读取JSON数据集例子如下:

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
Dataset<Row> people = spark.read().json("examples/src/main/resources/people.json");

// The inferred schema can be visualized using the printSchema() method
people.printSchema();
// root
//  |-- age: long (nullable = true)
//  |-- name: string (nullable = true)

// Creates a temporary view using the DataFrame
people.createOrReplaceTempView("people");

// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
namesDF.show();
// +------+
// |  name|
// +------+
// |Justin|
// +------+

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string.
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD =
new JavaSparkContext(spark.sparkContext()).parallelize(jsonData);
Dataset<Row> anotherPeople = spark.read().json(anotherPeopleRDD);
anotherPeople.show();
// +---------------+----+
// |        address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+


* 参考Spark SQL官方链接:http://spark.apache.org/docs/latest/sql-programming-guide.html#data-sources


TO BE CONTINUED...O(∩_∩)O
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Spark SparkSQL