您的位置:首页 > 数据库

一起学spark(11) -- Spark SQL 和 DataFrame 操作

2018-02-06 19:19 459 查看
Spark SQL是在Spark 1.0 中新加入的spark 组件,并快速成为了Spark中教受欢迎的操作结构化和半结构化数据的方式。DataFrame 是由 ROW对象组成的rdd,每个ROW对象表示一条记录,类似我们的表结构。
(1)采用spark sql 执行数据查询和插入from pyspark import SparkContext,SparkConf
from pyspark.sql import HiveContext,Row #没有 HiveContext 就用 SQLContext
conf = SparkConf().setMaster("yarn-client").setAppName("my_app")
conf.set("spark.network.timeout",300)
conf.set("spark.shuffle.consolidateFiles","true") #shuffle 合并
sc = SparkContext(conf = conf)
hiveCtx = HiveContext(sc)
#Hive 配置动态分区
hiveCtx.setConf("hive.exec.dynamic.partition","true") # or hiveCtx.sql("set hive.exec.dynamic.partition = true")
hiveCtx.setConf("hive.exec.dynamic.partition.mode","nonstrict")

#hiveCtx.sql("xxxx") 里面可以写任何Hive语句执行,包括查询,插入等操作,查询返回的就是一个DataFrame
emp_no = hiveCtx.sql("select empno,empname from emp").first().empno #DataFrame 也是RDD,故前面介绍的算子也适用

#数据插入
hiveCtx.sql("insert overwrite table emp select ...") #直接执行 sql 插入
df = hiveCtx.sql("select ...")
df.write.insertInto("emp",True) #True 表示是否 overwrite
df.write.partitionBy("busi_date").insertInto("emp",True) #如果是分区表 并以 busi_date 分区的话,这种只能开启动态(2)一般rdd转化成DataFrame
#1.一般转化
def change2row(x):
return Row(empno = x[0],empname = x[1])
rdd.map(lambda x:change2row(x)).toDF() #将 rdd 转换成 DataFrame
#或者直接
rdd.map(lambda x:Row(**x))

#2.使用createDataFrame创建  applySchema 或 inferSchema 不建议采用
#假设一个RDD:my_rdd = [string,float,int]
from pyspark.sql.types import StructType,StructField,StringType,FloatType,IntegerType
schema = StructType([StructField("student_no",StringType(),True),
StructField("score",FloatType(),True),
StructField("rank",IntegerType(),True)])
df = hiveCtx.createDataFrame(my_rdd,schema)#我们就将一个数组rdd,创建成了一个学生号,分数,排名的一个表结构

#如果是字典类型的rdd 比如my_rdd = {"student_no":"xxx","score":xxx,"rank":xx},则可以直接创建
df = hiveCtx.createDataFrame(my_rdd)
(3)DataFrame 的操作
#1.两个 DataFrame 之间的连接
cond = [df1.id = df2.id] #如果多个条件 [(df1.id = df2.id) & (df1.name = df2.name)]
df = df1.join(df2,cond,'inner').select(df1.xxx,...) #如果有多个条件,可用 & 连接
#连接类型有 "inner","fullouter","leftouter","rightouter","leftsemi" 等,跟表连接差不多

#2.DataFrame 函数
from pyspark.sql.functions import sum,max,mean,min,avg,day,abs,to_date ...#根据需要选择
schemaRDD = hiveCtx.createDataFrame(my_rdd,schema).groupBy("student_no").\
agg(sum("score").alias("sum_score"),min("score").alias("min_score"))

#3.DataFrame 转化成临时表,以便我们可想像表一样操作DataFrame
df.registerTempTable("table_name") #我们就可以在sql 中使用 table_name 操作 df

#4.创建自定函数UDF 在spark sql 中可以使用
hiveCtx.registerFunction("strLenPython",lambda x:len(x),IntegerType())
lenthSchemaRdd = hiveCtx.sql("select strLenPython('text') from dual") #使用
def func_name(xx):
xxx
hiveCtx.registerFunction("fc",func_name)
fcSchemaRdd = hiveCtx.sql("select fc(name) from emp")
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark sql dataframe