您的位置:首页 > 运维架构 > Apache

【云星数据---Apache Flink实战系列(精品版)】:Apache Flink高级特性与高级应用010-Slot和Parallelism的深入分析005

2017-11-19 16:01 916 查看

六、设置parallelism的方法

1.在操作符级别上设置parallelism

val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = [...]
val wordCounts = text
.flatMap{ _.split(" ") map { (_, 1) } }
.keyBy(0)
.timeWindow(Time.seconds(5))

//设置parallelism为5
.sum(1).setParallelism(5)
wordCounts.print()
env.execute("Word Count Example")


2.在运行环境级别上设置parallelism

val env = StreamExecutionEnvironment.getExecutionEnvironment

//设置parallelism为5
env.setParallelism(3)

val text = [...]
val wordCounts = text
.flatMap{ _.split(" ") map { (_, 1) } }
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
wordCounts.print()

env.execute("Word Count Example")


3.在客户端级别上设置parallelism

3.1通过p参数设置p
c032
arallelism

//设置parallelism为10
./bin/flink run -p 10 ../examples/*WordCount-java*.jar


3.1通过ClientAPI设置parallelism

try {
PackagedProgram program = new PackagedProgram(file, args)
InetSocketAddress jobManagerAddress =RemoteExecutor.getInetFromHostport("localhost:6123")
Configuration config = new Configuration()

Client client=new Client(jobManagerAddress,new Configuration(),program.getUserCodeClassLoader())

//设置parallelism为10
client.run(program, 10, true)

} catch {
case e: Exception => e.printStackTrace
}


4.在系统级别上设置parallelism

1.配置文件
$FLINK_HOME/conf/flink-conf.yaml
2.配置属性
parallelism.default


5.实战总结

1.系统级别的设置是全局的,对所有的job有效。
2.其他级别的设置是局部的,对当前的job有效。
3.多个级别上混合设置,高优先级的设置会覆盖低优先级的设置。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐