您的位置:首页 > 数据库

SparkSQL入门_1

2015-10-30 17:18 369 查看
概述

DataFrame

SQL query

ReadWrite

Example

概述

先说说准备工作吧。

目前使用的是伪分布式模式,hadoop,spark都已经配置好了。

数据仓库采用的是hive,hive的metastore存储在mysql中。

现在的主要目的是想把spark和hive结合起来,也就是用spark读取hive中的数据。

所以就用到了sparksql。

sparksql的配置有点麻烦,需要将spark的源码编译获取assembly包,另外还需要mysql-connector的驱动包,另外再将hive-site.xml放到conf文件夹中就可以了。

目前存在的问题是sparksql创建表权限报错,解决的方法是用hive先创建了。

sparksql整体的逻辑是dataframe,df可以从Row形式的RDD转换。同时df还可以转换成表接着使用sql的语句进行查询操作。

DataFrame

HiveContext是SQLContext的超集,一般需要实例化它,也就是

from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)

#创建df
df = sqlContext.read.json("examples/src/main/resources/people.json")

#df的操作
df.show()
df.printSchema()
df.select("name").show()
df.select(df['name'], df['age'] + 1).show()
df.filter(df['age'] > 21).show()
df.groupBy("age").count().show()


SQL query

df = sqlContext.sql("SELECT * FROM table")


ReadWrite

#读写数据
#一般的读写操作
df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
#指定格式的读写
df = sqlContext.read.load("examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")

#将df暂时保存,重启核后消失
DataFrame.saveAsTable("people3")
#将df直接保存到hive的metastore中,通过hive可以查询到
#df格式的数据registerTempTable到表中就可以使用sql语句查询了
DataFrame.registerTempTable ("people3")


Example

#创建一个表
# sc is an existing SparkContext.
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = sqlContext.createDataFrame(people)
schemaPeople.registerTempTable("people")

# SQL can be run over DataFrames that have been registered as a table.
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are RDDs and support all the normal RDD operations.
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
print(teenName)


# hive的操作
# sc is an existing SparkContext.
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries can be expressed in HiveQL.
results = sqlContext.sql("FROM src SELECT key, value").collect()

#常用的操作
hiveql.table("student").show()
hiveql.tables().show()
hiveql.tableNames()
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark sparksql hive