您的位置:首页 > 运维架构 > Linux

Linux qtcreator下kafka之librdkafka库的C语言封装,实现生产和消费

2018-01-30 17:28 851 查看
完整源码下载地址:http://download.csdn.net/download/libaineu2004/10235565
配置文件参考来源
Global configuration properties: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

源码参考\librdkafka\examples\rdkafka_example.c和rdkafka_simple_producer.c

1、调用librdkafka库(https://github.com/edenhill/librdkafka)自主编程编译用户自己的应用程序,编译选项要加上-lrdkafka -lz -lpthread -lrt这些选项。例如,我使用QtCreator之qmake模式,.pro文件如下:
[html] view plain copyQMAKE_LFLAGS += -lrdkafka -lrdkafka++ -lz -lpthread -lrt  
#-lrdkafka等价于 LIBS += /usr/local/lib/librdkafka.so  
编译通过,但是运行时会报错:error while loading shared libraries: librdkafka.so.1: cannot open shared object file: No such file or directory
此时需要在/etc/ld.so.conf中加入librdkafka.so所在的目录:/usr/local/lib/
然后在终端执行命令,使之生效:
[root@localhost etc]# ldconfig注意,/usr/local/lib/每次有库文件更新,都需要终端重新运行一次ldconfig这条命令。

cmake模式,CMakeLists.txt如下:
#CMake最低版本号要求
cmake_minimum_required(VERSION 2.8)

#项目信息
project(MY_KAFAKA_PRODUCER)

#查找当前目录下的所有源文件,并将名称保存到DIR_SRCS变量
aux_source_directory(. DIR_SRCS)

#指定编译选项,方法1
#ADD_DEFINITIONS(-lz -lpthread -lrt)

#指定编译选项,方法2
#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -lz -lpthread -lrt")

#指定生成目标
add_executable(kafkaproducer ${DIR_SRCS})

#指定在链接目标文件的时候需要链接的外部库,其效果类似gcc的编译参数“-l”,可以解决外部库的依赖问题
TARGET_LINK_LIBRARIES(kafkaproducer rdkafka)
TARGET_LINK_LIBRARIES(kafkaproducer rdkafka++)
#这些库名写法都可以
#TARGET_LINK_LIBRARIES(kafkaproducer -lrdkafka)
#TARGET_LINK_LIBRARIES(kafkaproducer -lrdkafka++)#CMake最低版本号要求
cmake_minimum_required(VERSION 2.8)

#项目信息
project(MY_KAFAKA_CONSUMER)

#查找当前目录下的所有源文件,并将名称保存到DIR_SRCS变量
aux_source_directory(. DIR_SRCS)

#指定编译选项,方法1
#ADD_DEFINITIONS(-lz -lpthread -lrt)

#指定编译选项,方法2
#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -lz -lpthread -lrt")

#指定生成目标
add_executable(kafkaconsumer ${DIR_SRCS})

#指定在链接目标文件的时候需要链接的外部库,其效果类似gcc的编译参数“-l”,可以解决外部库的依赖问题
TARGET_LINK_LIBRARIES(kafkaconsumer rdkafka)
TARGET_LINK_LIBRARIES(kafkaconsumer rdkafka++)
#这些库名写法都可以
#TARGET_LINK_LIBRARIES(kafkaconsumer -lrdkafka)
#TARGET_LINK_LIBRARIES(kafkaconsumer -lrdkafka++)
其实,工程名写成这样更好:
#指定生成目标
add_executable(${PROJECT_NAME} ${DIR_SRCS})
#指定在链接目标文件的时候需要链接的外部库,其效果类似gcc的编译参数“-l”,可以解决外部库的依赖问题
TARGET_LINK_LIBRARIES(${PROJECT_NAME} rdkafka)
TARGET_LINK_LIBRARIES(${PROJECT_NAME} rdkafka++)

2、生产者源码
(1)main.cpp
#include <string.h>
#include "kafkaproducer.h"

int main(int argc, char *argv[])
{
CKafkaProducer kp;

char topic[] = "test";
char brokers[] = "172.16.6.161:9092";
int partition = 0;
char str_msg[] = "Hello Kafka!";
int ret = 0;

ret = kp.init(topic, brokers, partition);
if (ret != 0)
{
printf("Error: kp.init(): ret=%d;\n", ret);
return 0;
}

ret = kp.sendMessage(str_msg, strlen(str_msg)); //向kafka服务器发送消息
if (ret == 0)
{
printf("push data success: %s\n", str_msg);
}
else //if (ret != 0)
{
printf("Error: kp.sendMessage(): ret=%d;\n", ret);
return 0;
}

printf("Please input msg:\n");

while (fgets(str_msg, sizeof(str_msg), stdin))
{
size_t len = strlen(str_msg);
if (str_msg[len - 1] == '\n')
{
str_msg[--len] = '\0';
}

if (strcmp(str_msg, "end") == 0)
{
break;
}

if (0 == kp.sendMessage(str_msg, strlen(str_msg)))
{
printf("push data success: %s\n", str_msg);
}
else
{
printf("push data failed: %s\n", str_msg);
}
}

return 0;
}
(2)kafkaproducer.h
#ifndef KAFKAPRODUCER_H
#define KAFKAPRODUCER_H

#include <librdkafka/rdkafka.h>

class CKafkaProducer
{
public:
CKafkaProducer();
~CKafkaProducer();

int init(char *topic, char *brokers, int partition); //topic="my_test"; brokers="192.168.1.42:9092"; partition=0;
int sendMessage(char *str, int len); //向kafka服务器发送消息

protected:
static void err_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque);
static void throttle_cb(rd_kafka_t *rk, const char *broker_name, int32_t broker_id, int throttle_time_ms, void *opaque);
static void offset_commit_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *offsets, void *opaque);
static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque);

private:
rd_kafka_t * m_kafka_handle; //kafka消息生产者句柄
rd_kafka_topic_t * m_kafka_topic; //kafka消息主题名称
rd_kafka_conf_t * m_kafka_conf; //kafka消息配置
rd_kafka_topic_conf_t * m_kafka_topic_conf;
rd_kafka_topic_partition_list_t * m_kafka_topic_partition_list;

int m_partition;
};

#endif // KAFKAPRODUCER_H
(3)kafkaproducer.cpp
#include "kafkaproducer.h"

CKafkaProducer::CKafkaProducer()
{
m_kafka_handle = NULL;
m_kafka_topic = NULL;
m_kafka_conf = NULL;
m_kafka_topic_conf = NULL;
m_kafka_topic_partition_list = NULL;
m_partition = RD_KAFKA_PARTITION_UA;
}

CKafkaProducer::~CKafkaProducer()
{
rd_kafka_flush(m_kafka_handle, 10*1000); //wait for max 10 seconds

rd_kafka_topic_destroy(m_kafka_topic);
rd_kafka_destroy(m_kafka_handle);
rd_kafka_topic_partition_list_destroy(m_kafka_topic_partition_list);
}

int CKafkaProducer::init(char *topic, char *brokers, int partition)
{
int ret = 0;
rd_kafka_conf_res_t ret_conf = RD_KAFKA_CONF_OK;
char errstr[512] = {0};

m_kafka_conf = rd_kafka_conf_new();

rd_kafka_conf_set_error_cb(m_kafka_conf, err_cb);
rd_kafka_conf_set_throttle_cb(m_kafka_conf, throttle_cb);
rd_kafka_conf_set_offset_commit_cb(m_kafka_conf, offset_commit_cb);
rd_kafka_conf_set_stats_cb(m_kafka_conf, stats_cb);

//---------Producer config-------------------
ret_conf = rd_kafka_conf_set(m_kafka_conf, "queue.buffering.max.messages", "500000", errstr, sizeof(errstr));
if (ret_conf != RD_KAFKA_CONF_OK)
{
printf("Error: rd_kafka_conf_set() failed 1; ret_conf=%d; errstr:%s\n", ret_conf, errstr);
return -1;
}

ret_conf = rd_kafka_conf_set(m_kafka_conf, "message.send.max.retries", "3", errstr, sizeof(errstr));
if (ret_conf != RD_KAFKA_CONF_OK)
{
printf("Error: rd_kafka_conf_set() failed 2; ret_conf=%d; errstr:%s\n", ret_conf, errstr);
return -1;
}

ret_conf = rd_kafka_conf_set(m_kafka_conf, "retry.backoff.ms", "500", errstr, sizeof(errstr));
if (ret_conf != RD_KAFKA_CONF_OK)
{
printf("Error: rd_kafka_conf_set() failed 3; ret_conf=%d; errstr:%s\n", ret_conf, errstr);
return -1;
}

//---------Kafka topic config-------------------
m_kafka_topic_conf = rd_kafka_topic_conf_new();

ret_conf = rd_kafka_topic_conf_set(m_kafka_topic_conf, "auto.offset.reset", "earliest", errstr, sizeof(errstr));
if (ret_conf != RD_KAFKA_CONF_OK)
{
printf("Error: rd_kafka_conf_set() failed 4; ret_conf=%d; errstr:%s\n", ret_conf, errstr);
return -1;
}

m_kafka_topic_partition_list = rd_kafka_topic_partition_list_new(1);

rd_kafka_topic_partition_list_add(m_kafka_topic_partition_list, topic, partition); //可以add一个以上的topic

m_partition = partition;

//---------Create Kafka handle-------------------
m_kafka_handle = rd_kafka_new(RD_KAFKA_PRODUCER, m_kafka_conf, errstr, sizeof(errstr));

if (m_kafka_handle == NULL)
{
printf("Error: Failed to create Kafka producer: %s\n", errstr);
return -1;
}

//---------Add broker(s)-------------------
if (brokers && rd_kafka_brokers_add(m_kafka_handle, brokers) < 1)
{
printf("Error: No valid brokers specified\n");
return -2;
}

m_kafka_topic = rd_kafka_topic_new(m_kafka_handle, topic, m_kafka_topic_conf); //Explicitly create topic to avoid per-msg lookups

return ret;
}

int CKafkaProducer::sendMessage(char *str, int len)
{
int ret = 0;

if (str == NULL)
{
return -1;
}

if (len <= 0)
{
return -2;
}

char * topic = m_kafka_topic_partition_list->elems[0].topic;
int partition = m_kafka_topic_partition_list->elems[0].partition;

//------------向kafka服务器发送消息----------------
ret = rd_kafka_produce(m_kafka_topic, partition, RD_KAFKA_MSG_F_COPY | RD_KAFKA_MSG_F_FREE, str, len, NULL, 0, NULL);

if (ret == -1)
{
rd_kafka_resp_err_t err = rd_kafka_last_error();

if (err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
{
printf("Error: No such partition: %d\n", partition);
}
else
{
printf("Error: produce error: %s%s\n", rd_kafka_err2str(err), err == RD_KAFKA_RESP_ERR__QUEUE_FULL ? " (backpressure)" : "");
}

rd_kafka_poll(m_kafka_handle, 10); //Poll to handle delivery reports

ret = -2;
return ret;
}

/* Poll to handle delivery reports */
  rd_kafka_poll(m_kafka_handle, 0);

  /* Wait for messages to be delivered */
  while (rd_kafka_outq_len(m_kafka_handle) > 0)
     rd_kafka_poll(m_kafka_handle, 100);

return ret;
}

void CKafkaProducer::err_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque)
{
printf("%% ERROR CALLBACK: %s: %s: %s\n", rd_kafka_name(rk), rd_kafka_err2str((rd_kafka_resp_err_t)err), reason);
}

void CKafkaProducer::throttle_cb(rd_kafka_t *rk, const char *broker_name, int32_t broker_id, int throttle_time_ms, void *opaque)
{
printf("%% THROTTLED %dms by %s (%d)\n", throttle_time_ms, broker_name, broker_id);
}

void CKafkaProducer::offset_commit_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *offsets, void *opaque)
{
int i;
int verbosity = 1;

if (err || verbosity >= 2)
{
printf("%% Offset commit of %d partition(s): %s\n", offsets->cnt, rd_kafka_err2str(err));
}

for (i = 0; i < offsets->cnt; i++)
{
rd_kafka_topic_partition_t * rktpar = &offsets->elems[i];

if (rktpar->err || verbosity >= 2)
{
printf("%% %s [%d] @ %d: %s\n", rktpar->topic, rktpar->partition, rktpar->offset, rd_kafka_err2str(err));
}
}
}

int CKafkaProducer::stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque)
{
printf("%s\n", json);
return 0;
}

3、消费者源码(1)main.cpp
#include "kafkaconsumer.h"

static void msg_consume(rd_kafka_message_t *rkmessage, void *opaque)
{
printf("[MSG] %d: %s\n", (int)rkmessage->len, (char *)rkmessage->payload);
}

int main(int argc, char *argv[])
{
CKafkaConsumer kc;

char topic[] = "test";
char brokers[] = "172.16.6.161:9092";
char partitions[] = "0";
char groupId[] = "my_group1";
consumer_callback consumer_cb = msg_consume; //注册消息回调函数,用户可以自定义此函数
void * param_cb = NULL; //param_cb=this;
int ret = 0;

ret = kc.init(topic, brokers, partitions, groupId, consumer_cb, param_cb);
if (ret != 0)
{
printf("Error: kc.init(): ret=%d;\n", ret);
return 0;
}

ret = kc.getMessage(RD_KAFKA_OFFSET_BEGINNING); //从kafka服务器接收消息,RD_KAFKA_OFFSET_BEGINNING | RD_KAFKA_OFFSET_END
if (ret != 0)
{
printf("Error: kc.getMessage(): ret=%d;\n", ret);
return 0;
}

return 0;
}
(2)kafkaconsumer.h
#ifndef KAFKACONSUMER_H
#define KAFKACONSUMER_H

#include <librdkafka/rdkafka.h>

typedef void (* consumer_callback)(rd_kafka_message_t *rkmessage, void *opaque);

class CKafkaConsumer
{
public:
CKafkaConsumer();
~CKafkaConsumer();

int init(char *topic, char *brokers, char *partitions, char *groupId, consumer_callback consumer_cb, void * param_cb); //topic="my_test"; brokers="192.168.1.42:9092"; partitions="0,1,2"; groupId="my_group";
int getMessage(int64_t start_offset); //从kafka服务器接收消息

protected:
static void err_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque);
static void throttle_cb(rd_kafka_t *rk, const char *broker_name, int32_t broker_id, int throttle_time_ms, void *opaque);
static void offset_commit_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *offsets, void *opaque);
static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque);
static void logger(const rd_kafka_t *rk, int level, const char *fac, const char *buf);
static void msg_consume(rd_kafka_message_t *rkmessage, void *opaque);

private:
rd_kafka_t * m_kafka_handle; //kafka消息生产者句柄
rd_kafka_topic_t * m_kafka_topic; //kafka消息主题名称
rd_kafka_conf_t * m_kafka_conf; //kafka消息配置
rd_kafka_topic_conf_t * m_kafka_topic_conf;
rd_kafka_topic_partition_list_t * m_kafka_topic_partition_list;
rd_kafka_queue_t * m_kafka_queue;
consumer_callback m_consumer_callback; //消息回调函数
void * m_consumer_callback_param; //消息回调函数的参数
};

#endif // KAFKACONSUMER_H
(3)kafkaconsumer.cpp
#include <string.h>
#include<cstdlib>
#include "kafkaconsumer.h"

CKafkaConsumer::CKafkaConsumer()
{
m_kafka_handle = NULL;
m_kafka_topic = NULL;
m_kafka_conf = NULL;
m_kafka_topic_conf = NULL;
m_kafka_topic_partition_list = NULL;
m_kafka_queue = NULL;

m_consumer_callback = NULL;
m_consumer_callback_param = NULL;
}

CKafkaConsumer::~CKafkaConsumer()
{
rd_kafka_flush(m_kafka_handle, 10*1000); //wait for max 10 seconds

rd_kafka_queue_destroy(m_kafka_queue);
rd_kafka_topic_destroy(m_kafka_topic);
rd_kafka_destroy(m_kafka_handle);
rd_kafka_topic_partition_list_destroy(m_kafka_topic_partition_list);
}

int CKafkaConsumer::init(char *topic, char *brokers, char *partitions, char *groupId, consumer_callback consumer_cb, void * param_cb)
{
int ret = 0;
rd_kafka_conf_res_t ret_conf = RD_KAFKA_CONF_OK;
char errstr[512] = {0};

if (topic == NULL){return -1;}
if (brokers == NULL){return -1;}
if (groupId == NULL){return -1;}

m_consumer_callback = consumer_cb;
m_consumer_callback_param = param_cb;

m_kafka_conf = rd_kafka_conf_new();

rd_kafka_conf_set_error_cb(m_kafka_conf, err_cb);
rd_kafka_conf_set_throttle_cb(m_kafka_conf, throttle_cb);
rd_kafka_conf_set_offset_commit_cb(m_kafka_conf, offset_commit_cb);
rd_kafka_conf_set_stats_cb(m_kafka_conf, stats_cb);
rd_kafka_conf_set_log_cb(m_kafka_conf, logger);

//---------Consumer config-------------------
ret_conf = rd_kafka_conf_set(m_kafka_conf, "queued.min.messages", "1000000", errstr, sizeof(errstr));
if(ret_conf != RD_KAFKA_CONF_OK){printf("Error: rd_kafka_conf_set() failed 1; ret_conf=%d; errstr:%s\n", ret_conf, errstr); return -1;}

ret_conf = rd_kafka_conf_set(m_kafka_conf, "session.timeout.ms", "6000", errstr, sizeof(errstr));
if(ret_conf != RD_KAFKA_CONF_OK){printf("Error: rd_kafka_conf_set() failed 2; ret_conf=%d; errstr:%s\n", ret_conf, errstr); return -1;}

// ret_conf = rd_kafka_conf_set(m_kafka_conf, "group.id", groupId, errstr, sizeof(errstr));
// if(ret_conf != RD_KAFKA_CONF_OK){printf("Error: rd_kafka_conf_set() failed 3; ret_conf=%d; errstr:%s\n", ret_conf, errstr); return -1;}

//---------Kafka topic config-------------------
m_kafka_topic_conf = rd_kafka_topic_conf_new();

ret_conf = rd_kafka_topic_conf_set(m_kafka_topic_conf, "auto.offset.reset", "earliest", errstr, sizeof(errstr));
if (ret_conf != RD_KAFKA_CONF_OK)
{
printf("Error: rd_kafka_topic_conf_set() failed 4; ret_conf=%d; errstr:%s\n", ret_conf, errstr);
return -1;
}

m_kafka_topic_partition_list = rd_kafka_topic_partition_list_new(1);

//------------解析json字符串------------------------
int cnt = 0;
int len = strlen(partitions);
char * pTemp = new char[len + 1];
char * pTemp2 = pTemp;
sprintf(pTemp, "%s", partitions); //partitions="0,1,2";

while (*pTemp != '\0')
{
char * s = strstr(pTemp, ",");
if(s != NULL)
{
*s = '\0';
}

int partition = atoi(pTemp);
rd_kafka_topic_partition_list_add(m_kafka_topic_partition_list, topic, partition); //可以add一个以上的topic

if (s != NULL)
{
pTemp = s + 1;
}
else
{
break;
}
}

if (pTemp2)
{
delete [] pTemp2;
pTemp2 = NULL;
}

//---------Create Kafka handle-------------------
m_kafka_handle = rd_kafka_new(RD_KAFKA_CONSUMER, m_kafka_conf, errstr, sizeof(errstr));

if (m_kafka_handle == NULL)
{
printf("Error: Failed to create Kafka producer: %s\n", errstr);
return -1;
}

rd_kafka_poll_set_consumer(m_kafka_handle); //Redirect rd_kafka_poll() to consumer_poll()

//---------Add broker(s)-------------------
if (brokers && rd_kafka_brokers_add(m_kafka_handle, brokers) < 1)
{
printf("Error: No valid brokers specified\n");
return -2;
}

// char * topic = m_kafka_topic_partition_list->elems[0].topic;
int partition = m_kafka_topic_partition_list->elems[0].partition;
int partition_cnt = m_kafka_topic_partition_list->cnt;

m_kafka_topic = rd_kafka_topic_new(m_kafka_handle, topic, m_kafka_topic_conf); //Explicitly create topic to avoid per-msg lookups

//-----------------------------------------
// int64_t seek_offset = RD_KAFKA_OFFSET_END; //RD_KAFKA_OFFSET_BEGINNING | RD_KAFKA_OFFSET_END | RD_KAFKA_OFFSET_STORED
// rd_kafka_resp_err_t err = rd_kafka_seek(m_kafka_topic, partition, seek_offset, 2000);

m_kafka_queue = rd_kafka_queue_new(m_kafka_handle);

return ret;
}

int CKafkaConsumer::getMessage(int64_t start_offset)
{
int ret = 0;

char * topic = m_kafka_topic_partition_list->elems[0].topic;
int partition = m_kafka_topic_partition_list->elems[0].partition;
int partition_cnt = m_kafka_topic_partition_list->cnt;

// int64_t start_offset = RD_KAFKA_OFFSET_END; //RD_KAFKA_OFFSET_BEGINNING | RD_KAFKA_OFFSET_END | RD_KAFKA_OFFSET_STORED

//------------从kafka服务器接收消息----------------
for (int i = 0; i < partition_cnt; i++)
{
int partition = m_kafka_topic_partition_list->elems[i].partition;

int r = rd_kafka_consume_start_queue(m_kafka_topic, partition, start_offset, m_kafka_queue);

if (r == -1)
{
printf("Error: creating queue: %s\n", rd_kafka_err2str(rd_kafka_last_error()));
return -1;
}
}

while(1)
{
int r = rd_kafka_consume_callback_queue(m_kafka_queue, 1000, msg_consume, this); //Queue mode
if (r <= 0)
{
rd_kafka_poll(m_kafka_handle, 1000);
continue;
}

rd_kafka_poll(m_kafka_handle, 0); //Poll to handle stats callbacks

// Sleep(1000);
// break;
}

//----------Stop consuming------------------------------
for (int i = 0; i < partition_cnt; i++)
{
int r = rd_kafka_consume_stop(m_kafka_topic, (int32_t)i);
if (r == -1)
{
printf("Error: in consume_stop: %s\n", rd_kafka_err2str(rd_kafka_last_error()));
}
}

return ret;
}

void CKafkaConsumer::err_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque)
{
printf("%% ERROR CALLBACK: %s: %s: %s\n", rd_kafka_name(rk), rd_kafka_err2str((rd_kafka_resp_err_t)err), reason);
}

void CKafkaConsumer::throttle_cb(rd_kafka_t *rk, const char *broker_name, int32_t broker_id, int throttle_time_ms, void *opaque)
{
printf("%% THROTTLED %dms by %s (%d)\n", throttle_time_ms, broker_name, broker_id);
}

void CKafkaConsumer::offset_commit_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *offsets, void *opaque)
{
int i;
int verbosity = 1;

if (err || verbosity >= 2)
{
printf("%% Offset commit of %d partition(s): %s\n", offsets->cnt, rd_kafka_err2str(err));
}

for (i = 0; i < offsets->cnt; i++)
{
rd_kafka_topic_partition_t * rktpar = &offsets->elems[i];

if (rktpar->err || verbosity >= 2)
{
printf("%% %s [%d] @ %d: %s\n", rktpar->topic, rktpar->partition, rktpar->offset, rd_kafka_err2str(err));
}
}
}

int CKafkaConsumer::stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque)
{
printf("%s\n", json);
return 0;
}

void CKafkaConsumer::logger(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
{
fprintf(stdout, "RDKAFKA-%i-%s: %s: %s\n", level, fac, rd_kafka_name(rk), buf);
}

void CKafkaConsumer::msg_consume(rd_kafka_message_t *rkmessage, void *opaque)
{
CKafkaConsumer * p = (CKafkaConsumer *)opaque;

if (p && p->m_consumer_callback)
{
p->m_consumer_callback(rkmessage, p->m_consumer_callback_param);
return;
}

if (rkmessage->err)
{
if(rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF)
{
printf("[INFO] Consumer reached end of %s [%d] message queue at offset %d\n", rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition, rkmessage->offset);
return;
}

printf("Error: Consume error for topic \"%s\" [%d] offset %d: %s\n", rkmessage->rkt ? rd_kafka_topic_name(rkmessage->rkt) : "", rkmessage->partition, rkmessage->offset, rd_kafka_message_errstr(rkmessage));

return;
}

if (rkmessage->key_len)
{
printf("Key: %d: %s\n", (int)rkmessage->key_len, (char *)rkmessage->key);
}

printf("%d: %s\n", (int)rkmessage->len, (char *)rkmessage->payload);
}


---
参考文章:
对librdkafka的C++封装
kafka C++ 生产者 消费者
c语言使用librdkafka库实现kafka的生产和消费实例
c++(11)使用librdkafka库实现kafka的消费实例
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  kafka librdkafka qtcreator