sparkstreaming优化
2017-08-27 18:10
24 查看
一、合理的Kafka拉取量:设置Spark Streaming最大数据接收率 - 如果运行Streaming应用程序的资源不是很多,数据处理能力跟不上接收数据的速率,可以为应用程序设置一个每秒最大接收记录数进行限制。对于Receiver模式的应用,设置spark.streaming.receiver.maxRate,对于Direct Kafka模式,设置spark.streaming.kafka.maxRatePerPartition限制从每个Kafka的分区读取数据的速率。假如某个Topic有8个分区,spark.streaming.kafka.maxRatePerpartition=100,那么每个batch最大接收记录为800。从Spark-1.5版本开始,引入了一个backpressure的机制来避免设置这个限制阈值。Spark
Streaming会自动算出当前的速率限制,并且动态调整这个阈值。通过将spark.streaming.backpressure.enabled为true开启backpressure功能。
需要注意一个参数spark.streaming.blockInterval。对于Receiver来说,接收到的数据在保存到Spark内存中之前,会以block的形式汇聚到一起。每个Batch中block的个数决定了程序运行时处理这些数据的task的个数。每一个receiver的每一个batck对应的task个数大致为(batch时间间隔 / block时间间隔)。比如说对于一个2m的batch,如果block时间间隔为200ms那么,将会有10个task。如果task的数量太少,对数据的处理就不会很高效。在batch时间固定的情况下,如果想增大task个数,那么就需要降低blockInterval参数了,这个参数默认值为200ms,官方建议的该参数下限为50ms,如果低于50ms可能会引起其他问题。
另一个提高数据并发处理能力的方法是显式的对接收数据重新分区,inputStream.repartition(<number of partitions>)
二、设置合理的批处理时间
batchDuration所产生的Job并不能在这期间完成处理,那么就会造成数据不断堆积,最终导致Spark Streaming发生阻塞。而且,一般对于batchDuration的设置不会小于500ms,因为过小会导致SparkStreaming频繁的提交作业,对整个streaming造成额外的负担。在平时的应用中,根据不同的应用场景和硬件配置,我设在1~10s之间,我们可以根据SparkStreaming的可视化监控界面,观察Total Delay来进行batchDuration的调整
三、设置合理的cpu资源数
CPU的core数量,每个executor可以占用一个或多个core,可以通过观察CPU的使用率变化来了解计算资源的使用情况,例如,很常见的一种浪费是一个executor占用了多个core,但是总的CPU使用率却不高(因为一个executor并不总能充分利用多核的能力),这个时候可以考虑让么个executor占用更少的core,同时worker下面增加更多的executor,或者一台host上面增加更多的worker来增加并行执行的executor的数量,从而增加CPU利用率。但是增加executor的时候需要考虑好内存消耗,因为一台机器的内存分配给越多的executor,每个executor的内存就越小,以致出现过多的数据spill
over甚至out of memory的情况。
四、设置合理的parallelism
partition和parallelism,partition指的就是数据分片的数量,每一次task只能处理一个partition的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多executor的计算能力无法利用充分;但是如果太大了则会导致分片太多,执行效率降低。在执行action类型操作的时候(比如各种reduce操作),partition的数量会选择parent RDD中最大的那一个。而parallelism则指的是在RDD进行reduce类操作的时候,默认返回数据的paritition数量(而在进行map类操作的时候,对于reduceByKey和reduceByKeyAndWindow操作来说,并行task个数由参数spark.default.parallelism来控制。partition数量通常取自parent
RDD中较大的一个,而且也不会涉及shuffle,因此这个parallelism的参数没有影响)。所以说,这两个概念密切相关,都是涉及到数据分片的,作用方式其实是统一的。通过spark.default.parallelism可以设置默认的分片数量,而很多RDD的操作都可以指定一个partition参数来显式控制具体的分片数量。
在SparkStreaming+kafka的使用中,我们采用了Direct连接方式,前文阐述过Spark中的partition和Kafka中的Partition是一一对应的,我们一般默认设置为Kafka中Partition的数量。
五、缓存经常使用的数据
对一些经常使用到的数据,我们可以显式地调用rdd.cache()来缓存数据,这样也可以加快数据的处理,但是我们需要更多的内存资源。
六、清除旧数据
配置选项spark.streaming.unpersist为true(默认就是true),减少Spark RDD的内存使用,也可能改善垃圾回收的行为。
默认情况下,所有输入数据和DStream通过不同的transformations持久化的数据都会自动进行清理。Spark Streaming根据transformations的不同来决定哪些数据需要被清理掉。例如,当使用10分钟的窗口函数时,Spark Streaming会保存最少10分钟的数据。想要数据保存更长时间,可以设置streamingContext.remenber参数。
七、尽量保证每个work节点中的数据不要落盘,以提升执行效率。
八、设置合理GC
GC是程序中最难调的一块,不合理的GC行为会给程序带来很大的影响。在集群环境下,我们可以使用并行Mark-Sweep垃圾回收机制,虽然这个消耗更多的资源,但是我们还是建议开启。可以如下配置:
spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC
Streaming会自动算出当前的速率限制,并且动态调整这个阈值。通过将spark.streaming.backpressure.enabled为true开启backpressure功能。
需要注意一个参数spark.streaming.blockInterval。对于Receiver来说,接收到的数据在保存到Spark内存中之前,会以block的形式汇聚到一起。每个Batch中block的个数决定了程序运行时处理这些数据的task的个数。每一个receiver的每一个batck对应的task个数大致为(batch时间间隔 / block时间间隔)。比如说对于一个2m的batch,如果block时间间隔为200ms那么,将会有10个task。如果task的数量太少,对数据的处理就不会很高效。在batch时间固定的情况下,如果想增大task个数,那么就需要降低blockInterval参数了,这个参数默认值为200ms,官方建议的该参数下限为50ms,如果低于50ms可能会引起其他问题。
另一个提高数据并发处理能力的方法是显式的对接收数据重新分区,inputStream.repartition(<number of partitions>)
二、设置合理的批处理时间
batchDuration所产生的Job并不能在这期间完成处理,那么就会造成数据不断堆积,最终导致Spark Streaming发生阻塞。而且,一般对于batchDuration的设置不会小于500ms,因为过小会导致SparkStreaming频繁的提交作业,对整个streaming造成额外的负担。在平时的应用中,根据不同的应用场景和硬件配置,我设在1~10s之间,我们可以根据SparkStreaming的可视化监控界面,观察Total Delay来进行batchDuration的调整
三、设置合理的cpu资源数
CPU的core数量,每个executor可以占用一个或多个core,可以通过观察CPU的使用率变化来了解计算资源的使用情况,例如,很常见的一种浪费是一个executor占用了多个core,但是总的CPU使用率却不高(因为一个executor并不总能充分利用多核的能力),这个时候可以考虑让么个executor占用更少的core,同时worker下面增加更多的executor,或者一台host上面增加更多的worker来增加并行执行的executor的数量,从而增加CPU利用率。但是增加executor的时候需要考虑好内存消耗,因为一台机器的内存分配给越多的executor,每个executor的内存就越小,以致出现过多的数据spill
over甚至out of memory的情况。
四、设置合理的parallelism
partition和parallelism,partition指的就是数据分片的数量,每一次task只能处理一个partition的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多executor的计算能力无法利用充分;但是如果太大了则会导致分片太多,执行效率降低。在执行action类型操作的时候(比如各种reduce操作),partition的数量会选择parent RDD中最大的那一个。而parallelism则指的是在RDD进行reduce类操作的时候,默认返回数据的paritition数量(而在进行map类操作的时候,对于reduceByKey和reduceByKeyAndWindow操作来说,并行task个数由参数spark.default.parallelism来控制。partition数量通常取自parent
RDD中较大的一个,而且也不会涉及shuffle,因此这个parallelism的参数没有影响)。所以说,这两个概念密切相关,都是涉及到数据分片的,作用方式其实是统一的。通过spark.default.parallelism可以设置默认的分片数量,而很多RDD的操作都可以指定一个partition参数来显式控制具体的分片数量。
在SparkStreaming+kafka的使用中,我们采用了Direct连接方式,前文阐述过Spark中的partition和Kafka中的Partition是一一对应的,我们一般默认设置为Kafka中Partition的数量。
五、缓存经常使用的数据
对一些经常使用到的数据,我们可以显式地调用rdd.cache()来缓存数据,这样也可以加快数据的处理,但是我们需要更多的内存资源。
六、清除旧数据
配置选项spark.streaming.unpersist为true(默认就是true),减少Spark RDD的内存使用,也可能改善垃圾回收的行为。
默认情况下,所有输入数据和DStream通过不同的transformations持久化的数据都会自动进行清理。Spark Streaming根据transformations的不同来决定哪些数据需要被清理掉。例如,当使用10分钟的窗口函数时,Spark Streaming会保存最少10分钟的数据。想要数据保存更长时间,可以设置streamingContext.remenber参数。
七、尽量保证每个work节点中的数据不要落盘,以提升执行效率。
八、设置合理GC
GC是程序中最难调的一块,不合理的GC行为会给程序带来很大的影响。在集群环境下,我们可以使用并行Mark-Sweep垃圾回收机制,虽然这个消耗更多的资源,但是我们还是建议开启。可以如下配置:
spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC
相关文章推荐
- Spark视频王家林第119课: Spark Streaming性能优化:如何在生产环境下应对流数据峰值巨变?
- spark调优(五)【Cigna优化Spark Streaming实时处理应用】
- Spark Streaming实践和优化
- 第116课: Spark Streaming性能优化:如何在毫秒内处理处理大吞吐量的和数据波动比较大 的程序
- Spark Streaming实践和优化
- spark Streaming的Receiver和Direct的优化对比
- 大数据IMF传奇行动绝密课程第120课:Spark Streaming性能优化:如何在End-to-End生产环境下安全高效地把结果数据存入HBase中
- streaming优化:spark.default.parallelism调整处理并行度
- 第121课: Spark Streaming性能优化:通过摄像头图像处理案例来说明Spark流处理性能评估新方法及性能调优参数测试
- 第121课:Spark Streaming性能优化:通过摄像头图像处理案例来说明Spark流处理性能评估新方法及性能调优参数调试
- Spark Streaming 流计算优化记录(5)-分区与内存的优化
- Spark Streaming 流计算优化记录(6)-GC优化与shuffle service
- Spark Streaming 流计算优化记录(1)-背景介绍
- 第122课:Spark Streaming性能优化:Spark Streaming处理分布式拒绝服务器案例及性能优化
- spark streaming性能优化
- Spark Streaming 流计算优化记录(2)-不同时间片数据流的Join
- Spark Streaming 数据产生与导入相关的内存分析及优化
- 第123课:Spark Streaming性能优化:通过Spark Streaming发现botnet及性能优化
- spark streaming性能优化
- Spark Streaming 流计算优化记录(4)-时间都去哪儿了,关于调度与空转