您的位置:首页 > 运维架构

Flink快速学习之Operators(运算符)

2020-04-10 19:09 1336 查看

Operators(运算符)

DataStream Transformations (数据流的转换)

DataStream → DataStream

Map
Takes one element and produces one element. A map function that doubles the values of the input stream:
获取一个元素并生成一个元素。一个映射函数,使输入流的值加倍:

dataStream.map { x => x * 2 }

FlatMap
Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences towords:
获取一个元素并生成零个、一个或多个元素。一个降维功能,将句子分割成单词:

dataStream.flatMap { str => str.split(" ") }

Filter
Evaluates a boolean function for each element and retains those for which the function returns true. A filterthat filters out zero values:
对每个元素求布尔函数的值,并保留函数返回true的元素。过滤掉零值的过滤器:

dataStream.filter { _ != 0 }

DataStream* → DataStream

Union
Union of two or more data streams creating a new stream containing all the elements from all the streams.Note: If you union a data stream with itself you will get each element twice in the resulting stream.
两个或多个数据流的并集,创建包含来自所有流的所有元素的新流。注意:如果您将一个数据流与它自己相结合,您将得到结果流中的每个元素两次。

dataStream.union(otherStream1, otherStream2, …)

DataStream,DataStream → ConnectedStreams(连接流)

connect
“Connects” two data streams retaining their types, allowing for shared state between the two streams
“连接”两个数据流,保持它们的类型,允许在两个流之间共享状态

someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...
val connectedStreams = someStream.connect(otherStream)

ConnectedStreams → DataStream

CoMap, CoFlatMap
Similar to map and flatMap on a connected data stream
类似于连接数据流上的map和 flatMap

connectedStreams.map(
(_ : Int) => true,
(_ : String) => false
)
connectedStreams.flatMap(
(_ : Int) => true,
(_ : String) => false
)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text1 = env.socketTextStream("CentOS", 9999)
val text2 = env.socketTextStream("CentOS", 8888)
text1.connect(text2)
.flatMap((line:String)=>line.split("\\s+"),(line:String)=>line.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.print("总数")
env.execute("Stream WordCount")

DataStream → SplitStream(分流)

Split
Split the stream into two or more streams according to some criterion.
根据某些标准将流分成两个或多个流。

val split = someDataStream.split(
(num: Int) =>
(num % 2) match {
case 0 => List("even")
case 1 => List("odd")
}
)

SplitStream → DataStream

Select
Select one or more streams from a split stream.
从拆分流中选择一个或多个流。

val even = split.select("even")
val odd = split.select("odd")
val all = split.select("even","odd")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text1 = env.socketTextStream("CentOS", 9999)
var splitStream= text1.split(line=> {
if(line.contains("error")){
List("error")
} else{
List("info")
}
})
splitStream.select("error").printToErr("错误")
splitStream.select("info").print("信息")
splitStream.select("error","info").print("All")
env.execute("Stream WordCount")

PrcoessFunction
⼀般来说,更多使⽤PrcoessFunctio完成流的分⽀

import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object Operators {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

val text = env.readTextFile("hdfs://SparkTwo:9000/demo/words")
val errorTag = new OutputTag[String]("a")
val allTag = new OutputTag[String]("all")
val unit = text.process(new ProcessFunction[String, String] {
override def processElement(value: String,
ctx: ProcessFunction[String, String]#Context,
out: Collector[String]): Unit = {
if (value.contains("a")) {
ctx.output(errorTag, value)
} else {
out.collect(value)
}
ctx.output(allTag, value)
}
})
unit.getSideOutput(errorTag).print("A=")
unit.getSideOutput(allTag).print("ALL=")
unit.print("正常=")
env.execute("Stream WordCount")
}
}

输出结果:

DataStream → KeyedStream

KeyBy
Logically partitions a stream into disjoint partitions, each partition containing elements of the same key.Internally, this is implemented with hash partitioning. See keys on how to specify keys. This transformation returns a KeyedStream.
逻辑上将一个流划分为不相连的分区,每个分区包含相同键的元素。
在内部,这是通过哈希分区实现的。有关如何指定

keys
,请参阅keys。这个转换返回一个KeyedStream。

dataStream.keyBy("someKey") // 以字段 "someKey" 为键
dataStream.keyBy(0) // 由元组的第一个元素作为键

KeyedStream → DataStream

Reduce
A “rolling” reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.
键控数据流上的“滚动”减少。将当前元素与最后一个减少的值组合在一起,并发出新的值。
A reduce function that creates a stream of partial sums:
创建部分流的和的reduce函数:

keyedStream.reduce(_ + _)

lines.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy("_1")
.reduce((v1,v2)=>(v1._1,v1._2+v2._2))
.print()

Fold
A “rolling” fold on a keyed data stream with an initial value. Combines the current element with the lastfolded value and emits the new value.
具有初始值的键控数据流上的“滚动”折叠。将当前元素与上一个折叠值组合并发出新值。
A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence “start-1”, “start-1-2”,“start-1-2-3”, …
当作用于序列(1,2,3,4,5)时,发出序列“start-1”、“start-1-2”、“start-1-2”、“start-1- 3”、…

val result: DataStream[String] =
keyedStream.fold(“start”)((str, i) => { str + “-” + i })

text.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy("_1")
.fold((null:String,0:Int))((z,v)=>(v._1,v._2+z._2))
.print()

Aggregations
Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns theminimum value, whereas minBy returns the element that has the minimum value in this field (same for
max and maxBy).
在键控数据流上滚动聚合。min和minBy之间的区别是,min返回最小值,而minBy返回该字段中最小值的元素(对于max和maxBy和它们类似)。

keyedStream.sum(0)
keyedStream.sum(“key”)
keyedStream.min(0)
keyedStream.min(“key”)
keyedStream.max(0)
keyedStream.max(“key”)
keyedStream.minBy(0)
keyedStream.minBy(“key”)
keyedStream.maxBy(0)
keyedStream.maxBy(“key”)

val env = StreamExecutionEnvironment.getExecutionEnvironment
//zhangsan 研发部 1000
//lisi 研发部 5000
//ww 销售部 9000
val lines = env.socketTextStream("CentOS", 9999)
lines.map(line=>line.split(" "))
.map(ts=>Emp(ts(0),ts(1),ts(2).toDouble))
.keyBy("dept")
.maxBy("salary")//Emp(lisi,研发部,5000.0)
.print()
env.execute("Stream WordCount")

如果使⽤时max,则返回的是Emp(zhangsan,研发部,5000.0)

Physical partitioning (物理分区)

Flink还通过以下function对转换后的DataStream进⾏分区(如果需要)。
Rebalancing (Round-robin partitioning):
分区元素轮循,从⽽为每个分区创建相等的负载。在存在数据偏斜的情况下对性能优化有⽤

dataStream.rebalance()

Random partitioning
根据均匀分布对元素进⾏随机划分。

dataStream.shuffle()

Rescaling
和Roundrobin Partitioning⼀样,Rescaling Partitioning也是⼀种通过循环的⽅式进⾏数据重平衡的分区策略。但是不同的是,当使⽤Roundrobin Partitioning时,数据会全局性地通过⽹络介质传输到其他的节点完成数据的重新平衡,⽽Rescaling Partitioning仅仅会对上下游继承的算⼦数据进⾏重平衡,具体的分区主要根据上下游算⼦的并⾏度决定。例如上游算⼦的并发度为2,下游算⼦的并发度为4,就会发⽣上游算⼦中⼀个分区的数据按照同等⽐例将数据路由在下游的固定的两个分区中,另外⼀个分区同理路由到下游两个分区中。

dataStream.rescale()

Broadcasting
Broadcasts elements to every partition.
向每个分区广播元素。

dataStream.broadcast

Custom partitioning
Selects a subset of fields from the tuples
从元组中选择字段的子集

dataStream.partitionCustom(partitioner, “someKey”)
dataStream.partitionCustom(partitioner, 0)

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("CentOS", 9999)
.map((_,1))
.partitionCustom(new Partitioner[String] {
override def partition(key: String, numPartitions: Int): Int = {
key.hashCode & Integer.MAX_VALUE % numPartitions
}
},_._1)
.print()
.setParallelism(4)
println(env.getExecutionPlan)
env.execute("Stream WordCount")

Task chaining and resource groups(任务链接和资源组)

对两个算⼦操作进⾏Chain(连接),意味着将这两个算⼦放置于⼀个线程中,这样是为了节省没必要的线程开销,提升性能。如果可能的话,默认情况下Flink会链接运算符。
用户是可以调⽤:

StreamExecutionEnvironment.disableOperatorChaining()

禁⽤chain⾏为,但是不推荐。
startNewChain

someStream.filter(…).map(…).startNewChain().map(…)
将第⼀个map算⼦和filter算⼦进⾏隔离

disableChaining

someStream.map(…).disableChaining()
所有操作符禁⽌和map操作符进⾏chain

slotSharingGroup
设置操作的slot共享组。 Flink会将具有相同slot共享组的operator放在同⼀个Task slot中,同时将没有slot共享组的operator保留在其他Task slot中。这可以⽤来隔离Task Slot。下游的操作符会⾃动继承上游资源组。默认情况下,所有的输⼊算⼦的资源组的名字是 default ,因此当⽤户不对程序进⾏资源划分的情况下,⼀个job所需的资源slot,就等于最⼤并⾏度的Task。

someStream.filter(…).slotSharingGroup(“name”)

  • 点赞
  • 收藏
  • 分享
  • 文章举报
笑而抿之乎 发布了20 篇原创文章 · 获赞 10 · 访问量 1661 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: