Apache Flink源码解析之stream-window
2016-05-10 23:05
886 查看
window(窗口)是Flink流处理中非常重要的概念,本篇我们来对窗口相关的概念以及关联的实现进行解析。本篇的内容主要集中在package
Flink的根窗口对象是一个抽象类,只提供了一个抽象方法:
用于获取最大的时间戳。Flink提供了两个窗口的具体实现。在实现
该类内部有一个静态类定义了
start : 时间间隔的起始
end : 时间间隔的截止
其equals的实现中,除了常规比较(比较引用,比较Class的实例),还会比较
assignWindows :将某个带有时间戳
getDefaultTrigger :返回跟
getWindowSerializer :返回
下面会谈到很多基于时间的窗口,这里有两个概念,分别是
时间类型:
eventTime :用户赋予的自定义的时间戳(事件时间戳)
processingTime : 执行当前task的subtask主机的本地时间戳(系统时间戳)
窗口类型:
Sliding:滑动窗口,可能会重叠(某个元素可能会身处多个窗口中)
Tumbling:非重叠窗口(在
方法实现:
assignWindows :方法的实现即返回存放
getDefaultTrigger :的实现是返回一个不做任何动作的
在Sliding窗口中,
它剔除元素的时机是:在触发器触发之后,在窗口被处理(apply windowFunction)之前
该接口只定义了一个方法:
接口的返回值即表示要剔除元素的个数。
TimeEvitor
CountEvitor
DeltaEvitor
大致的逻辑是,先取出最后一个元素的时间戳作为“当前”时间,然后减去期望中的“窗口大小”,得到一个基准时间戳(只需要比基准时间戳大的元素)。
然后从第一个元素开始循环比较每一个元素,如果比基准时间戳小,则累加剔除统计数,一旦发现某个元素的时间戳大于基准时间戳,则直接跳出循环,不再累加了(因为本地窗口中元素是基于时间有序的,这一点由Flink运行时来保证,如果从某个元素开始其时间戳大于基准时间戳,则后续的所有元素都满足这一条件,因此也就没必要再循环下去了)。
size : 时间间隔的大小(数值)
unit :
该类提供的很多静态方法提供对不同unit的设置。
以粗粒度来看,Flink主要提供了三种形式的触发方式:
按元素
按系统时间
按事件时间
这体现为Trigger的三个主要的抽象方法:
onElement :针对每个元素触发,这主要针对于那些基于元素的触发器,比如后面我们将看到的
onProcessingTime :被processing-time(Flink系统时间时间戳)定时器触发
onEventTime :被event-time(事件时间戳)定时器触发
以上这些方法中都有一个共同的参数:
getCurrentWatermark :返回当前的
registerProcessingTimeTimer :注册一个系统时间的定时器,触发
registerEventTimeTimer :注册一个事件时间的定时器,触发
deleteProcessingTimeTimer :删除系统时间的定时器
deleteEventTimeTimer :删除事件时间的定时器
getPartitionedState :用于失败恢复的获取状态的接口
其中,registerXXX/deleteXXX模式对主要针对上面两种基于时间的触发器。而最后一个方法
FIRE :window将会被应用window Function进行计算,然后将结果emit出去,但元素并没有被清洗,仍然在window中
PURGE :清除window中的元素
CONTINUE :不做任何操作
这些触发器都具有一些共性,这里一并说明:
由于Flink在
因为有不少触发类型依赖于上下文的某些状态值(比如下文典型的ContinuousXXXTrigger),这些状态值将通过
持续的触发依赖于在
微信扫码关注公众号:Apache_Flink
org.apache.flink.streaming.api.windowing下。
Window
一个Window代表有限对象的集合。一个窗口有一个最大的时间戳,该时间戳意味着在其代表的某时间点——所有应该进入这个窗口的元素都已经到达。
Flink的根窗口对象是一个抽象类,只提供了一个抽象方法:
public abstract long maxTimestamp();
用于获取最大的时间戳。Flink提供了两个窗口的具体实现。在实现
Window时,子类应该override
equals和
hashCode这两个方法,以使得在逻辑上两个相等的window被认为是同一个。
GlobalWindow
GlobalWindow是一个全局窗口,被实现为单例模式。其
maxTimestamp被设置为
Long.MAX_VALUE。
该类内部有一个静态类定义了
GlobalWindow的序列化器:
Serializer。
TimeWindow
TimeWindow表示一个时间间隔窗口,这体现在其构造器需要注入的两个属性:
start : 时间间隔的起始
end : 时间间隔的截止
TimeWindow表示的时间间隔为[start, end)。其
maxTimestamp的实现为:
public long maxTimestamp() { return end - 1; }
其equals的实现中,除了常规比较(比较引用,比较Class的实例),还会比较
start,
end这两个属性。
TimeWindow也在内部实现了序列化器,该序列化器主要针对
start和
end两个属性。
WindowAssigner
元素的窗口分配器。用于将元素分配给一个或者多个窗口。该抽象类定义了三个抽象方法:assignWindows :将某个带有时间戳
timestamp的元素
element分配给一个或多个窗口,并返回窗口集合
getDefaultTrigger :返回跟
WindowAssigner关联的默认触发器
getWindowSerializer :返回
WindowAssigner分配的窗口的序列化器
内置的WindowAssigner
整个类型继承图如下:下面会谈到很多基于时间的窗口,这里有两个概念,分别是
时间类型和
窗口类型:
时间类型:
eventTime :用户赋予的自定义的时间戳(事件时间戳)
processingTime : 执行当前task的subtask主机的本地时间戳(系统时间戳)
窗口类型:
Sliding:滑动窗口,可能会重叠(某个元素可能会身处多个窗口中)
Tumbling:非重叠窗口(在
assignWindows方法中返回的一般都是
Collections.singletonList())
GlobalWindows
该分配器对应于窗口GlobalWindow,它将所有的元素分配给同一个
GlobalWindow(本质上而言,
GlobalWindow也只有一个实例)。跟
GlobalWindow的实现方式一样,
GlobalWindows也被实现为单例模式。
方法实现:
assignWindows :方法的实现即返回存放
GlobalWindow单实例的集合对象
getDefaultTrigger :的实现是返回一个不做任何动作的
NerverTrigger
TumblingEventTimeWindows
依据给定的窗口大小,结合event-time,返回存储TimeWindow单实例的集合。
getDefaultTrigger方法返回
EventTimeTrigger类型的实例。
TumblingProcessingTimeWindows
依据给定窗口的大小,结合processing-time,返回存储TimeWindow单实例的集合。需要注意的是,这里依据的是运行当前任务所在主机的本地时间戳。
getDefaultTrigger方法返回的是
ProcessingTimeTrigger类型的实例。
SlidingProcessingTimeWindows
Sliding窗口不同于Tumbling窗口,它除了指定窗口的大小,还要指定一个滑动值,即
slide。所谓的滑动窗口可以这么理解,比如:一分钟里每十秒钟。这里一分钟是窗口大小,每十秒即为滑动值。
在Sliding窗口中,
assignWindows方法返回的就不再是单个窗口了,而是窗口的集合。首先计算出窗口的个数:
size/slide,然后循环初始化给定的
size内不同
slide的窗口对象。
SlidingEventTimeWindows
类似SlidingProcessingTimeWindows只不过窗口的
start参数的计算方式依赖于系统时间戳。
Evictor
evitor : 中文译为驱逐者;顾名思义其用于剔除窗口中的某些元素它剔除元素的时机是:在触发器触发之后,在窗口被处理(apply windowFunction)之前
该接口只定义了一个方法:
int evict(Iterable<StreamRecord<T>> elements, int size, W window);
接口的返回值即表示要剔除元素的个数。
内置的Evitor
Flink内置实现了三个Evitor:
TimeEvitor
CountEvitor
DeltaEvitor
TimeEvitor
这个Evitor基于给定的保留时间(keep time)作为剔除规则,大致的实现如下:public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) { int toEvict = 0; long currentTime = Iterables.getLast(elements).getTimestamp(); long evictCutoff = currentTime - windowSize; for (StreamRecord<Object> record: elements) { if (record.getTimestamp() > evictCutoff) { break; } toEvict++; } return toEvict; }
大致的逻辑是,先取出最后一个元素的时间戳作为“当前”时间,然后减去期望中的“窗口大小”,得到一个基准时间戳(只需要比基准时间戳大的元素)。
然后从第一个元素开始循环比较每一个元素,如果比基准时间戳小,则累加剔除统计数,一旦发现某个元素的时间戳大于基准时间戳,则直接跳出循环,不再累加了(因为本地窗口中元素是基于时间有序的,这一点由Flink运行时来保证,如果从某个元素开始其时间戳大于基准时间戳,则后续的所有元素都满足这一条件,因此也就没必要再循环下去了)。
CountEvictor
基于容量的Evictor,它通过比对evict方法的第二个参数
size来判断应该剔除多少个元素。具体的实现:
public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) { if (size > maxCount) { return (int) (size - maxCount); } else { return 0; } }
DeltaEvictor
基于给定的阈值threshold和
deltaFunction来进行判断。也是拿当前元素跟最后一个元素一起计算delta跟阈值做对比。
Time
Flink中仅有一个类Time来定义窗口的时间间隔。该时间默认指执行环境下的时间。创建一个
Time对象,需要两个参数:
size : 时间间隔的大小(数值)
unit :
TimeUnit的实例,表示时间间隔的单位
该类提供的很多静态方法提供对不同unit的设置。
Trigger
Trigger(触发器)用于决定某个窗口的元素集合什么时候触发计算以及结果什么时候被emit。以粗粒度来看,Flink主要提供了三种形式的触发方式:
按元素
按系统时间
按事件时间
这体现为Trigger的三个主要的抽象方法:
onElement :针对每个元素触发,这主要针对于那些基于元素的触发器,比如后面我们将看到的
CountTrigger
onProcessingTime :被processing-time(Flink系统时间时间戳)定时器触发
onEventTime :被event-time(事件时间戳)定时器触发
以上这些方法中都有一个共同的参数:
TriggerContext。
TriggerContext
顾名思义,它提供触发器执行时的上下文信息,但它只是Trigger的内部接口:
getCurrentWatermark :返回当前的
watermark
registerProcessingTimeTimer :注册一个系统时间的定时器,触发
onProcessingTime
registerEventTimeTimer :注册一个事件时间的定时器,触发
onEventTime
deleteProcessingTimeTimer :删除系统时间的定时器
deleteEventTimeTimer :删除事件时间的定时器
getPartitionedState :用于失败恢复的获取状态的接口
其中,registerXXX/deleteXXX模式对主要针对上面两种基于时间的触发器。而最后一个方法
getKeyValueState也是非常重要的,因为它用于获取窗口相关的状态,比如后面谈到的一些触发器是依赖于一些上下文状态的,那些状态的获取就是依靠这个方法。
TrigerResult
Trigger中定义的三个触发方法被调用后,最终要返回一个结果以决定触发之后产生的行为(比如是调用window function还是将窗口丢弃),这个定义触发器触发结果行为是通过
TriggerResult来表达的。它是一个枚举类型,有这么几个枚举值:
FIRE :window将会被应用window Function进行计算,然后将结果emit出去,但元素并没有被清洗,仍然在window中
PURGE :清除window中的元素
FIRE_AND_PURGE:同时具备
FIRE和
PURGE两种属性产生的行为
CONTINUE :不做任何操作
内置的Trigger
Flink内置实现了很多触发器,完整的类图如下:这些触发器都具有一些共性,这里一并说明:
由于Flink在
Trigger中已事先将各种触发器类型的回调封装为不同的方法(onXXX),所以后续各种不同的触发器类型的核心逻辑将主要在其特定相关的onXXX方法中,而无关的onXXX方法将直接返回
TriggerResult.CONTINUE(其实个人认为这种设计方式有欠妥当,因为不利于扩展)
因为有不少触发类型依赖于上下文的某些状态值(比如下文典型的ContinuousXXXTrigger),这些状态值将通过
TriggerContext的
getPartitionedState方法进行存取
EventTimeTrigger
基于事件时间的触发器,对应onEventTime
ProcessingTimeTrigger
基于当前系统时间的触发器,对应onProcessingTime
ContinuousEventTimeTrigger
该触发器是基于事件时间的按照指定时间间隔持续触发的触发器,它的首次触发取决于Watermark。首次触发的判断位于
onElement中,它注册下一次(也是首次)触发eventTime 定时器的时机,然后将其
first状态标识为false。具体实现如下:
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception { ValueState<Boolean> first = ctx.getPartitionedState(stateDesc); if (first.value()) { long start = timestamp - (timestamp % interval); long nextFireTimestamp = start + interval; ctx.registerEventTimeTimer(nextFireTimestamp); first.update(false); return TriggerResult.CONTINUE; } return TriggerResult.CONTINUE; }
持续的触发依赖于在
onEventTime中不断注册下一次触发的定时器:
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) { ctx.registerEventTimeTimer(time + interval); return TriggerResult.FIRE; }
ContinuousProcessingTimeTrigger
基于系统时间的按照指定时间间隔持续触发的触发器,它也是基于保存的状态值fire-timestamp来判断是否需要触发,不过它的循环注册过程是在
onElement中。
CountTrigger
基于一个给定的累加值触发,由于累加值不是基于时间而是基于元素的,所有其触发机制实现在onElement中,逻辑很简单,先累加如果大于给定的阈值则触发:
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws IOException { ValueState<Long> count = ctx.getPartitionedState(stateDesc); long currentCount = count.value() + 1; count.update(currentCount); if (currentCount >= maxCount) { count.update(0L); return TriggerResult.FIRE; } return TriggerResult.CONTINUE; }
PurgingTrigger
该触发器类似于一个包装器,用于将任何给定的触发器转变成purging触发器。它的实现机制是,它接收一个trigger实例,然后在各个onXXX回调上执行该实例的相应的onXXX并获得TriggerResult的实例,进行相应的判断,最后返回
FIRE_AND_PURGE枚举值。
DeltaTrigger
基于DeltaFunction和一个给定的阈值触发,该触发器在最后一个到达元素和当前元素之间计算一个delta值跟给定的阈值比较,如果高于给定的阈值,则触发。因为是基于元素的,所以主要逻辑实现在
onElement中。
小结
本篇还是侧重于分析跟窗口有关的概念,就目前来看它们并没有太多的关联性,这一点我们在后续会剖析它们如何关联起来实现完整的窗口机制的。微信扫码关注公众号:Apache_Flink
相关文章推荐
- apachebench的简单使用1
- Apache Spark探秘:多进程模型还是多线程模型?
- apache中禁止一般用户访问后台特定目录
- 【常用类库之一—org.apache.commons.beanUtils】
- Ubuntu下配置LAMP环境
- Shiro学习总结(3)——Apache Shiro身份认证
- Shiro学习总结(3)——Apache Shiro身份认证
- Shiro学习总结(1)——Apache Shiro简介
- Shiro学习总结(1)——Apache Shiro简介
- 记一次 win2008R2 APACHE+PHP环境的搭建
- 4000 Mac自带Apache搭建PHP开发环境(一)phpinfo
- Apache提示You don't have permission to access / on this server问题解决
- 通哥运维笔记之apache开机启动脚本
- myeclipse中Could not find the main class: org.apache.catalina.startup.Boostrap. Program will exit
- apache-common-pool
- Apache commons类库阅读笔记
- Cause: org.apache.ibatis.reflection.ReflectionException: Could not set property 'orderdetails' of 'class com.luchao.mybatis.first.po.Orders' with value 'Orderdetail [id=null, ordersId=3, itemsId=1, it
- apache设置默认虚拟主机
- Apache Thrift - 可伸缩的跨语言服务开发框架
- CentOS 6.5环境下使用HAProxy+apache实现web服务的动静分离