Spark SQL and DataFrame Guide(1.4.1)——之DataFrames
2015-07-29 16:02
513 查看
Spark SQL是处理结构化数据的Spark模块。它提供了DataFrames这种编程抽象,同时也可以作为分布式SQL查询引擎使用。
1. 开始入口:
入口需要从SQLContext类或者它的子类开始,当然需要使用SparkContext创建SQLContext;这里我们使用pyspark(已经自带了SQLContext即sc):
还可以使用HiveContext,它可以提供比SQLContext更多的功能,例如可以使用更完整的HiveQL解析器写查询,使用Hive UDFs,从Hive表中读取数据等。使用HiveContext并不需要安装hive,Spark默认将HiveContext单独打包避免对hive过多的依赖
2.创建DataFrames
使用JSON文件创建:
注意:
这里你可能需要将文件存入HDFS(这里的文件在Spark安装目录中,1.4版本)
3.DataFrame操作
4.使用编程运行SQL查询
SQLContext可以使用编程运行SQL查询并返回DataFrame。
5.和RDD交互
将RDD转换成DataFrames有两种方法:
利用反射来推断包含特定类型对象的RDD的schema。这种方法会简化代码并且在你已经知道schema的时候非常适用。
使用编程接口,构造一个schema并将其应用在已知的RDD上。
一、利用反射推断Schema
Spark SQL能够将含Row对象的RDD转换成DataFrame,并推断数据类型。通过将一个键值对(key/value)列表作为kwargs传给Row类来构造Rows。key定义了表的列名,类型通过看第一列数据来推断。(所以这里RDD的第一列数据不能有缺失)未来版本中将会通过看更多数据来推断数据类型,像现在对JSON文件的处理一样。
二、编程指定Schema
通过编程指定Schema需要3步:
从原来的RDD创建一个元祖或列表的RDD。
用StructType 创建一个和步骤一中创建的RDD中元祖或列表的结构相匹配的Schema。
通过SQLContext提供的createDataFrame方法将schema 应用到RDD上。
DataFrames
DataFrame是一个带有列名的分布式数据集合。等同于一张关系型数据库中的表或者R/Python中的data frame,不过在底层做了很多优化;我们可以使用结构化数据文件、Hive tables,外部数据库或者RDDS来构造DataFrames。1. 开始入口:
入口需要从SQLContext类或者它的子类开始,当然需要使用SparkContext创建SQLContext;这里我们使用pyspark(已经自带了SQLContext即sc):
from pyspark.sql import SQLContext sqlContext = SQLContext(sc)
还可以使用HiveContext,它可以提供比SQLContext更多的功能,例如可以使用更完整的HiveQL解析器写查询,使用Hive UDFs,从Hive表中读取数据等。使用HiveContext并不需要安装hive,Spark默认将HiveContext单独打包避免对hive过多的依赖
2.创建DataFrames
使用JSON文件创建:
from pyspark.sql import SQLContext sqlContext = SQLContext(sc)
df = sqlContext.read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
df.show()
注意:
这里你可能需要将文件存入HDFS(这里的文件在Spark安装目录中,1.4版本)
hadoop fs -mkdir examples/src/main/resources/ hadoop fs -put /appcom/spark/examples/src/main/resources/* /user/hdpuser/examples/src/main/resources/
3.DataFrame操作
from pyspark.sql import SQLContext sqlContext = SQLContext(sc)
# Create the DataFrame
df = sqlContext.read.json("examples/src/main/resources/people.json")
# Show the content of the DataFrame
df.show()
## age name
## null Michael
## 30 Andy
## 19 Justin
# Print the schema in a tree format
df.printSchema()
## root
## |-- age: long (nullable = true)
## |-- name: string (nullable = true)
# Select only the "name" column
df.select("name").show()
## name
## Michael
## Andy
## Justin
# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
## name (age + 1)
## Michael null
## Andy 31
## Justin 20
# Select people older than 21
df.filter(df['age'] > 21).show()
## age name
## 30 Andy
# Count people by age
df.groupBy("age").count().show()
## age count
## null 1
## 19 1
## 30 1
4.使用编程运行SQL查询
SQLContext可以使用编程运行SQL查询并返回DataFrame。
from pyspark.sql import SQLContext sqlContext = SQLContext(sc)
df = sqlContext.sql("SELECT * FROM table")
5.和RDD交互
将RDD转换成DataFrames有两种方法:
利用反射来推断包含特定类型对象的RDD的schema。这种方法会简化代码并且在你已经知道schema的时候非常适用。
使用编程接口,构造一个schema并将其应用在已知的RDD上。
一、利用反射推断Schema
Spark SQL能够将含Row对象的RDD转换成DataFrame,并推断数据类型。通过将一个键值对(key/value)列表作为kwargs传给Row类来构造Rows。key定义了表的列名,类型通过看第一列数据来推断。(所以这里RDD的第一列数据不能有缺失)未来版本中将会通过看更多数据来推断数据类型,像现在对JSON文件的处理一样。
# sc is an existing SparkContext. from pyspark.sql import SQLContext, Row sqlContext = SQLContext(sc) # Load a text file and convert each line to a Row. lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: Row(name=p[0], age=int(p[1]))) # Infer the schema, and register the DataFrame as a table. schemaPeople = sqlContext.createDataFrame(people) schemaPeople.registerTempTable("people") # SQL can be run over DataFrames that have been registered as a table. teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") # The results of SQL queries are RDDs and support all the normal RDD operations. teenNames = teenagers.map(lambda p: "Name: " + p.name) for teenName in teenNames.collect(): print teenName
二、编程指定Schema
通过编程指定Schema需要3步:
从原来的RDD创建一个元祖或列表的RDD。
用StructType 创建一个和步骤一中创建的RDD中元祖或列表的结构相匹配的Schema。
通过SQLContext提供的createDataFrame方法将schema 应用到RDD上。
# Import SQLContext and data types from pyspark.sql import SQLContext from pyspark.sql.types import * # sc is an existing SparkContext. sqlContext = SQLContext(sc) # Load a text file and convert each line to a tuple. lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: (p[0], p[1].strip())) # The schema is encoded in a string. schemaString = "name age" fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] schema = StructType(fields) # Apply the schema to the RDD. schemaPeople = sqlContext.createDataFrame(people, schema) # Register the DataFrame as a table. schemaPeople.registerTempTable("people") # SQL can be run over DataFrames that have been registered as a table. results = sqlContext.sql("SELECT name FROM people") # The results of SQL queries are RDDs and support all the normal RDD operations. names = results.map(lambda p: "Name: " + p.name) for name in names.collect(): print name
相关文章推荐
- UIActionSheet
- 云盘+Git GUI云盘文件版本控制
- ios开发之实现长按UITableViewCell弹出UIMenuController
- Android Build属性介绍
- apue 第十一章 线程
- C#中Queue<T>类的使用以及部分方法的源码分析
- C#中Queue<T>类的使用以及部分方法的源码分析
- muduo::BlockingQueue、BoundedBlockingQueue分析
- SPOJ 1771 NQUEEN Yet Another N-Queen Problem
- 通用Key-Value存储系统的存储管理策略解析
- Architecture,Valid architectures,Build Active Architecture Only
- 配置文件读取
- UISearchBar 点击取消回到原来位置时会跳动的解决方法
- Maximum Subarray
- Qt 5.3 下OpenCV 2.4.11 开发(3)简单的GUI项目
- Daikon Forge GUI Library(dfgui)之Scroll Panel
- [UIScreen mainScreen].bounds.size获取设备长宽时的问题
- UILable 设置对齐方式扩展
- 用户选择一个value过滤数据
- 初学miniui之miniui的使用