您的位置:首页 > 理论基础 > 计算机网络

基于kafka的大量httpRequest队列控制及mergeRequest机制

2016-03-31 11:14 471 查看

基于kafka的大量httpRequest队列控制及mergeRequest机制

需求

为了支持高频数据写入更新,希望把数据写入的请求丢入队列,等用户触发update请求的时候,再把写入的请求全部拿下来处理,而不是每个写入请求都打开文件写入一次

调研kafka

kafka在生产数据这一块的调用是相当简单,默认写入就会产生一个topic,这个是比较便利的,便于我们管理所有table的队列

而在消费者端,提供了两种类型的Consumer方式供使用

* High Level

* SimpleConsumer

HighLevel是比较傻瓜式的消费者模型,保证每一个id表示的客户端至少能够消费一条信息一次,靠zookeeper去控制offset,而消费端目前只能生成stream也就是流式的读取方式,这个对我们要处理多个table队列明显是不够的,上述连接提供了使用场景

SimpleConsumer的方式从字面上比较simple,但是一点都不简单,要使用该模型必须用户自己视线以下条件

* 自己管理日志的offset,这一块我们可以借助redis等工具来记录每一个table的处理offset

* 针对于每一个topic以及partition,在fetch数据的时候必须找出他的lead broker

* 自己处理好transaction的概念,比如说处理日志出现异常的情况,offset该何去何从

Steps for using a SimpleConsumer

1. Find an active Broker and find out which Broker is the leader for your topic and partition

2. Determine who the replica Brokers are for your topic and partition

3. Build the request defining what data you are interested in

4. Fetch the data

5. Identify and recover from leader changes

可能有疑问的地方需要验证

日志过期后offset怎么变化, 设置日志过期时间验证

某一条记录有问题,我们在merge的时候怎么去定位并剔除

如何快速的fallback到我们旧的append逻辑

关于offset管理有一个文章写的很详细,建议看看A Closer Look at Kafka OffsetRequest

验证结果

针对上面的三条疑问的验证

日志过期之后,offset怎么变化

比如说又这个一个情况,我们往日志里写了100个记录,offset从0~99, 在这个日志还没有过期的时候,我们可以通过0~99取日志,但是一旦这个日志过期以后,我们再从这个offset里面取日志,返回就没有内容了,但是日志删除之后,新的日志的offset是会从100开始的,也就是说,从100开始取是ok的,这个我们可以通过把删除日志的阈值调大一点再删除

定位错误记录这个问题

我们可以在process里面做一些hack,或者在日志进队列的时候就保证日志的准确性

fallback这个涉及到我们系统自己的处理,这里不在这里补充

code sample

package com.leanken.kafka

import java.util
import java.util.Properties

import kafka.api._
import kafka.common.TopicAndPartition
import kafka.consumer.{Consumer, ConsumerConfig}
import kafka.javaapi
import kafka.javaapi.consumer.SimpleConsumer
import kafka.message.MessageAndOffset

/**
* Created by leanken on 16/3/30.
*/
object test extends App {

// client id 可以是我们定义的tableId之类的用于标志客户端
val client_id = "test-consumer-group"
val topic = "my-replicated-topic"
val partitions = 0
// 我设置了9093 9094 9095 三个broker, 这个9095端口是我通过工具查询到的lead, 正式代码需要写一个工具函数去查询对应topic partition的lead
val consumer = new SimpleConsumer("localhost", 9095, 100000, 64 * 1024, client_id)

// read 10000这个是指数据大小
val req : FetchRequest = new FetchRequestBuilder().clientId(client_id).addFetch(topic, partitions, 20, 10000).build()
val res: javaapi.FetchResponse = consumer.fetch(req);

val it = res.messageSet(topic, partitions).iterator()
while (it.hasNext()) {
val messageAndOffset: MessageAndOffset = it.next()
val payload = messageAndOffset.message.payload
val bytes = Array.ofDim[Byte](payload.limit())
payload.get(bytes)
// fetch 下来之后我们可以通过iterator去访问每一个message, 或者最后一个的offset
println(String.valueOf(messageAndOffset.offset) + ": " + new String(bytes, "UTF-8"))
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: