您的位置:首页 > 数据库

Spark SQL and DataFrames

2016-09-06 11:26 381 查看

1.SparkSession

SparkSQL的操作都建立在SparkSession上,创建一个SparkSession叫spark,后面代码都基于此,不再提示

from pyspark.sql import SparkSession

spark = SparkSession\
.builder\
.appName("PythonSQL")\
.config("spark.some.config.option", "some-value")\
.getOrCreate()


2.创建SparkDataFrames

df = spark.read.json("examples/src/main/resources/people.json")
df.show()


3.DataFrame的基本操作

读取文件生成DataFrame
df = spark.read.json("examples/src/main/resources/people.json")


查看内容
df.show()


树结构打印表结构
df.printSchema()


选择一列
df.select("name").show()


选择两列,其中一列+1
df.select(df['name'],df['age']+1).show()


筛选
df.filter(df['age']>21).show()


分组聚合
df.groupBy("age").count().show()


4.由RDD转换

方式1:Row推断模式

from pyspark.sql import Row
sc = spark.sparkContext

lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l:l.split(','))
people = parts.map(lambda p:Row(name=p[0],age=int(p[1])))

schemaPeople = spark.createSchema(people)
schemaPeople.createOrReplaceTempView('people')


名为
spark
的SparkSession注册了名为
people
的table,可通过
spark.sql()
执行对注册的表的sql语句。

table由RDD转化来,由
Row()
建立列,用
createDataFrame()
注册table,用
createOrReplaceTempView()
建立表名。

teenagers = spark.sql('SELECT name FROM people WHERE age >= 13 AND age <= 19')
teenNames = teenagers.map(lambda p:'name:' + p.name)
for teenName in teenNames.collect():
print(teenName)


已经注册了表的SparkSession执行的sql语句可用RDD的操作。

方式2:StructType指定模式

from pyspark.sql.types import *
sc = spark.sparkContext

lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l:l.split(','))
people = parts.map(lambda p:(p[0],p[1].strip()))

schema = StructType().add('name','string',True).add('age','int',True)

schemaPeople = spark.createDataFrame(people, schema)
schemaPeople.createOrReplaceTempView("people")


RDD转换为DataFrame没有用Row时未识别模式,通过
StructType()
用添加add()方法加入
StructField('列名','数据类型',是否允许null)
建立表结构(即模式schema),注册table时候StructType()对象作为第二个参数以
createDataFrame
加入SparkSession。

5.Todo

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark