您的位置:首页 > 其它

Spark-1.3.1与Hive整合实现查询分析

2017-03-29 18:44 369 查看
在大数据应用场景下,使用过Hive做查询统计分析的应该知道,计算的延迟性非常大,可能一个非常复杂的统计分析需求,需要运行1个小时以上,但是比之于使用MySQL之类关系数据库做分析,执行速度快很多很多。使用HiveQL写类似SQL的查询分析语句,最终经过Hive查询解析器,翻译成Hadoop平台上的MapReduce程序进行运行,这也是MapReduce计算引擎的特点带来的延迟问题:Map中间结果写文件。如果一个HiveQL语句非常复杂,会被翻译成多个MapReduce Job,那么就会有很多的Map输出中间结果数据到文件中,基本没有数据的共享。

如果使用Spark计算平台,基于Spark RDD数据集模型计算,可以减少计算过程中产生中间结果数据写文件的开销,Spark会把数据直接放到内存中供后续操作共享数据,减少了读写磁盘I/O操作带来的延时。另外,如果基于Spark on YARN部署模式,可以充分利用数据在Hadoop集群DataNode节点的本地性(Locality)特点,减少数据传输的通信开销。

软件准备

我把使用的相关软件的版本在这里列出来,以便测试验证,如下所示:

CentOS-6.6 (Final)

JDK-1.7.0_25

Maven-3.2.1

Hadoop-2.2.0

Spark-1.3.1

Hive-0.12.0

MySQL-Server-5.5.8

另外还要搭建好Hadoop集群,以及安装配置好Hive客户端,能够在Hive上正确执行查询分析,安装过程不再累述,可以参考网上很多文档。由于我们使用最新版本的Spark-1.3.1,为了使用我们现有2.2.0版本的Hadoop平台,所以需要重新编译构建Spark程序,接下来会做详细说明。

这里,给出使用的各个集群环境的结构拓扑,如下表所示:
Source节点服务名称说明
hadoop1Spark Master/Spark DriverSpark集群
hadoop2DataNode/NodeManagerHadoop集群
hadoop3DataNode/NodeManagerHadoop集群
hadoop4HiveHive客户端
hadoop5Spark WorkerSpark集群
hadoop6Spark Worker/NameNode/ResourceManager/Secondary NameNodeSpark集群/Hadoop集群
10.10.4.130MySQL用于存储Hive元数据
上述节点配置相同,因为是测试机,所以配置相对比较低。我们是分别将Spark集群和Hadoop集群的Worker和NodeManager/DataNode分开部署了,在使用Spark做计算的时候,就没有数据本地性(Locality)的特性,所以如果基于Spark on YARN的模式,可能会获得更好地计算性能的提升。

Spark编译安装配置

首先从官网下在Spark源码文件:

1
cd
~/
2
wget http://mirror.bit.edu.cn/apache/spark/spark-1.3.1/spark-1.3.1.tgz
3
tar
xvzf
spark-1.3.1.tgz
4
mv
spark-1.3.1
spark-1.3.1-bin-hadoop2.2
我的环境是JDK 1.7,使用Maven构建,执行如下命令行:

1
export
MAVEN_OPTS=
"-Xmx2g
-XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
2
cd
/home/spark/spark-1.3.1-bin-hadoop2.2/
3
mvn
-Pyarn -Dyarn.version=2.2.0 -Phadoop-2.2 -Dhadoop.version=2.2.0 -Phive -Phive-0.12.0 -Phive-thriftserver -DskipTests clean package
编译构建完成以后,可以看到如下内容:

1
/home/spark/spark-1.3.1-bin-hadoop2.2/assembly/target/scala-2.10/spark-assembly-1.3.1-hadoop2.2.0.jar
2
/home/spark/spark-1.3.1-bin-hadoop2.2/lib_managed/*.jar
如果网络状况不好,可能无法构建成功。

另外,也可以使用sbt构建,执行如下命令:

1
cd
/home/spark/spark-1.3.1-bin-hadoop2.2/
2
build/sbt
-Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -Phive -Phive-0.12.0 -Phive-thriftserver assembly
如果失败,多试几次可能会以成功。

使用Maven构建与使用sbt构建,都要耗费很长时间,而且最终生成的文件可能会有所不同。

下面,我们配置Spark集群,首先在Spark Master节点上配置,修改配置文件conf/slaves,将Worker节点主机名加入进去,一行一个,内容如下所示:

1
hadoop5
2
hadoop6
修改Spark环境变量配置文件conf/spark-env.sh,增加如下配置行:

1
SPARK_MASTER_IP=hadoop1
修改配置文件spark-defaults.conf,内容如下所示:

1
spark.eventLog.enabled
true
2
spark.eventLog.dir hdfs://hadoop6:8020/spark/logs/events
登录到Hive安装的节点,将Hive的配置文件拷贝到Spark安装目录下的conf目录下面,执行如下命令:

1
scp
/usr/
local
/hive/conf/hive-site.xml
spark@hadoop1:/home/spark/spark-1.3.1-bin-hadoop2.2/conf/
最后分发Spark安装文件到Spark Worker节点上:

1
sudo
scp
-r
/home/spark/spark-1.3.1-bin-hadoop2.2 spark@hadoop5:/home/spark/spark-1.3.1-bin-hadoop2.2/
2
sudo
scp
-r
/home/spark/spark-1.3.1-bin-hadoop2.2 spark@hadoop6:/home/spark/spark-1.3.1-bin-hadoop2.2/
为了方便启动Spark集群,可以配置Spark Master到Workers的ssh免密码登录,然后只需要在Master中执行如下脚本即可:

1
sbin/start-all.sh
可以查看Spark各个节点的服务启动情况,也可以通过Spark UI链接进入页面查看http://hadoop1:8080/,默认是8080端口,如果8080端口已经被占用,Spark会自动选择端口号数字加1,如http://hadoop1:8081/

Spark+Hive整合

我们知道,在使用Hive进行查询的时候,到底层MapReduce计算层会将HiveQL翻译成MapReduce程序,在Hadoop平台上执行计算,这使得计算的延迟比较大。我们整合Spark和Hive,就是通过Spark平台来计算Hive查询,也就是Hive不再使用它默认的MapReduce计算引擎,Spark会直接读取Hive的元数据存储,将Hive数据转换成Spark RDD数据,通过Spark提供的计算操作来实现(Transformation和Action)。

我们首先在Hive中创建一个数据库event_db,执行如下命令:

1
CREATE
DATABASE
event_db;
在创建一个Hive外部表user_event,执行DDL脚本:

01
CREATE
EXTERNAL
TABLE
event_db.user_event(
02
appid
string,
03
event_code
string,
04
udid
string,
05
uid
string,
06
install_id
string,
07
session_id
string,
08
play_id
string,
09
page
string,
10
timestamp
string,
11
action
string,
12
network
string,
13
operator
string,
14
lon
string,
15
lat
string,
16
imsi
string,
17
speed
string,
18
event_id
string,
19
type
string,
20
result
string,
21
refer
string,
22
radio_id
bigint
,
23
audio_id
bigint
,
24
play_time
bigint
,
25
duration
bigint
,
26
start_time
string,
27
end_time
string,
28
request_agent
string,
29
request_referer
string,
30
device_id
string,
31
model_id
string,
32
area_tag
string,
33
remarks4
string,
34
remarks5
string,
35
ip
bigint
,
36
area_code
int
,
37
create_time
string)
38
PARTITIONED
BY
(
39
create_date
string)
40
ROW
FORMAT DELIMITED
41
FIELDS
TERMINATED
BY
'\t'
42
STORED
AS
INPUTFORMAT
43
'org.apache.hadoop.mapred.TextInputFormat'
44
OUTPUTFORMAT
45
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
46
LOCATION
47
'hdfs://hadoop6:8020/hive/event_db/user_event'
;
我选择了一天的用户事件数据(大概有5G左右,13824560条记录),将数据加载到Hive的分区中,执行如下LOAD命令行:

1
LOAD
DATA
LOCAL
INPATH
'/home/shirdrn/data/user_event_20150511.log'
OVERWRITE
INTO
TABLE
event_db.user_event
PARTITION (create_date=
'2015-05-11'
);
Standalone模式

我们可以通过指定SPARK_CLASSPATH变量,将需要访问Hive的元数据存储MySQL的驱动包加入进去,然后直接启动Spark SQL Shell即可。这里,使用Spark默认的集群管理模式Standalone,启动Shell时需要指定master选项为Spark Master服务连接:

1
SPARK_CLASSPATH=
"$SPARK_CLASSPATH:/home/spark/spark-1.3.0-bin-hadoop2.2/lib_managed/jars/mysql-connector-java-5.1.34.jar"
2
bin/spark-sql
--master spark://hadoop1:7077
这样我们可以直接在Spark SQL Shell上输入Hive查询语句就可以执行查询分析计算。

另外,还可以通过Spark Shell进行操作,不过需要了解Spark SQL支持的Scala API,启动Spark Shell,执行如下命令:

1
SPARK_CLASSPATH=
"$SPARK_CLASSPATH:/home/spark/spark-1.3.0-bin-hadoop2.2/lib_managed/jars/mysql-connector-java-5.1.34.jar"
2
bin/spark-shell
--master spark://hadoop1:7077
然后,创建一个org.apache.spark.sql.hive.HiveContext对象,用来执行Hive查询:

1
scala>
val
sqlContext
=
new
org.apache.spark.sql.hive.HiveContext(sc)
2
sqlContext
:
org.apache.spark.sql.hive.HiveContext
=
org.apache.spark.sql.hive.HiveContext
@
6
dcc
664
b
接着可以执行查询:

1
scala>
sqlContext.sql(
"SELECT
area_code,event_code,COUNT(udid)AS user_cntFROM event_db.user_eventWHERE create_date='2015-05-11' GROUP BY area_code,event_codeLIMIT 10"
).collect().foreach(println)
可以看到查询结果。

yarn-client模式

如果基于YARN模式运行(与Hive整合只支持yarn-client模式,不支持yarn-cluster),需要指定Hadoop集群的环境变量(在当前Driver节点上必须有Hadoop的安装文件),如下所示:

1
export
HADOOP_HOME=/usr/
local
/hadoop-2.2.0
2
export
HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
然后启动Spark SQL Shell,执行如下命令:

1
SPARK_CLASSPATH=
"$SPARK_CLASSPATH:/home/spark/spark-1.3.0-bin-hadoop2.2/lib_managed/jars/mysql-connector-java-5.1.34.jar"
2
bin/spark-sql
--master yarn-client
查询结果耗时比较

我们使用Hive,以及上面提到的两种模式分别执行如下HiveQL查询统计语句:

1
SELECT
area_code,event_code,
COUNT
(
DISTINCT
udid)
AS
user_cnt
FROM
event_db.user_event
WHERE
create_date=
'2015-05-11'
AND
(create_time
BETWEEN
'2015-05-11
17:00:00'
AND
'2015-05-11
23:30:00'
)
GROUP
BY
area_code,event_code
ORDER
BY
user_cnt
DESC
LIMIT
10
可以看到查询结果,结果如下所示:

01
156000000
100003 8290
02
110000
100003 7832
03
440100
100003 4956
04
110000
100010 3850
05
440300
100003 3709
06
320100
100003 3683
07
410100
100003 3669
08
110000
101014 3479
09
110000
200004 3455
10
110000
100011 3423
对比耗时,如下表所示:
运行模式花费时间(秒)
Hive189.695
Spark Standalone82.895
Spark yarn-client104.259
可见,无论是Spark Standalone模式还是Spark yarn-client模式,耗时都比直接执行Hive查询要少得多。我们执行Spark计算,2个Worker节点上各用了一个Executor,每个Executor使用512M内存,如果增加Executor个数,或者调大内存,应该比上面运行耗时更少,例如,启动Spark SQL Shell并指定相关参数:

1
bin/spark-sql
--master spark://hadoop1:7077 --driver-memory 1G --driver-cores 2 --executor-memory 4G
或者:

1
bin/spark-sql
--master yarn-client --driver-memory 1G --driver-cores 2 --executor-cores 4 --num-executors 8 --executor-memory 4G
总结

根据上面我们实践的整合Spark+Hive,在执行复杂统计分析时,完全可以使用Spark SQL来替代Hive,至少会提高几倍的速度,对于一些基于Hive统计应用,可能每天晚上要执行6个小时以上的统计计算,导致第二天结果数据都无法出来,如果统计需求再次增加,可能时间还会更长。除了对Hive查询语句进行优化之外,应该说优化空间不大,所以这个时候可以考虑使用Spark平台来实现统计分析,而且,Spark集群可以线性扩展,对于一些调优也更容易一些。

另外,Spark的发展超级迅猛,新版本频繁发布,而且在后期的版本中还会在性能方面进行大幅改进。Tungsten项目将是Spark自诞生以来内核级别的最大改动,以大幅度提升Spark应用程序的内存和CPU利用率为目标,旨在最大程度上压榨新时代硬件性能。Tungsten项目包括了3个方面的努力:

Memory Management和Binary Processing:利用应用的语义(Application Semantics)来更明确地管理内存,同时消除JVM对象模型和垃圾回收开销。

Cache-aware Computation(缓存友好的计算):使用算法和数据结构来实现内存分级结构(Memory Hierarchy)。

代码生成(Code Generation):使用代码生成来利用新型编译器和CPU。

Tungsten将大幅度提升Spark的核心引擎,在Spark 1.4版本,会包括Dataframe API中聚合操作的内存管理,以及定制化序列化器。在Spark 1.5版本中,会有部分项目(基于DataFrame模型)包括二进制内存管理的扩展和Cache-aware数据结构。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: