Spark SQL, DataFrames and Datasets Guide
2017-09-14 16:33
495 查看
目录
概述 SQL
DataFrames
Datasets
Getting Started
Starting Point:SQLContext
DataFrame
创建 DataFrames
DataFrame操作
执行SQL查询
Dataset
创建Datasets
与RDDs交互操作
使用Reflection推断Schema
程序自动指定Schema
概述
Spark SQL是一个用作结构数据处理的一个模块。不像Spark RDD中的API,Spark SQL提供给Spark更多关于数据和计算的信息。从内部来说,Spark SQL提取信息的接口经过额外的优化。有很多方法使用Spark SQL,包括SQL, DataFrames的API和Datasets的API。Spark SQL的计算引擎与语言或者API是独立开的。这种统一意味着开发人员可以很容易在不同的APIs之间来回切换,这就提供了最自然的方式来表达一个给定的转换。
SQL
Spark SQL可以使用基础的SQL或者HiveQL执行SQL查询。Spark SQL也可以被用来从已存在的Hive数据库中读取数据。读取的数据库被返回为DataFrame。
DataFrames
如果用过R或者python的pandas库的话,对DataFrames就特别熟悉了。直观的角度,数据是存在类似excel表中。不理解的话可以百度一下R的DataFrame结构。
Datasets
Dataset是Spark 1,。6中新的一种接口,目前还在试验阶段,Dataset尝试提供类似于RDDS优点的数据存取,同时又兼顾SQL的引擎优化。一个Dataset可以从JVM对象中被构造,使用transformations对数据进行操作。
Getting Started
Starting Point:SQLContext
(下面的代码我全部使用python代码,首先我对python比较熟悉,再者python简洁好理解,可能工程上使用java的居多,但是目前阶段需要快速,优质的掌握SparkSQL的相关概念和理论。) Spark中SparkSQL的入口点就是SQL
Context类,或者他的派生。当然在穿件一个基础的SQLContext之前,我们需要创建一个SparkContext。
from pyspark.sql import SQLContext sqlContext = SQLContext(sc) #这里的sc是创建的SparkContext
除了使用SQLContext,我们也可以使用HiveContext,HiveContext比基础的SQLContext提供更多的功能。这些功能暴多:使用HiveQL解析器查询,使用Hive UDFs和从Hive tables中读取数据的能力。比较麻烦的是HiveContext是独立的包,有很多麻烦的依赖,如果能够搞定这个的话,哪使用HiveContext就不成问题了。
DataFrame
创建 DataFrames
使用SQLContext,应用可以从已存的RDD中,Hive table中或者其他的数据源中创建DataFrames。 下面的例子是基于JSON文件创建一个数据框图。
from pyspark.sql import SQLContext sqlContext = SQLContext(sc) df = sqlContext.read.json("examples/src/main/resources/people.json") # Displays the content of the DataFrame to stdout df.show() '''这张表中的数据 {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19} '''
DataFrame操作
DataFrame提供了一系列的方法对结构化数据进行操作。下面列出使用DataFrames进行结构化数据操作的例子。 在python中,可以使用属性df.age和索引进行访问。虽然前者很好用,但是极度推荐使用接口的形式。java、Scala也是使用接口来进行访问。
from pyspark.sql import SQLContext sqlContext = SQLContext(sc) # 创建一个DataFrame df = sqlContext.read.json("examples/src/main/resources/people.json") # 显示 df.show() ## age name ## null Michael ## 30 Andy ## 19 Justin # 以树结构打印数据 df.printSchema() ## root ## |-- age: long (nullable = true) ## |-- name: string (nullable = true) # 只选择“name”这一列 df.select("name").show() ## name ## Michael ## Andy ## Justin # 选择每一个人,但是年龄加一显示出来 df.select(df['name'], df['age'] + 1).show() ## name (age + 1) ## Michael null ## Andy 31 ## Justin 20 # 选择年龄操作21岁的人 df.filter(df['age'] > 21).show() ## age name ## 30 Andy # 按年龄计数,这个就类似SQL中的select count(*) groupby age df.groupBy("age").count().show() ## age count ## null 1 ## 19 1 ## 30 1
更多的操作请参见API Documentation
执行SQL查询
这个Ssql功能能够确保这个程序自动运行SQL查询,以DataFrame结构返回查询结果。from pyspark.sql import SQLContext sqlContext = SQLContext(sc) df = sqlContext.sql("SELECT * FROM table")
Dataset
创建Datasets
Datasets与RDDs类似,但是不适用Java序列和Kryo,他们使用一种特殊的编码手段进行处理或者在网络中进行传输。虽然编码器和标准序列化都是讲对象转化为字节,编码器代码动态生成和进行很多的操作比如:过滤、排序和hash没有反序列化的字节回一个对象。
与RDDs交互操作
Spark SQL提供两种不同的方法将已经存在的RDDs转化成DataFrames。 * 第一种方法使用反射来推断一个RDD包含对象的具体类型的图式。这种reflection是当编写Spark应用程序的时候就已经知道这些图式,为了让代码更简洁和有效时采用的方法。
* 第二种方法是通过一种程序化的结构创建一个DataFrames,允许创建一个图式,并读取一个已存的RDD。这个方法更加的冗长,允许我们知道运行时才创建一个DataFrames。
使用Reflection推断Schema
Spark SQ可以转换行对象的RDD为一个DataFrame,推断数据类型。通过对键/值列表作为关键字参数传入Row类中的。这一列的关键字定义了这个表的列名,随后通过扫描第一行推断数据类型。因为我们只看第一行,所以一定要确保RDD中第一行没有缺损数据。在未来的版本中这一点会改变。#sc是一件存在的SparkContext from pyspark.sql import SQLContext, Row sqlContext = SQLContext(sc) # 读取text文件,将每一行转换为一个Row lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: Row(name=p[0], age=int(p[1]))) # 推测schema,并注册DataFrame为一个table schemaPeople = sqlContext.createDataFrame(people) schemaPeople.registerTempTable("people") # SQL 可以在一件注册为table的DataFrames中执行 teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") # SQL查询的结果是RDD,并支持左右正常的RDD操作 teenNames = teenagers.map(lambda p: "Name: " + p.name) for teenName in teenNames.collect(): print(teenName)
程序自动指定Schema
当关键字参数字典不能提前被定义(例如,记录的结构是一个字符串或者文本数据集会被解析,不同用户会有不同的项目),一个DataFrame可以通过以下三步创建: * 从原始的RDD中创建一个set或者是lists(在java中是raw)
* 通过使用StructType匹配创建schema
* 通过SQLContext类中创建DataFrame方法将schema转换为RDD
比如:
# Import SQLContext and data types from pyspark.sql import SQLContext from pyspark.sql.types import * # sc 是一个已存的SparkContext sqlContext = SQLContext(sc) # 读取 lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: (p[0], p[1].strip())) # The schema is encoded in a string. schemaString = "name age" fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] schema = StructType(fields) # Apply the schema to the RDD. schemaPeople = sqlContext.createDataFrame(people, schema) # Register the DataFrame as a table. schemaPeople.registerTempTable("people") # SQL can be run over DataFrames that have been registered as a table. results = sqlContext.sql("SELECT name FROM people") # The results of SQL queries are RDDs and support all the normal RDD operations. names = results.map(lambda p: "Name: " + p.name) for name in names.collect(): print(name)
基本的概念和内容就这么多了,官方网站上还有Data Sources、性能优化、分布式SQL引擎等等。这些方面的内容用的时候在看官方文档
相关文章推荐
- Spark SQL, DataFrames and Datasets Guide
- 【Spark】Spark SQL, DataFrames and Datasets Guide(翻译文,持续更新)
- 学习spark:五、Spark SQL, DataFrames and Datasets Guide
- pyspark-Spark SQL, DataFrames and Datasets Guide
- Spark SQL,DataFrames and DataSets Guide官方文档翻译
- Apache Spark 2.2.0 中文文档 - Spark SQL, DataFrames and Datasets Guide | ApacheCN
- Spark SQL, DataFrames and Datasets Guide
- Apache Spark 2.2.0 中文文档 - Spark SQL, DataFrames and Datasets Guide | ApacheCN
- Spark SQL, DataFrames and Datasets Guide
- Spark SQL,DataFrames and Datasets Guide
- Apache Spark APIs 的三剑客,RDDs, DataFrames, and Datasets
- Spark -9:Spark SQL, DataFrames and Datasets 编程指南
- Spark 官方文档(5)——Spark SQL,DataFrames和Datasets 指南
- Spark SQL, DataFrames and Datasets(Spark-2.1.1)指南
- Spark SQL and DataFrame Guide(1.4.1)——之DataFrames
- spark2.2官方教程笔记-Spark SQL, DataFrames and Datasets向导
- rdds-dataframes-and-datasets
- Spark SQL and DataFrame Guide(1.4.1)——之DataFrames
- Spark 官方文档(5)——Spark SQL,DataFrames和Datasets 指南
- spark第六篇:Spark SQL, DataFrame and Dataset Guide