您的位置:首页 > 运维架构

Flink从Kafka 0.8中读取多个Topic时的问题

2020-01-15 06:38 1031 查看

    Flink提供了FlinkKafkaConsumer08,使用Kafka的High-level接口,从Kafka中读取指定Topic的数据,如果要从多个Topic读取数据,可以如下操作:

1.application.conf中配置

   如果使用了配置管理库typesafe.config,可以在其application.conf按如下方式配置List类型的元素:

myToicList:["t1","t2","t3"]

2.读取配置文件

object MyFlinkConfig {
import com.typesafe.config.{ Config, ConfigFactory }
import net.ceedubs.ficus.Ficus._

def apply(): MyFlinkConfig = apply(ConfigFactory.load)

def apply(applicationConfig: Config): MyFlinkConfig = {

val config = applicationConfig.getConfig("MyFlinkConfig")

new MyFlinkConfig (config.as[List[String]]("myTopicList"))
}
}

case class MyFlinkConfig (myTopicList: List[String]) extends Serializable {}

3.读取多个Topic

 因为FlinkKafkaConsumer08使用Java实现的,而MyFlinkConfig 中的List是Scala的List,所以要将Scala的List转为Java的List

val config =MyFlinkConfig()
import scala.collection.JavaConversions._
val kafkaConsumer=new FlinkKafkaConsumer08[MonitorDataRecord](config.myTopicList, new SimpleStringSchema(), kafkaProps)

 

4.遇到的问题

4.1 如果要读取的Topic不存在,则应用程序直接报错,因此Topic在配置文件中配置时一定要正确

4.2 如果要读取的Topic列表中,其中一个在Kafka中没有数据,而你又基于Event Time提取Timestamp并且设置Watermark,会导致整个Topic列表都没法基于时间窗口触发操作,解决方案:

    先rebalance,然后再设置水位:   

val monitorSampling = env
.addSource(kafkaConsumer)
.rebalance
.assignTimestampsAndWatermarks(new MyWatermarkGenerator[MyRecord](Time.seconds(config.latencyDuration)))

 

转载于:https://www.cnblogs.com/liugh/p/7479515.html

  • 点赞
  • 收藏
  • 分享
  • 文章举报
an7800666 发布了0 篇原创文章 · 获赞 1 · 访问量 2022 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: