您的位置:首页 > 其它

sparkstreaming优化

2017-08-27 18:10 24 查看
  一、合理的Kafka拉取量:设置Spark Streaming最大数据接收率 - 如果运行Streaming应用程序的资源不是很多,数据处理能力跟不上接收数据的速率,可以为应用程序设置一个每秒最大接收记录数进行限制。对于Receiver模式的应用,设置spark.streaming.receiver.maxRate,对于Direct Kafka模式,设置spark.streaming.kafka.maxRatePerPartition限制从每个Kafka的分区读取数据的速率。假如某个Topic有8个分区,spark.streaming.kafka.maxRatePerpartition=100,那么每个batch最大接收记录为800。从Spark-1.5版本开始,引入了一个backpressure的机制来避免设置这个限制阈值。Spark
Streaming会自动算出当前的速率限制,并且动态调整这个阈值。通过将spark.streaming.backpressure.enabled为true开启backpressure功能。

需要注意一个参数spark.streaming.blockInterval。对于Receiver来说,接收到的数据在保存到Spark内存中之前,会以block的形式汇聚到一起。每个Batch中block的个数决定了程序运行时处理这些数据的task的个数。每一个receiver的每一个batck对应的task个数大致为(batch时间间隔 / block时间间隔)。比如说对于一个2m的batch,如果block时间间隔为200ms那么,将会有10个task。如果task的数量太少,对数据的处理就不会很高效。在batch时间固定的情况下,如果想增大task个数,那么就需要降低blockInterval参数了,这个参数默认值为200ms,官方建议的该参数下限为50ms,如果低于50ms可能会引起其他问题。 

  另一个提高数据并发处理能力的方法是显式的对接收数据重新分区,inputStream.repartition(<number of partitions>)

二、设置合理的批处理时间

batchDuration所产生的Job并不能在这期间完成处理,那么就会造成数据不断堆积,最终导致Spark Streaming发生阻塞。而且,一般对于batchDuration的设置不会小于500ms,因为过小会导致SparkStreaming频繁的提交作业,对整个streaming造成额外的负担。在平时的应用中,根据不同的应用场景和硬件配置,我设在1~10s之间,我们可以根据SparkStreaming的可视化监控界面,观察Total Delay来进行batchDuration的调整

三、设置合理的cpu资源数

CPU的core数量,每个executor可以占用一个或多个core,可以通过观察CPU的使用率变化来了解计算资源的使用情况,例如,很常见的一种浪费是一个executor占用了多个core,但是总的CPU使用率却不高(因为一个executor并不总能充分利用多核的能力),这个时候可以考虑让么个executor占用更少的core,同时worker下面增加更多的executor,或者一台host上面增加更多的worker来增加并行执行的executor的数量,从而增加CPU利用率。但是增加executor的时候需要考虑好内存消耗,因为一台机器的内存分配给越多的executor,每个executor的内存就越小,以致出现过多的数据spill
over甚至out of memory的情况。

四、设置合理的parallelism

partition和parallelism,partition指的就是数据分片的数量,每一次task只能处理一个partition的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多executor的计算能力无法利用充分;但是如果太大了则会导致分片太多,执行效率降低。在执行action类型操作的时候(比如各种reduce操作),partition的数量会选择parent RDD中最大的那一个。而parallelism则指的是在RDD进行reduce类操作的时候,默认返回数据的paritition数量(而在进行map类操作的时候,对于reduceByKey和reduceByKeyAndWindow操作来说,并行task个数由参数spark.default.parallelism来控制。partition数量通常取自parent
RDD中较大的一个,而且也不会涉及shuffle,因此这个parallelism的参数没有影响)。所以说,这两个概念密切相关,都是涉及到数据分片的,作用方式其实是统一的。通过spark.default.parallelism可以设置默认的分片数量,而很多RDD的操作都可以指定一个partition参数来显式控制具体的分片数量。

在SparkStreaming+kafka的使用中,我们采用了Direct连接方式,前文阐述过Spark中的partition和Kafka中的Partition是一一对应的,我们一般默认设置为Kafka中Partition的数量。

五、缓存经常使用的数据

对一些经常使用到的数据,我们可以显式地调用rdd.cache()来缓存数据,这样也可以加快数据的处理,但是我们需要更多的内存资源。

六、清除旧数据

配置选项spark.streaming.unpersist为true(默认就是true),减少Spark RDD的内存使用,也可能改善垃圾回收的行为。

默认情况下,所有输入数据和DStream通过不同的transformations持久化的数据都会自动进行清理。Spark Streaming根据transformations的不同来决定哪些数据需要被清理掉。例如,当使用10分钟的窗口函数时,Spark Streaming会保存最少10分钟的数据。想要数据保存更长时间,可以设置streamingContext.remenber参数。

七、尽量保证每个work节点中的数据不要落盘,以提升执行效率。






八、设置合理GC

GC是程序中最难调的一块,不合理的GC行为会给程序带来很大的影响。在集群环境下,我们可以使用并行Mark-Sweep垃圾回收机制,虽然这个消耗更多的资源,但是我们还是建议开启。可以如下配置:

spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐