您的位置:首页 > 其它

Spark Streaming 容错机制分析

2018-03-31 23:19 323 查看

Spark容错级别

Driver级别的容错

 
在Driver级别的容错具体为DAG生成的模板,即DStreamGraph,RecevierTracker中存储的元数据信息和JobScheduler中存储的Job进行的进度情况等信息,只要通过checkpoint就可以了,每个Job生成之前进行checkpoint,在Job生成之后再进行checkpoint,如果出错的话就从checkpoint中恢复。

Executor级别的容错

在Executor级别的容错具体为接收数据的安全性和任务执行的安全性。在接收数据安全性方面,一种方式是Spark Streaming接收到数据默认为MEMORY_AND_DISK_2的方式,在两台机器的内存中,如果一台机器上的Executor挂了,立即切换到另一台机器上的Executor,这种方式一般情况下非常可靠且没有切换时间。另外一种方式是WAL(Write Ahead Log),在数据到来时先通过WAL机制将数据进行日志记录,如果有问题则从日志记录中恢复,然后再把数据存到Executor中,再进行其他副本的复制。WAL这种方式对性能有影响,在生产环境中不常用,一般使用Kafka存储,Spark Streaming接收到数据丢失时可以从Kafka中回放。在任务执行的安全性方面,靠RDD的容错。 

Spark Streaming 容错机制

容错的概念

容错 指的是一个系统在部分模块出现故障时还能否持续的对外提供服务,一个高可用的系统应该具有很高的容错性;对于一个大的集群系统来说,机器故障、网络异常等都是很常见的Spark这样的大型分布式计算集群提供了很多的容错机制来提高整个系统的可用性
 

引入

面向大规模数据分析,数据检查点操作成本很高,需要通过数据中心的网络连接在机器之间复制庞大的数据集,而网络带宽往往比内存带宽低得多,同时还需要消耗更多的存储资源。 
因此,Spark选择记录更新的方式。但是,如果更新粒度太细太多,那么记录更新成本也不低。因此,RDD只支持粗粒度转换,即只记录单个块上执行的单个操作,然后将创建RDD的一系列变换序列(每个RDD都包含了他是如何由其他RDD变换过来的以及如何重建某一块数据的信息。因此RDD的容错机制又称“血统(Lineage)”容错)记录下来,以便恢复丢失的分区。
Lineage本质上很类似于数据库中的重做日志(Redo Log),只不过这个重做日志粒度很大,是对全局数据做同样的重做进而恢复数据。

容错原理

在容错机制中,如果一个节点死机了,而且运算窄依赖,则只要把丢失的父RDD分区重算即可,不依赖于其他节点。而宽依赖需要父RDD的所有分区都存在,重算就很昂贵了。可以这样理解开销的经济与否:在窄依赖中,在子RDD的分区丢失、重算父RDD分区时,父RDD相应分区的所有数据都是子RDD分区的数据,并不存在冗余计算。在宽依赖情况下,丢失一个子RDD分区重算的每个父RDD的每个分区的所有数据并不是都给丢失的子RDD分区用的,会有一部分数据相当于对应的是未丢失的子RDD分区中需要的数据,这样就会产生冗余计算开销,这也是宽依赖开销更大的原因。因此如果使用Checkpoint算子来做检查点,不仅要考虑Lineage是否足够长,也要考虑是否有宽依赖,对宽依赖加Checkpoint是最物有所值的,可以节约大量的系统资源。

基于RDD的容错机制

Spark Streaming的容错机制是基于RDD的容错机制
主要表现为:
  1.checkpoint
2 .基于血统(lineage)的高度容错机制
3 .出错了之后会从出错的位置从新计算,而不会导致重复计算

Lineage机制

利用内存加快数据加载,在众多的其它的In-Memory类数据库或Cache类系统中也有实现,Spark的主要区别在于它处理分布式运算环境下的数据容错性(节点实效/数据丢失)问题时采用的方案。
为了保证RDD中数据的鲁棒性,RDD数据集通过所谓的血统关系(Lineage)记住了它是如何从其它RDD中演变过来的。
相比其它系统的细颗粒度的内存数据更新级别的备份或者LOG机制,RDD的Lineage记录的是粗颗粒度的特定数据转换(Transformation)操作(filter, map, join etc.)行为。当这个RDD的部分分区数据丢失时,它可以通过Lineage获取足够的信息来重新运算和恢复丢失的数据分区。这种粗颗粒的数据模型,限制了Spark的运用场合,但同时相比细颗粒度的数据模型,也带来了性能的提升。
 

RDD在Lineage依赖方面分类

Narrow Dependencies与Wide Dependencies用来解决数据容错时的高效性。

Narrow Dependencies

某个具体的RDD,其分区partition a最多子Rdd中一个分区partition b依赖,此种情况只有Map任务, 是不需要发送shuffle过程的, 窄依赖又分为1:1和N:1两种。
 
Narrow Dependencies是指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对应于一个子RDD的分区或多个父RDD的分区对应于一个子RDD的分区,也就是说一个父RDD的一个分区不可能对应一个子RDD的多个分区。

Wide Dependencies

或称为为ShuffleDependency,与Hadoop MR的Shuffle的数据依赖相似,宽依赖需要计算所有父RDD对应分区的数据,然后在节点之间进行shuffle。
 
Wide Dependencies是指子RDD的分区依赖于父RDD的多个分区或所有分区,也就是说存在一个父RDD的一个分区对应一个子RDD的多个分区。对与Wide Dependencies,这种计算的输入和输出在不同的节点上,lineage方法对与输入节点完好,而输出节点宕机时,通过重新计算,这种情况下,这种方法容错是有效的,否则无效,因为无法重试,需要向上其祖先追溯看是否可以重试(这就是lineage,血统的意思),Narrow Dependencies对于数据的重算开销要远小于Wide Dependencies的数据重算开销。
如图
 




分布式数据集容错方式

在RDD计算,通过checkpoint进行容错,做checkpoint有两种方式:
数据检查点(checkpoint data)和记录数据的更新(logging the updates)
用户可以控制采用哪种方式来实现容错,默认是logging the updates方式,通过记录跟踪所有生成RDD的转换(transformations)也就是记录每个RDD的lineage(血统)来重新计算生成丢失的分区数据。

Tast执行任务失败解决方案

 


开始Checkpoint 详解

检查点机制-checkpoint

什么是检查点机制

Spark Streaming 周期性地把应用数据存储到诸如HDFS 或Amazon S3 这样的可靠存储系统中以供恢复时使用的机制叫做检查点机制

举例

·      数据库 checkpoint 过程中一般把内存中的变化进行持久化到物理页, 这时候就可以斩断依赖链, 就可以把 redo 日志删掉了, 然后更新下检查点,
·      hdfs namenode 的元数据 editlog, Secondary namenode 会把 edit log 应用到 fsimage, 然后刷到磁盘上, 也相当于做了一次 checkpoint, 就可以把老的 edit log 删除了。
·     spark streaming 中对于一些 有状态的操作, 这在某些 stateful 转换中是需要的,在这种转换中,生成 RDD 需要依赖前面的 batches,会导致依赖链随着时间而变长。为了避免这种没有尽头的变长,要定期将中间生成的 RDDs 保存到可靠存储来切断依赖链, 必须隔一段时间进行一次进行一次 checkpoint。
· 
·     cache 和 checkpoint 是有显著区别的, 缓存把 RDD 计算出来然后放在内存中, 但是RDD 的依赖链(相当于数据库中的redo 日志), 也不能丢掉, 当某个点某个 executor 宕了, 上面cache 的RDD就会丢掉, 需要通过 依赖链重放计算出来, 不同的是, checkpoint 是把 RDD 保存在 HDFS中, 是多副本可靠存储,所以依赖链就可以丢掉了,就斩断了依赖链, 是通过复制实现的高容错。但是有一点要注意, 因为checkpoint是需要把 job 重新从头算一遍, 最好先cache一下, checkpoint就可以直接保存缓存中的 RDD 了, 就不需要重头计算一遍了, 对性能有极大的提升。

检查点机制的作用

检查点本质:通过将RDD写入Disk做检查点。
控制发生失败时需要重算的状态数
Spark  Streaming通过lineage重算,检查点机制则可以控制需要在lineage中回溯多远提供驱动器程序容错
如果流计算应用中的驱动器程序崩溃了,你可以重启驱动器程序,并让驱动器程序从检查点   恢复,这样SparkStreaming就可以读取之前运行的程序处理数据的进度,并从那里继续。
即是 是为了通过Lineage做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后节点出现问题而丢失分区,从做检查点的RDD开始重做lineage,就会减少开销
 

checkpoint 两种类型的数据

Metadata(元数据) checkpointing - 保存定义了 Streaming 计算逻辑至类似 HDFS 的支持容错的存储系统。用来恢复 driver,元数据包括:
配置 - 用于创建该 streaming application 的所有配置
DStream 操作 - DStream 一些列的操作
未完成的 batches - 那些提交了 job 但尚未执行或未完成的 batches RDD Data checkpointing - 保存已生成的RDDs至可靠的存储
metadata checkpointing 主要用来恢复 driver;而 RDD数据的 checkpointing 对于
stateful 转换操作是必要的。
对于window和stateful操作必须checkpoint(Spark Streaming会检查并给出提示)
通过StreamingContext的checkpoint来指定目录,默认按照batch Duration来做checkpoint
通过DStream的checkpoint指定当前DStream的间隔时间间隔必须是slide interval的倍数。

检查点机制-checkpoint的形式

checkpoint  的形式是将类  Checkpoint的实例序列化后写入外部存储,值得一提的是,有专门的一个线程来做将序列化后的 checkpoint 写入外部存储的操作。类 Checkpoint 包含以下数据:
除了 Checkpoint 类,还有 CheckpointWriter 类用来导出 checkpoint,
CheckpointReader 用来导入



checkpoint 检查点机制-checkpoint的局限

Spark Streaming 的 checkpoint 机制看起来很美好,却有一个硬伤。
前面提到最终刷到外部存储的是类 Checkpoint 对象序列化后的数据。那么在 Spark Streaming application 重新编译后,再去反序列化 checkpoint 数据就会失败。这个时候就必须新建 StreamingContext。
针对这种情况,在我们结合 Spark Streaming + kafka 的应用中,需要自行维护了消费的
offsets,这样一来即使重新编译 application,还是可以从需要的 offsets 来消费数据。对于其他的情况需要大家根据实际需求自行处理

Driver节点容错

如果你想让你的  application能从driver失败中恢复,你的application要满足: 若 application 为首次重启,将创建一个新的 StreamContext 实例
如果 application 是从失败中重启,将会从 checkpoint 目录导入 checkpoint 数据来重新创建 StreamingContext 实例
 


除调用 getOrCreate 外, 你还需要编写在驱动器程序崩溃时重启驱动器进程的代码。
在 yarn 模式下,driver 是运行在 ApplicationMaster 中,若 ApplicationMaster 挂掉,
yarn 会自动在另一个节点上启动一个新的 ApplicationMaster。Spark standalone模式下:
./bin/spark-submit --deploy-mode cluster --supervise --master spark://... App.jar

Worker节点容错

为了应对工作节点失败的问题,Spark Streaming 使用与Spark 的容错机制相同的方法,根据不同的输入源,分两种情况:

使用可靠输入源—如HDFS

由于输入数据是可靠的,所有数据都可以重新计算,因此不会丢失数据 使用基于网络接收的输入源—例如Kafka、Flume等
接收到的数据会在集群的不同节点间复制(默认复本数为2)
一个工作节点失效,在恢复时可以从另一个工作节点的数据中重新计算
如果是接收数据的工作节点失效,那就可能丢失数据(数据已经收到但是还未复制到其他节   点,也没有处理完是失效了)
 
处理保证所有转换操作为精确一次保证(Spark Streaming 工作节点的容错保障)
输出操作在把结果保存到外部存储时,写结果的任务可能因故障而执行多次,一些数据可能   也就被写了多次
可以使用事务操作来写入外部系统(即原子化地将一个RDD分区一次写入),或者设计幂等   的更新操作(即多次运行同一个更新操作仍生成相同的结果)。比如Spark Streaming 的saveAs...File 操作会在一个文件写完时自动将其原子化地移动到最终位置上,以此确保每个输出文件只存在一份。
 

checkpoint 正确使用简单举例



使用很简单, 就是设置一下 checkpoint 目录,然后再rdd上调用 checkpoint 方法, action 的时候就对数据进行了 checkpoint。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐