您的位置:首页 > 其它

spark1.2.0源码分析之spark streaming执行工作流程

2015-01-11 15:11 495 查看
根据官方提供的示例代码,查看一下spark streaming整个执行的流程:

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate


当 new StreamingContext(conf,Second(1)) 时,调用代码如下:

def this(conf: SparkConf, batchDuration: Duration) = {
this(StreamingContext.createNewSparkContext(conf), null, batchDuration)  //根据conf来创建SparkContext的实例
}
接着会调用主构造方法,如下:

class StreamingContext private[streaming] (
sc_ : SparkContext,
cp_ : Checkpoint,
batchDur_ : Duration
)
继续看 ssc.socketTextStream("localhost", 9999) 的调用:

def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2  //默认的存储级别
): ReceiverInputDStream[String] = {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}


ReceiverInputDStream 是一个抽象类,具体实现有socket、kafka、twitter、flume等输入流形式,本例为 SocketInputDStream,继续看其实现的代码:

def socketStream[T: ClassTag](
hostname: String,
port: Int,
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
): ReceiverInputDStream[T] = {
new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
}
其中参数 converter: (InputStream) => Iterator[T],将输入的流封装,最后返回一个迭代器,实际调用如下:

def bytesToLines(inputStream: InputStream): Iterator[String] = {
val dataInputStream = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"))  //创建输入流读取器
new NextIterator[String] {
protected override def getNext() = {
val nextValue = dataInputStream.readLine()  //一行一行读取
if (nextValue == null) {
finished = true
}
nextValue
}

protected override def close() {
dataInputStream.close()
}
}
}
SocketInputDStream里面代码比较简单, 代码如下:

private[streaming]
class SocketInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
) extends ReceiverInputDStream[T](ssc_) {

def getReceiver(): Receiver[T] = {
new SocketReceiver(host, port, bytesToObjects, storageLevel)  //一个接收器
}
}


接着,继续看一下示例的代码:

val words = lines.flatMap(_.split(" "))

val pairs = words.map(word => (word, 1))

val wordCounts = pairs.reduceByKey(_ + _)

wordCounts.print()

这些都是一些DStream的操作,和RDD的操作类似,暂时不谈这些。

接下来就是: ssc.start() 的调用,代码如下:

def start(): Unit = synchronized {
if (state == Started) {
throw new SparkException("StreamingContext has already been started")
}
if (state == Stopped) {
throw new SparkException("StreamingContext has already been stopped")
}
validate()
sparkContext.setCallSite(DStream.getCreationSite())
scheduler.start()  //真正的调度,在JobScheduler类中
state = Started
}
继续看 JobScheduler 类里的 start() 方法:

def start(): Unit = synchronized {
if (eventActor != null) return // scheduler has already been started

logDebug("Starting JobScheduler")
eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
def receive = {
case event: JobSchedulerEvent => processEvent(event)  //创建了一个Actor实例,处理JobSchedulerEvent事件
}
}), "JobScheduler")

listenerBus.start()  //启动监听器
receiverTracker = new ReceiverTracker(ssc)  //接收输入流的跟踪器
receiverTracker.start()  //启动
jobGenerator.start()   //启动job生成器
logInfo("Started JobScheduler")
}
先看一下 receiverTracker.start() 方法:

def start() = synchronized {
if (actor != null) {
throw new SparkException("ReceiverTracker already started")
}

if (!receiverInputStreams.isEmpty) {  //必须要先创建有输入流
actor = ssc.env.actorSystem.actorOf(Props(new ReceiverTrackerActor),  //创建一个Actor实例 ReceiverTrackerActor
"ReceiverTracker")
if (!skipReceiverLaunch) receiverExecutor.start()  //在集群中启动接收器
logInfo("ReceiverTracker started")
}
}
接收输入流的代码为 : private val receiverInputStreams = ssc.graph.getReceiverInputStreams(),继续跟踪代码:

def getReceiverInputStreams() = this.synchronized {
inputStreams.filter(_.isInstanceOf[ReceiverInputDStream[_]])  //关键在inputStreams里,它是一个ArrayBuffer[InputDStream[_]]
.map(_.asInstanceOf[ReceiverInputDStream[_]])
.toArray
}
当我们在创建 SocketInputDStream 对象的时候,其父类 InputDStream 里面有一句关键的代码会被调用:

ssc.graph.addInputStream(this)
def addInputStream(inputStream: InputDStream[_]) {
this.synchronized {
inputStream.setGraph(this)
inputStreams += inputStream
}
}


所以 receiverInputStreams 中就有之前创建的输入流了。

回到receiverTracker.start() 方法中,还有一句关键的代码:receiverExecutor.start() ,具体调用如下:

@transient val env = ssc.env
@transient val thread  = new Thread() {
override def run() {
try {
SparkEnv.set(env)
startReceivers()  //实际调用该方法
} catch {
case ie: InterruptedException => logInfo("ReceiverLauncher interrupted")
}
}
}

def start() {
thread.start()  //开启一个线程
}


继续查看 startReceivers 方法:

/**
* Get the receivers from the ReceiverInputDStreams, distributes them to the
* worker nodes as a parallel collection, and runs them.
*/
private def startReceivers() {
val receivers = receiverInputStreams.map(nis => {  //得到所有的接收器
val rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
})

// Right now, we only honor preferences if all receivers have them
val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)  //判断是否所有的接收器都有优先位置(worker node),只要其中一个没有,就默认都没有

// Create the parallel collection of receivers to distributed them on the worker nodes
val tempRDD =
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)  //生成并行的RDD,将Receiver封装到RDD里了
} else {
ssc.sc.makeRDD(receivers, receivers.size)
}

val checkpointDirOption = Option(ssc.checkpointDir)  //checkpoint目录,有可能为Nnoe
val serializableHadoopConf = new SerializableWritable(ssc.sparkContext.hadoopConfiguration)

// Function to start the receiver on the worker node
val startReceiver = (iterator: Iterator[Receiver[_]]) => {  //在worker节点上启动receiver
if (!iterator.hasNext) {
throw new SparkException(
"Could not start receiver as object not found.")
}
val receiver = iterator.next()
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)  //具体实现类
supervisor.start()  //启动,这里面做了好几件事情
supervisor.awaitTermination()
}
// Run the dummy Spark job to ensure that all slaves have registered.
// This avoids all the receivers to be scheduled on the same node.
if (!ssc.sparkContext.isLocal) {
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()  //保证所有的worker节点都注册,避免所有的receivers都分配到一个节点
}

// Distribute the receivers and start them
logInfo("Starting " + receivers.length + " receivers")
ssc.sparkContext.runJob(tempRDD, ssc.sparkContext.clean(startReceiver))  //真正的启动job
logInfo("All of the receivers have been terminated")
}


接下来,看一下比较关键 supervisor.start() 方法,调用代码如下:

def start() {
onStart()
startReceiver()
}
继续跟踪下去:

override protected def onStart() {
blockGenerator.start()  //启动块生成器
}
/** Start receiver */
def startReceiver(): Unit = synchronized {
try {
logInfo("Starting receiver")
receiver.onStart()   //到这里,真正的接收器的启动,调用的是SocketReceiver.onStart
logInfo("Called receiver onStart")
onReceiverStart()
receiverState = Started
} catch {
case t: Throwable =>
stop("Error starting receiver " + streamId, Some(t))
}
}


SocketReceiver.onStart() 具体调用如下:

/** Create a socket connection and receive data until receiver is stopped */
def receive() {
var socket: Socket = null
try {
logInfo("Connecting to " + host + ":" + port)
socket = new Socket(host, port)
logInfo("Connected to " + host + ":" + port)
val iterator = bytesToObjects(socket.getInputStream())  //接收数据输入
while(!isStopped && iterator.hasNext) {
store(iterator.next)   //保存结果
}
logInfo("Stopped receiving")
restart("Retrying connecting to " + host + ":" + port)
} catch {
case e: java.net.ConnectException =>
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
restart("Error receiving data", t)
} finally {
if (socket != null) {
socket.close()
logInfo("Closed socket to " + host + ":" + port)
}
}
}


onReceiverStart() 具体调用如下:

override protected def onReceiverStart() {
val msg = RegisterReceiver(
streamId, receiver.getClass.getSimpleName, Utils.localHostName(), actor)  //发送消息,注册receiver
val future = trackerActor.ask(msg)(askTimeout)  //trackerActor实际上就是driver
Await.result(future, askTimeout)
}


********** The End **********
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: