您的位置:首页 > 其它

Spark视频王家林大神第2课:解密spark第二代tungsten引擎测试数据和引擎实现内幕

2018-01-25 08:56 393 查看
Spark视频王家林大神第2课:解密spark第二代tungsten引擎测试数据和引擎实现内幕

本节探讨spark第二代tungsten引擎测试数据和引擎实现内幕。第二代tungsten钨丝计划的推出,Spark官方的数据表明Spark的性能提升了5到10倍,大多数的代码不经修改,直接放在Spark2.x上运行,会比在Spark1.6上运行,速度会快5到10倍。注意这里的大多数代码是指例如基本的filter操作、基本的HashJoin等操作,如果进行全局排序,这本身就很复杂,第二代tungsten引擎在这方面还没做更多的努力。大多数代码如一些耗CPU的操作,性能在Spark2.x中会有显著的提升。获得最大性能提升的就在于数据规模特别大且特别耗CPU,这正是平时性能优化的难点。
         Databricks公司的官方博客(https://databricks.com/blog/2016/05/11/apache-spark-2-0-technical-preview-easier-faster-and-smarter.html)提及Spark采用了第二代引擎,里面有个关键的技术全阶段代码生成(whole-stagecode generation)。在一台机器上运行,每10亿条记录数据的执行情况,在Spark1.6版本和Spark2.0版本上运行对比,Spark2.0的性能得到非常显著的提升。 
primitive
Spark 1.6
Spark 2.0
filter
15ns
1.1ns
sum w/o group
14ns
0.9ns
sum w/ group
79ns
10.7ns
hash join
115ns
4.0ns
sort (8-bit entropy)
620ns
5.3ns
sort (64-bit entropy)
620ns
40ns
sort-merge join
750ns
700ns
                                
        看一下Databricks公司提供的全阶段代码生成(whole-stagecode generation)的相关对比代码(http://cdn2.hubspot.net/hubfs/438089/notebooks/spark2.0/Whole-stage%20code%20generation.html),这里的测试集群只有1个节点,分配3个Cores,CPU 型号是Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz。
Databricks公司编写的基准测试代码benchmark,以秒为单位:
1.          // Define a simple benchmark utilfunction
2.         def benchmark(name: String)(f: => Unit) {
3.           val startTime = System.nanoTime
4.           f
5.           val endTime = System.nanoTime
6.           println(s"Time taken in$name: " + (endTime - startTime).toDouble / 1000000000 + "seconds")
7.         }
 
在求和计算中,Spark 1.6版本没有开启全阶段代码生成(whole-stagecode generation)功能,累加求和计算10亿条记录花费约8秒。
1.          // This config turns off wholestage code generation, effectively changing the execution path
2.         // to be similar to Spark 1.6.
3.         spark.conf.set("spark.sql.codegen.wholeStage", false)
4.          
5.         benchmark("Spark 1.6") {
6.           spark.range(1000L * 1000 *1000).selectExpr("sum(id)").show()
7.         }
 
在求和计算中,Spark2.0版本开启全阶段代码生成(whole-stagecode generation)功能,累加求和计算10亿条记录花费约0.7秒。
1.          spark.conf.set("spark.sql.codegen.wholeStage",true)
2.          
3.         benchmark("Spark2.0") {
4.           spark.range(1000L * 1000 *1000).selectExpr("sum(id)").show()
5.         }
 
在join操作中,Spark 1.6版本没有开启全阶段代码生成(whole-stagecode generation)功能,join操作计算10亿条记录花费约67秒。
1.          // This config turns off whole stage codegeneration, effectively changing the execution path
2.         // to be similar to Spark 1.6.
3.         spark.conf.set("spark.sql.codegen.wholeStage",false)
4.          
5.         benchmark("Spark1.6") {
6.           spark.range(1000L * 1000 *1000).join(spark.range(1000L).toDF(), "id").count()
7.         }
8.         Time taken in Spark 1.6:67.949386495 seconds
在join操作中,Spark2.0版本开启全阶段代码生成(whole-stagecode generation)功能,join操作计算10亿条记录花费约0.8秒。
1.          spark.conf.set("spark.sql.codegen.wholeStage",true)
2.          
3.         benchmark("Spark2.0") {
4.           spark.range(1000L * 1000 *1005).join(spark.range(1040L).toDF(), "id").count()
5.         }
6.         Time taken in Spark 2.0:0.864557455 seconds
 
   Join等相关操作的物理计划、优化的逻辑计划、已分析的逻辑计划、已解析的逻辑计划如下:
1.          spark.range(1000L * 1000 *1000).join(spark.range(1000L).toDF(),
2.          "id").selectExpr("count(*)").explain(true)
3.         == Parsed Logical Plan ==
4.         'Project[unresolvedalias('count(1), Some(count(1)))]
5.         +- Project [id#782L]
6.            +- Join Inner, Some((id#782L = id#785L))
7.               :- Range 0, 1000000000, 1, 3, [id#782L]
8.               +- Range 0, 1000, 1, 3, [id#785L]
9.          
10.      == Analyzed Logical Plan ==
11.      count(1): bigint
12.      Aggregate[(count(1),mode=Complete,isDistinct=false) AS count(1)#797L]
13.      +- Project [id#782L]
14.         +- Join Inner, Some((id#782L = id#785L))
15.            :- Range 0, 1000000000, 1, 3, [id#782L]
16.            +- Range 0, 1000, 1, 3, [id#785L]
17.       
18.      == Optimized Logical Plan ==
19.      Aggregate[(count(1),mode=Complete,isDistinct=false) AS count(1)#797L]
20.      +- Project
21.         +- Join Inner, Some((id#782L = id#785L))
22.            :- Range 0, 1000000000, 1, 3, [id#782L]
23.            +-Range 0, 1000, 1, 3, [id#785L]
24.       
25.      == Physical Plan ==
26.      TungstenAggregate(key=[],functions=[(count(1),mode=Final,isDistinct=false)], output=[count(1)#797L])
27.      +- Exchange SinglePartition,None
28.         +- TungstenAggregate(key=[],functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#799L])
29.            +- Project
30.               +- BroadcastHashJoin [id#782L],[id#785L], Inner, BuildRight, None
31.                  :- Range 0, 1, 3, 1000000000,[id#782L]
32.                  +- BroadcastExchangeHashedRelationBroadcastMode(List(input[0, bigint]))
33.                     +- Range 0, 1, 3, 1000,[id#785L]
 Spark 1.6、Spark 2.0性能测试对比显著提升,背后的原因是Spark2.0做了全阶段代码生成(whole-stagecode generation)。Spark 2.0中,如果物理计划中有*,则会启用whole-stage code generation的机制,Range,Filter等等都有这个*。shuffle操作会产生2个Stage,在第一个Stage中,Range,Filter, Aggregat三个操作合并成一个操作,Range、Filter、Aggregates操作都会通过whole-stagecode generation的方式执行,从whole-stage code generation这个名字可以看出来,全阶段代码生成是针对一个stage的,Exchange不具有whole-stagecode generation,因为whole-stage code generation在一个Stage内部,不能在2台机器上,在第二个Stage中,只有一个操作Aggregate。这说明第二代Spark tungsten的精髓所在,在一个Stage内部多个算子,原先我们自己的操作都是基于一个Iterator,然后执行Next、Next、Next......的方式,Next的操作方式是把数据放到内存中,处理一下数据,然后再放到内存中,再处理一下数据, 但现在是进行整个Stage的whole-stage code generation,就是一个Function,不需要很多Function。
Spark 2.x中的第二代Tungsten性能之所以能够提升10倍左右的原因:
1)      去掉了虚函数的调用,极大的减少了CPU指令的无用的消耗!Spark的计算模
型以前都是基于RDD的Iterator进行迭代,Iterator不断进行的Next操作,不同类型不同算子的组合带来大量虚操作,这些操作消耗时间,如果能把一个查询的操作封装成一个for循环,就极大的提升性能。
2)      数据直接放在寄存器中,至少提升了一个数量级的数据读写速度!假设对SQL
进行查询操作,与其写一个SQL Select语句,不如把这个Select语句翻译写成一个for循环,从一个集合中遍历数据,不用使用Next。原先的方式每Next执行一次,Next是一个函数调用;现在是一个for循环,最原始的数据集的计算方式,就是一个函数,不存在所谓的虚函数了。现在的CPU都能对产生的中间数据进行优化,数据不保存在内存,而是直接存储在寄存器中。
3)      现在的CPU等硬件架构对基本的条件语句,循环语句等进行了极大的优化。并
且可以使用硬件加速。例如写一个SQL语句,Spark 2.x中第二代Tungsten能够将SQL语句直接翻译成一个函数,for循环计算大多数的中间数据都可放到寄存器中,因此Range,Filter等操作能极大的提高速度。而不是以前每条数据都通过Iterator的Next进行虚函数、函数的调用,每条数据都有一个函数的调用。
4)       对于复杂的数据操作,采用Vectorzation的方式,采用列式的方式读写数据。
原来的数据例如DataFrameRow Format是一行一行的数据,需要Next、Next等操作;数据很复杂,有不同类型,如使用列式存储构建了向量,只需要Next1次就行了,Next读取一行数据,但是读取的是整列的数据。例如,100亿条数据都有3列:ID,姓名,年龄,原先的循环需要100亿次,而现在采用列式存储,只需要循环3次,因为以列的方式存储,现在的一行就是原来的一列,去掉了原来的虚函数。如果数据规模特别大,即使100亿条数据只有1列姓名,数据也是放在内存中,Tungsten计划可以极大的改善这个过程。
Spark 2.x的优点:擅长CPU密集型的计算。
Spark2.x的弱点:对I/O没有进行太多优化。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐