SparkSQL入门_1
2015-10-30 17:18
369 查看
概述
DataFrame
SQL query
ReadWrite
Example
目前使用的是伪分布式模式,hadoop,spark都已经配置好了。
数据仓库采用的是hive,hive的metastore存储在mysql中。
现在的主要目的是想把spark和hive结合起来,也就是用spark读取hive中的数据。
所以就用到了sparksql。
sparksql的配置有点麻烦,需要将spark的源码编译获取assembly包,另外还需要mysql-connector的驱动包,另外再将hive-site.xml放到conf文件夹中就可以了。
目前存在的问题是sparksql创建表权限报错,解决的方法是用hive先创建了。
sparksql整体的逻辑是dataframe,df可以从Row形式的RDD转换。同时df还可以转换成表接着使用sql的语句进行查询操作。
DataFrame
SQL query
ReadWrite
Example
概述
先说说准备工作吧。目前使用的是伪分布式模式,hadoop,spark都已经配置好了。
数据仓库采用的是hive,hive的metastore存储在mysql中。
现在的主要目的是想把spark和hive结合起来,也就是用spark读取hive中的数据。
所以就用到了sparksql。
sparksql的配置有点麻烦,需要将spark的源码编译获取assembly包,另外还需要mysql-connector的驱动包,另外再将hive-site.xml放到conf文件夹中就可以了。
目前存在的问题是sparksql创建表权限报错,解决的方法是用hive先创建了。
sparksql整体的逻辑是dataframe,df可以从Row形式的RDD转换。同时df还可以转换成表接着使用sql的语句进行查询操作。
DataFrame
HiveContext是SQLContext的超集,一般需要实例化它,也就是from pyspark.sql import HiveContext sqlContext = HiveContext(sc) #创建df df = sqlContext.read.json("examples/src/main/resources/people.json") #df的操作 df.show() df.printSchema() df.select("name").show() df.select(df['name'], df['age'] + 1).show() df.filter(df['age'] > 21).show() df.groupBy("age").count().show()
SQL query
df = sqlContext.sql("SELECT * FROM table")
ReadWrite
#读写数据 #一般的读写操作 df = sqlContext.read.load("examples/src/main/resources/users.parquet") df.select("name", "favorite_color").write.save("namesAndFavColors.parquet") #指定格式的读写 df = sqlContext.read.load("examples/src/main/resources/people.json", format="json") df.select("name", "age").write.save("namesAndAges.parquet", format="parquet") #将df暂时保存,重启核后消失 DataFrame.saveAsTable("people3") #将df直接保存到hive的metastore中,通过hive可以查询到 #df格式的数据registerTempTable到表中就可以使用sql语句查询了 DataFrame.registerTempTable ("people3")
Example
#创建一个表 # sc is an existing SparkContext. from pyspark.sql import SQLContext, Row sqlContext = SQLContext(sc) # Load a text file and convert each line to a Row. lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: Row(name=p[0], age=int(p[1]))) # Infer the schema, and register the DataFrame as a table. schemaPeople = sqlContext.createDataFrame(people) schemaPeople.registerTempTable("people") # SQL can be run over DataFrames that have been registered as a table. teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") # The results of SQL queries are RDDs and support all the normal RDD operations. teenNames = teenagers.map(lambda p: "Name: " + p.name) for teenName in teenNames.collect(): print(teenName)
# hive的操作 # sc is an existing SparkContext. from pyspark.sql import HiveContext sqlContext = HiveContext(sc) sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") # Queries can be expressed in HiveQL. results = sqlContext.sql("FROM src SELECT key, value").collect() #常用的操作 hiveql.table("student").show() hiveql.tables().show() hiveql.tableNames()
相关文章推荐
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- 分享Hive的一份胶片资料
- Spark随谈——开发指南(译)
- Spark,一种快速数据分析替代方案
- Hadoop生态上几个技术的关系与区别:hive、pig、hbase 关系与区别
- eclipse 开发 spark Streaming wordCount
- Spark初探
- Spark Streaming初探
- 搭建hadoop/spark集群环境
- 将Hive的默认数据库Derby改为Postgresql
- kettle中对hive操作时需要知道的东西
- #Note# Analyzing Twitter Data with Apache Hadoo...
- [翻译]Hive wiki GettingStarted
- 整合Kafka到Spark Streaming——代码示例和挑战
- Spark 性能相关参数配置详解-任务调度篇
- 基于spark1.3.1的spark-sql实战-01
- 基于spark1.3.1的spark-sql实战-02
- 在 Databricks 可获得 Spark 1.5 预览版
- 启动hive命令报错 “Metastore contains multiple versions”