您的位置:首页 > 运维架构 > Apache

Apache Flink源码解析之stream-transformation

2016-05-14 20:33 1156 查看
之前我们聊了Flink程序的
source
sink
就差
transformation
了。今天我们就来解读一下Flink的
transformation
。它们三者的关系如下图:



当然这还是从Flink编程API的角度来看的(编程视角)。所谓的
transformation
,用于转换一个或多个
DataStream
从而形成一个新的
DataStream
对象。Flink提供编程接口,允许你组合这些
transformation
从而形成非常灵活的拓扑结构。

StreamTransformation

StreamTransformation
是所有
transformation
的抽象类,提供了实现
transformation
的基础功能。每一个
DataStream
都有一个与之对应的
StreamTransformation


一些API操作,比如
DataStream#map
,将会在底层创建一个
StreamTransformation
树,而在程序的运行时,该拓扑结构会被翻译为
StreamGraph


StreamTransformation
无关运行时的执行,它只是逻辑上的概念。

属性如下:

name : 转换器的名称,这个主要用于可视化的目的

uid : 用户指定的uid,该uid的主要目的是用于在job重启时可以再次分配跟之前相同的uid,应该是用于持久保存状态的目的。

bufferTimeout :
buffer
超时时间

parallelism : 并行度

id : 跟属性
uid
无关,它的生成方式是基于一个静态累加器

outputType : 输出类型

slotSharingGroup : 给当前的
transformation
设置slot共享组。
slot sharing group
用于将并行执行的
operator
“归拢”到相同的
TaskManager slot
中(
slot
概念基于资源的划分,因此这里的目的是让不同的
subtask
共享
slot
资源)

其中,
StreamTransformation
构造器需要的参数是:

name

outputType

parallelism

核心的抽象方法:

setChainingStrategy : 设置
chaining
策略

getTransitivePredecessors :返回中间过渡阶段的前置
StreamTransformation
集合,该方法的可能的应用场景是用来决定在迭代中的
feedback edge
(反馈边)最终是有前置
StreamTransformation


内置的StreamTransformation

因为就一层继承关系的树形结构,所以这里类之间的关系图就不再暂时了

绝大部分
StreamTransformation
都需要依赖上游
StreamTransformation
作为输入
SourceTransformation
等少数特例除外;

如果没有特别说明,
getTransitivePredecessors
的实现逻辑都是,由自身加input(上游
StreamTransformation
)组成的集合。

根据实现,我们可以将它们分成两类:

I :输入输出相关,需要自行定义
name
,都需要与之对应的
operator
setChainingStrategy
的实现都返回
operator#setChainingStrategy


属于该分类的有:

SourceTransformation
SinkTransformation
OneInputTransformation
TwoInputTransformation


II :内置函数
name
内部固定,无法更改,无需
operator
setChainingStrategy
的实现都只是抛出
UnsupportedOperationException
异常

属于该分类的有:

除了上面那些,其他所有的transformation


SourceTransformation

它表示一个sorce,它并不真正做转换工作,因为它没有输入,但它是任何拓扑的根
StreamTransformation


除了
StreamTransformation
构造器需要的那三个参数,
SourceTransformation
还需要
StreamSource
类型的参数,它是真正执行转换的
operator


值得一提的是,其
getTransitivePredecessors
抽象方法的实现:

public Collection<StreamTransformation<?>> getTransitivePredecessors() {
return Collections.<StreamTransformation<?>>singleton(this);
}


因为其没有前置转换器,所以其返回只存储自身实例的集合对象。

SinkTransformation

它表示一个sink,创建的时候构造器需要
operator
它是
StreamSink
的实例,是最终做转换的
operator


getTransitivePredecessors
方法的实现是将自身以及
input#getTransitivePredecessors
的返回值(之前的
StreamTransformation
集合)集合

该类有两个特别的属性:

stateKeySelector

stateKeyType

这两个属性的目的是因为sink的状态也可能是基于key分区的。

OneInputTransformation

接受一种输入的
StreamTransformation
(换句话说,只接收一个输入流)。跟上面的
SinkTransformation
构造器类似,需要
input
operator
两个参数(只不过这里的
operator
类型是对应的
OneInputStreamOperator
)。

TwoInputTransformation

表示接收两种输入的
StreamTransformation
(接收两种流作为输入)。其他的实现同
OneInputTransformation


SplitTransformation

可将其看作分流转换器,该转换用于将一个流拆分成多个流(通过
OutputSelector
来达到这个目的),当然这个操作只是逻辑上的拆分(它只影响上游的流如何跟下游的流连接)。

构造该转换器,同样也是依赖于其输入转换器(
input
)以及一个输出选择器(
outputSelector
),但在实例化其父类(
StreamTransformation
,没有提供自定义的名称,而是固定的常量值
Split


SelectTransformation

该选择转换器用于从上游流中筛选出特定的元素。它在使用时,必须跟随在
SplitTransformation
之后(
SplitTransformation
通过指定的名称将元素分配到多个逻辑流中)。

构造
SelectTransformation
需要前一个转换器作为输入,以及上游用于分流的
SplitTransformation
所使用的名称。跟
SplitTransformation
类似,这里也无需提供自定义的转换器名称,而是固定的常量值
Select


UnionTransformation

合并转换器,该转换器用于将多个输入
StreamTransformation
进行合并。因此该转换器接收
StreamTransformation
的集合。其名称也在内部被固定为
Union


PartitionTransformation

该转换器用于改变输入元素的分区,其名称为:
Partition
。因此,工作时除了提供一个
StreamTransformation
作为输入,还需要提供一个
StreamPartitioner
的实例来进行分区。

FeedbackTransformation

该转换器用于表示Flink DAG中的一个反馈点
feedback point
)。所谓反馈点,可用于连接一个或者多个
StreamTransformation
,这些
StreamTransformation
被称为反馈边(
feedback edges
)。处于反馈点下游的operation将可以从反馈点和反馈边获得元素输入。

反馈转换器的固定名称为
Feedback
,它的实例化需要两个参数:

input : 上游输入
StreamTransformation


waitTime :
feedback operator
的等待时间,一旦超过该等待时间,将关闭并不再接收任何反馈元素。

实例化
FeedbackTransformation
时,会自动创建一个用于存储反馈边的集合
feedbackEdges
。那么反馈边如何收集呢?
FeedbackTransformation
通过定义一个实例方法:
addFeedbackEdge
来进行收集,而这里所谓的“收集”就是将下游
StreamTransformation
的实例加入
feedbackEdges
集合中(这里可以理解为将两个点建立连接关系,也就形成了边)。不过,这里加入的
StreamTransformation
的实例有一个要求:也就是当前
FeedbackTransformation
的实例跟待加入
StreamTransformation
实例的并行度一致

某种程度上,你可以将其类比于pub-sub机制

CoFeedbackTransformation

某种程度上跟
FeedbackTransformation
类似。
feedback
元素的类型不需要跟上游的
StreamTransformation
元素的类型一致,因为
CoFeedbackTransformation
之后只允许跟
TwoInputTransformations
。上游的
StreamTransformation
将会连接到
TwoInputTransformations
第一个输入,而
feedback edge
将会连接到其第二个输入。因此上游的
StreamTransformation
其实是跟
CoFeedbackTransformation
无关的,它跟
TwoInputTransformation
有关。

上游的
StreamTransformation
CoFeedbackTransformation
无关,从
CoFeedbackTransformation
构造器需要的参数就可以看出来。通常,其他的
StreamTransformation
的实现都需要传入上游的
StreamTransformation
作为其输入。但
CoFeedbackTransformation
却没有,它只需要上游的并行度:
parallelism
。另外一个需要的参数是
feedbackType


它绝大部分实现跟
FeedbackTransformation
区别在于
getTransitivePredecessors
方法的实现。我们之前谈及
getTransitivePredecessors
主要的应用场景就是用于
feedback
,而它又不像
FeedbackTransformation
跟其上游输入有关,所以它只返回了只有当前实例的单元素集合。

小结

本文剖析了Flink中的
StreamTransformation
实现。当然还没有谈到这些
transformation
之间是如何串联起来,实现非常灵活的拓扑。这是我们后面会谈论的内容。

微信扫码关注公众号:Apache_Flink

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: