您的位置:首页 > 数据库

SparkSQL性能分析与优化及相关工具小结

2018-03-02 21:24 826 查看

简介

前段时间的工作是将内部一个OLAP系统Hxxx作为一个数据源接入到SparkSQL并进行优化。本文总结下调优过程当中一些可以借鉴与讨论的地方,鉴于本人水平有限,还请有这方面调优经验的同学不吝赐教^_^

优化主要是从两个方面来考虑,

1. 集群粒度的调优,包括CPU与内存分配,数据分布,shuffle等。数据存储在HDFS上,Hxxx接入SparkSQL时已经保证了Data Locality,所以数据分布这里就不考虑了。我们的环境中会使用YARN来跑Spark任务,所以需要考虑在YARN上面资源分配的问题。

2. 单个节点的调优。性能优化领域的一位大牛,Brendan D. Gregg,的一段话,

Performance issues can be categorized into one of two types:

- On-CPU: where busy processes and threads are spending time running on-CPU.

- Off-CPU: where time is spent waiting while blocked on I/O, locks, timers, paging/swapping, etc.

所以我们对单节点的优化可以分为On-CPU跟Off-CPU的优化,其中会用一些神器来帮助我们进行分析,包括Flame GraphJava Mission Control等。

集群粒度的调优

这方面的调优有时候还是要靠经验,因此参考了下面几篇文章,

1. How-to: Tune Your Apache Spark Jobs (Part 1)

2. How-to: Tune Your Apache Spark Jobs (Part 2)

3. 官方文档Tuning Spark

以及SparkSummitStrata+Hadoop会议上的几个分享,

1. Top 5 mistakes when writing Spark applications

2. Tuning and Debugging in Apache Spark

3. Everyday I’m Shuffling - Tips for Writing Better Spark Programs

总结一个checklist供参考,

1. CPU,内存资源分配问题

2. 数据本地性

3. 数据shuffle相关

4. 数据格式,cache level,序列化,压缩等问题

5. 计算并行度,straggler排查

根据How-to: Tune Your Apache Spark Jobs (Part 2)里面分享的经验,在CPU跟内存分配这方面的调优带来了30%的提升,其他几点在我们这个case中都没有太大问题的。怎么知道有没有问题的?通过Spark History Server的WebUI就可以判断了,



可以看到基本上时间全消耗在executor的计算(绿色)上。

CPU跟内存分配调优怎么做的呢?主要是从一条很有价值的经验出发,文章中提到,

I’ve noticed that the HDFS client has trouble with tons of concurrent threads. A rough guess is that at most five tasks per executor can achieve full write throughput

因此使用
spark.executor.cores    5
这样来配置。我们环境中机器是16核的,所以每台机器分配3个executor,剩下一个核留给OS以及其他进程。相比于一开始,每台机器只分配一个executor,每个executor使用15个核,性能提升了30%,性价比非常之高。

单节点优化

从History Server的WebUI我们可以看到现在瓶颈是在单个节点的计算上面。所以接下来就应该对单个节点,也就是executor,进行优化了。如之前所提到的,这部分分为两类问题,On-CPU与Off-CPU。On-CPU,也就是热点代码执行所消耗的时间;Off-CPU,就是一些在关键路径上等待I/O,等待锁之类的所消耗的时间。下面就介绍一些可以在这两方面帮助我们进行分析的神器。

On-CPU

要找到热点代码,需要借助采样工具,在一定的频率下对我们的进程进行采样,每次采样的时候记录下当前占用CPU的调用栈。最后根据采样数据的比例找出热点代码。

有很多工具都能帮我们找到并展示热点代码,个人比较喜欢火焰图的表现方式,非常直观。利用火焰图我们不仅可以看到Java程序的热点代码,还可以看到JVM自身的热点代码。下面来看下怎么生成mixed的(C++代码与Java代码)火焰图。

perf + perf-map-agent

参考Java in Flames这篇文章,我们可以使用perf来对executor进程进行采样,

$ jps | grep CoarseGrainedExecutorBackend | awk 'NF==2 && NR==1 {print $1}' | perf record -F 99 -p `xargs` -a -g -- sleep 60


使用perf-map-agent生成采样数据中地址所对应的Java符号,最后解析采样数据并使用flamegraph.pl生成火焰图,

$ perf script -f comm,pid,tid,cpu,event,sym,trace | ./stackcollapse-perf.pl --pid | ./flamegraph.pl --color=java --hash > executor-flame.svg


如下,



可以看到在整个JVM进程当中,GC线程,JIT编译线程,Java主线程大概各占用了1/3的CPU。GC调优我们放到最后来看;JIT编译部分,我把分层编译打开(
-XX:+TieredCompilation
)之后性能有一点点提升;而Java主线程,由于perf-map-agent存在的问题,需要开启
-Xcomp
强制进行JIT编译才能得到正确的Java符号,否则如果是解释执行的字节码就只能看到如图所示的
Interpreter
。其实看下perf-map-agent的源码,perf-map-agent.c,就能看出问题,它的实现方式就是监听JVMTI事件,
CompiledMethodLoad
DynamicCodeGenerated
,当然也就无法得到解释执行的字节码对应的Java符号了。所以不推荐使用mixed的火焰图。

Java Flight Recorder

从Oracle JDK7u40开始,Oracle JRockit JVM的一个神器,Java Flight Recorder,被移植到了HotSpot VM(题外话,虽然是说移植到了HotSpot VM,但是同样使用了HotSpot VM的OpenJDK却不支持JFR也没有JMC,WTF?)。使用JFR我们也可以对Java进程进行采样,非常简单,只需要加几个option就可以了,

spark.executor.extraJavaOptions    -XX:+UnlockCommercialFeatures -XX:+FlightRecorder -XX:StartFlightRecording=filename=executor.jfr,dumponexit=true,settings=profile


dumponexit=true
表示进程结束时输出采样数据,也就是说要在整个运行过程中都进行采样,如果不需要的话可以指定
duration=Xs
只在X秒时间内采样。
settings=profile
将会使用
$JAVA_HOME/jre/lib/jfr
目录下的
profile.jfc
作为采样配置,后面我们会需要修改这个配置文件。

executor运行结束之后会生成一个数据文件
executor.jfr
,接下来可以借助jfr-flame-graph来解析,

$ ./flamegraph-output.sh folded -f executor.jfr -o executor.txt


再使用flamegraph.pl生成火焰图,

$ cat executor.txt | ./flamegraph.pl > executor-flame-java.svg


最终结果是svg图片,戳这里,可以搜索,放大。

可以看到,CPU主要消耗在两个地方,一个是左侧Hxxx读取文档(
HxxxSearchUtil.getDocValue
,可以到svg图里面搜索,有高亮),另一个是右侧SparkSQL的聚合操作(
agg_doAggregateWithKeys
是CodeGen生成的执行聚合操作的代码)。Hxxx的内部实现暂时还不熟悉,所以先不考虑这部分优化。聚合操作这边是有可以优化的代码的,



这两个地方对应的代码是这样的,
new HxxxInternalRow(it.asScala.toArray)
it
是一个
java.util.Iterator
,这地方为了简便使用了JavaConverters进行转换,再转成数组。虽然简便,但是看得出来还是比较耗CPU的,用麻烦点的方式优化一下,

val list = it.next()
val numFields = list.size()
val array = new Array[Any](numFields)
for(i <- 0 until numFields) {
val value = list.get(i)
array(i) = value
}
new HxxxInternalRow(array)


还有这个地方,
if(value.toString.equals("-2147482624")) null




增加了很多无谓的
toString
调用,改成
if(value.isInstanceOf[String] && value.asInstanceOf[String].equals("-2147482624")) null
就好了。

然后还有一个看得到的热点,



SparkSQL内部没有使用
String
而是自己搞了一个
UTF8String
,为了节省内存?而Hxxx返回的数据如果是字符串用的是
String
String
是使用
UTF-16
编码的,于是编码转换成了热点代码。这里只能先mark一下,等对SparkSQL内部实现更了解之后再来看能不能优化掉。

改完之后上面几个被优化的热点都消失了,



我们接下来的一个工作是要将聚合操作下推到Hxxx来做,完成之后应该可以直接消除掉右边聚合操作的热点。在这个case中,单个executor对将近两千万条数据进行了聚合,还是比较耗CPU的。

Off-CPU

想要分析Off-CPU的时间都耗在哪,与On-CPU不同,不能再通过采样来实现了。需要通过工具(Perf,DTrace,SystemTap等)收集相关的事件,例如进程上下文切换,再通过事件的时间差来计算出我们所关心的指标。

这时候Java Flight Recorder这神器又可以出场了,我们需要的这些事件,它也都帮我们搞定了。具体的事件类型可以参考官方文档。实际上在上面我们生成的
executor.jfr
文件里面已经包含了所有事件的数据,而之前我们使用jfr-flame-graph来解析jfr文件时,其实也仅仅是读取了
Method Profiling Sample
事件,可以看下源码,JFRToFlameGraphWriterCommand.java

既然现在我们已经有了这些数据,那么接下来就需要一个类似火焰图的工具将这些数据展示出来,这时候另一大神器出场了,Java Mission Control

Java Mission Control

JMC的使用非常简单,直接打开
%JAVA_HOME%/bin/jmc.exe
就可以了。然后用它来打开
executor.jfr
文件。打开之后左侧会有好几个tab,这个大家自己把玩一下就明白了,或者可以看下官方文档介绍

我们所关心的Off-CPU事件,在这,



但是下面的堆栈跟踪并没有展示总计的时间,这样不好找出瓶颈,我们可以结合事件tab里的堆栈跟踪来分析,按总计排序,



接下来就一个一个去看那些占大头的时间是不是在我们的关键路径上,也就是说是不是在关键路径上阻塞了。在我们的case里,触发
Java Thread Park
Java Thread Sleep
的代码,要嘛不在关键路径,要嘛是一些等待初始化等待计算的操作,基本没有优化的空间了,





有一个地方可以提下,如下图,像这样用来进行初始化的线程池,
corePoolSize
keepAliveTime
这些配置是可以用来优化线程资源的,mark下,这里先不展开了。



来看下
Java Monitor Wait
Java Monitor Blocked






HxxxSearcher.init
的时候,因为调用
DFSClient.getFileInfo
需要等待2秒多的时间,这个需要确定下具体的原因,看上去像是HDFS响应比较慢引起的(UPDATE:后来这个时间变成了几十毫秒,是环境不稳定导致的?)。

接下来是另一趴重头戏,I/O等待时间,也就是
File Read
File Write
Socket Read
Socket Write
事件。这里就需要修改上面我们提到的配置文件
profile.jfc
了,比如
file_read
file_write
的配置,

<event path="java/file_read">
<setting name="enabled">true</setting>
<setting name="stackTrace">true</setting>
<setting name="threshold" control="http://www.oracle.com/hotspot/jvm/file-io-threshold">10 ms</setting>
</event>

<event path="java/file_write">
<setting name="enabled">true</setting>
<setting name="stackTrace">true</setting>
<setting name="threshold" control="http://www.oracle.com/hotspot/jvm/file-io-threshold">10 ms</setting>
</event>


在我们这个case里需要将
10 ms
改成
10 us
,这个阈值是指事件超过多长时间才记录下来,改成
10 us
也就是说一次文件读写只要超过10us就会记录下来。Hxxx用来做实时检索,在文件读取上面要消耗不少时间,



可以看到在程序运行过程中执行了上万次文件读取操作(
FileChannelImpl#read
),总计等待了6秒多的时间,而且每次读取的文件大小都不超过1MB。这里就可以想到一种优化思路,是不是可以每次读取更多的数据,从而减少读取次数减少等待时间。当然这也需要对Hxxx更熟悉才能做,mark先。

GC调优

GC调优,首先要根据应用场景来选择collector,参考官方文档,如果是需要high throughput,选择Parallel GC(parallel collector又称为throughput collector);如果是需要low latency,对响应时间有要求,选择CMS GC;而G1 GC,相比CMS GC,可以达到更高的throughput。我们这个case,在YARN上面跑个几十秒,主要看的是throughput,于是选择用Parallel GC。根据HotSpot VM的Ergonomics,我们机器上默认使用的就是Parallel GC。

Parallel GC可以调整的参数不多,一开始我只设置了
-XX:ParallelGCThreads=5
,将并行GC线程数设置成与
spark.executor.cores
相同。借助JMC来看下GC情况,



可以看到,居然出现了两次FullGC(ParallelOld),GC的原因是Ergonomics,很明显不是因为内存不足引起(Allocation Failure)的,应该是可以避免的。那这个GC原因具体到底是个什么鬼?参考这篇文档,找到gcCause.cpp

case _adaptive_size_policy:
return "Ergonomics";


所以是因为adaptive size policy引起的,adaptive size policy是用来提升性能的,官方文档提到,

The heap size parameters selected by ergonomics plus the features of the adaptive size policy are meant to provide good performance for server applications.

具体来说就是会在JVM运行过程中动态调整堆的大小,我试了下不使用该policy,
-XX:-UseAdaptiveSizePolicy
,发现committed heap的大小就被固定死了(而且这个policy对CMS GC也有效,G1 GC没试),而这个committed heap,参考官方文档,就固定为
-Xms
指定的大小,

If the value of the -Xms parameter is smaller than the value of the -Xmx parameter, than not all of the space that is reserved is immediately committed to the virtual machine.

我没有指定
-Xms
,根据Ergonomics,初始堆的大小是1G。所以看上去是堆的动态增大(commit更多的堆)会触发FullGC?通过显式设置
-Xms8G
来看看,



两次由于Ergonomics引起的FullGC消失掉了,但是也可以看到committed heap仍然是在动态增大的,却没有再触发FullGC,因此可以确定,adaptive size policy引起的堆大小调整,不一定会触发FullGC,至于什么情况下会触发,恐怕得去看HotSpot VM的源码了。anyway,FullGC的问题解决掉了,而且因为Parallel GC是stop-the-world的,因此优化掉多少时间,executor的执行时间就减少掉多少。

大头优化掉之后,再看看其他还能优化的点。试了下
-XX:MaxGCPauseMillis=<N>
-XX:GCTimeRatio=<N>
并没有什么提升,看来这部分的Ergonomics并不适用于短进程?回到上图,现在能优化的也就是减少MinorGC(ParallelScavenge)了,首先能想到的就是把YoungGen调大一些,试了下
-XX:NewRatio=1
基本上可以再减少掉一次MinorGC。

调到这里已经不知道还有哪些option可以使用了(有经验的同学可以指点一下哈),接下来就只能从程序入手,看看内存都分配到哪去了。这时候JMC又可以登场了,看下内存tab里面的分配这一栏,



理解这里的数据需要参考下官方文档介绍

Small objects in Java are allocated in a TLAB (Thread Local Area Buffer). TLAB is a small memory area where new objects are allocated. Once a TLAB is full, the thread gets a new one. Logging all memory allocations gives an overhead; therefore, all allocations that triggered a new TLAB are logged. Larger objects are allocated outside TLAB, which are also logged.

因此这里所展示的是触发TLAB申请(已申请的TLAB满了,线程向JVM申请新的TLAB,TLAB的大小可以通过
-XX:TLABSize=<N>
指定)的内存分配以及大对象的内存分配(直接分配到堆上),而不是所有的内存分配

上图所展示的
byte[]
的分配,很大一部分都是前面提到的,
String
对象转换成Spark内部的
UTF8String
所引起的,还有排第四的
sun.nio.cs.UTF_8$Encoder
对象的分配也是,可见这部分如果优化掉可以减少很多内存分配。

再看下大对象的分配,



发现Hxxx在
GlobalInfo
里面分配了将近1G的
byte[]
,这是可以优化的地方,改成200M之后,基本上又减少了一次MinorGC,



TODO

最后,记录下几个待深入和优化的点,

1. Spark 聚合操作下推

2. Spark UTF8String 优化

3. Hxxx读取文档优化,包括I/O部分提高单次读取大小

alright,今天就先到这了,have fun^_^
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark 火焰图 jmc jfr