您的位置:首页 > 其它

spark streaming性能优化

2017-11-12 10:32 337 查看

一 数据接收并行度调优

通过网络接收数据的时候,比如kafka或者flume,会将数据反序列化,并存储在在Spark内存中。如果数据接收成为系统的瓶颈,那么可以考虑并行化接收数据。

1.1除了创建更多输入DStream和Receiver

每一个InputDStream都会在某个Worker上的Executor上启动一个Receiver,该Receiver接收一个数据流。因此可以通过创建多个InputDStream,并且配置他们接收数据源不同的分区数据,达到接收多个数据流的效果。比如说,一个接收两个kafka topic的InputDStream,可以拆分成2个InputDStream,每一个分别接收一个topic的数据。这样就会创建2个Receiver,从而并行的接收数据,进而提升吞吐量。多个DStream可以使用union算子进行联合,从而形成一个新的DStream,后续的操作都可以基于联合的DStream.

intnumStreams
= 5;
List<JavaPairDStream<String,String>>
kafkaStreams= new
ArrayList<JavaPairDStream<String,String>>(numStreams);
for (int i=
0; i <
numStreams; i++) {

    kafkaStreams.add(KafkaUtils.createStream(...));

}
JavaPairDStream<String,String>
unifiedStream= streamingContext.union(kafkaStreams.get(0),kafkaStreams.subList(1,kafkaStreams.size()));
unifiedStream.print();
 

1.2 调整block 时间间隔

我们可以通过调整spark.streaming.blockInterval参数来设置产生一个block的时间间隔,默认是200ms。对于大多数Receiver来说,在将接收到的数据保存到Spark的BlockManager之前,都会将数据根据设置的时间间隔分成构造成不同的block,然后推送给BlockManager存储。每一个批次中block的数量决定了该批次对应的RDD的partition数量,以及针对该RDD执行转换操作的时候创建的task数量,每一个batch对应的task数量大约是
= (batch 时间间隔)/(block时间间隔) ,即batch 时间间隔为1s,block时间间隔为200ms,相当于一个batch会包括5个block,即batch对应的RDD就有5个分区,也就决定task的数量是5.

如何认定batch对应的task数量太少呢?

如果每个batch的task数量低于每台机器的CPU Core数量,那么就说明batch的task数量是不够的,因为所有的CPU资源无法完全被利用起来。

要为batch增加block的数量,那么就减小block interval。然而,推荐的block interval最小值是50ms,如果低于这个数值,那么大量task的启动时间,可能会变成一个性能开销点。

 

1.3 调大task数量

如果并行的task数量不是很多,那么集群资源无法充分利用,举例来说对于分布式的reduce操作,比如reduceByKeyAndWindow或者reduceByKey等操作,默认的并行度是由spark.default. parallelism参数决定的,你可以在reduceByKey操作中传入第二个参数,手动指定并行度,也可以调整全局的spark.default.parallelism参数

如果输入的数据流,如果你觉得分区数目太少,也可以对输入的数据流重新分区,显示对输入数据流执行repartition操作,这样调大分区数目,那么task的数量也就大了

 

 

 

二 任务启动优化

如果每一秒钟启动的task数量太多,假设50个,即1s的batch时间间隔和50ms的block时间间隔。如果发送这些task去worker上的executor,那么性能开销会比较大,这是很难到到毫秒级的延迟了。

2.1task序列化

使用Kryo序列化类库来序列化task,可以减小task的大小从而减少driver发送这些task到各个executor的发送时间,即节省网络资源

 

2.2 执行模式

在standalone模式下运行spark,可以达到更少的task启动时间

 

三 数据序列化优化

数据序列化造成的系统开销可以由序列化格式的优化来减小。在流式场景下,有两种类型的数据需要序列化:

1 输入数据:默认情况下,接收到的输入数据,是存储在Executor内存中,使用的持久化级别是MEMORY_AND_DISK_2.这就意味着数据被序列化字节从而减小GC开销,并且还会复制到其他节点已进行容错。因此Receiver必须反序列化接收到的数据,然后在使用Spark的序列化格式序列化数据

 

2 流式计算操作生成持久化的RDD: 流式计算生成持久化RDD,可能会持久化到内存,这里默认持久化级别就是MEMORY_ONLY_SER,默认就会减小GC开销。

 

四 batch时间间隔优化

batch应该在生成之后就尽可能块的处理掉,对于一个应用来说,可以通过观察Spark UI上batch的处理时间来定。batch的处理时间必须小于batch 时间间隔,假设batch 时间间隔1s, 那么这个批次的处理时间不应超过1s

 

为应用计算正确batch比较好的办法:

给定一个很保守的batch interval,比如5s-10s,以很慢的数据接受速率进行测试,要检查应用是否你跟的上这个数据接收速率,可以检查每一个batch的处理时间的延迟,如果处理时间与batch interval基本吻合,那么应用就是稳定的,否则如果batch调度延迟持续增长,那么就意味着应用无法跟得上这个速率,也就是不稳定的。记住,由于临时性的数据增长导致的暂时的延迟增长,可以合理的,只要延迟情况可以在短时间内恢复即可。

五 内存调优

如果想要使用一个窗口长度为10分钟的window操作,那么集群就必须有足够的内存来保存10分钟内的数据。如果想要使用updateStateByKey来维护许多key的state,那么你的内存资源就必须足够大。反过来说,如果想要做一个简单的map-filter-store操作,那么需要使用的内存就很少。

通常来说,通过Receiver接收到的数据,会使用MEMORY_AND_DISK_SER_2持久化级别来进行存储,因此无法保存在内存中的数据会溢写到磁盘上。而溢写到磁盘上,是会降低应用的性能的。因此,通常是建议为应用提供它需要的足够的内存资源。建议在一个小规模的场景下测试内存的使用量,并进行评估。

 

内存调优的另外一个方面是垃圾回收。对于流式应用来说,如果要获得低延迟,肯定不想要有因为JVM垃圾回收导致的长时间延迟。有很多参数可以帮助降低内存使用和GC开销:

 

5.1DStream的持久化

默认持久化的时候会序列化为字节,与非序列化的方式相比,这会降低内存和GC开销。使用Kryo序列化机制可以进一步减少内存使用和GC开销。如果还要进一步降低内库存是用了,可以进行数据压缩,spark.rdd.compress参数控制(默认false)。

但是CPU资源的消耗可能就大了。

5.2 清理旧数据

默认情况下,所有输入数据和通过DStreamtransformation操作生成的持久化RDD,会自动被清理。Spark Streaming会决定何时清理这些数据,取决于transformation操作类型。

例如,你在使用窗口长度为10分钟内的window操作,Spark会保持10分钟以内的数据,时间过了以后就会清理旧数据。但是在某些特殊场景下,比如Spark SQL和Spark Streaming整合使用时,在异步开启的线程中,使用Spark SQL针对batch RDD进行执行查询。那么就需要让Spark保存更长时间的数据,直到Spark SQL查询结束。可以使用streamingContext.remember()方法来实现。

5.3CMS垃圾回收器

使用并行的mark-sweep垃圾回收机制,被推荐使用,用来保持GC低开销。虽然并行的GC会降低吞吐量,但是还是建议使用它,来减少batch的处理时间(降低处理过程中的GC开销)。如果要使用,那么要在driver端和executor端都开启。

在spark-submit中使用--driver-java-options设置使spark.executor.extra

JavaOptions参数设置-XX:+UseConcMarkSweepGC。

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐