Flink应用程序结构开发介绍
Flink程序遵循一定的编程模式。DataStream API 和 DataSet API 基本具有相同的程序结构。以下为一个流式程序的示例代码来对文本文件进行词频统计。
package com.realtime.flink.streaming import org.apache.flink.apijava.utils.ParameterTool import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _} object WordCount { def main(args: Array[String]) { //第一步:设定执行环境 val env = SreamExecutionEnvironment.getExecutionEnvironment //第二步:指定数据源地址,开始读取数据 val text = env.readTextFile("file:///path/file") //第三步:对数据集指定转换操作逻辑 val counts : DataStream[(String, int)] = text .flatMap(_.toLowerCase.split(" ")) .fliter(_.nonEmpty) .map(_, 1) .sum(1) //第四步:指定计算结果输出位置 if (params.has("output")) { counts.writeAsText(params.get("output")) } else { println("Printing resule to stdout. Use --output to specify output path.") counts.print() } //第五步:指定名称并触发流式任务 env.execute("Streaming WordCount") } }
整个Flink 程序一共分为5步:
1. Flink执行环境
不同的执行环境决定了应用的类型:
StreamExecutionEnvironmen用来流式处理,ExecutionEnvironment是批量数据处理环境.
获取环境的三种方式:
流处理:
//设定Flink运行环境,如果在本地启动则创建本地环境,如果在集群启动就创建集群环境 StreamExecutionEnvironment.getExecutionEnvironment //指定并行度创建本地执行环境 StreamExecutionEnvironment.createLocalEnvironment(5) //指定远程JobManager ip和RPC 端口以及运行程序所在的jar包和及其依赖包 StreamExecutionEnvironment.createRemoteEnvironment("JobManagerHost", 6021, 5, "/user/application.jar")
第三种方式直接从本地代码创建与远程集群的JobManager的RPC连接,指定jar将运行程序远程拷贝到JobManager节点上,Flink应用程序运行在远程的环境中,本地程序相当于一个客户端.
批处理:
//设定Flink运行环境,如果在本地启动则创建本地环境,如果在集群启动就创建集群环境 ExecutionEnvironment.getExecutionEnvironment //指定并行度创建本地执行环境 ExecutionEnvironment.createLocalEnvironment(5) //指定远程JobManager ip和RPC 端口以及运行程序所在的jar包和及其依赖包 ExecutionEnvironment.createRemoteEnvironment("JobManagerHost", 6021, 5, "/user/application.jar")
注意不同的语言开发Flink应用的时候需要引入不同环境对应的执行环境
2. 初始化数据
创建完执行环境, ExecutionEnvironment 需要提供不同的数据接入接口完成数据初始化,将外部数据转换成DataStream 或DataSet数据集.
Flink提供了多种从外部读取数据的连接器,包括批量和实时的数据连接器,能够将Flink系统与其他第三方系统进行连接,直接获取外部数据
以下代码通过readTextFile()方法读取flle://pathfile路径中的数据并转换成DataStream数据集.
val text: DataStream[String] = env.readTextFlie("flle://pathfile")
读取文件转换为DataStream[String]数据集,完成了从本地文件到分布式数据集的转换
3. 执行转换操作
对数据集的各种Transformation操作通过不同的Operator来实现,每个Operator来实现,每个Operator内部通过实现Function接口完成数据处理逻辑的定义.
DataStream API 和 DataSet API 提供了很多转换算子, 如: map, flatMap, filter, keyBy, 用户只需要定义每个算子执行的函数逻辑,然后应用在数据转换操作Operator 接口即可.
val counts: DataStream[String, Int] = text .flatMap(_.toLowerCase.split(" ")) //执行flatMap操作 .filter(_.nonEmpty) //过滤空字段 .map((_, 1) //执行map转换操作,转换成key - value 接口 .keyBy(0) // 按照指定key对数据重分区 .sum(1) /执行求和运算操作
flink 定义Function的计算逻辑可以通过以下几种方式完成定义:
1. 通过创建Class 实现Function接口
//实现MapFunction接口 class MyMapFunction extends MapFunction[String, String] { override def map(t: String): String { t.toUpperCase() } } val dataStream: DataStream[String] = env.fromElements("hello", flink) //将MyMapFunction实现类传入进去 dataStream.map(new MyMapFunction)
完成对实现将数据集中的字符串转换成大写的数据处理
2. 通过创建匿名类实现Function接口
val dataStream: DataStream[String] = env.fromElements("hello", flink) //通过创建MapFunction匿名实现类来定义map函数的计算逻辑 dataStream.map(new MapFunction[String, String] { //实现对输入字符串大写转换 override def map(t: String): String{ t.toUpperCase() } })
3. 通过实现RichFunction接口
Flink提供了RichFunction接口,用于比较高级的数据处理场景,RichFunction接口中有open、close、getRuntimeContext 以及setRuntimeContext来获取状态、缓存等系统内部数据. 与MapFunction类似,RichFunction子类也有RichMapFunction.
//定义匿名类实现RichMapFunction接口,完成对字符串到整形数字的转换 dataStream.map(new RichMapFunction[String, Int] { //实现对输入字符串大写转换 override def map(in: String):Int = (in.toInt) })
4.分区key指定
某些算子需要指定的key进行转换,常见的算子有: join 、coGroup、groupBy.需要将DataStream或DataSet数据集转换成对应KeyedStream 和GroupDataSet ,主要是将相同key的数据路由到相同的Pipeline中
1.根据字段位置指定
//DataStream API聚合计算 val dataStream : DataStream[(String,Int)] = env.fromElements(("a", 1),("c", 2)) //根据第一个字段重新分区,然后对第二个字段进行求和计算 val result = dataStream.keyBy(0).sum(1)
//DataSet API 聚合计算 val dataSet = env.fromElements(("a", 1),("c", 2)) //根据第一个字段进行数据重分区 val groupDataSet : GroupDataSet[(String , Int)] = dataSet.groupBy(0) //求取相同key值第二个字段的最大值 groupDataSet.max(1)
2.根据字段名称指定
使用字段名称需要DataStream 中的数据结构类型必须是Tuple类或者POJOs类
val personDataSet = env.fromElements(new Person("Alex", 18), new Person("Peter", 43)) //指定name字段名称来确定groupBy 字段 personDataSet.groupBy("name").max(1)
如果程序中使用Tuple数据类型,通常情况下字段名称从1开始计算,字段位置索引从0开始计算
val personDataStream = env.fromElements(new Person("Alex", 18), new Person("Peter", 43)) //通过名称指定第一个字段 personDataStream.keyBy("_1") //通过位置指定第一个字段 personDataStream.keyBy(0)
使用嵌套的复杂数据结构:
class NestedClass { var id: int, tuples: (Long, Long, String)){ def this() { this(0, (0, 0, " ")) } } class CompelexClass(var nested: NestedClass, var tag: String) { def this() { this(null, " ") } }
通过“nested”获取整个NestedClass对象所有字段,调用“tag”获取 CompelexClass中tag字段,调用“nested.id”获取NestedClass的id字段,调用“nested.tuples._1”获取NestedClass中tuple元祖第一个字段
3. 通过Key选择器指定
定义KeySelector,然后复写getKey方法,从Person对象中获取name为指定的Key.
case class Person(name: String, age: Int) var person = env.fromElements(Person("hello", 1), Person("Flink", 3) ) // val keyed: KeyedStream[WC] = person.keyBy(new KeySelector[Person, String](){ override def getKey(person: Person): String = person.name })
5.输出结果
数据进行转换操作之后,一般会输出到外部系统或者控制台上.Flink 除了基本的数据输出方法,在系统中还定义了很多Connector,用户通过调用addSink()添加输出系统定义的DataSink类算子,这样就可以将数据输出到外部系统.
//将数据输出到文件中 counts.writeAsText("file://path/to/savefile") //将数据输出控制台 counts.print()
程序触发
计算逻辑全部操作定义好后,需要调ExecutionEnvironment的execute()方法来触发程序的执行,execute()方法返回的结果类型为JobExecutionResult,JobExecutionResult包含了程序执行的时间和累加器等指标.
注意: DataStream流式应用需要显示调用execute()方法,否则Flink应用程序不会执行.但对于DataSet API 输出算子已经包含对execute()方法的调用,不再需要显示调用了,否则会出现程序异常.
//调StreamExecutionEnvironment的execute()方法来执行流式应用程序 env.execute("App Name")
总结
本文主要介绍了Flink应用程序开发的5步:获取执行环境;初始化数据;执行转换操作;分区key指定;输出结果以及程序的触发等开发模式以及内部的一些实现细节.
- [转贴]用ASP.NET开发三层结构应用程序(一)--结构介绍
- 【Android开发】Android应用程序目录结构
- 使用.NET语言开发Silverlight应用程序入门(一):了解项目结构
- 用MS.NET开发三层结构应用程序
- [零基础学软件开发9]选择结构if语句介绍之2
- [零基础学软件开发10]选择结构if语句介绍之3
- android基础之应用程序开发目录介绍
- Android SVN开发实战之目录结构介绍
- 人间风景开发日志 – 1 <介绍,文件结构>
- 基于DDD的.NET开发框架ABP实例,多租户 (Sass)应用程序,采用.NET MVC, Angularjs, EntityFramework-介绍
- ZZ:使用.NET语言开发Silverlight应用程序入门(一):了解项目结构
- ZZ:使用.NET语言开发Silverlight应用程序入门(一):了解项目结构
- 怎样用VS2005进行三层结构应用程序的开发
- 转:基于NHibernate的三层结构应用程序开发初步
- Android开发入门教程--Android应用程序结构分析
- 用MS.NET开发三层结构应用程序
- ZZ:使用.NET语言开发Silverlight应用程序入门(一):了解项目结构
- [零基础学软件开发8]选择结构if语句介绍之1
- 应用程序开发--所用过的技术与工具介绍(一)
- 用MS.NET开发三层结构应用程序