您的位置:首页 > 其它

spark1.6.1学习笔记01-spark调优

2017-05-09 10:46 190 查看
  spark调优主要在两个方面进行考虑,一个是数据序列化,另一个是内存管理。

  1、从数据序列化角度进行优化

  spark提供了两个序列化库。默认情况下Spark使用Java的ObjectOutputStream框架,它可以作用于任何实现了java.io.Serializable接口的类。通过实现java.io.Externalizable接口可以更精细地控制序列化过程。Java序列化非常的灵活, 但也非常慢。Spark还提供了Kryo库(version 2)来实现快速序列化。Kryo的速度是Java序列化的10倍以上,但是它不支持所有可序列化类型,它要求在程序中使用到的序列化类型需要提前被注册。

  在SparkConf中可以切换使用Kryo进行序列化,如:

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").
  如此一来,spark将会在shuffle操作时和将rdd写入硬盘时使用Kyro方式进行序列化。Spark自动注册了很多scala常用类,这些类来自Twitter chill库的AllScalaRegistrar。在Kryo中注册自己的类的方法是:
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
  如果你的对象非常大,那么需要配置spark.kryoserializer.buffer,这个缓存需要大到足够存放你所需的最大的对象。即使不在kryo进行注册,spark依然会工作,只是它将会存储每个对象的全类名,这将会非常浪费。

  2、内存管理

  内存管理有三方面需要考虑,对象使用的内存、访问这些对象时消耗的内存,GC使用的内存。

  访问java对象需要消耗的内存大概是被访问对象占用内存的2-5倍。这是因为:

    (1)每个java对象都有一个header,它是一个指向类的指针,大小有16bytes

    (2)Java的String对象有大概40bytes的多余消耗,String对象存储在一个char的数组中,且需要额外的空间来存储长度。另外存储每个char需要2个bytes。这样10个字符的string对象需要消耗60bytes。

    (3)Collection类中很多类型不仅包含之前提到的header,还有pointer指向下一个对象。例如LinkedList,HashMap等等。

    (4)基本类型常常会转化为包装类型使用。

  Spark中的内存消耗主要来自于执行和存储。执行内存指用于shuffles、joins、sorts、aggregations的内存;存储内存指缓存cache和在集群中传播内部数据的内存。执行和存储共享统一的一块内存(M)。执行内存在必要时可能会挤占存储内存,直到存储内存减少到一个阈值(R)。R是M中永远不会被执行内存挤占的部分,它只属于存储内存。相反在执行消耗的内存很小时,存储内存却可以占用整个M的空间,单存储内存永远不会挤占执行内存。

  基于以上设计,不使用缓存的应用可以占用所有的硬盘空间用于执行,这样避免了不必要的硬盘溢出。其次,不使用缓存的应用保留一小部分内存空间(R)。最后这样的设计让使用者不必费心于内存分配。spark提供了两个相关的配置,但实际默认的配置就已经适用于大部分的工作了:

  spark.memory.fraction指定M占用JVM heap size的比例,默认值0.75。剩下的25%用于存储用户数据结构、spark内部元数据,并作为内存溢出时的安全防护。

  spark.memory.storageFraction指定R占用M的比例。

  Spark提供两种方式用来评估对象的内存大小。首先可以将其转化为rdd,并在spark驱动程序的storage页查看内存大小。另一种方式是使用SizeEstimator的estimate方法得到内存大小,然后在控制台打印。

  下面正式开始介绍如何进行内存管理优化了。。。。。。

  优化数据结构:

    总的原则是避免使用各种包装类,具体方法包括

    (1)尽量使用数组和基本类型,少用collection类。fastutil库为基本类型提供了便利的collection类,它们与标准java类库是兼容的。

    (2)避免使用包含很多小对象和指针的嵌套的数据结构。

    (3)用整数或枚举来做ID,避免使用String。

    (4)如果你的内存少于32GB,使用JVM参数
-XX:+UseCompressedOops
将pointer的长度由8bytes改到4bytes。可以在spark-env.sh中增加这些参数。

  序列化RDD存储:

    使用序列化方式存储rdd可以降低内存的使用。在rdd persistence API中指定序列化存储级别(例如MEMORY_ONLY_SER)。Spark将RDD的每个partition作为一个很大的byte数组进行存储。但这样会增加数据的访问时间,因为需要反序列化。因此我们也强烈推荐使用Kryo进行序列化,它比java序列化高效的多~

  GC优化:

    GC消耗往往和对象的数目成正相关,因此减少使用包装类可以减少GC消耗。不过最该尝试的还是序列化存储,因为这样数据只用一个对象保存了。

    GC的另一个问题来自计算节点工作内存(执行作业的内存)和保存RDD的内存之间的相互干扰。

    (1)评估GC的影响:

      首先收集GC的频率和时间,通过添加-verbose:gc -XX:+PrintGCDetails -XX:PrintGCTimeStamps参数可以达到这一目的。GC的相关信息将会出现在worker的日志中(stdout文件)。

    (2)高级GC优化

      在开始高级GC优化之前需要先熟悉JAVA的内存管理方式

      java heap space被划分为两块Young和Old。Young存放短期存在对象,Old存放长期存在的对象。

      Young又可以进一步划分为三块(Eden,Survivor1和Survivor2)

      GC的过程大致可以描述为:当Eden满了以后,一个微型的GC将会在Eden上执行,从Eden和Survivor1中幸存的对象会被复制到Survivor2.Survivors会进行一次调换(1变成2,2变成1)。若一个对象足够“老”,或者Survivor2满了,那么对象将会被复制到Old。最后如果Old满了,那么一个Full GC将会执行。

      GC优化的目的在于保证只有长期存在的RDD会进入Old,Young的大小足够保存短期存在的对象。这样就可以避免full GC会收集任务执行过程中产生的临时对象(它们只存在于YOUNG里)。一些有用的方法包括:

      <1>查看gc状态,如果某个任务完成前执行了多次full GC,说明内存不够执行任务。

      <2>如果Old接近于满负荷,可以降低缓存使用的内存,方法是减低参数spark.memory.storageFraction。

      <3>如果有很多微型GC但是很少有full GC,那么可以提升Eden的大小,使其超过每次任务所需的内存数。如果Eden的大小为E,那么Young的大小要设置为4/3E。命令行为-Xmn=4/3*E

      <4>若从HDFS读取数据,那么内存大小可以用HDFS的文件大小做评估。解压后的block大小大概是解压前的2-3倍。如果你希望有3-4个任务的工作空间,而且HDFS的block大小是64MB,可以评估Eden的大小大概是4*3*64MB。

      <5>监控GC状态以便调整配置。

    关于GC的优化的细节,可以参考这里

  3、其他优化方法

    (1)并行的程度

    在你设置每个操作的并行级别之前,spark集群将不会完全的利用计算资源。Spark根据读入文件的大小来决定map操作会被拆分为几个子任务(但是也可以通过SparkContext.textFile参数进行控制)。对于reduce操作,例如groupByKey和reduceByKey,Spark会使用最大的父辈RDD的partitions数目。可以在执行操作时将并行级别用第二个参数传递进去,或者配置参数spark.default.parallelism类修改默认值。我们推荐每个CPU执行2-3个任务。

    (2)Reduce任务中内存的使用

    有时会因为执行任务造成内存溢出,比如groupByKey这个reduce操作。Spark的shuffle操作(sortByKey、groupByKey,reduceByKey,join等等)会在每个任务中创建一个hash table来处理分组操作(group),这个hash table往往会很大。简单的处理方法是增大并行的程度,这样每个任务的输入将会较小。Spark可以高效的支持短于200毫秒的任务,因为它重用了一个执行的虚拟机,这样每次执行任务的开销是很低的。可以将并行的程度设置的高于集群的内核数。

    (3)广播大型变量

    使用SparkContext提供的广播方法可以极大地减少每个序列化任务的大小,和集群启动一个作业的消耗。如果你的任务使用到来自驱动程序的大型变量(例如一个静态的用于查找的表),可以考虑将它包装进广播变量。Spark会在主节点打印序列化后的任务的大小,你可以根据这个来判断任务是否过大。一般情况下,如果任务大于20KB就需要作出相应的调整了。

    (4)数据本地化

    如果数据和操作数据的代码之间的距离很近,那么执行将会很高效。否则将会需要通过网络进行一些转移操作。一般,对序列化后的代码进行转移要不对数据集进行转移要划算的多。Spark也是基于此来对数据本地化进行调度的。

    数据与操作数据的代码之间的“距离”分为几个级别,按照从近到远可以做如下的排序:

      <1>PROCESS_LOCAL,数据和操作数据的代码在同一个JVM。

      <2>NODE_LOCAL,数据和操作数据的代码在同一个节点。例如在同一个节点的HDFS中,或在同一个节点的另一个executor中。

      <3>NO_PREF,数据可以从别的地方很快的获取,但是不是本地。

      <4>RACK_LOCAL,数据存放在同一个机架上的其他服务器中。需要通过网路转移,典型的是通过交换。

      <5>ANY,数据存放在网络中的任何地方,且不在同一个机架上。

    Spark会尝试将所有的任务调度到里数据最近的地方执行,但这并不总能实现。有时一些空闲的executor没有未被操作的本地数据,这是Spark将会切换到更低的数据本地化级别。有两种可选的方法:1)在数据所在的本地进行等待,直到有可用的cpu来开启新的任务;2)立即在有空闲cpu的某处开启新的任务,并将数据转移到那个地方。

    Spark通常会等待一段时间以期数据所在的本地会出现空闲的CPU,如果超过这个时间,它会开始将数据转移到存在空闲CPU的某处。在各个不同的“距离”进行切换的等待时间是可以单独配置的,不过也可以统一配置一个时间间隔。详情参考spark.locality参数的配置。如果你的任务通常耗时很长,而且数据本地化程度很低,那么可以增加这些配置项。不过默认值一般都会比较合适。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark