Apache Flink源码解析之stream-transformation
2016-05-14 20:33
1156 查看
之前我们聊了Flink程序的
当然这还是从Flink编程API的角度来看的(编程视角)。所谓的
一些API操作,比如
属性如下:
name : 转换器的名称,这个主要用于可视化的目的
uid : 用户指定的uid,该uid的主要目的是用于在job重启时可以再次分配跟之前相同的uid,应该是用于持久保存状态的目的。
bufferTimeout :
parallelism : 并行度
id : 跟属性
outputType : 输出类型
slotSharingGroup : 给当前的
其中,
name
outputType
parallelism
核心的抽象方法:
setChainingStrategy : 设置
getTransitivePredecessors :返回中间过渡阶段的前置
绝大部分
如果没有特别说明,
根据实现,我们可以将它们分成两类:
I :输入输出相关,需要自行定义
属于该分类的有:
II :内置函数,
属于该分类的有:
除了
值得一提的是,其
因为其没有前置转换器,所以其返回只存储自身实例的集合对象。
该类有两个特别的属性:
stateKeySelector
stateKeyType
这两个属性的目的是因为sink的状态也可能是基于key分区的。
构造该转换器,同样也是依赖于其输入转换器(
构造
反馈转换器的固定名称为
input : 上游输入
waitTime :
实例化
某种程度上,你可以将其类比于pub-sub机制
上游的
它绝大部分实现跟
微信扫码关注公众号:Apache_Flink
source、
sink就差
transformation了。今天我们就来解读一下Flink的
transformation。它们三者的关系如下图:
当然这还是从Flink编程API的角度来看的(编程视角)。所谓的
transformation,用于转换一个或多个
DataStream从而形成一个新的
DataStream对象。Flink提供编程接口,允许你组合这些
transformation从而形成非常灵活的拓扑结构。
StreamTransformation
StreamTransformation是所有
transformation的抽象类,提供了实现
transformation的基础功能。每一个
DataStream都有一个与之对应的
StreamTransformation。
一些API操作,比如
DataStream#map,将会在底层创建一个
StreamTransformation树,而在程序的运行时,该拓扑结构会被翻译为
StreamGraph。
StreamTransformation无关运行时的执行,它只是逻辑上的概念。
属性如下:
name : 转换器的名称,这个主要用于可视化的目的
uid : 用户指定的uid,该uid的主要目的是用于在job重启时可以再次分配跟之前相同的uid,应该是用于持久保存状态的目的。
bufferTimeout :
buffer超时时间
parallelism : 并行度
id : 跟属性
uid无关,它的生成方式是基于一个静态累加器
outputType : 输出类型
slotSharingGroup : 给当前的
transformation设置slot共享组。
slot sharing group用于将并行执行的
operator“归拢”到相同的
TaskManager slot中(
slot概念基于资源的划分,因此这里的目的是让不同的
subtask共享
slot资源)
其中,
StreamTransformation构造器需要的参数是:
name
outputType
parallelism
核心的抽象方法:
setChainingStrategy : 设置
chaining策略
getTransitivePredecessors :返回中间过渡阶段的前置
StreamTransformation集合,该方法的可能的应用场景是用来决定在迭代中的
feedback edge(反馈边)最终是有前置
StreamTransformation。
内置的StreamTransformation
因为就一层继承关系的树形结构,所以这里类之间的关系图就不再暂时了绝大部分
StreamTransformation都需要依赖上游
StreamTransformation作为输入
SourceTransformation等少数特例除外;
如果没有特别说明,
getTransitivePredecessors的实现逻辑都是,由自身加input(上游
StreamTransformation)组成的集合。
根据实现,我们可以将它们分成两类:
I :输入输出相关,需要自行定义
name,都需要与之对应的
operator,
setChainingStrategy的实现都返回
operator#setChainingStrategy
属于该分类的有:
SourceTransformation SinkTransformation OneInputTransformation TwoInputTransformation
II :内置函数,
name内部固定,无法更改,无需
operator,
setChainingStrategy的实现都只是抛出
UnsupportedOperationException异常
属于该分类的有:
除了上面那些,其他所有的transformation
SourceTransformation
它表示一个sorce,它并不真正做转换工作,因为它没有输入,但它是任何拓扑的根StreamTransformation。
除了
StreamTransformation构造器需要的那三个参数,
SourceTransformation还需要
StreamSource类型的参数,它是真正执行转换的
operator。
值得一提的是,其
getTransitivePredecessors抽象方法的实现:
public Collection<StreamTransformation<?>> getTransitivePredecessors() { return Collections.<StreamTransformation<?>>singleton(this); }
因为其没有前置转换器,所以其返回只存储自身实例的集合对象。
SinkTransformation
它表示一个sink,创建的时候构造器需要operator它是
StreamSink的实例,是最终做转换的
operator。
getTransitivePredecessors方法的实现是将自身以及
input#getTransitivePredecessors的返回值(之前的
StreamTransformation集合)集合
该类有两个特别的属性:
stateKeySelector
stateKeyType
这两个属性的目的是因为sink的状态也可能是基于key分区的。
OneInputTransformation
接受一种输入的StreamTransformation(换句话说,只接收一个输入流)。跟上面的
SinkTransformation构造器类似,需要
input和
operator两个参数(只不过这里的
operator类型是对应的
OneInputStreamOperator)。
TwoInputTransformation
表示接收两种输入的StreamTransformation(接收两种流作为输入)。其他的实现同
OneInputTransformation。
SplitTransformation
可将其看作分流转换器,该转换用于将一个流拆分成多个流(通过OutputSelector来达到这个目的),当然这个操作只是逻辑上的拆分(它只影响上游的流如何跟下游的流连接)。
构造该转换器,同样也是依赖于其输入转换器(
input)以及一个输出选择器(
outputSelector),但在实例化其父类(
StreamTransformation,没有提供自定义的名称,而是固定的常量值
Split)
SelectTransformation
该选择转换器用于从上游流中筛选出特定的元素。它在使用时,必须跟随在SplitTransformation之后(
SplitTransformation通过指定的名称将元素分配到多个逻辑流中)。
构造
SelectTransformation需要前一个转换器作为输入,以及上游用于分流的
SplitTransformation所使用的名称。跟
SplitTransformation类似,这里也无需提供自定义的转换器名称,而是固定的常量值
Select。
UnionTransformation
合并转换器,该转换器用于将多个输入StreamTransformation进行合并。因此该转换器接收
StreamTransformation的集合。其名称也在内部被固定为
Union。
PartitionTransformation
该转换器用于改变输入元素的分区,其名称为:Partition。因此,工作时除了提供一个
StreamTransformation作为输入,还需要提供一个
StreamPartitioner的实例来进行分区。
FeedbackTransformation
该转换器用于表示Flink DAG中的一个反馈点(feedback point)。所谓反馈点,可用于连接一个或者多个
StreamTransformation,这些
StreamTransformation被称为反馈边(
feedback edges)。处于反馈点下游的operation将可以从反馈点和反馈边获得元素输入。
反馈转换器的固定名称为
Feedback,它的实例化需要两个参数:
input : 上游输入
StreamTransformation
waitTime :
feedback operator的等待时间,一旦超过该等待时间,将关闭并不再接收任何反馈元素。
实例化
FeedbackTransformation时,会自动创建一个用于存储反馈边的集合
feedbackEdges。那么反馈边如何收集呢?
FeedbackTransformation通过定义一个实例方法:
addFeedbackEdge来进行收集,而这里所谓的“收集”就是将下游
StreamTransformation的实例加入
feedbackEdges集合中(这里可以理解为将两个点建立连接关系,也就形成了边)。不过,这里加入的
StreamTransformation的实例有一个要求:也就是当前
FeedbackTransformation的实例跟待加入
StreamTransformation实例的并行度一致。
某种程度上,你可以将其类比于pub-sub机制
CoFeedbackTransformation
某种程度上跟FeedbackTransformation类似。
feedback元素的类型不需要跟上游的
StreamTransformation元素的类型一致,因为
CoFeedbackTransformation之后只允许跟
TwoInputTransformations。上游的
StreamTransformation将会连接到
TwoInputTransformations第一个输入,而
feedback edge将会连接到其第二个输入。因此上游的
StreamTransformation其实是跟
CoFeedbackTransformation无关的,它跟
TwoInputTransformation有关。
上游的
StreamTransformation跟
CoFeedbackTransformation无关,从
CoFeedbackTransformation构造器需要的参数就可以看出来。通常,其他的
StreamTransformation的实现都需要传入上游的
StreamTransformation作为其输入。但
CoFeedbackTransformation却没有,它只需要上游的并行度:
parallelism。另外一个需要的参数是
feedbackType。
它绝大部分实现跟
FeedbackTransformation区别在于
getTransitivePredecessors方法的实现。我们之前谈及
getTransitivePredecessors主要的应用场景就是用于
feedback,而它又不像
FeedbackTransformation跟其上游输入有关,所以它只返回了只有当前实例的单元素集合。
小结
本文剖析了Flink中的StreamTransformation实现。当然还没有谈到这些
transformation之间是如何串联起来,实现非常灵活的拓扑。这是我们后面会谈论的内容。
微信扫码关注公众号:Apache_Flink
相关文章推荐
- Web性能压力测试工具之ApacheBench(ab)详解
- apache-hama集群配置
- mac上安装Apache服务器
- Apache安装
- LR监控Apache性能
- Apache 配置使用小结
- Apache下error.log文件太大的处理
- ClassNotFoundException: org.apache.ws.commons.schema.resolver.URIResolver
- Apache Spark 1.5新特性介绍
- 使用百度云加速防apache的ab测试
- 使用百度云加速防apache的ab测试
- 使用百度云加速防apache的ab测试
- windows下 php、apache 版本选择 解惑说明
- 阿里云ubuntu12.04环境下配置Apache+PHP+PHPmyadmin+MySQL
- 启动mac自带的apache服务器,并打开支持的php模块
- mac下重启apache服务
- Could not get JDBC Connection; nested exception is org.apache.commons.dbcp.SQLNestedException: Canno
- 小Tips—为Apache 2.x添加压缩功能
- apache.commons.lang 包 StringUtils 的 isBank() 方法认知
- mac开机启动原理--apache