您的位置:首页 > 大数据

6大数据实战系列-sparkSql实战

2017-10-12 14:21 190 查看
sparkSql两个最重要的类SqlContext、DataFrame,DataFrame功能强大,能够与rdd互转换、支持sql操作如sql().where.order.join.groupBy.limit等。

SparkSql的查询响应性能是hive的几何级倍数,并且SparkSql支持多种数据源操作包括hive、hdfs、rdd、json、mysql,本文先讲解hive、hdfs、rdd、json4种数据源操作。

1 基础环境

1.1 版本预览

Cnetos 6.5    已安装
Hadoop 2.8   已安装集群
Hive 2.3      待安装
Mysql 5.6     已安装
Spark 2.1.1    已安装


1.2 机器环境

192.168.0.251 slave
192.168.0.252 master
Hadoop:hadoop已做双机无密码登录


1.3 工作路径

Hadoop:/home/data/app/hadoop/hadoop-2.8.0/etc/hadoop
Spark:/home/data/app/hadoop/spark-2.1.1-bin-hadoop2.7
Hive数据路径: /user/hive/warehouse/


2 初始化配置

2.1 spark连接hive

节点Spark conf下增加hive-site.xml

<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://shulaibao2:9083</value>
<description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
</property>
</configuration>


2.2 启动hive支持metastore

nohup hive --service metastore > metastore.log 2>&1 &


2.3 spark集群重启

./stop-all.sh
./start-all.sh


3 sparkSql - hive数据源

3.1 sparkSql操作

./spark-sql --master spark://shulaibao2:7077 --executor-memory 1g


按年统计交易订单数量、交易金额
select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber
join tbDate c on a.dateid=c.dateid
group by c.theyear order by c.theyear;

计算每年销售额最大的订单
select c.theyear,max(d.sumofamount) from tbDate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d  on c.dateid=d.dateid group by c.theyear sort by c.theyear;


3.2 spark shell编码

val hiveQuery = sql("select * from hive_data.tbstock limit 10")

hiveQuery.collect()
res14: Array[org.apache.spark.sql.Row] = Array([BYSL00000893,ZHAO,2007-8-23], [BYSL00000897,ZHAO,2007-8-24], [BYSL00000898,ZHAO,2007-8-25], [BYSL00000899,ZHAO,2007-8-26], [BYSL00000900,ZHAO,2007-8-26], [BYSL00000901,ZHAO,2007-8-27], [BYSL00000902,ZHAO,2007-8-27], [BYSL00000904,ZHAO,2007-8-28], [BYSL00000905,ZHAO,2007-8-28], [BYSL00000906,ZHAO,2007-8-28])


4 sparkSql - RDD数据源

4.1 hdfs数据源

import spark.implicits._
case class Person(name: String, age: Int)
val peopleDF =
spark.sparkContext.textFile("hdfs://shulaibao2:9010/home/hadoop/upload/test/people.txt").map(_.split(",")).map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()

peopleDF.createOrReplaceTempView("people") : registerTempTable  - deprecation
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 24 AND 40")

teenagersDF.map(teenager => "Name: " + teenager(0)).show()
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()


4.2 RDD数据源

import spark.implicits._
case class Person(name:String, age:Int, state:String)
sc.parallelize(Person("Michael",29,"CA")::Person("Andy",30,"NY")::Person("Justin",19,"CA")::Person("Justin",25,"CA")::Nil).toDF().registerTempTable("people")

val query= sql("select * from people") : @return dataFrame

查询的schem
query.printSchema

query.collect() : @return Array[org.apache.spark.sql.Row]

查看整个运行计划:
query.queryExecution


5 json 数据源

hadoop fs -put /data/software/sougou/jsonPerson.json /home/hadoop/upload/test/

spark.sqlContext.jsonFile("/home/hadoop/upload/test/jsonPerson.json").registerTempTable("jsonPerson")
val jsonQuery = sql("select * from jsonPerson")
查看结构:
jsonQuery.printSchema
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  大数据 spark-sql hive
相关文章推荐