您的位置:首页 > 其它

SparkStreaming案例:NetworkWordCount--ReceiverSupervisorImpl中的startReceiver(),Receiver如何将数据store到RDD

2018-02-24 16:11 543 查看
接着上文“ReceiverSupervisorImpl.onStart()如何得到Reciver的数据写到spark的BlockManager中”往下分析startReceiver()方法
1,supervisor.start()该方法是启动Receiver开始在Executor上接收数据的入口
start()方法是在ReceiverSupervisorImpl的父类ReceiverSupervisor中实现的/** Start the supervisor */
def start() {
  onStart()
  startReceiver()
}2,ReceiverSupervisor的startReceiver()是会调用的Receiver实现类中的onStart方法,如SocketReceiver中实现的onStart方法/** Start receiver */
def startReceiver(): Unit = synchronized {
  try {
    //作用就是更新的Receiver的ReceiverTrackingInfo成员信息,成功返回true
    if (onReceiverStart()) {
      logInfo("Starting receiver")
      receiverState = Started
      receiver.onStart() //真正调用的Receiver实现类中的onStart方法,如SocketInputDstream中实现的SocketReceive中onStart方法
      logInfo("Called receiver onStart")
    } else {
      // The driver refused us
      stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
    }
  } catch {
    case NonFatal(t) =>
      stop("Error starting receiver " + streamId, Some(t))
  }
}a,onReceiverStart()方法,该方法作用:
将streamId,receiver的类名,Executor的主机名,executorId,还当有当前匿名的RpcEndPoint引用设置到RegisterReceiver,发给driver上的ReceiverTrackerEndpoint对应的receiveAndReply方法中override protected def onReceiverStart(): Boolean = {
  val msg = RegisterReceiver(
    streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
  //作用就是更新的Receiver的ReceiverTrackingInfo成员信息,成功返回true
  trackerEndpoint.askWithRetry[Boolean](msg)
}b,查看driver上的ReceiverTrackerEndpoint(它在ReceiverTracker)中的receiveAndReply对RegisterReceiver相关实现:private class ReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint {
  。。。。
  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    // Remote messages
    //得到Executor上RegisterReceiver对应ReceiverInputDStream中的streamId,receiver的类名,Executor的主机名,
executorId,Executor上的RpcEndPoint引用
    case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>
      //该方法的使用就是将Receiver的注册信息更新receiverTrackingInfos:HashMap[Int, ReceiverTrackingInfo],
key就是receiverId,同时给StreamingListenerBus发送Receiver开始事件类StreamingListenerReceiverStarted。
如果正常的话返回true给Executor的RpcEndPoint
      val successful =
        registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress)
      context.reply(successful)
    case AddBlock(receivedBlockInfo) =>
      
    case DeregisterReceiver(streamId, message, error) =>
     。。。。
  }
c,查看一下registerReceiver是如何更新ReceiverTrackingInfo(这个类作用记录Receiver相关信息)信息的/** Register a receiver
  * 该方法的使用就是将Receiver的注册信息更新到ReceiverTrackingInfo
  * 同时给StreamingListenerBus发送Receiver开始事件类StreamingListenerReceiverStarted。
  * */
private def registerReceiver(
    streamId: Int,
    typ: String,
    host: String,
    executorId: String,
    receiverEndpoint: RpcEndpointRef,
    senderAddress: RpcAddress
  ): Boolean = {

  if (!receiverInputStreamIds.contains(streamId)) {
    throw new SparkException("Register received for unexpected id " + streamId)
  }
  if (isTrackerStopping || isTrackerStopped) {
    return false
 
}
  //该receiverTrackingInfos:HashMap[Int, ReceiverTrackingInfo],该Map有值是因为在
   //ReceiverTracker最开始launchReceivers()时用ReceiverTrackerEndpoint启动StartAllReceivers就将所有Receiver,
放入updateReceiverScheduledExecutors方法注入进去的
  // ReceiverTrackingInfo.scheduledLocations返回值:如:Some(ArrayBuffer("executor_localhost_driver"))
  val scheduledLocations = receiverTrackingInfos(streamId).scheduledLocations
  val acceptableExecutors = if (scheduledLocations.nonEmpty) {
      // This receiver is registering and it's scheduled by
      // ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledLocations" to check it.
      scheduledLocations.get  //得到的值如:ArrayBuffer("executor_localhost_driver")
    } else {
      // This receiver is scheduled by "ReceiverSchedulingPolicy.rescheduleReceiver", so calling
      // "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it.
      scheduleReceiver(streamId)
    }
  //该案例acceptableExecutors得到的是ExecutorCacheTaskLocation.其中exists是scala提供的api,
表示只要有集合元素只要有一个元素返回值是true,该exists就会返回true,类似or.(它还有一个forall表示and的意思)
  def isAcceptable: Boolean = acceptableExecutors.exists {
    case loc: ExecutorCacheTaskLocation => loc.executorId == executorId
    case loc: TaskLocation => loc.host == host
  }

  if (!isAcceptable) {
    // Refuse it since it's scheduled to a wrong executor
    false
  } else {
    val name = s"${typ}-${streamId}"
    //ReceiverTrackingInfo该类记录了Receiver的相关信息
    val receiverTrackingInfo = ReceiverTrackingInfo(
      streamId,
      ReceiverState.ACTIVE,
      scheduledLocations = None,
      runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)),
      name = Some(name),
      endpoint = Some(receiverEndpoint))
    //此时更新ReceiverTrackingInfo的ReceiverState信息,增加Executor的endpoint信息
    receiverTrackingInfos.put(streamId, receiverTrackingInfo)
    //给所有监听器发送Receiver的注册信息StreamingListenerReceiverStarted(查看spark如何使用ListenerBus实现类似js监听事件效果)
listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))
    logInfo("Registered receiver for stream " + streamId + " from " + senderAddress)
    true  
  }
}
3,再回来看ReceiverSupervisor的startReceiver()是会调用的Receiver实现类中的onStart方法,如SocketReceiver中实现的Receive中onStart方法/** Start receiver */
def startReceiver(): Unit = synchronized {
  try {
    //作用就是更新的Receiver的ReceiverTrackingInfo成员信息,成功返回true
    if (onReceiverStart()) {
      logInfo("Starting receiver")
      receiverState = Started
      receiver.onStart() //真正调用的Receiver实现类中的onStart方法,如SocketInputDstream中实现的Receive中onStart方法
      logInfo("Called receiver onStart")
    } else {
      // The driver refused us
      stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
    }
  } catch {
    case NonFatal(t) =>
      stop("Error starting receiver " + streamId, Some(t))
  }
}===>查看SocketReceiver的onStart方法,新启动一个守护线程来接收数据private[streaming]
class SocketReceiver[T: ClassTag](
    host: String,
    port: Int,
    bytesToObjects: InputStream => Iterator[T],
    storageLevel: StorageLevel
  ) extends Receiver[T](storageLevel) with Logging {
  //Receiver子类必须实现OnStart方法
  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      setDaemon(true)
      //从socket中得到的数据后,使用store方法将数据存放到spark中
      override def run() { receive() }
    }.start()
  }===》线程调用SocketReceiver的 receive()方法:/** Create a socket connection and receive data until receiver is stopped */
def receive() {
  var socket: Socket = null
  try
{
    logInfo("Connecting to " + host + ":" + port)
    socket = new Socket(host, port)
    logInfo("Connected to " + host + ":" + port)
    val iterator = bytesToObjects(socket.getInputStream())
    //iterator.hasNext会一直返回true,即只要socket流中有数据,就会将它出来,直接到receiver停止为止,数据会一直被存入Block中,
然后按dstream周期向ReceiveInputStream中的compute中取当前周期内的Block生成RDD
    //也就是receiver从socket得到的数据并不是按周期去取的,而是socket流中只要有数据就会存入spark中
    while(!isStopped && iterator.hasNext) {
      store(iterator.next)
    }
。。。。。。。。
4,查看一下store方法,是如何将数据放到Spark中的/**
 * Store a single item of received data to Spark's memory.
 * These single items will be aggregated together into data blocks before
 * being pushed into Spark's memory.
  * 将单个接收到的数据存储到Spark的内存中,这些单个数据在被推入Spark的内存之前将被汇总到Block中。
 */
def store(dataItem: T) {
  supervisor.pushSingle(dataItem)
}===》pushSingle方法是由ReceiverSupervisorImpl实现的//初始化时createBlockGenerator创建BlockGenerator对象,用该BlockGenerator来切分Receiver接收到的数据
//每初始化一次创建一个BlockGenerator,这个BlockGenerator除放在registeredBlockGenerators集合中,还跟方法返回了
private val defaultBlockGenerator = createBlockGenerator(defaultBlockGeneratorListener)
/** Push a single record of received data into block generator.
  * 该方法是由Receiver的实现类如SocketReceiver调用store()方法将单条记录store进去的
  * */
def pushSingle(data: Any) {
  //defaultBlockGenerator就是BlockGenerator对象的引用
  defaultBlockGenerator.addData(data)
}
===》将接收到的消息放在ArrayBuffer[Any]这个内存中,当有数据放到个currentBuffer时,
//RecurringTimer会在updateCurrentBuffer生成Block,并将Block放到blocksForPushin这个ArrayBlockingQueue[Block]里面
//此时keepPushingBlocks方法发现ArrayBlockingQueue[Block]队列里面有内容就会调用,pushBlock(block)方法
==>具体是如何放到Spark的BlockManager中的,可以查看:“SparkStreaming案例:NetworkWordCount--ReceiverSupervisorImpl.onStart()如何得到Reciver的数据写到spark的BlockManager中”从BlockGenerator.start()方法解析开始。/**
 * Push a single data item into the buffer.
  * 该方法是由Receiver的实现类如SocketReceiver调用store()方法将每条记录store进去的==》然后由ReceiverSupervisorImpl调用pushSingle传进来
 */
def addData(data: Any): Unit = {
  if (state == Active) {
    waitToPush()//该方法用来限制receiver接收数据的速率,如果消息很大超过指定速率,则会阻塞线程
    synchronized {
      if (state == Active) {
        currentBuffer += data
      } else {
        throw new SparkException(
          "Cannot add data as BlockGenerator has not been started or has been stopped")
      }
    }
  } else {
    throw new SparkException(
      "Cannot add data as BlockGenerator has not been started or has been stopped")
  }

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐