您的位置:首页 > 产品设计 > UI/UE

spark第六篇:Spark SQL, DataFrame and Dataset Guide

2017-12-17 17:58 651 查看
预览

Spark SQL是用来处理结构化数据的Spark模块。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了更多的与结构化数据和执行的运算有关的信息。在内部,Spark SQL使用这些额外的信息来执行额外的优化。有几种与Spark SQL进行交互的方式,包括SQL和Dataset API。在计算结果时,会使用相同的执行引擎,而不管使用哪种API/语言表示计算,这种统一意味着开发人员可以轻松地在不同的API之间来回切换。

本指南中的所有例子都可以在spark-shell,pyspark shell或者spark R shell中执行。

SQL

Spark SQL的一个用途是执行SQL查询。Spark SQL可以从现有的Hive中读取数据,本文下面有讲如何配置此功能。从另一种编程语言运行SQL时,结果会以Dataset/DataFrame返回。你还可以使用命令行或通过JDBC/ODBC与SQL接口进行交互。

Dataset和DataFrame

Dataset是分布式数据集合。Dataset是Spark1.6.0中添加的一个新接口,既提供了RDD的优点(强类型,支持lambda表达式),又提供了Spark SQL优化执行引擎的优点。Dataset可以通过JVM中的对象来构建,然后使用函数转换(map,flatMap,filter等)进行操作。Dataset API仅可用于Scala和Java,Python不支持Dataset API。

A DataFrame is a Dataset organized into named columns.它在概念上相当于关系型数据库中的表或R/Python中的data frame,但在底层有更丰富的优化。DataFrame可以从各种来源构建,如结构化数据文件,Hive中的表,外部数据库或者现有的RDDs。DataFrame API适用于Scala,Java,Python和R。在Scala和Java中,DataFrame is represented by a Dataset of Rows。在Scala API中,DataFrame is simply a type alias of Datasets[Row]。在Java API中,DataFrame被表示成Dataset<Row>。

Getting Started

Starting Point:SparkSession

Java API

编程入口是SparkSession类。用SparkSession.builder()方法来创建一个最基本的SparkSession实例:

public static void main(String[] args) {
SparkSession spark = SparkSession
.builder().master("local")
.appName("Java Spark SQL basic example")
.enableHiveSupport()
.getOrCreate();
}


SparkSession类是从Spark2.0之后才有的,内置了对Hive的支持,包括用HiveSQL查询,访问Hive UDFs,以及从Hive表中读取数据。只需调用org.apache.spark.sql.SparkSession.Builder的enableHiveSupport()方法启用Hive支持即可。

创建DataFrame

利用SparkSession实例,可以通过现有的RDD,或者从Hive表,或者从Spark其他的数据源(下面会讲)来创建DataFrame。

下面例子中,通过JSON文件创建了一个DataFrame:

public static void main(String[] args) {
SparkSession spark = SparkSession
.builder().master("local")
.appName("Java Spark SQL basic example")
.enableHiveSupport()
.getOrCreate();

Dataset<Row> df = spark.read().json("d:/people.json");

// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age|   name|
// +----+-------+// |  30|   Andy|
// +----+-------+
}


如上,用Dataset<Row>来表示DataFrame,其中Row全类名是org.apache.spark.sql.Row。

通过调用SparkSession实例的read()方法获取DataFrameReader实例,通过该实例的各种方法来读取数据源生成DataFrame实例,如json(path)读取json文件,csv(path)读取csv文件,text(path)读取text文件,jdbc()方法读取关系型数据库的表,orc()方法读取ORC文件(启用hive支持后才能调用),parquet()方法读取Parquet文件。

上例中调用了Dataset<Row>对象的show()方法,该方法会在控制台以表格的形式(带表头的)打印出Dataset对象前20条记录,也就是表格前20行记录。每个单元格内容右对齐,且最多展示20个字符(索引在20及20之后的字符不展示)。

实测发现,如果json文件中的内容是格式化之后的json字符串,就不能正确地读出数据,必须使json字符串只占一行(json array也是占一行,不能跨行)。

无类型 Dataset操作(又称作DataFrame操作)

DataFrame为Scala,Java,Python和R中的结构化数据操作提供了特定领域的语言。

如上所述,在Spark2.0之后,在Scala和Java语言中,DataFrame以Dataset<Row>表示。这些操作也被称为无类型转换,与强类型的Scala/Java Dataset的类型转换形成对比(These operations are also referred as “untyped transformations” in contrast to “typed transformations” come with strongly typed Scala/Java Datasets)。

下面是使用Dataset处理结构化数据的例子:

public static void main(String[] args) {
SparkSession spark = SparkSession
.builder().master("local")
.appName("Java Spark SQL basic example")
.enableHiveSupport()
.getOrCreate();

Dataset<Row> df = spark.read().json("d:/people.json");

// Displays the content of the DataFrame to stdout
df.show();

df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show();
     // Select everybody, but increment the age by 1
df.select(col("name"), col("age").plus(1)).show();// Select people older than 21
df.filter(col("age").gt(21)).show();

     // Count people by age
df.groupBy("age").count().show();
}


完整的代码示例参见spark-2.2.0-bin-hadoop2.7.tgz解压缩后的examples目录中的JavaSparkSQLExample.java。

上例中使用了Dataset<Row>的以下实例方法:

printSchema():在控制台打印出表头,亦即列名。

select(Column col1, Column col2, ...):传入一个或多个Column对象,返回一个新的Dataset<Row>对象,该对象只包含查询的列。Column是spark-sql jar包中的类,全类名是org.apache.spark.sql.Column,可以通过调用functions.col(String colName)来生成Column实例,其中functions可以省略。

filter(Column condition):返回一个新的Dataset<Row>对象,该对象只包含符合筛选条件的记录。传入的Column对象的作用相当于where条件,例如col("age").lt(18)。

groupBy(Column col1, Column col2, ...):传入要分组的列,返回一个RelationalGroupedDataset对象。

除了简单的列引用和表达式之外,Dataset还有丰富的函数库,包括字符串操作、时间函数及常见的数学运算等。具体可以查看org.apache.spark.sql.functions中的方法。

运行SQL查询

SparkSession实例的sql()方法可以运行SQL查询,结果以Dataset<Row>形式返回。

sql语句中from关键字后面跟的是临时视图名,临时视图可以通过调用Dataset<Row>实例的 createTempView(String viewName)、createOrReplaceTempView(String viewName)方法来创建(这两个方法都是从2.0.0版本开始才有),通过这两个方法创建的临时视图是SparkSession范围的,如果创建它的SparkSession终止,那么该视图将会消失。同样的,如果在另一个SparkSession中执行该视图的SQL查询,会抛 org.apache.spark.sql.AnalysisException: Table or view not found 异常。如果想在另一个SparkSession中也能正确的执行该视图的SQL查询,则需要创建一个全局临时视图,可以通过调用Dataset<Row>实例的 createGlobalTempView(String viewName)、createOrReplaceGlobalTempView(String viewName)方法创建,前一个方法从2.1.0版本开始才有,后一个方法从2.2.0版本开始才有。全局临时视图和系统保存的数据库global_temp绑定,我们必须使用限定名来引用它,例如,select * from global_temp.view1。

public static void main(String[] args) {
SparkSession spark = SparkSession
.builder().master("local")
.appName("Java Spark SQL basic example")
.enableHiveSupport()
.getOrCreate();

Dataset<Row> df = spark.read().json("d:/people.json");

// Displays the content of the DataFrame to stdout
df.show();

// Creates a local temporary view using the given name.
// The lifetime of this temporary view is tied to the [[SparkSession]] that was used to create this Dataset.
df.createOrReplaceTempView("people");

spark.sql("select * from people where age > 20").show();

// Creates a global temporary view using the given name. The lifetime of this temporary view is tied to this Spark application.
df.createOrReplaceGlobalTempView("people");

// Global temporary view is cross-session
spark.newSession().sql("select * from global_temp.people where age > 20").show();
}


创建Dataset

Dataset类似于RDD,与之不同的是,Dataset使用专门的编码器对对象进行序列化以便通过网络进行处理或者传输,而不是使用Java序列化或者Kryo序列化。虽然编码器和标准序列化都负责将对象转换为字节,但编码器是动态生成代码,并且使用允许Spark执行很多操作(如过滤、排序和哈希)的格式,而无需将字节反序列化成对象。

public static void main(String[] args) {
SparkSession spark = SparkSession
.builder().master("local")
.appName("Java Spark SQL basic example")
.enableHiveSupport()
.getOrCreate();

// Create an instance of a Bean class
Person person = new Person();
person.setName("Andy");
person.setAge(32);

// Encoders are created for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> javaBeanDS = spark.createDataset(Arrays.asList(person), personEncoder);
javaBeanDS.show();

// DataFrames can be converted to a Dataset by providing a class. Mapping based on name
Dataset<Person> peopleDS = spark.read().json("d:/people.json").as(personEncoder);
peopleDS.show();

// Encoders for most common types are provided in class Encoders
Encoder<Integer> integerEncoder = Encoders.INT();
Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
Dataset<Integer> transformedDS = primitiveDS.map(
(MapFunction<Integer, Integer>) value -> value + 1,
integerEncoder);
transformedDS.show();
}


上例中需要额外创建一个Person javabean类,有age和name两个属性,代码省略。

与RDD互操作

Spark SQL支持两种方法来将现有的RDD转换为Dataset。

第一种方法使用反射来推测包含特定类型对象的RDD的模式。这种基于反射的方法编程很简洁,在你已经知道RDD的模式的时候运行良好。

第二种方法是通过一个允许你构建模式并将其应用于现有RDD的接口。尽管这个方法比较冗长,但是它允许你在不知道列及其类型的情况下创建Dataset。

下面分别讲述这两种方法

用反射来推测

Spark SQL支持自动将RDD<JavaBean>转换成DataFrame。使用反射获得的BeanInfo定义表的模式。目前,Spark SQL可以支持嵌套的JavaBean和包含List或者Array字段的JavaBean,但是不支持包含Map字段的JavaBean。

SparkSession提供了createDataset()方法和createDataFrame()方法的多种重载,可以实现RDD到Dataset的转换需求。

手动指定模式

当无法提前定义JavaBean类时,可以以编程方式创建Dataset<Row>,分为三个步骤:

1.通过原始RDD创建一个RDD<Row>。

2.创建由StructType实例表示的schema。

3.通过调用SparkSession实例的createdDataFrame()重载方法把步骤2创建的schema应用到步骤1创建的RDD<Row>上。

public static void main(String[] args) {
SparkSession spark = SparkSession
.builder().master("local")
.appName("Java Spark SQL basic example")
.enableHiveSupport()
.getOrCreate();

// Create an RDD
JavaRDD<String> peopleRDD = spark.sparkContext()
.textFile("d:/people.txt", 1)
.toJavaRDD();
// Convert records of the RDD (people) to Rows
JavaRDD<Row> rowRDD = peopleRDD.map(record -> {
String[] attributes = record.split(",");
return RowFactory.create(attributes[0], attributes[1].trim());
});

// The schema is encoded in a string
String schemaString = "name age";

// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {
StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
fields.add(field);
}
//StructType实例代表schema
StructType schema = DataTypes.createStructType(fields);

// Apply the schema to the RDD
Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);

peopleDataFrame.createOrReplaceTempView("people");

Dataset<Row> results = spark.sql("SELECT name FROM people");

Dataset<String> namesDS = results.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
Encoders.STRING());
namesDS.show();
}


上例中,在由RDD<String>转换为RDD<Row>的过程中,使用了RowFactory.create(Object ... values)来创建Row实例。schema由StructType实例表示,通过DataTypes.createStructType(List<StructField> fields)方法创建,StructField实例通过DataTypes.createStructField(String name, DataType dataType, boolean nullable)方法创建,其中的DataType实例可以通过DataTypes打点创建。最后再调用SparkSession实例的createDataFrame(JavaRDD<Row> rowRDD, StructType schema)方法来生成Dataset<Row>实例。

聚合

Spark SQL为操作DataFrame内置了很多函数,诸如count(),countDistinct(),avg(),max(),min()等常用聚合函数,具体可以查看org.apache.spark.sql.functions的方法。同时用户还可以自定义聚合函数,官方文档上有讲,本处翻译省略,具体参考https://spark.apache.org/docs/latest/sql-programming-guide.html#aggregations

数据源

Spark SQL支持通过DataFrame接口在各种数据源上进行操作。DataFrame可以使用关系转换进行操作,也可以用来创建临时视图。将DataFrame注册为临时视图允许你对其数据运行SQL查询。本节介绍使用Spark Data Sources加载和保存数据的常用方法,然后介绍可用于内置数据源的特定选项。

加载和保存的常用方法

在最简单的形式中,默认的数据源parquet(可以通过更改spark.sql.sources.default的值来自定义)将用于所有操作。

Java代码

你也可以手动指定将要使用的数据源以及你想要传递给数据源的其他选项。数据源由其完全限定名指定,但对于内置的数据源,你可以使用其短名称(json,parquet,jdbc,orc,libsvm,csv,text)。从任何数据源加载的DataFrame都可以使用此语法转换为其他类型。

Java代码

public static void main(String[] args) {
SparkSession spark = SparkSession
.builder().master("local")
.appName("Java Spark SQL basic example")
.enableHiveSupport()
.getOrCreate();

Dataset<Row> peopleDF =
spark.read().format("json").load("d:/people.json");
peopleDF.select("name").write().format("text").save("d:/people.text");
}


上例中,DataFrameReader实例连续调用format("json")和load(path),相当于调用json(path)方法,查看json()方法的源码可以看到。其实load()是最基本的加载数据的方法,默认是parquet格式,但是为其他格式提供了优化的API,直接一步到位,所以才产生json()方法、text()方法等。

直接在文件上运行SQL

除了用DataFrameReader的各种方法将文件内容读取到DataFrame中并查询外,你还可以直接对文件进行SQL查询。

Java代码

public static void main(String[] args) {
SparkSession spark = SparkSession
.builder().master("local")
.appName("Java Spark SQL basic example")
.enableHiveSupport()
.getOrCreate();

Dataset<Row> sqlDF =
spark.sql("SELECT * FROM json.`d:/people.json`");
sqlDF.show();
}


保存模式
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: