SparkStreaming案例:NetworkWordCount--ReceiverSupervisorImpl中的startReceiver(),Receiver如何将数据store到RDD
2018-02-24 16:11
543 查看
接着上文“ReceiverSupervisorImpl.onStart()如何得到Reciver的数据写到spark的BlockManager中”往下分析startReceiver()方法
1,supervisor.start()该方法是启动Receiver开始在Executor上接收数据的入口
start()方法是在ReceiverSupervisorImpl的父类ReceiverSupervisor中实现的/** Start the supervisor */
def start() {
onStart()
startReceiver()
}2,ReceiverSupervisor的startReceiver()是会调用的Receiver实现类中的onStart方法,如SocketReceiver中实现的onStart方法/** Start receiver */
def startReceiver(): Unit = synchronized {
try {
//作用就是更新的Receiver的ReceiverTrackingInfo成员信息,成功返回true
if (onReceiverStart()) {
logInfo("Starting receiver")
receiverState = Started
receiver.onStart() //真正调用的Receiver实现类中的onStart方法,如SocketInputDstream中实现的SocketReceive中onStart方法
logInfo("Called receiver onStart")
} else {
// The driver refused us
stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
}
} catch {
case NonFatal(t) =>
stop("Error starting receiver " + streamId, Some(t))
}
}a,onReceiverStart()方法,该方法作用:
将streamId,receiver的类名,Executor的主机名,executorId,还当有当前匿名的RpcEndPoint引用设置到RegisterReceiver,发给driver上的ReceiverTrackerEndpoint对应的receiveAndReply方法中override protected def onReceiverStart(): Boolean = {
val msg = RegisterReceiver(
streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
//作用就是更新的Receiver的ReceiverTrackingInfo成员信息,成功返回true
trackerEndpoint.askWithRetry[Boolean](msg)
}b,查看driver上的ReceiverTrackerEndpoint(它在ReceiverTracker)中的receiveAndReply对RegisterReceiver相关实现:private class ReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint {
。。。。
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
// Remote messages
//得到Executor上RegisterReceiver对应ReceiverInputDStream中的streamId,receiver的类名,Executor的主机名,
* 该方法的使用就是将Receiver的注册信息更新到ReceiverTrackingInfo
* 同时给StreamingListenerBus发送Receiver开始事件类StreamingListenerReceiverStarted。
* */
private def registerReceiver(
streamId: Int,
typ: String,
host: String,
executorId: String,
receiverEndpoint: RpcEndpointRef,
senderAddress: RpcAddress
): Boolean = {
if (!receiverInputStreamIds.contains(streamId)) {
throw new SparkException("Register received for unexpected id " + streamId)
}
if (isTrackerStopping || isTrackerStopped) {
return false
}
//该receiverTrackingInfos:HashMap[Int, ReceiverTrackingInfo],该Map有值是因为在
//ReceiverTracker最开始launchReceivers()时用ReceiverTrackerEndpoint启动StartAllReceivers就将所有Receiver,
def startReceiver(): Unit = synchronized {
try {
//作用就是更新的Receiver的ReceiverTrackingInfo成员信息,成功返回true
if (onReceiverStart()) {
logInfo("Starting receiver")
receiverState = Started
receiver.onStart() //真正调用的Receiver实现类中的onStart方法,如SocketInputDstream中实现的Receive中onStart方法
logInfo("Called receiver onStart")
} else {
// The driver refused us
stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
}
} catch {
case NonFatal(t) =>
stop("Error starting receiver " + streamId, Some(t))
}
}===>查看SocketReceiver的onStart方法,新启动一个守护线程来接收数据private[streaming]
class SocketReceiver[T: ClassTag](
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
) extends Receiver[T](storageLevel) with Logging {
//Receiver子类必须实现OnStart方法
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
setDaemon(true)
//从socket中得到的数据后,使用store方法将数据存放到spark中
override def run() { receive() }
}.start()
}===》线程调用SocketReceiver的 receive()方法:/** Create a socket connection and receive data until receiver is stopped */
def receive() {
var socket: Socket = null
try {
logInfo("Connecting to " + host + ":" + port)
socket = new Socket(host, port)
logInfo("Connected to " + host + ":" + port)
val iterator = bytesToObjects(socket.getInputStream())
//iterator.hasNext会一直返回true,即只要socket流中有数据,就会将它出来,直接到receiver停止为止,数据会一直被存入Block中,
* Store a single item of received data to Spark's memory.
* These single items will be aggregated together into data blocks before
* being pushed into Spark's memory.
* 将单个接收到的数据存储到Spark的内存中,这些单个数据在被推入Spark的内存之前将被汇总到Block中。
*/
def store(dataItem: T) {
supervisor.pushSingle(dataItem)
}===》pushSingle方法是由ReceiverSupervisorImpl实现的//初始化时createBlockGenerator创建BlockGenerator对象,用该BlockGenerator来切分Receiver接收到的数据
//每初始化一次创建一个BlockGenerator,这个BlockGenerator除放在registeredBlockGenerators集合中,还跟方法返回了
private val defaultBlockGenerator = createBlockGenerator(defaultBlockGeneratorListener)
//RecurringTimer会在updateCurrentBuffer生成Block,并将Block放到blocksForPushin这个ArrayBlockingQueue[Block]里面
//此时keepPushingBlocks方法发现ArrayBlockingQueue[Block]队列里面有内容就会调用,pushBlock(block)方法
==>具体是如何放到Spark的BlockManager中的,可以查看:“SparkStreaming案例:NetworkWordCount--ReceiverSupervisorImpl.onStart()如何得到Reciver的数据写到spark的BlockManager中”从BlockGenerator.start()方法解析开始。/**
* Push a single data item into the buffer.
* 该方法是由Receiver的实现类如SocketReceiver调用store()方法将每条记录store进去的==》然后由ReceiverSupervisorImpl调用pushSingle传进来
*/
def addData(data: Any): Unit = {
if (state == Active) {
waitToPush()//该方法用来限制receiver接收数据的速率,如果消息很大超过指定速率,则会阻塞线程
synchronized {
if (state == Active) {
currentBuffer += data
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
}
}
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
}
}
1,supervisor.start()该方法是启动Receiver开始在Executor上接收数据的入口
start()方法是在ReceiverSupervisorImpl的父类ReceiverSupervisor中实现的/** Start the supervisor */
def start() {
onStart()
startReceiver()
}2,ReceiverSupervisor的startReceiver()是会调用的Receiver实现类中的onStart方法,如SocketReceiver中实现的onStart方法/** Start receiver */
def startReceiver(): Unit = synchronized {
try {
//作用就是更新的Receiver的ReceiverTrackingInfo成员信息,成功返回true
if (onReceiverStart()) {
logInfo("Starting receiver")
receiverState = Started
receiver.onStart() //真正调用的Receiver实现类中的onStart方法,如SocketInputDstream中实现的SocketReceive中onStart方法
logInfo("Called receiver onStart")
} else {
// The driver refused us
stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
}
} catch {
case NonFatal(t) =>
stop("Error starting receiver " + streamId, Some(t))
}
}a,onReceiverStart()方法,该方法作用:
将streamId,receiver的类名,Executor的主机名,executorId,还当有当前匿名的RpcEndPoint引用设置到RegisterReceiver,发给driver上的ReceiverTrackerEndpoint对应的receiveAndReply方法中override protected def onReceiverStart(): Boolean = {
val msg = RegisterReceiver(
streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
//作用就是更新的Receiver的ReceiverTrackingInfo成员信息,成功返回true
trackerEndpoint.askWithRetry[Boolean](msg)
}b,查看driver上的ReceiverTrackerEndpoint(它在ReceiverTracker)中的receiveAndReply对RegisterReceiver相关实现:private class ReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint {
。。。。
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
// Remote messages
//得到Executor上RegisterReceiver对应ReceiverInputDStream中的streamId,receiver的类名,Executor的主机名,
executorId,Executor上的RpcEndPoint引用 case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) => //该方法的使用就是将Receiver的注册信息更新receiverTrackingInfos:HashMap[Int, ReceiverTrackingInfo],
key就是receiverId,同时给StreamingListenerBus发送Receiver开始事件类StreamingListenerReceiverStarted。
如果正常的话返回true给Executor的RpcEndPoint val successful = registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress) context.reply(successful) case AddBlock(receivedBlockInfo) => …
case DeregisterReceiver(streamId, message, error) => 。。。。 }c,查看一下registerReceiver是如何更新ReceiverTrackingInfo(这个类作用记录Receiver相关信息)信息的/** Register a receiver
* 该方法的使用就是将Receiver的注册信息更新到ReceiverTrackingInfo
* 同时给StreamingListenerBus发送Receiver开始事件类StreamingListenerReceiverStarted。
* */
private def registerReceiver(
streamId: Int,
typ: String,
host: String,
executorId: String,
receiverEndpoint: RpcEndpointRef,
senderAddress: RpcAddress
): Boolean = {
if (!receiverInputStreamIds.contains(streamId)) {
throw new SparkException("Register received for unexpected id " + streamId)
}
if (isTrackerStopping || isTrackerStopped) {
return false
}
//该receiverTrackingInfos:HashMap[Int, ReceiverTrackingInfo],该Map有值是因为在
//ReceiverTracker最开始launchReceivers()时用ReceiverTrackerEndpoint启动StartAllReceivers就将所有Receiver,
放入updateReceiverScheduledExecutors方法注入进去的 // ReceiverTrackingInfo.scheduledLocations返回值:如:Some(ArrayBuffer("executor_localhost_driver")) val scheduledLocations = receiverTrackingInfos(streamId).scheduledLocations val acceptableExecutors = if (scheduledLocations.nonEmpty) { // This receiver is registering and it's scheduled by // ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledLocations" to check it. scheduledLocations.get //得到的值如:ArrayBuffer("executor_localhost_driver") } else { // This receiver is scheduled by "ReceiverSchedulingPolicy.rescheduleReceiver", so calling // "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it. scheduleReceiver(streamId) } //该案例acceptableExecutors得到的是ExecutorCacheTaskLocation.其中exists是scala提供的api,
表示只要有集合元素只要有一个元素返回值是true,该exists就会返回true,类似or.(它还有一个forall表示and的意思) def isAcceptable: Boolean = acceptableExecutors.exists { case loc: ExecutorCacheTaskLocation => loc.executorId == executorId case loc: TaskLocation => loc.host == host } if (!isAcceptable) { // Refuse it since it's scheduled to a wrong executor false } else { val name = s"${typ}-${streamId}" //ReceiverTrackingInfo该类记录了Receiver的相关信息 val receiverTrackingInfo = ReceiverTrackingInfo( streamId, ReceiverState.ACTIVE, scheduledLocations = None, runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)), name = Some(name), endpoint = Some(receiverEndpoint)) //此时更新ReceiverTrackingInfo的ReceiverState信息,增加Executor的endpoint信息 receiverTrackingInfos.put(streamId, receiverTrackingInfo) //给所有监听器发送Receiver的注册信息StreamingListenerReceiverStarted(查看spark如何使用ListenerBus实现类似js监听事件效果)
listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo)) logInfo("Registered receiver for stream " + streamId + " from " + senderAddress) true } }3,再回来看ReceiverSupervisor的startReceiver()是会调用的Receiver实现类中的onStart方法,如SocketReceiver中实现的Receive中onStart方法/** Start receiver */
def startReceiver(): Unit = synchronized {
try {
//作用就是更新的Receiver的ReceiverTrackingInfo成员信息,成功返回true
if (onReceiverStart()) {
logInfo("Starting receiver")
receiverState = Started
receiver.onStart() //真正调用的Receiver实现类中的onStart方法,如SocketInputDstream中实现的Receive中onStart方法
logInfo("Called receiver onStart")
} else {
// The driver refused us
stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
}
} catch {
case NonFatal(t) =>
stop("Error starting receiver " + streamId, Some(t))
}
}===>查看SocketReceiver的onStart方法,新启动一个守护线程来接收数据private[streaming]
class SocketReceiver[T: ClassTag](
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
) extends Receiver[T](storageLevel) with Logging {
//Receiver子类必须实现OnStart方法
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
setDaemon(true)
//从socket中得到的数据后,使用store方法将数据存放到spark中
override def run() { receive() }
}.start()
}===》线程调用SocketReceiver的 receive()方法:/** Create a socket connection and receive data until receiver is stopped */
def receive() {
var socket: Socket = null
try {
logInfo("Connecting to " + host + ":" + port)
socket = new Socket(host, port)
logInfo("Connected to " + host + ":" + port)
val iterator = bytesToObjects(socket.getInputStream())
//iterator.hasNext会一直返回true,即只要socket流中有数据,就会将它出来,直接到receiver停止为止,数据会一直被存入Block中,
然后按dstream周期向ReceiveInputStream中的compute中取当前周期内的Block生成RDD //也就是receiver从socket得到的数据并不是按周期去取的,而是socket流中只要有数据就会存入spark中 while(!isStopped && iterator.hasNext) { store(iterator.next) }
。。。。。。。。4,查看一下store方法,是如何将数据放到Spark中的/**
* Store a single item of received data to Spark's memory.
* These single items will be aggregated together into data blocks before
* being pushed into Spark's memory.
* 将单个接收到的数据存储到Spark的内存中,这些单个数据在被推入Spark的内存之前将被汇总到Block中。
*/
def store(dataItem: T) {
supervisor.pushSingle(dataItem)
}===》pushSingle方法是由ReceiverSupervisorImpl实现的//初始化时createBlockGenerator创建BlockGenerator对象,用该BlockGenerator来切分Receiver接收到的数据
//每初始化一次创建一个BlockGenerator,这个BlockGenerator除放在registeredBlockGenerators集合中,还跟方法返回了
private val defaultBlockGenerator = createBlockGenerator(defaultBlockGeneratorListener)
/** Push a single record of received data into block generator. * 该方法是由Receiver的实现类如SocketReceiver调用store()方法将单条记录store进去的 * */ def pushSingle(data: Any) { //defaultBlockGenerator就是BlockGenerator对象的引用 defaultBlockGenerator.addData(data) }===》将接收到的消息放在ArrayBuffer[Any]这个内存中,当有数据放到个currentBuffer时,
//RecurringTimer会在updateCurrentBuffer生成Block,并将Block放到blocksForPushin这个ArrayBlockingQueue[Block]里面
//此时keepPushingBlocks方法发现ArrayBlockingQueue[Block]队列里面有内容就会调用,pushBlock(block)方法
==>具体是如何放到Spark的BlockManager中的,可以查看:“SparkStreaming案例:NetworkWordCount--ReceiverSupervisorImpl.onStart()如何得到Reciver的数据写到spark的BlockManager中”从BlockGenerator.start()方法解析开始。/**
* Push a single data item into the buffer.
* 该方法是由Receiver的实现类如SocketReceiver调用store()方法将每条记录store进去的==》然后由ReceiverSupervisorImpl调用pushSingle传进来
*/
def addData(data: Any): Unit = {
if (state == Active) {
waitToPush()//该方法用来限制receiver接收数据的速率,如果消息很大超过指定速率,则会阻塞线程
synchronized {
if (state == Active) {
currentBuffer += data
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
}
}
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
}
}
相关文章推荐
- SparkStreaming案例:NetworkWordCount--ReceiverSupervisorImpl.onStart()如何将Reciver数据写到BlockManager中
- SparkStreaming案例:NetworkWordCount--ReceiverInputDstream的compute方法如何取得Socket预先存放在BlockManager中的数据
- SparkStreaming案例:NetworkWordCount--ReceiverInputDstream的Receiver是如何被放到Executor上执行的?
- spark-streaming-[6]-KafkaWordCount和KafkaWordCountProducer(Receiver-based Approach)
- 第90讲,Spark streaming基于kafka 以Receiver方式获取数据 原理和案例实战
- DStream操作实战:1.SparkStreaming接受socket数据,实现单词计数WordCount
- spark streaming 接收 kafka 数据java代码WordCount示例
- 大数据IMF传奇行动绝密课程第90课:SparkStreaming基于Kafka Receiver案例实战和内幕源码解密
- 在idea上用SparkStreaming实现从远程socket读取数据并完成Wordcount
- Spark2.x学习笔记:16、Spark Streaming入门实例NetworkWordCount
- Sparkstreaming基于kafka以Receiver方式获取数据原理和案例实战
- spark streaming的NetworkWordCount实例理解
- 初见spark streaming之JavaNetWorkWordCount
- spark streaming的NetworkWordCount实例理解
- spark streaming的NetworkWordCount实例理解
- Spark Streaming的wordcount案例
- 运行SparkStreaming的NetworkWordCount实例出错:Error connecting to localhost:9999 java.net.ConnectException: Connection refused 解决办法
- SparkStream例子HdfsWordCount--Streaming的Job是如何调度的
- spark-streaming-[1]-streaming基础NetworkWordCount
- SparkStreaming的WordCount示例及源码分析(一)