sparkSQL、dataframe
2016-11-17 15:28
316 查看
http://www.aboutyun.com/forum.php?mod=viewthread&tid=12358&page=1
空值填充:http://spark.apache.org/docs/1.5.0/api/python/_modules/pyspark/sql/dataframe.html
spark 将dataframe数据写入Hive分区表:http://www.cnblogs.com/longjshz/p/5414051.html
#df22.select("pkg","cnt01").sort("cnt01",ascending=False).show(100) #按照某一个字段进行排序,降序
#从数据表读取数据,把数据读为数据框
df=sqlContext.sql("select * from zhangb.gedeng limit 2")
#把整张数据表变成数据框
df1=sqlContext.table("zhangb.gedeng")
#把数据框转成rdd形式
dfrdd=df1.rdd #不正规
dfrdd1 =df1.rdd.map(tuple)
dfrdd2 =df1.rdd.map(list)
#把数据框注册为表
df1.registerTempTable("people")
# 将普通RDD转变为DataFrame
rdd = sparkContext.textFile("sex") \
.flatMap(lambda line: line.split()) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b) \
wordCounts = sqlContext.createDataFrame(rdd, ["word", "count"]
#实际数据练习rdd转换成df
rdd = sc.textFile("sex").map(lambda p :p.strip().split('\t')).\
filter(lambda p:len(p)==3).map(lambda p:((re.split(";|,",p[2])),int(p[1]))).\
flatMap(lambda p:[(p[0][i],p[1])for i in range(len(p[0])) if i%2==0]).filter(lambda p:p[0]!='')
pkg1 = sqlContext.createDataFrame(rdd, ["pkg", "sex"])
# 将本地数据容器转变为DataFrame
da = [("Alice", 21), ("Bob", 24)]
people = sqlContext.createDataFrame(da, ["name", "age"])
db=[("Alice", 100,46), ("Bob", 39,47),("cele", 89,30)]
score=sqlContext.createDataFrame(db,['name','math','eng'])
#join
dc=people.join(score,people.name==score.name,"left_outer")
# 将Pandas DataFrame转变为Spark DataFrame(Python API特有功能)
sparkDF = sqlContext.createDataFrame(pandasDF)
#=========对数据框进行查看操作
# 创建一个只包含"年轻"用户的DataFrame
young = users.filter(users.age < 21)
# 也可以使用Pandas风格的语法
young = users[users.age < 21]
# 将所有人的年龄加1
young2=young.select(young.name, young.age + 1)
# 统计年轻用户中各性别人数
young.groupBy("gender").count().show()
# 将所有年轻用户与另一个名为logs的DataFrame联接起来(合并)
young.join(logs, logs.userId == users.userId, "left_outer")
# 除DSL以外,我们当然也可以像以往一样,用SQL来处理DataFrame:
df1.registerTempTable("dd")
#==============保存输出
#最后,当数据分析逻辑编写完毕后,我们便可以将最终结果保存下来或展现出来:
# 保存为SQL表
young.saveAsTable(tableName="young", source="parquet" mode="overwrite")
# 转换为Pandas DataFrame(Python API特有功能)
pandasDF = young.toPandas()
#追加至HDFS上的Parquet文件
young.save(path="hdfs://path/to/data.parquet",
source="parquet",
mode="append")
#覆写S3上的JSON文件
young.save(path="s3n://path/to/data.json",
source="json",
mode="append")
#空值填充
空值填充:http://spark.apache.org/docs/1.5.0/api/python/_modules/pyspark/sql/dataframe.html
spark 将dataframe数据写入Hive分区表:http://www.cnblogs.com/longjshz/p/5414051.html
#df22.select("pkg","cnt01").sort("cnt01",ascending=False).show(100) #按照某一个字段进行排序,降序
#从数据表读取数据,把数据读为数据框
df=sqlContext.sql("select * from zhangb.gedeng limit 2")
#把整张数据表变成数据框
df1=sqlContext.table("zhangb.gedeng")
#把数据框转成rdd形式
dfrdd=df1.rdd #不正规
dfrdd1 =df1.rdd.map(tuple)
dfrdd2 =df1.rdd.map(list)
#把数据框注册为表
df1.registerTempTable("people")
# 将普通RDD转变为DataFrame
rdd = sparkContext.textFile("sex") \
.flatMap(lambda line: line.split()) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b) \
wordCounts = sqlContext.createDataFrame(rdd, ["word", "count"]
#实际数据练习rdd转换成df
rdd = sc.textFile("sex").map(lambda p :p.strip().split('\t')).\
filter(lambda p:len(p)==3).map(lambda p:((re.split(";|,",p[2])),int(p[1]))).\
flatMap(lambda p:[(p[0][i],p[1])for i in range(len(p[0])) if i%2==0]).filter(lambda p:p[0]!='')
pkg1 = sqlContext.createDataFrame(rdd, ["pkg", "sex"])
# 将本地数据容器转变为DataFrame
da = [("Alice", 21), ("Bob", 24)]
people = sqlContext.createDataFrame(da, ["name", "age"])
db=[("Alice", 100,46), ("Bob", 39,47),("cele", 89,30)]
score=sqlContext.createDataFrame(db,['name','math','eng'])
#join
dc=people.join(score,people.name==score.name,"left_outer")
# 将Pandas DataFrame转变为Spark DataFrame(Python API特有功能)
sparkDF = sqlContext.createDataFrame(pandasDF)
#=========对数据框进行查看操作
# 创建一个只包含"年轻"用户的DataFrame
young = users.filter(users.age < 21)
# 也可以使用Pandas风格的语法
young = users[users.age < 21]
# 将所有人的年龄加1
young2=young.select(young.name, young.age + 1)
# 统计年轻用户中各性别人数
young.groupBy("gender").count().show()
# 将所有年轻用户与另一个名为logs的DataFrame联接起来(合并)
young.join(logs, logs.userId == users.userId, "left_outer")
# 除DSL以外,我们当然也可以像以往一样,用SQL来处理DataFrame:
df1.registerTempTable("dd")
#==============保存输出
#最后,当数据分析逻辑编写完毕后,我们便可以将最终结果保存下来或展现出来:
# 保存为SQL表
young.saveAsTable(tableName="young", source="parquet" mode="overwrite")
# 转换为Pandas DataFrame(Python API特有功能)
pandasDF = young.toPandas()
#追加至HDFS上的Parquet文件
young.save(path="hdfs://path/to/data.parquet",
source="parquet",
mode="append")
#覆写S3上的JSON文件
young.save(path="s3n://path/to/data.json",
source="json",
mode="append")
#空值填充
1 pyspark --master yarn-client --executor-memory 5G --num-executors 50 2 import os 3 import copy 4 import codecs 5 import operator 6 import re 7 from math import log 8 from pyspark.sql import SQLContext,Row 9 from pyspark.mllib.regression import LabeledPoint 10 from pyspark import SparkContext, SparkConf 11 12 #从数据表读取数据,把数据读为数据框 13 df=sqlContext.sql("select * from zhangb.gedeng limit 2") 14 15 #把整张数据表变成数据框 16 df1=sqlContext.table("zhangb.gedeng") 17 18 #把数据框注册为表 19 df1.registerTempTable("people") 20 21 # 将普通RDD转变为DataFrame 22 rdd = sparkContext.textFile("sex") \ 23 .flatMap(lambda line: line.split()) \ 24 .map(lambda word: (word, 1)) \ 25 .reduceByKey(lambda a, b: a + b) \ 26 wordCounts = sqlContext.createDataFrame(rdd, ["word", "count"] 27 28 #实际数据练习rdd转换成df 29 30 rdd = sc.textFile("sex").map(lambda p :p.strip().split('\t')).\ 31 filter(lambda p:len(p)==3).map(lambda p:((re.split(";|,",p[2])),int(p[1]))).\ 32 flatMap(lambda p:[(p[0][i],p[1])for i in range(len(p[0])) if i%2==0]).filter(lambda p:p[0]!='') 33 34 pkg1 = sqlContext.createDataFrame(rdd, ["pkg", "sex"]) 35 36 # 将本地数据容器转变为DataFrame 37 da = [("Alice", 21), ("Bob", 24)] 38 people = sqlContext.createDataFrame(da, ["name", "age"]) 39 40 db=[("Alice", 100,46), ("Bob", 39,47),("cele", 89,30)] 41 score=sqlContext.createDataFrame(db,['name','math','eng']) 42 43 #join 44 dc=people.join(score,people.name==score.name,"left_outer") 45 46 # 将Pandas DataFrame转变为Spark DataFrame(Python API特有功能) 47 sparkDF = sqlContext.createDataFrame(pandasDF) 48 49 #=========对数据框进行查看操作 50 51 # 创建一个只包含"年轻"用户的DataFrame 52 young = users.filter(users.age < 21) 53 54 # 也可以使用Pandas风格的语法 55 young = users[users.age < 21] 56 57 # 将所有人的年龄加1 58 59 young2=young.select(young.name, young.age + 1) 60 61 # 统计年轻用户中各性别人数 62 young.groupBy("gender").count().show() 63 64 # 将所有年轻用户与另一个名为logs的DataFrame联接起来 65 young.join(logs, logs.userId == users.userId, "left_outer") 66 67 # 除DSL以外,我们当然也可以像以往一样,用SQL来处理DataFrame: 68 69 df1.registerTempTable("dd") 70 71 #==============保存输出 72 73 #最后,当数据分析逻辑编写完毕后,我们便可以将最终结果保存下来或展现出来: 74 # 保存为SQL表 75 young.saveAsTable(tableName="young", source="parquet" mode="overwrite") 76 77 # 转换为Pandas DataFrame(Python API特有功能) 78 pandasDF = young.toPandas() 79 80 #追加至HDFS上的Parquet文件 81 young.save(path="hdfs://path/to/data.parquet", 82 source="parquet", 83 mode="append") 84 85 #覆写S3上的JSON文件 86 87 young.save(path="s3n://path/to/data.json", 88 source="json", 89 mode="append")
相关文章推荐
- 1.Spark SQL:DataFrame的使用
- 3.Spark SQL:使用反射方式、编程方式,将RDD转换为DataFrame
- Spark SQL概述,DataFrames,创建DataFrames的案例,DataFrame常用操作(DSL风格语法),sql风格语法
- 一起学spark(11) -- Spark SQL 和 DataFrame 操作
- 1.Spark SQL:DataFrame的使用
- 3.Spark SQL:使用反射方式、编程方式,将RDD转换为DataFrame
- API。Spark SQL 1.3.0 DataFrame介绍、使用及提供了些完整的数据写入
- 一起学spark(11) -- Spark SQL 和 DataFrame 操作
- Spark 2.1 -- spark SQL , Dataframe 和DataSet 指南
- 第56课:解密Spark SQL与DataFrame的本质
- 转】Spark SQL 之 DataFrame
- 第56课:Spark SQL和DataFrame的本质
- 1.Spark SQL:DataFrame的使用
- 3.Spark SQL:使用反射方式、编程方式,将RDD转换为DataFrame
- 一起学spark(11) -- Spark SQL 和 DataFrame 操作
- SparkSQL---DataFrame
- 1.Spark SQL:DataFrame的使用
- 3.Spark SQL:使用反射方式、编程方式,将RDD转换为DataFrame
- 一起学spark(11) -- Spark SQL 和 DataFrame 操作
- Spark SQL与DataFrame