您的位置:首页 > 运维架构 > 网站架构

第三课解密SparkStreaming运行机制和架构进阶

2016-05-24 01:10 441 查看
内容:

1. 解密Spark Streaming Job架构和运行机制

2. 机密Spark Streaming容错架构和运行机制

一、SparkStreaming Job架构与运行机制

为了更好的理解SparkStreaming Job架构,我们先来看一个例子:

package com.dt.spark.Streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
*使用foreachRDD将Spark Streaming处理的结果放到DataBase中
* @author DT大数据梦工厂
* 新浪微博:http://weibo.com/ilovepains/
* Created by pc-Hipparic on 2016/5/22.
*/
object OnlineForeachRDD2DB {
def main(args: Array[String]) {
val conf = new SparkConf()
conf.setAppName("OnlineforeachRDD2DB").setMaster(“local[6]”)
//设置batchDuration时间间隔来控制Job生成的频率并且创建Spark Streaming执行的入口
val ssc = new StreamingContext(conf, Seconds(5))
val lines = ssc.socketTextStream("Master",9999)
//设计处理逻辑
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x,1)).reduceByKey(_ + _)
//调用foreachRDD进行数据持久化操作
wordCounts.foreachRDD(rdd =>
rdd.foreachPartition{ partitionOfRecords => {
//调用简单实现的数据库连接线程池
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => {
val sql = "insert into streaming_itemcount(word,count) values('" + record._1 + "'," + record._2 + ")"
val stmt = connection.createStatement()
stmt.executeUpdate(sql);
})
ConnectionPool.returnConnection(connection)
}
})
/**
*在StreamingContext#start内部会调用JobScheduler#star,进行消息循环,在
*JobScheduler#start内部会构造JobGenerator和ReceiverTacker,并调用JobGenerator#start和
*ReceiverTacker#start(见2)
*/
ssc.start()
ssc.awaitTermination()
}
}


这里,我们将数据持久化到了mysql中。

结果如下:





为了更好了解每一步以及涉及的核心组件,来看下日志信息。

16/05/24 01:07:46 INFO scheduler.ReceiverTracker: All of the receivers have deregistered successfully
16/05/24 01:07:46 INFO scheduler.ReceiverTracker: ReceiverTracker stopped
16/05/24 01:07:46 INFO scheduler.JobGenerator: Stopping JobGenerator immediately
16/05/24 01:07:46 INFO util.RecurringTimer: Stopped timer for JobGenerator after time 1464023265000
16/05/24 01:07:46 INFO scheduler.JobGenerator: Stopped JobGenerator
16/05/24 01:07:46 INFO scheduler.JobScheduler: Stopped JobScheduler
16/05/24 01:07:46 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/streaming,null}
//省略部分日志


这里可以看见,在SparkStreaming程序被停止时,有Receviver被停掉,还有ReceiveTracker、JobGenerator、RecurringTime、Scheduler等组件一起控制Spark Streaming。

接下来我们再来看看上节课的SparkStreaming的运行图:





正如上节课所述,随着时间的流逝,SparkStreaming会不断的生成Job,那么问题来了,这些Job是怎么生成的呢?

在StreamingContext中,首先会调用重载构造器,然后在重载构造其中对一些成员进行初始化操作。

例如graph(DStream Graph):

private[streaming] val graph: DStreamGraph = {
//如果进行了Checkpoint操作,则从相应的目录进行恢复
if (isCheckpointPresent) {
cp_.graph.setContext(this)
cp_.graph.restoreCheckpointData()
cp_.graph
} else {
//否则构建DstreamGraph对象,并设置BatchDuration(时间间隔)
require(batchDur_ != null, "Batch duration for StreamingContext cannot be null")
val newGraph = new DStreamGraph()
newGraph.setBatchDuration(batchDur_)
newGraph
}
}


Scheduler(作业调度器)
//创建JobScheduler对象。
private[streaming] val scheduler = new JobScheduler(this)


progressListener(事件监听器)

private[streaming] val progressListener = new StreamingJobProgressListene

StreamingSource等等。

/* Initializing a streamingSource to register metrics */
private val streamingSource = new StreamingSource(this)

但是,我们在查看这里的源代码中没有发现与作业真正进行运行调度的代码,说明此事作业(Job)还未真正开始运行。

返回我们所写的代码,发现在StreamingContext初始化后,就是我们的业务逻辑的代码了,这是我们所书写的业务逻辑即数据处理方式步骤,这其中并没有action,它也未触发作业的运行。那么,作业的真正运行只能是最后一部分的代码了,即

// OnlineForeachRDD2DB#main
ssc.start()
ssc.awaitTermination()

为了验证这个假设,来看看ssc#start方法的源码:

//启动Spark Streaming的执行过程
def start(): Unit = synchronized {
state match {
case INITIALIZED =>
startSite.set(DStream.getCreationSite())
StreamingContext.ACTIVATION_LOCK.synchronized {//省略部分代码
//调用scheduler#start方法
scheduler.start()  //1
}
state = StreamingContextState.ACTIVE
} //省略部分代码
}


//JobScheduler#start            1处被调用
def start(): Unit = synchronized {
//若消息循环器不为空,表示SparkStreaming已启动,直接返回
if (eventLoop != null) return
logDebug("Starting JobScheduler")
//创建消息循环器,用来接收消息
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
}
//启动消息循环器,开始接收消息
eventLoop.start()

// 控制批量接收数据,来更新完成的Batch
for {
inputDStream <- ssc.graph.getInputStreams
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)
//启动监听器
listenerBus.start(ssc.sparkContext)
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
//调用receiveTracker#start启动receiverTracker
//首先它会在Spark Cluster中启动Receiver(其实是在Executor中先启动ReceiverSupervisor来
//启动Receiver)
receiverTracker.start()     //2
//启动JobGenerator,JobGenerator启动后会不断根据batchDuration生成一个个Job
jobGenerator.start()
logInfo("Started JobScheduler")
}


从这里开始,作业被启动了,此时Spark Streaming便可以依据时间间隔(Batch Interval)来产生作业了。

通过我们对案例的深入研究发现,对于Spark Streaming来讲,需要关注以下几方面(组件):

(1)JobGenerator

它启动后,会不断的产生Job,处理流入的数据;

(2)receiverTracker

receiverTracker启动的时候会启动receiver,receiver会接收流入的数据;

(3)JobScheduler

SparkStreaming最为重要的一个组件,JobGenerator和receiverTracker都是由JobScheduler产生的。所以要了解Spark Streaming的核心就必须要掌握这个组件。

运行流程:

JobScheduler在启动的时候启动了JobGenerator,通过JobGenerator来不断产生Job。Job产生时需要输入数据(RDD),而JobScheduler也会在启动时启动receiverTracker,然后Tracker会在Spark 集群中各个worker上Executor中启动相应的receiver,receiver在接收到外界输入的数据时,会通过receiversupervise存储到Executor,并将Metedata信息发送给Driver中的receiverTracker。(receiverTracker负责启动receiver并且负责接收元数据)。ReceiverTracker内部会通过ReceivedBlockTracker来管理接收到的元数据信息。Drive在进行作业调度时会通过ReceiverBlockTracker的元数据信息,来指定Job处理的数据。这样流入的数据就可以存储到Spark底层的存储系统了,此时便可以基于这些数据来产生RDD了。

接下来就是要将处理逻辑作用于数据上进行数据的处理,也就是定时产生Job了。当每个BatchInterval会产生一个具体的Job,其实这里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD的DAG而已(从Java的角度来讲,就相当Runnable接口的实例),此时想要运行Job需要提交给JobScheduler中通过线程池的方式找到一个单独的线程来提交Job到集群运行(其实是在线程中基于RDD的Action触发真正的作业的运行)。

1. 为什么使用线程?

1)因为作业不断产生,所以为了提升效率,需要线程池,这和在Executor中通过线程池执行由异曲同工之妙。

2)有可能设置Job的FAIR公平调度的方式,这个时候也需要多线程的支持。

在JobGenerator中会有一个定时器timer,都过开发者设置的BatchInterval不断的生成Job。

//JobGenerator#timer
// RecurringTimer不断地将接收的数据进行分配来生成Job(基于DstreamGraph产生Job)
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")




二 容错

1.Executor级别容错

a)接受数据的安全性

SparkStreaming默认情况下,接受到的数据都采用MEMORY_AND_DISK_TWO的方式。即默认情况下,数据会存储在两台机器的内存中,如果有一台机器挂掉,则框架会立即切换到另一台机器上来运行。在大多数情况下这种策略是比较可靠的,并且框架的切换基本上有切换时间;另一种方式是WAL(writer ahead log)方式,在数据到来时,先把数据通过WAL机制做一个日志记录,以后有问题便可以基于日志恢复,然后在存入Executor中,然后在进行其他机器副本的复制。这种方式是先写日志在进行其他操作,如果数据丢失便可通过WAL机制把数据恢复回来。(生产环境下,采用Kafka的回放来实现数据恢复。)

b)执行的安全性

任务(Job)执行的安全性则依赖于RDD的容错。

2.Driver级别的容错

Driver级别的容错需要考虑DStreamGraph(DAG生成的模板)、 ReceiverTracker(接收的数据处理了哪些数据)、JobGenerator(生成作业的进度)等等方面,对于这些容错,只需要在每个作业生成前进行checkpoint,生成之后再做一次checkpoint即可。如果出错,则直接从checkpoint中恢复即可。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: