您的位置:首页 > 产品设计 > UI/UE

Flink流计算编程--如何实现基于KEY/VALUE的List State

2016-08-17 12:26 597 查看

1、ListState简介

Flink提供了3种基于KEY/VALUE的state的实现方式,分别是:

ValueState<T>
ListState<T>
ReducingState<T>


官方文档中关于state的使用可以参考这里:Working with State

之前文章中,有一个使用ValueState的例子:ValueState,而ValueState适用于拿到上一条记录,或者上一个窗口中的记录,即ValueState存放的数据仅仅是一条记录,这在很多情况下都很有用。例如上一个窗口中的某些值要传递到下一个窗口,或者上一条记录中的某个值要传递到下一个记录等等。

而ListState则是将需要某些值存到一个List中(Iterable),即缓存的数据不仅仅是1个值,而是多个值。这在很多情况下也很有用,例如计算的数值要包含全天的每一个记录,那么此时只有将每一个记录的值存成一个列表,才可以计算。

2、ListState例子

如何获取ListState?

先定义ListState,并override RichFunction的open方法:

var state : ListState[TransactionListState] = null


override def open(config : Configuration) : Unit = {

state = getRuntimeContext.getListState[TransactionListState](new ListStateDescriptor[TransactionListState]("VWAP List State",classOf[TransactionListState]))
}


其中,ListStateDescriptor类提供了几种不同的定义方式:



这里我选择了第一种,2个参数分别是ListStateDescriptor的名字以及typeClass。

查询列表与添加数据到列表:

state.add(。。。)
listState = state.get()


完整的代码如下:

package toptrade.ComputeClass

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector

/**
* 计算VWAP标准差
* 利用Flink提供的基于key/vale的ListState来完成
* 将每一分钟的分钟VOLUME以及分钟VWAP放入一个列表,每分钟的计算都是基于此
* 在每天盘前,都要清空ListState列表,并且盘前的VWAP是0(没有比较)
*/
object TransactionListStateFunction {

var state : ListState[TransactionListState] = null

var volume = BigDecimal.valueOf(0.0)
var VWAP = BigDecimal.valueOf(0.0)

var listState : java.lang.Iterable[TransactionListState] = null

class TransactionListStateFunction extends RichFlatMapFunction[(String,Int,String,String,BigDecimal,BigDecimal,BigDecimal,BigDecimal,BigDecimal,BigDecimal,BigDecimal,BigDecimal,BigDecimal,BigDecimal,BigDecimal,BigDecimal,BigDecimal,BigDecimal,BigDecimal,BigDecimal,BigDecimal), (String, Int, String, String, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal)]{

override def open(config : Configuration) : Unit = { state = getRuntimeContext.getListState[TransactionListState](new ListStateDescriptor[TransactionListState]("VWAP List State",classOf[TransactionListState])) }

override def flatMap(in: (String, Int, String, String, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal), out: Collector[(String, Int, String, String, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal)]): Unit = {

/**
* init
*/
if(state.get() == null){
volume = BigDecimal.valueOf(0.0)
VWAP = BigDecimal.valueOf(0.0)

state.add(TransactionListState(volume,VWAP))
}else{
//get Iterable
listState = state.get()
}

var VWAP_SD : BigDecimal = BigDecimal.valueOf(0.0)

/**
* 盘前,需要将state清空
*/
if(in._3 < "0930"){
state.clear()
state.add(TransactionListState(in._5,in._7))
listState = state.get()

VWAP_SD = BigDecimal.valueOf(0.0)
}else{

val volume = in._5
val VWAP = in._7
val vwap_accum = in._14
/**
* 每次先添加当前分钟的VOLUME与VWAP到列表中
*/
state.add(TransactionListState(volume,VWAP))
listState = state.get()

import scala.collection.JavaConverters._
val scalaState = listState.asScala.toList

//计算累计VOLUME
var sum_volume = BigDecimal.valueOf(0.0)
for(elem <- scalaState){
sum_volume = sum_volume.+(elem.volume)
}

//开始计算
var SD = BigDecimal.valueOf(0.0)
for(elem <- scalaState){
val first = elem.volume./(sum_volume)
val second = BigDecimal.valueOf(Math.pow(elem.VWAP.-(vwap_accum).toDouble,2))
SD = SD.+(first.*(second))
}
VWAP_SD = BigDecimal.valueOf(Math.sqrt(SD.toDouble)).setScale(4,BigDecimal.RoundingMode.HALF_UP)
}

out.collect(in._1,in._2,in._3,in._4,in._5,in._6,in._7,in._8,in._9,in._10,in._11,in._12,in._13,in._14,VWAP_SD,in._16,in._17,in._18,in._19,in._20,in._21)

}
}

case class TransactionListState(volume : BigDecimal, VWAP : BigDecimal)
}


3、总结

Flink提供了基于key/value的3种State接口,其中ListState接口适用于缓存多个值用于之后的计算。在具体实现时,由于state必须基于key,且必须获取getRuntimeContext,因此,使用state必须同时满足2个条件:

1、直接基于keyedStream或者由keyedStream转换的windowedStream
2、必须继承RichFunction


在实际的实现时,由于windowedStream在scala中不能实现RichWindowFunction,因此我在main中使用Flatmap间接实现了windowFunction中的功能:

val fromTransactionDataStream = watermarkTransaction
.keyBy(_.code)
.window(TumblingEventTimeWindows.of(Time.seconds(60)))


val onlyTransaction = fromTransactionDataStream
.apply(new StockTransactionApply)
.keyBy(_._3)
.flatMap(new TransactionStateFlatMapFunction)


引用

https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html

http://mail-archives.apache.org/mod_mbox/flink-user/201606.mbox/%3CCAGco–Y-UxKWoJwKDJQ5GQt_Mjr_V8VqOEeGdusc97PWmE0qyg@mail.gmail.com%3E
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Flink DataStream ListState