(场景)kafka的topic多分区的情况,如何保证跨区的消息消费的顺序性
2015-03-24 11:49
721 查看
这个问题严格来说是肯定有的,kafka只能保证分区内的有序性。
下面是kafka作者Jay
Kreps的blog中介绍kafka设计思想的一段话。
Each partition is a totally ordered log, but there is no global ordering between partitions (other than perhaps some
wall-clock time you might include in your messages). The assignment of the messages to a particular partition is controllable by the writer, with most users choosing to partition by some kind of key (e.g. user id). Partitioning allows log appends to occur
without co-ordination between shards and allows the throughput of the system to scale linearly with the Kafka cluster size.
针对部分消息有序(message.key相同的message要保证消费顺序)场景,可以在producer往kafka插入数据时控制,同一key分发到同一partition上面。
kafka源码如下,支持该方式
在kafka-storm中,如果one partition -> one consumer instance 的话,就没这样的问题,但失去了并行。
如果N1 partitions -> N2 consumer instances的话 ,
1)N1<N2,这种情况会造成部分consumer空转,资源浪费。
2)N1>N2(N2>1),这种情况,每个kafka-spout实例会消费固定的1个或者几个partition,msg不会被不同consumer重复消费。
3)N1=N2,这种情况,实际操作发现,1个consumer instance都对应消费1个partition。1个partition只会有1个consumer实例,否则需要加锁等操作,这样减少了消费控制的复杂性。
应用系统-》日志文件sftp服务器-》数据采集层-》kafka-》storm实时数据清洗处理层-》redis、hbase-》定时任务、mapreduce
在集成测试期间,由于没有实际的日志,所以在采集层模拟往kafka插入数据(特别在发送频率模拟的很粗糙),发现在实时处理层,计算出来用户在某个位置滞留时间计算出来为负数,原因如下,
1)采集层模拟不真实(同一用户往kafka插入的位置的时间是随机生成),但要考虑目前的日志文件sftp服务器 或者 采集层 是否会有这种情况,如果有,可以从业务层面规避,过滤掉该条无效数据。
2)就是storm中tuple处理失败,重发,kafka-storm中就使offset回到失败的那个位置,但之前位置信息可能已经缓存到了redis(为了减少hbase访问次数,用户的最近一条位置信息放在了redis中),这样offset之后的所有消息会重新被消费,这样以来滞留时间为负数,可以过滤掉该条记录,不存到redis中。
真实数据:U1 T1 A1->U1 T2 A2
fail重发 :U1 T1 A1->U1 T2 A2 -> 前两条都失败,重发 -> U1 T1 A1(负数的滞留时间) -> U1 T2 A2
由于采用的是失败重发,是at least once,如果是only once的话,就会没有这样的情况,
PS:一些原理性问题,可以参考“kafka消费原理”介绍。
下面是kafka作者Jay
Kreps的blog中介绍kafka设计思想的一段话。
Each partition is a totally ordered log, but there is no global ordering between partitions (other than perhaps some
wall-clock time you might include in your messages). The assignment of the messages to a particular partition is controllable by the writer, with most users choosing to partition by some kind of key (e.g. user id). Partitioning allows log appends to occur
without co-ordination between shards and allows the throughput of the system to scale linearly with the Kafka cluster size.
针对部分消息有序(message.key相同的message要保证消费顺序)场景,可以在producer往kafka插入数据时控制,同一key分发到同一partition上面。
kafka源码如下,支持该方式
在kafka-storm中,如果one partition -> one consumer instance 的话,就没这样的问题,但失去了并行。
如果N1 partitions -> N2 consumer instances的话 ,
1)N1<N2,这种情况会造成部分consumer空转,资源浪费。
2)N1>N2(N2>1),这种情况,每个kafka-spout实例会消费固定的1个或者几个partition,msg不会被不同consumer重复消费。
3)N1=N2,这种情况,实际操作发现,1个consumer instance都对应消费1个partition。1个partition只会有1个consumer实例,否则需要加锁等操作,这样减少了消费控制的复杂性。
具体应用场景:
计算用户在某个位置的滞留时间,日志内容可以抽象成用户ID、时间点、位置。应用系统-》日志文件sftp服务器-》数据采集层-》kafka-》storm实时数据清洗处理层-》redis、hbase-》定时任务、mapreduce
在集成测试期间,由于没有实际的日志,所以在采集层模拟往kafka插入数据(特别在发送频率模拟的很粗糙),发现在实时处理层,计算出来用户在某个位置滞留时间计算出来为负数,原因如下,
1)采集层模拟不真实(同一用户往kafka插入的位置的时间是随机生成),但要考虑目前的日志文件sftp服务器 或者 采集层 是否会有这种情况,如果有,可以从业务层面规避,过滤掉该条无效数据。
2)就是storm中tuple处理失败,重发,kafka-storm中就使offset回到失败的那个位置,但之前位置信息可能已经缓存到了redis(为了减少hbase访问次数,用户的最近一条位置信息放在了redis中),这样offset之后的所有消息会重新被消费,这样以来滞留时间为负数,可以过滤掉该条记录,不存到redis中。
真实数据:U1 T1 A1->U1 T2 A2
fail重发 :U1 T1 A1->U1 T2 A2 -> 前两条都失败,重发 -> U1 T1 A1(负数的滞留时间) -> U1 T2 A2
由于采用的是失败重发,是at least once,如果是only once的话,就会没有这样的情况,
PS:一些原理性问题,可以参考“kafka消费原理”介绍。
相关文章推荐
- kafka的topic多分区的情况,如何保证跨区的消息消费的顺序性
- 消费RabbitMQ时的注意事项,如何禁止大量的消息涌到Consumer,保证线程安全
- 京东JMQ如何保证消息顺序消费
- 如何保证kafka 的消息机制
- Kafka创建Topic时如何将分区放置到不同的Broker中
- 高并发情况下如何保证消息的顺序
- spring boot / cloud (十九) 并发消费消息,如何保证入库的数据是最新的?
- kafka系列-kafka多分区的情况下保证数据的有序性
- kafka的topic和分区策略——log entry和消息id索引文件
- Kafka Producer是如何动态感知Topic分区数变化
- spring boot / cloud (十九) 并发消费消息,如何保证入库的数据是最新的?
- 如何保证kafka 的消息机制
- kafka消费者如何才能从头开始消费某个topic的全量数据
- HadoopConsumer——消费kafka中若干topic的消息,追加存储至hdfs的不同文件内
- 京东JMQ如何保证消息顺序消费
- 如何保证kafka 的消息机制
- 如何确定Kafka的分区数,key和consumer线程数,以及不消费问题解决
- 如何确定Kafka的分区数、key和consumer线程数、以及不消费问题解决
- Kafka消息保证不丢失和重复消费问题
- 查看当前服务器中的所有的topic,创建topic,删除topic,通过shell命令发送消息,通过shell消费消息,查看topic详情,对分区数进行修改