您的位置:首页 > 产品设计 > UI/UE

Spark SQL, DataFrames and Datasets Guide

2017-09-14 16:33 495 查看


目录

概述 
SQL
DataFrames
Datasets

Getting Started 
Starting Point:SQLContext
DataFrame 
创建 DataFrames
DataFrame操作
执行SQL查询

Dataset 
创建Datasets
与RDDs交互操作
使用Reflection推断Schema
程序自动指定Schema


概述

Spark SQL是一个用作结构数据处理的一个模块。不像Spark RDD中的API,Spark SQL提供给Spark更多关于数据和计算的信息。从内部来说,Spark SQL提取信息的接口经过额外的优化。有很多方法使用Spark SQL,包括SQL, DataFrames的API和Datasets的API。Spark SQL的计算引擎与语言或者API是独立开的。这种统一意味着开发人员可以很容易在不同的APIs之间来回切换,这就提供了最自然的方式来表达一个给定的转换。


SQL

Spark SQL可以使用基础的SQL或者HiveQL执行SQL查询。Spark SQL也可以被用来从已存在的Hive数据库中读取数据。读取的数据库被返回为DataFrame。


DataFrames

如果用过R或者python的pandas库的话,对DataFrames就特别熟悉了。直观的角度,数据是存在类似excel表中。不理解的话可以百度一下R的DataFrame结构。


Datasets

Dataset是Spark 1,。6中新的一种接口,目前还在试验阶段,Dataset尝试提供类似于RDDS优点的数据存取,同时又兼顾SQL的引擎优化。一个Dataset可以从JVM对象中被构造,使用transformations对数据进行操作。


Getting Started


Starting Point:SQLContext

(下面的代码我全部使用python代码,首先我对python比较熟悉,再者python简洁好理解,可能工程上使用java的居多,但是目前阶段需要快速,优质的掌握SparkSQL的相关概念和理论。) 

Spark中SparkSQL的入口点就是SQL 

Context类,或者他的派生。当然在穿件一个基础的SQLContext之前,我们需要创建一个SparkContext。
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
#这里的sc是创建的SparkContext


除了使用SQLContext,我们也可以使用HiveContext,HiveContext比基础的SQLContext提供更多的功能。这些功能暴多:使用HiveQL解析器查询,使用Hive UDFs和从Hive tables中读取数据的能力。比较麻烦的是HiveContext是独立的包,有很多麻烦的依赖,如果能够搞定这个的话,哪使用HiveContext就不成问题了。


DataFrame


创建 DataFrames

使用SQLContext,应用可以从已存的RDD中,Hive table中或者其他的数据源中创建DataFrames。 

下面的例子是基于JSON文件创建一个数据框图。
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.json("examples/src/main/resources/people.json")

# Displays the content of the DataFrame to stdout
df.show()

'''这张表中的数据
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
'''


DataFrame操作

DataFrame提供了一系列的方法对结构化数据进行操作。下面列出使用DataFrames进行结构化数据操作的例子。 

在python中,可以使用属性df.age和索引进行访问。虽然前者很好用,但是极度推荐使用接口的形式。java、Scala也是使用接口来进行访问。
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

# 创建一个DataFrame
df = sqlContext.read.json("examples/src/main/resources/people.json")

# 显示
df.show()
## age  name
## null Michael
## 30   Andy
## 19   Justin

# 以树结构打印数据
df.printSchema()
## root
## |-- age: long (nullable = true)
## |-- name: string (nullable = true)

# 只选择“name”这一列
df.select("name").show()
## name
## Michael
## Andy
## Justin

# 选择每一个人,但是年龄加一显示出来
df.select(df['name'], df['age'] + 1).show()
## name    (age + 1)
## Michael null
## Andy    31
## Justin  20

# 选择年龄操作21岁的人
df.filter(df['age'] > 21).show()
## age name
## 30  Andy

# 按年龄计数,这个就类似SQL中的select count(*) groupby   age
df.groupBy("age").count().show()
## age  count
## null 1
## 19   1
## 30   1


更多的操作请参见API Documentation


执行SQL查询

这个Ssql功能能够确保这个程序自动运行SQL查询,以DataFrame结构返回查询结果。
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.sql("SELECT * FROM table")


Dataset


创建Datasets

Datasets与RDDs类似,但是不适用Java序列和Kryo,他们使用一种特殊的编码手段进行处理或者在网络中进行传输。虽然编码器和标准序列化都是讲对象转化为字节,编码器代码动态生成和进行很多的操作比如:过滤、排序和hash没有反序列化的字节回一个对象。


与RDDs交互操作

Spark SQL提供两种不同的方法将已经存在的RDDs转化成DataFrames。 

* 第一种方法使用反射来推断一个RDD包含对象的具体类型的图式。这种reflection是当编写Spark应用程序的时候就已经知道这些图式,为了让代码更简洁和有效时采用的方法。 

* 第二种方法是通过一种程序化的结构创建一个DataFrames,允许创建一个图式,并读取一个已存的RDD。这个方法更加的冗长,允许我们知道运行时才创建一个DataFrames。


使用Reflection推断Schema

Spark SQ可以转换行对象的RDD为一个DataFrame,推断数据类型。通过对键/值列表作为关键字参数传入Row类中的。这一列的关键字定义了这个表的列名,随后通过扫描第一行推断数据类型。因为我们只看第一行,所以一定要确保RDD中第一行没有缺损数据。在未来的版本中这一点会改变。
#sc是一件存在的SparkContext
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)

# 读取text文件,将每一行转换为一个Row
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# 推测schema,并注册DataFrame为一个table
schemaPeople = sqlContext.createDataFrame(people)
schemaPeople.registerTempTable("people")

# SQL 可以在一件注册为table的DataFrames中执行
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# SQL查询的结果是RDD,并支持左右正常的RDD操作
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
print(teenName)


程序自动指定Schema

当关键字参数字典不能提前被定义(例如,记录的结构是一个字符串或者文本数据集会被解析,不同用户会有不同的项目),一个DataFrame可以通过以下三步创建: 

* 从原始的RDD中创建一个set或者是lists(在java中是raw) 

* 通过使用StructType匹配创建schema 

* 通过SQLContext类中创建DataFrame方法将schema转换为RDD 

比如:
# Import SQLContext and data types
from pyspark.sql import SQLContext
from pyspark.sql.types import *

# sc 是一个已存的SparkContext
sqlContext = SQLContext(sc)

# 读取
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: (p[0], p[1].strip()))

# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = sqlContext.createDataFrame(people, schema)

# Register the DataFrame as a table.
schemaPeople.registerTempTable("people")

# SQL can be run over DataFrames that have been registered as a table.
results = sqlContext.sql("SELECT name FROM people")

# The results of SQL queries are RDDs and support all the normal RDD operations.
names = results.map(lambda p: "Name: " + p.name)
for name in names.collect():
print(name)


基本的概念和内容就这么多了,官方网站上还有Data Sources、性能优化、分布式SQL引擎等等。这些方面的内容用的时候在看官方文档
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: