一起学spark(11) -- Spark SQL 和 DataFrame 操作
2018-02-06 19:19
381 查看
Spark SQL是在Spark 1.0 中新加入的spark 组件,并快速成为了Spark中教受欢迎的操作结构化和半结构化数据的方式。DataFrame 是由 ROW对象组成的rdd,每个ROW对象表示一条记录,类似我们的表结构。
(1)采用spark sql 执行数据查询和插入from pyspark import SparkContext,SparkConf
from pyspark.sql import HiveContext,Row #没有 HiveContext 就用 SQLContext
conf = SparkConf().setMaster("yarn-client").setAppName("my_app")
conf.set("spark.network.timeout",300)
conf.set("spark.shuffle.consolidateFiles","true") #shuffle 合并
sc = SparkContext(conf = conf)
hiveCtx = HiveContext(sc)
#Hive 配置动态分区
hiveCtx.setConf("hive.exec.dynamic.partition","true") # or hiveCtx.sql("set hive.exec.dynamic.partition = true")
hiveCtx.setConf("hive.exec.dynamic.partition.mode","nonstrict")
#hiveCtx.sql("xxxx") 里面可以写任何Hive语句执行,包括查询,插入等操作,查询返回的就是一个DataFrame
emp_no = hiveCtx.sql("select empno,empname from emp").first().empno #DataFrame 也是RDD,故前面介绍的算子也适用
#数据插入
hiveCtx.sql("insert overwrite table emp select ...") #直接执行 sql 插入
df = hiveCtx.sql("select ...")
df.write.insertInto("emp",True) #True 表示是否 overwrite
df.write.partitionBy("busi_date").insertInto("emp",True) #如果是分区表 并以 busi_date 分区的话,这种只能开启动态(2)一般rdd转化成DataFrame
(1)采用spark sql 执行数据查询和插入from pyspark import SparkContext,SparkConf
from pyspark.sql import HiveContext,Row #没有 HiveContext 就用 SQLContext
conf = SparkConf().setMaster("yarn-client").setAppName("my_app")
conf.set("spark.network.timeout",300)
conf.set("spark.shuffle.consolidateFiles","true") #shuffle 合并
sc = SparkContext(conf = conf)
hiveCtx = HiveContext(sc)
#Hive 配置动态分区
hiveCtx.setConf("hive.exec.dynamic.partition","true") # or hiveCtx.sql("set hive.exec.dynamic.partition = true")
hiveCtx.setConf("hive.exec.dynamic.partition.mode","nonstrict")
#hiveCtx.sql("xxxx") 里面可以写任何Hive语句执行,包括查询,插入等操作,查询返回的就是一个DataFrame
emp_no = hiveCtx.sql("select empno,empname from emp").first().empno #DataFrame 也是RDD,故前面介绍的算子也适用
#数据插入
hiveCtx.sql("insert overwrite table emp select ...") #直接执行 sql 插入
df = hiveCtx.sql("select ...")
df.write.insertInto("emp",True) #True 表示是否 overwrite
df.write.partitionBy("busi_date").insertInto("emp",True) #如果是分区表 并以 busi_date 分区的话,这种只能开启动态(2)一般rdd转化成DataFrame
#1.一般转化 def change2row(x): return Row(empno = x[0],empname = x[1]) rdd.map(lambda x:change2row(x)).toDF() #将 rdd 转换成 DataFrame #或者直接 rdd.map(lambda x:Row(**x)) #2.使用createDataFrame创建 applySchema 或 inferSchema 不建议采用 #假设一个RDD:my_rdd = [string,float,int] from pyspark.sql.types import StructType,StructField,StringType,FloatType,IntegerType schema = StructType([StructField("student_no",StringType(),True), StructField("score",FloatType(),True), StructField("rank",IntegerType(),True)]) df = hiveCtx.createDataFrame(my_rdd,schema)#我们就将一个数组rdd,创建成了一个学生号,分数,排名的一个表结构 #如果是字典类型的rdd 比如my_rdd = {"student_no":"xxx","score":xxx,"rank":xx},则可以直接创建 df = hiveCtx.createDataFrame(my_rdd)(3)DataFrame 的操作
#1.两个 DataFrame 之间的连接 cond = [df1.id = df2.id] #如果多个条件 [(df1.id = df2.id) & (df1.name = df2.name)] df = df1.join(df2,cond,'inner').select(df1.xxx,...) #如果有多个条件,可用 & 连接 #连接类型有 "inner","fullouter","leftouter","rightouter","leftsemi" 等,跟表连接差不多 #2.DataFrame 函数 from pyspark.sql.functions import sum,max,mean,min,avg,day,abs,to_date ...#根据需要选择 schemaRDD = hiveCtx.createDataFrame(my_rdd,schema).groupBy("student_no").\ agg(sum("score").alias("sum_score"),min("score").alias("min_score")) #3.DataFrame 转化成临时表,以便我们可想像表一样操作DataFrame df.registerTempTable("table_name") #我们就可以在sql 中使用 table_name 操作 df #4.创建自定函数UDF 在spark sql 中可以使用 hiveCtx.registerFunction("strLenPython",lambda x:len(x),IntegerType()) lenthSchemaRdd = hiveCtx.sql("select strLenPython('text') from dual") #使用 def func_name(xx): xxx hiveCtx.registerFunction("fc",func_name) fcSchemaRdd = hiveCtx.sql("select fc(name) from emp")
相关文章推荐
- 一起学spark(11) -- Spark SQL 和 DataFrame 操作
- 一起学spark(11) -- Spark SQL 和 DataFrame 操作
- 一起学spark(11) -- Spark SQL 和 DataFrame 操作
- 一起学spark(11) -- Spark SQL 和 DataFrame 操作
- 一起学spark(11) -- Spark SQL 和 DataFrame 操作
- 一起学spark(11) -- Spark SQL 和 DataFrame 操作
- 一起学spark(11) -- Spark SQL 和 DataFrame 操作
- 一起学spark(11) -- Spark SQL 和 DataFrame 操作
- 一起学spark(11) -- Spark SQL 和 DataFrame 操作
- 一起学spark(11) -- Spark SQL 和 DataFrame 操作
- 一起学spark(11) -- Spark SQL 和 DataFrame 操作
- 一起学spark(11) -- Spark SQL 和 DataFrame 操作
- 一起学spark(11) -- Spark SQL 和 DataFrame 操作
- 一起学spark(11) -- Spark SQL 和 DataFrame 操作
- 一起学spark(11) -- Spark SQL 和 DataFrame 操作
- 一起学spark(11) -- Spark SQL 和 DataFrame 操作
- 一起学spark(11) -- Spark SQL 和 DataFrame 操作
- 一起学spark(11) -- Spark SQL 和 DataFrame 操作
- sparkSQL里 sql语句,dataframe,Thrift Server JDBC都可以实现对数据的查询,过滤等操作, 哪这3种情况分别是什么情况下使用
- SparkSQL DataFrame常用操作(二)