您的位置:首页 > 数据库

spark sql on hive笔记一

2016-12-09 20:39 453 查看
spark sql on hive非常方便,通过共享读取hive的元数据,我们可以直接使用spark sql访问hive的库和表,做更快的OLAP的分析。

spark 如果想直接能集成sql,最好自己编译下源码:
切换scala的版本为新版本
dev/change-scala-version.sh 2.11
编译支持hive
mvn -Pyarn -Phive  -Phive-thriftserver -Phadoop-2.7.3 -Dscala-2.11 -DskipTests clean package


注意,spark sql 可以直接在linux上使用,像执行hive命令一样,进入交互式终端,进行即席查询,进入spark-sql交互式终端命令,并指定以yarn的模式运行:
spark/bin/spark-sql  --master yarn


本次使用的spark2.0.2,进入交互式终端之后,可以进行任意的查询分析,但本文的笔记例子,不是基于终端的spark sql分析,而是在scala中使用spark sql on hive,在编程语言里面使用spark sql on hive 灵活性大大提供,能做更多的事情,比如说分析完的结果存储到mysql,hbase或者redis里面,或者分析的过程,需要外部存储的一些数据等等。

开发程序是在IDEA里面写的,项目风格是java+scala混搭采用maven管理,注意不是全scala项目,没有用sbt管理,sbt的国内下载非常慢,能翻墙的同学可以尝试一下。

功能: 使用spark sql读取hive的数据,然后根据某个字段分组,并收集分组结果,然后存储到redis里面。
def main(args: Array[String]): Unit = {

val t0=System.nanoTime();//开始时间
val spark=SparkSession
.builder()
.appName("spark on sql hive  ")
.enableHiveSupport().getOrCreate();//激活hive支持

import spark.implicits._
import spark.sql
sql(" use db")//切换db
//注意,collect_set 可以收集分组结果
val ds=sql("select q_id, collect_set(kp_id) as ids from ods_q_quest_kp_rel where kp_id!=0  group by q_id");
ds.cache();//cache起来,便于后续使用
println("size:",ds.collect().length)//打印长度
ds.select("q_id","ids").collect().foreach (
t =>
{
val key=t.getAs[String]("q_id");//获取上面的列映射
val value=t.getAs[Seq[String]]("ids").mkString(",");//获取上面的分组集合
//insert redis
}
)
val t1=System.nanoTime();

println("insert redis ok! Elapsed time: " + (t1 - t0)/1000/1000 + "ms")
//停止
spark.stop();

}


有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。 技术债不能欠,健康债更不能欠, 求道之路,与君同行。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark sql hive