您的位置:首页 > 其它

[spark streaming] 状态管理 updateStateByKey&mapWithState

2017-12-30 17:19 501 查看

前言

SparkStreaming 7*24 小时不间断的运行,有时需要管理一些状态,比如wordCount,每个batch的数据不是独立的而是需要累加的,这时就需要sparkStreaming来维护一些状态,目前有两种方案updateStateByKey&mapWithState,mapWithState是spark1.6新加入的保存状态的方案,官方声称有10倍性能提升。

updateStateByKey

先上一个示例:

def updateFunction(currValues:Seq[Int],preValue:Option[Int]): Option[Int] = {
val currValueSum = currValues.sum
//上面的Int类型都可以用对象类型替换
Some(currValueSum + preValue.getOrElse(0)) //当前值的和加上历史值
}
kafkaStream.map(r => (r._2,1)).updateStateByKey(updateFunction _)


这里的updateFunction方法就是需要我们自己去实现的状态跟新的逻辑,
currValues
就是当前批次的所有值,
preValue
是历史维护的状态,
updateStateByKey
返回的是包含历史所有状态信息的DStream,下面我们来看底层是怎么实现状态的管理的,通过跟踪源码看到最核心的实现方法:

private [this] def computeUsingPreviousRDD(
batchTime: Time,
parentRDD: RDD[(K, V)],
prevStateRDD: RDD[(K, S)]) = {
// Define the function for the mapPartition operation on cogrouped RDD;
// first map the cogrouped tuple to tuples of required type,
// and then apply the update function
val updateFuncLocal = updateFunc
val finalFunc = (iterator: Iterator[(K, (Iterable[V], Iterable[S]))]) => {
val i = iterator.map { t =>
val itr = t._2._2.iterator
val headOption = if (itr.hasNext) Some(itr.next()) else None
(t._1, t._2._1.toSeq, headOption)
}
updateFuncLocal(batchTime, i)
}
val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner)
val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning)
Some(stateRDD)
}


可以看到是将
parentRDD
preStateRDD
进行
co-group
,然后将finalFunc方法作用于每个Partition,看到finalFunc方法的实现里面
(t._1, t._2._1.toSeq, headOption)
这样的形式,
(key,currValues,preValue)
这不就是和我们需要自己实现的updateFun类似的结构吗,是的没错,我们的方法已经被包装了一次:

def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S],
partitioner: Partitioner
): DStream[(K, S)] = ssc.withScope {
val cleanedUpdateF = sparkContext.clean(updateFunc)
val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {
iterator.flatMap(t => cleanedUpdateF(t._2, t._3).map(s => (t._1, s)))
}
updateStateByKey(newUpdateFunc, partitioner, true)
}


可以知道每次调用updateStateByKey都会将旧的状态RDD和当前batch的RDD进行co-group来得到一个新的状态RDD,即使真正需要跟新的数据只有1条也需要将两个RDD进行cogroup,所有的数据都会被计算一遍,而且随着状态的不断增加,运行速度会越来越慢。

为了解决这一问题,
mapWithState
应运而生。

mapWithState

先来个示例:

val initialRDD = ssc.sparkContext.parallelize(List[(String, Int)]())
//自定义mappingFunction,累加单词出现的次数并更新状态
val mappingFunc = (word: String, count: Option[Int], state: State[Int]) => {
val sum = count.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
output
}
//调用mapWithState进行管理流数据的状态
kafkaStream.map(r => (r._2,1)).mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD)).print()


这里的
initialRDD
就是初始化状态,
updateStateByKey
也有对应的API。这里的
mappingFun
也是需要我们自己实现的状态跟新逻辑,调用
state.update()
就是对状态的跟新,
output
就是通过
mapWithState
后返回的DStream中的数据形式。注意这里不是直接传入的
mappingFunc
函数,而是一个StateSpec 的对象,其实也是对函数的一个包装而已。接下来我们跟踪源码看看是怎么实现状态的管理的,会创建一个
MapWithStateDStreamImpl
实例:

def mapWithState[StateType: ClassTag, MappedType: ClassTag](
spec: StateSpec[K, V, StateType, MappedType]
): MapWithStateDStream[K, V, StateType, MappedType] = {
new MapWithStateDStreamImpl[K, V, StateType, MappedType](
self,
spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]]
)
}


当然是要看看其compute方法是怎么实现的:

private val internalStream =
new InternalMapWithStateDStream[KeyType, ValueType, StateType, MappedType](dataStream, spec)

override def compute(validTime: Time): Option[RDD[MappedType]] = {
internalStream.getOrCompute(validTime).map { _.flatMap[MappedType] { _.mappedData } }
}


compute方法又把处理逻辑给了
internalStream:InternalMapWithStateDStream
,继续看
InternalMapWithStateDStream
的compute方法主要处理逻辑:

override def compute(validTime: Time): Option[RDD[MapWithStateRDDRecord[K, S, E]]] = {
// Get the previous state or create a new empty state RDD
val prevStateRDD = getOrCompute(validTime - slideDuration) match {
case Some(rdd) =>
if (rdd.partitioner != Some(partitioner)) {
// If the RDD is not partitioned the right way, let us repartition it using the
// partition index as the key. This is to ensure that state RDD is always partitioned
// before creating another state RDD using it
MapWithStateRDD.createFromRDD[K, V, S, E](
rdd.flatMap { _.stateMap.getAll() }, partitioner, validTime)
} else {
rdd
}
case None =>
MapWithStateRDD.createFromPairRDD[K, V, S, E](
spec.getInitialStateRDD().getOrElse(new EmptyRDD[(K, S)](ssc.sparkContext)),
partitioner,
validTime
)
}

// Compute the new state RDD with previous state RDD and partitioned data RDD
// Even if there is no data RDD, use an empty one to create a new state RDD
val dataRDD = parent.getOrCompute(validTime).getOrElse {
context.sparkContext.emptyRDD[(K, V)]
}
val partitionedDataRDD = dataRDD.partitionBy(partitioner)
val timeoutThresholdTime = spec.getTimeoutInterval().map { interval =>
(validTime - interval).milliseconds
}
Some(new MapWithStateRDD(
prevStateRDD, partitionedDataRDD, mappingFunction, validTime, timeoutThresholdTime))
}


先后获取
prevStateRDD
parentRDD
,并且保证使用的是同样的partitioner,接着以两个rdd为参数、自定义的
mappingFunction
函数、以及key的超时时间等为参数又创建了
MapWithStateRDD
,该RDD继承了
RDD[MapWithStateRDDRecord[K, S, E]]
MapWithStateRDD
中的数据都是
MapWithStateRDDRecord
对象,每个分区对应一个对象来保存状态(这就是为什么两个RDD需要用同一个
Partitioner
),看看
MapWithStateRDD
的compute方法:

override def compute(
partition: Partition, context: TaskContext): Iterator[MapWithStateRDDRecord[K, S, E]] = {

val stateRDDPartition = partition.asInstanceOf[MapWithStateRDDPartition]
val prevStateRDDIterator = prevStateRDD.iterator(
stateRDDPartition.previousSessionRDDPartition, context)
val dataIterator = partitionedDataRDD.iterator(
stateRDDPartition.partitionedDataRDDPartition, context)

val prevRecord = if (prevStateRDDIterator.hasNext) Some(prevStateRDDIterator.next()) else None
val newRecord = MapWithStateRDDRecord.updateRecordWithData(
prevRecord,
dataIterator,
mappingFunction,
batchTime,
timeoutThresholdTime,
removeTimedoutData = doFullScan // remove timedout data only when full scan is enabled
)
Iterator(newRecord)
}


拿到
prevStateRDD
parentRDD
对应分区的迭代器,接着获取了
prevStateRDD
的一条数据,这个分区也只有一条
MapWithStateRDDRecord
类型的数据,维护了对应分区所有数据状态,接着调用了最核心的方法来跟新状态,最后返回了只包含一条数据的迭代器,我们来看看是怎么这个核心的计算逻辑:

def updateRecordWithData[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag](
prevRecord: Option[MapWithStateRDDRecord[K, S, E]],
dataIterator: Iterator[(K, V)],
mappingFunction: (Time, K, Option[V], State[S]) => Option[E],
batchTime: Time,
timeoutThresholdTime: Option[Long],
removeTimedoutData: Boolean
): MapWithStateRDDRecord[K, S, E] = {
// Create a new state map by cloning the previous one (if it exists) or by creating an empty one
val newStateMap = prevRecord.map { _.stateMap.copy() }. getOrElse { new EmptyStateMap[K, S]() }

val mappedData = new ArrayBuffer[E]
val wrappedState = new StateImpl[S]()

// Call the mapping function on each record in the data iterator, and accordingly
// update the states touched, and collect the data returned by the mapping function
dataIterator.foreach { case (key, value) =>
wrappedState.wrap(newStateMap.get(key))
val returned = mappingFunction(batchTime, key, Some(value), wrappedState)
if (wrappedState.isRemoved) {
newStateMap.remove(key)
} else if (wrappedState.isUpdated
|| (wrappedState.exists && timeoutThresholdTime.isDefined)) {
newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)
}
mappedData ++= returned
}

// Get the timed out state records, call the mapping function on each and collect the
// data returned
if (removeTimedoutData && timeoutThresholdTime.isDefined) {
newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key, state, _) =>
wrappedState.wrapTimingOutState(state)
val returned = mappingFunction(batchTime, key, None, wrappedState)
mappedData ++= returned
newStateMap.remove(key)
}
}

MapWithStateRDDRecord(newStateMap, mappedData)
}


先copy了原来的状态,接着定义了两个变量,
mappedData
是最终要返回的结果,
wrappedState
可以看成是对state的包装,添加了一些额外的方法。

接着遍历当前批次的数据,从状态中取出key对应的原来的state,并根据自定义的函数来对state进行跟新,这里涉及到state的
remove&update&timeout
来对
newStateMap
进行跟新操作,并将有跟新的状态加入到了
mappedData
中。

若有设置超时时间,则还会对超时了的key进行移除,也会加入到
mappedData
中,最终通过新的状态对象
newStateMap
和需返回的
mappedData
数组构建了
MapWithStateRDDRecord
对象来返回。

而在前面提到的
MapWithStateDStreamImpl
实例的compute方法中:

override def compute(validTime: Time): Option[RDD[MappedType]] = {
internalStream.getOrCompute(validTime).map { _.flatMap[MappedType] { _.mappedData } }
}


调用的就是这个
mappedData
数据。

我们发现返回的都是有update的数据,若要获取所有的状态在
mapWithState
之后调用
stateSnapshots
即可。若要清除某个key的状态,可在自定义的方法中调用
state.remove()


总结

updateStateByKey底层是将preSateRDD和parentRDD进行co-group,然后对所有数据都将经过自定义的mapFun函数进行一次计算,即使当前batch只有一条数据也会进行这么复杂的计算,大大的降低了性能,并且计算时间会随着维护的状态的增加而增加。

mapWithstate底层是创建了一个MapWithStateRDD,存的数据是MapWithStateRDDRecord对象,一个Partition对应一个MapWithStateRDDRecord对象,该对象记录了对应Partition所有的状态,每次只会对当前batch有的数据进行跟新,而不会像updateStateByKey一样对所有数据计算。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐