rdkafka 保存offset到本地文件
2016-06-05 18:40
417 查看
为了支持断点续传功能,需要将offset保存在一个地方,下次从这个offset开始。librdkafka提供了本地文件保存的方式。
下面的代码演示了
1. 要用topic config对象设置 offset.store.path和offset.store.method
2. start函数接受参数OFFSET_STORED
std::unique_ptr<RdKafka::Consumer> consumer(RdKafka::Consumer::create(global_conf_.get(), err_));
if (!consumer) {
throw KafkaError("Failed to create consumer");
}
stringstream stream;
stream << topic_name_ << "-" << partition_idx << ".txt";
string file_path = stream.str();
string lasterr;
topic_conf_->set("offset.store.path", file_path, lasterr);
topic_conf_->set("offset.store.method", "file", lasterr);
std::unique_ptr<RdKafka::Topic> topic(RdKafka::Topic::create(consumer.get(), topic_name_, topic_conf_.get(), err_));
// RdKafka::ErrorCode resp = consumer->start(topic.get(), partition_idx, RdKafka::Topic::OFFSET_BEGINNING);
RdKafka::ErrorCode resp = consumer->start(topic.get(), partition_idx, RdKafka::Topic::OFFSET_STORED);
if (resp != RdKafka::ERR_NO_ERROR) {
throw KafkaError(RdKafka::err2str(resp));
}
注意:
1. 当前进程目录下会出现$topic-$partiton-idx.txt文件
2. 运行后正常读取数据后需要过一会儿才会写入数值
这个写入时间可以设置:offset.store.sync.interval.ms
3. 程序启动时,可以先手动将offset写入文件,然后再启动
详细配置参考github https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
下面的代码演示了
1. 要用topic config对象设置 offset.store.path和offset.store.method
2. start函数接受参数OFFSET_STORED
std::unique_ptr<RdKafka::Consumer> consumer(RdKafka::Consumer::create(global_conf_.get(), err_));
if (!consumer) {
throw KafkaError("Failed to create consumer");
}
stringstream stream;
stream << topic_name_ << "-" << partition_idx << ".txt";
string file_path = stream.str();
string lasterr;
topic_conf_->set("offset.store.path", file_path, lasterr);
topic_conf_->set("offset.store.method", "file", lasterr);
std::unique_ptr<RdKafka::Topic> topic(RdKafka::Topic::create(consumer.get(), topic_name_, topic_conf_.get(), err_));
// RdKafka::ErrorCode resp = consumer->start(topic.get(), partition_idx, RdKafka::Topic::OFFSET_BEGINNING);
RdKafka::ErrorCode resp = consumer->start(topic.get(), partition_idx, RdKafka::Topic::OFFSET_STORED);
if (resp != RdKafka::ERR_NO_ERROR) {
throw KafkaError(RdKafka::err2str(resp));
}
注意:
1. 当前进程目录下会出现$topic-$partiton-idx.txt文件
2. 运行后正常读取数据后需要过一会儿才会写入数值
这个写入时间可以设置:offset.store.sync.interval.ms
3. 程序启动时,可以先手动将offset写入文件,然后再启动
详细配置参考github https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
相关文章推荐
- Kafka 之 中级
- Linux下Kafka单机安装配置方法(图文)
- Kafka使用入门教程第1/2页
- Logstash 与Elasticsearch整合使用示例
- 大数据实验室(大数据基础培训)——Kafka的安装、配置及基础使用
- 大数据实验室(大数据基础培训)——概要
- kafka-manager 的编译和使用(附安装包)
- Kafka+Log4j实现日志集中管理
- Kafka深度解析
- Kafka设计解析(三)- Kafka High Availability (下)
- kafka+storm初探
- storm集群 + kafka单机性能测试
- flume、kafka、storm常用命令
- kafka集群的安装
- kafka 一些基本知识
- Kafka入门经典教程
- Kafka初步学习总结
- 自研轻量级分布式实时计算框架light_drtc
- windows下kafka环境搭建