Apache Flink Client生成StreamGraph
2016-07-23 21:14
555 查看
概述
上文我们分析提交流程时,RemoteStreamEnvironment类的
execute方法的第一步就是生成
StreamGraph。
StreamGraph是用于表示流的拓扑结构的数据结构,它包含了生成
JobGraph的必要信息。它的类继承关系图如下:
如果你按照
StreamGraph的继承链向上追溯,最终会发现它实现了接口
FlinkPlan。Flink在这里效仿的是数据库的执行SQL是产生执行计划的机制,
FlinkPlan定义在Flink的优化器相关的包中,针对流应用的计划是
StreamingPlan。
针对Batch类的应用的计划类是OptimizedPlan。Flink会对Batch类的应用进行优化(这点我们后面会分析),而当前针对Streaming类的应用没有优化措施。
StreamGraph的形象化表示如下图:
Flink官方提供了一个计划可视化器来图形化执行计划
节点和边
上面的图是由“节点”和“边”组成的。节点在Flink中对应的数据结构是StreamNode,而边在Flink中对应的数据结构是
StreamEdge。
StreamNode和
StreamEdge之间存在着组合的依赖关系,依赖关系可见下图:
StreamEdge包含了其连接的源节点
sourceVertex和目的节点
targetVertex,而
StreamNode中包含了与其连接的入边集合
inEdges和出边集合
outEdges。
StreamEdge和
StreamNode都有唯一的编号进行标识,但是各自编号的生成规则并不相同。
StreamNode的编号
id的生成是通过调用
StreamTransformation的静态方法
getNewNodeId获得的,其实现是一个静态计数器:
// This is used to assign a unique ID to every StreamTransformation protected static Integer idCounter = 0; public static int getNewNodeId() { idCounter++; return idCounter; }
StreamEdge的编号
edgeId是字符串类型,其生成的规则为:
this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + selectedNames + "_" + outputPartitioner;
它是由多个段连接起来的,语义的文字表述如下:
源顶点_目的顶点_输入类型数量_输出选择器的名称_输出分区器
edgeId除了用来实现StreamEdge的hashCode及equals方法之外并没有其他实际意义。
StreamNode其实是表示operator的数据结构,了解这一点很重要。从Flink开始生成StreamGraph开始,source、sink都是图中的一个节点都是operator,都通过StreamNode这一数据结构来表示,我们常将它们单独拎出来讲是因为它们是流的的输入和输出,但在数据结构层面上它们是一致的。
StreamNode除了存储了输入端和输出端的
StreamEdge集合,还封装了
operator的其他关键属性,基于这不是我们关注的重点,所以不再赘述。
回过头来我们看
JobGraph就不是那么难理解了。它包含了表述整个流拓扑的所有必要信息(比如所有的节点集合、所有的
source集合、所有的
sink集合、虚拟输出选择节点、虚拟分区节点)。同时还包含了大量操作这些信息的方法。
生成StreamGraph
了解了基础的数据结构之后,我们来分析如何生成JobGraph。定位到
getStreamGraph的实现:
public StreamGraph getStreamGraph() { if (transformations.size() <= 0) { throw new IllegalStateException("No operators defined in streaming topology. Cannot execute."); } return StreamGraphGenerator.generate(this, transformations); }
它依赖于
transformations集合,该集合中存储着一个
Streaming程序中所有的转换操作对应的
StreamTransformation对象。
每当在
DataStream对象上调用
transform方法或者调用已经被实现了的一些转换操作(如map、flter等,这些转换操作在内部也调用了
transform方法),这些调用都会被加入到
transformations集合中。
StreamTransformation表示创建DataStream的操作,其实每个DataStream底层都对应着一个StreamTransformation。DataStream持有执行环境对象的引用,当调用transform方法时,它会调用执行环境对象的addOperator方法,将特定的StreamTransformation对象加入到transformations集合中去,这就是transformations集合中元素的来源。
到目前为止我们提到了多个名词,它们之前拥有着强依赖关系,为了避免混淆,我们以flatMap转换操作为例图示各种对象之间的构建关系:
在源码中,其实Flink自身的命名也并不是那么准确,比如上图中的SingleOutputStreamOperator其实是一种DataStream,但却以Operator结尾,让人匪夷所思。这种情况下,鉴定它们类型的方式可以通过查看它们的继承链来进行识别。
StreamGraph的生成依赖于生成器
StreamGraphGenerator,每调用一次静态方法
generate才会在内部创建一个
StreamGraphGenerator的实例,一个实例对应着一个
StreamGraph对象。
StreamGraphGenerator调用内部的实例方法
generateInternal来遍历
transformations集合的每个对象:
private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) { for (StreamTransformation<?> transformation: transformations) { transform(transformation); } return streamGraph; }
在
transform方法中,它枚举了Flink中每一种转换类型,并对当前传入的转换类型进行判断,然后将其分发给特定的转换方法进行转换,最终返回当前
StreamGraph对象中跟该转换有关的节点编号集合。
你可以将整个过程看作是玩拼图游戏,每遍历完一个转换对象,就离构建完整的
StreamGraph更近一步。所有类型各异的转换操作各自持有整个
StreamGraph的一部分小图片,根据不同的转换操作类型,它们为
StreamGraph提供的“部件”并不完全相同,有的转换只构建节点(如
SourceTransformation),有的转换除了构建节点还构建边(如
SinkTransformation),有的只构建虚拟节点(如
PartitionTransformation、
SplitTransformation、
SelectTransformation)。
关于虚拟节点,这里需要说明的是并非所有转换操作都具有实际的物理意义(即物理上对应
operator)。有些转换操作只具有逻辑概念,例如
union,
split,
select,
partition。这些转换操作不会构建真实的
StreamNode对象。比如某个流处理应用对应的转换树如下图:
但在运行时,其生成的执行计划,这里也就等同于
StreamGraph却是下图这种形式:
从图中可以看到,转换图中对应的一些逻辑操作在产生的执行计划时并不存在,Flink将这些逻辑转换操作转换成了虚拟节点,它们的信息会被绑定到从
source到
map转换的这条边上。
在给
StreamGraph创建并添加一个
operator时,需要给该
operator指定
slotSharingGroup,这时需要调用方法
determineSlotSharingGroup来获得SlotSharingGroup的名称:
private String determineSlotSharingGroup(String specifiedGroup, Collection<Integer> inputIds) { if (specifiedGroup != null) { return specifiedGroup; } else { String inputGroup = null; for (int id: inputIds) { String inputGroupCandidate = streamGraph.getSlotSharingGroup(id); if (inputGroup == null) { inputGroup = inputGroupCandidate; } else if (!inputGroup.equals(inputGroupCandidate)) { return "default"; } } return inputGroup == null ? "default" : inputGroup; } }
当用户指定了组名,则直接使用用户指定的名称。如果用户没有指定特定的名称,则需要结合输入节点来做决定:第一种情况如果所有的输入节点都拥有相同的
slotSharingGroup名称,那么就使用该组名;否则组名将被命名为
default。
Flink当前对于流处理的应用是不作优化的,所以其执行计划就是
StreamGraph。Flink提供了一个执行计划的可视化器,它将客户端生成的执行计划以图形的方式展示出来,就像本节开始我们展示的那幅图就是可视化器生成的。那么我们怎么来查看我们自己编写的程序的执行计划呢?其实很简单,我们以Flink的flink-examples-streaming包中的
SocketTextStreamWordCount为例,来看一下如何生成执行计划。
我们将
SocketTextStreamWordCount最后一行代码注释掉:
env.execute("WordCount from SocketTextStream Example");
然后将其替换成下面这句:
System.out.println(env.getExecutionPlan());
这行语句的作用是打印当前这个程序的执行计划,它将在控制台产生该执行计划的JSON格式表示:
{"nodes":[{"id":1,"type":"Source: Socket Stream","pact":"Data Source","contents":"Source: Socket Stream", "parallelism":1},{"id":2,"type":"Flat Map","pact":"Operator","contents":"Flat Map","parallelism":2, "predecessors":[{"id":1,"ship_strategy":"REBALANCE","side":"second"}]},{"id":4,"type":"Keyed Aggregation", "pact":"Operator","contents":"Keyed Aggregation","parallelism":2,"predecessors":[{"id":2, "ship_strategy":"HASH","side":"second"}]},{"id":5,"type":"Sink: Unnamed","pact":"Data Sink", "contents":"Sink: Unnamed","parallelism":2,"predecessors":[{"id":4,"ship_strategy":"FORWARD", "side":"second"}]}]}System.out.println(env.getExecutionPlan());
把上面这段JSON复制到Flink的执行计划可视化器,点击下方的
Draw按钮,即可生成。
小结
本文我们谈论了StreamGraph的数据结构以及
StreamGraphGenerator如何生成
StreamGraph。鉴于
StreamEdge和
StreamNode是组成
StreamGraph不可或缺的部分,我们还对这两个数据结构进行了简单的分析。当然,
StreamGraph还有一个关键的实例方法:
getJobGraph,它用于获取流处理程序的
JobGraph(该方法继承自
StreamingPlan)。至于什么是
JobGraph以及如何获取它,我们将在下文进行讨论。
微信扫码关注公众号:Apache_Flink
QQ扫码关注QQ群:Apache Flink学习交流群(123414680)
相关文章推荐
- Apache Parquet 与Apache ORC简介
- Spring之整合Apache CXF框架实现WebServices远程调用
- apache开启虚拟主机VHOST后 localhost无法访问,出现Forbidden 403
- 关于org.apache.hadoop.hbase.DoNotRetryIOException: java.lang.IllegalAccessError
- Apache服务器下载、安装、配置
- 向HDFS上传文件时报错16/07/23 01:13:30 WARN hdfs.DFSClient: DataStreamer Exception: org.apache.hadoop.ipc.Rem
- windows 下wamp环境1 配置之apache的安装
- 搭建简单的Web服务器
- Web服务器 Apache URL重写
- Apache Maven 入门篇(下)
- Apache Maven 入门篇 ( 上 )
- 解决编译apache出现的问题:configure: error: APR not found . Please read the documentation
- Apache FileUpload详细介绍
- Apache FileUpload详细介绍
- Web---文件上传-用apache的工具处理、打散目录、简单文件上传进度
- Web---文件上传-用apache的工具处理、打散目录、简单文件上传进度
- ubuntu apache 启动报错
- apache ab压力测试报错
- javaEE:day7-上传文件(Apache包)、目录打散、文件上传进度条、纯前台进度条
- Window 7安装和配置Apache2.4服务器(by 星空武哥)