Flink从Kafka 0.8中读取多个Topic时的问题
2020-01-15 06:38
1031 查看
Flink提供了FlinkKafkaConsumer08,使用Kafka的High-level接口,从Kafka中读取指定Topic的数据,如果要从多个Topic读取数据,可以如下操作:
1.application.conf中配置
如果使用了配置管理库typesafe.config,可以在其application.conf按如下方式配置List类型的元素:
myToicList:["t1","t2","t3"]
2.读取配置文件
object MyFlinkConfig { import com.typesafe.config.{ Config, ConfigFactory } import net.ceedubs.ficus.Ficus._ def apply(): MyFlinkConfig = apply(ConfigFactory.load) def apply(applicationConfig: Config): MyFlinkConfig = { val config = applicationConfig.getConfig("MyFlinkConfig") new MyFlinkConfig (config.as[List[String]]("myTopicList")) } } case class MyFlinkConfig (myTopicList: List[String]) extends Serializable {}
3.读取多个Topic
因为FlinkKafkaConsumer08使用Java实现的,而MyFlinkConfig 中的List是Scala的List,所以要将Scala的List转为Java的List
val config =MyFlinkConfig()
import scala.collection.JavaConversions._
val kafkaConsumer=new FlinkKafkaConsumer08[MonitorDataRecord](config.myTopicList, new SimpleStringSchema(), kafkaProps)
4.遇到的问题
4.1 如果要读取的Topic不存在,则应用程序直接报错,因此Topic在配置文件中配置时一定要正确
4.2 如果要读取的Topic列表中,其中一个在Kafka中没有数据,而你又基于Event Time提取Timestamp并且设置Watermark,会导致整个Topic列表都没法基于时间窗口触发操作,解决方案:
先rebalance,然后再设置水位:
val monitorSampling = env .addSource(kafkaConsumer) .rebalance .assignTimestampsAndWatermarks(new MyWatermarkGenerator[MyRecord](Time.seconds(config.latencyDuration)))
转载于:https://www.cnblogs.com/liugh/p/7479515.html
- 点赞
- 收藏
- 分享
- 文章举报
相关文章推荐
- Flink从Kafka 0.8中读取多个Topic时的问题
- Structured Streaming从Kafka 0.8中读取数据的问题
- ERROR Error when sending message to topic test_topic with key: null, value: 3 bytes……:部署Kafka时遇到两个问题
- Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十三)Structured Streaming遇到问题:Set(TopicName-0) are gone. Some data may have been missed
- Kafka 如何读取指定topic中的offset -------------用来验证分区是不是均衡!!!(__consumer_offsets)(已验证!)
- 关于kafka的新的group无法订阅到topic中历史消息的问题
- 一个kafka创建topic失败的问题
- Kafka 0.8 宕机问题排查步骤
- SparkStream从kafka读取数据编码问题(Java)
- 使用Flink读取Kafka中的消息
- kafka 0.10.2 解决java无法生产消息到指定topic问题
- Kafka 0.8 如何创建topic
- Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十五)Structured Streaming:同一个topic中包含一组数据的多个部分,按照key它们拼接为一条记录(以及遇到的问题)。
- spark streaming 自定义kafka读取topic的offset(python)
- 构建一个flink程序,从kafka读取然后写入MYSQL
- Kafka读取数据是中文乱码问题
- 使用命令读取kafka的内部topic:__consumer_offsets
- Flink1.4.0连接Kafka0.10.2时遇到的问题
- 使用Flink时从Kafka中读取Array[Byte]类型的Schema
- storm-kafka数据读取问题