Spark SQL and DataFrames
2016-09-06 11:26
381 查看
1.SparkSession
SparkSQL的操作都建立在SparkSession上,创建一个SparkSession叫spark,后面代码都基于此,不再提示from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .appName("PythonSQL")\ .config("spark.some.config.option", "some-value")\ .getOrCreate()
2.创建SparkDataFrames
df = spark.read.json("examples/src/main/resources/people.json") df.show()
3.DataFrame的基本操作
读取文件生成DataFramedf = spark.read.json("examples/src/main/resources/people.json")
查看内容
df.show()
树结构打印表结构
df.printSchema()
选择一列
df.select("name").show()
选择两列,其中一列+1
df.select(df['name'],df['age']+1).show()
筛选
df.filter(df['age']>21).show()
分组聚合
df.groupBy("age").count().show()
4.由RDD转换
方式1:Row推断模式
from pyspark.sql import Row sc = spark.sparkContext lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l:l.split(',')) people = parts.map(lambda p:Row(name=p[0],age=int(p[1]))) schemaPeople = spark.createSchema(people) schemaPeople.createOrReplaceTempView('people')
名为
spark的SparkSession注册了名为
people的table,可通过
spark.sql()执行对注册的表的sql语句。
table由RDD转化来,由
Row()建立列,用
createDataFrame()注册table,用
createOrReplaceTempView()建立表名。
teenagers = spark.sql('SELECT name FROM people WHERE age >= 13 AND age <= 19') teenNames = teenagers.map(lambda p:'name:' + p.name) for teenName in teenNames.collect(): print(teenName)
已经注册了表的SparkSession执行的sql语句可用RDD的操作。
方式2:StructType指定模式
from pyspark.sql.types import * sc = spark.sparkContext lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l:l.split(',')) people = parts.map(lambda p:(p[0],p[1].strip())) schema = StructType().add('name','string',True).add('age','int',True) schemaPeople = spark.createDataFrame(people, schema) schemaPeople.createOrReplaceTempView("people")
RDD转换为DataFrame没有用Row时未识别模式,通过
StructType()用添加add()方法加入
StructField('列名','数据类型',是否允许null)建立表结构(即模式schema),注册table时候StructType()对象作为第二个参数以
createDataFrame加入SparkSession。
5.Todo
相关文章推荐
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Spark随谈——开发指南(译)
- Spark,一种快速数据分析替代方案
- eclipse 开发 spark Streaming wordCount
- Understanding Spark Caching
- ClassNotFoundException:scala.PreDef$
- Windows 下Spark 快速搭建Spark源码阅读环境
- Spark中将对象序列化存储到hdfs
- 使用java代码提交Spark的hive sql任务,run as java application
- Spark机器学习(一) -- Machine Learning Library (MLlib)
- Spark机器学习(二) 局部向量 Local-- Data Types - MLlib
- Spark机器学习(三) Labeled point-- Data Types
- Spark初探
- Spark Streaming初探
- Spark本地开发环境搭建
- zeppelin 提交spark 任务异常:.JsonMappingException: Could not find creator property with name zeppelin
- 搭建hadoop/spark集群环境
- Spark Infomation
- Spark HA部署方案