您的位置:首页 > 其它

kafkaAPI使用以及常用配置介绍

2018-08-14 01:03 295 查看

这篇文章主要介绍kafka中JAVA API的使用,这里面为了介绍配置,所以使用的是原生的javaapi操作,kafka可以与spring通过xml配置集成,或者更加简单通过spring boot引入starter,通过(AutoConfiguration)自动配置完成集成。但其实无论何种使用方式,其根本都是使用原生pai进行操作。

使用maven依赖管理,引入kafka依赖

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>

然后是一个生产者的demo,代码如下

public class KafkaProducerDemo extends Thread{
private KafkaProducer<Integer, String> kafkaProducer;

private String topic;

private boolean isAsync;
public KafkaProducerDemo(String topic, boolean isAysnc) {
//配置kafka生产者的属性配置
Properties properties = new Properties();
//集群broker地址,多个broker地址逗号隔开
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.15:9092");
//设置生产者id
properties.put(ProducerConfig.CLIENT_ID_CONFIG,"KafkaProducerDemo");
//设置发送消息ack模式
properties.put(ProducerConfig.ACKS_CONFIG,"-1");
//key序列化类
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.IntegerSerializer");
//value序列化类
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
//设置批量发送消息的size
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 5);
//延迟发送的时间,延迟时间内的消息一起发送到broker'
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1000);
//每次请求最大的字节数
properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024);
kafkaProducer=new KafkaProducer<Integer, String>(properties);
this.topic=topic;
this.isAsync=isAysnc;
}

public KafkaProducerDemo(){

}
@Override
public void run() {
int count = 50;
if (isAsync){
do {
kafkaProducer.send(new ProducerRecord<Integer, String>(topic, count,"isAsyncSend" + count), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
//异步发送回调函数,异步发送过程是类似队列消费过程,先将消息放到列表,然后有一个线程扫描这个列表,发现有消息则进行发送消费
if(recordMetadata!=null){
System.out.println("分区"+recordMetadata.partition()+
"\n 偏移"+recordMetadata.offset());
}
}
});
count--;
}while (count >0);
}else {
//同步发送消息,get是阻塞进行的
do {
try {
RecordMetadata recordMetadata = kafkaProducer.send(new ProducerRecord<Integer, String>(topic, count,"isAsyncSend" + count)).get();
System.out.println("分区"+recordMetadata.partition()+
"\n 偏移"+recordMetadata.offset());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
count--;
}while (count >0);
}
}

public void init() {

}

public static void main(String[] args) {
KafkaProducerDemo kafkaProducerDemo = new KafkaProducerDemo("test", true);
kafkaProducerDemo.start();
}
}

拿出比较重要的配置详细介绍,其实上面的配置常量,在对应的源码里面都有很详细的作用介绍,有兴趣可以自己去看。
ProducerConfig.ACKS_CONFIG
0:表示producer不会发送消息后等待任何broker的响应,无法保证消息是否成功发送到broker中。
1:producer只需要得到分区副本中leader的确认就可以。
all:producer需要等到分区副本中所有的副本对消息确认,才可以进行确认。

ProducerConfig.BATCH_SIZE_CONFIG
属于一种优化策略,批量发送消息
为了减少网络请求次数,采取批量发送消息的策略。同时批量发送时消息里面可能有发送到不同分区的消息,而分区也可能落在不同的broker,所以发送时时按不同分区来分发的。
ProducerConfig.LINGER_MS_CONFIG
同样属于一种优化策略,表示延时时间,在延时时间内进行一次消息批量发送,可以配合batch_size使用

kafka生产者的同步发送与异步发送
这里的生产者发送消息存在两种模式:同步和异步。
同步发送的时候,调用send方法后利用futrue阻塞等待返回结果,get()获得发送结果。
异步发送时,先把要发送的消息放到一个列表,然后有一个工作线程负责扫描列表,发现有待发送消息的话就进行消息发送,同时获得发送结果后调用回调函数。

然后是kafka消费者api使用demo,代码如下。

public class KafkaConsumerDemo implements Runnable{
private KafkaConsumer kafkaConsumer = null;

public KafkaConsumerDemo(String topic) {
Properties properties=new Properties();
//连接broker集群地址,多个broker逗号隔开
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.15:9092");
//配置消费者所属的分组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KafkaConsumerDemo");
//是否自动ack
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
//若果是自动ack,那么自动ack的频率是多长
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
//反序列化key
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.IntegerDeserializer");
//反序列化value
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
//消费者加入group时,消费offset设置策略,earliest重置offset为最早的偏移地址,latest重置ofsset为已经消费的最大偏移,none没有offset时抛异常
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
//每次允许拉取最大的消息数量
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
kafkaConsumer=new KafkaConsumer(properties);
kafkaConsumer.subscribe(Collections.singletonList(topic));
}

public KafkaConsumerDemo() {
}

@Override
public void run() {
while(true){
//拉取消息,暂时不确认消息
ConsumerRecords<Integer,String> consumerRecord=kafkaConsumer.poll(1000);
for(ConsumerRecord record:consumerRecord){
System.out.println("-------------message receive:"+record.value());
kafkaConsumer.commitAsync();
}
}
}

public static void main(String[] args) {
new Thread(new KafkaConsumerDemo("test")).start();
}
}

ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
跟别的mq中间件类似,消费者消费消息也有消息确认机制,这个配置true表示开启自动消息确认机制,否则需要手动确认消息

ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG
假如开启了自动ack的话,表示多久进行一次批量ack,批量ack也是一种优化策略。类似 activeMQ的 optimizeAcknowledge批量优化。

ConsumerConfig.MAX_POLL_RECORDS_CONFIG
表示一次可以最多拉取多少条消息进行消费,这个属于批量获取消息,类似activeMQ的prefecthSize.

ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
这个配置是针对新加入消费者组的消费者配置的,配置新消费者从哪个offset开始消费消息。

public static final String AUTO_OFFSET_RESET_DOC = "What to do when there is no initial offset in
been deleted): <ul><li>earliest: automatically reset the offset to the earliest offset<li>latest: automatically reset the offset to the latest offset</li><li>none: throw exception to the consumer if
consumer.</li></ul>";

看了上面一段解释,可以知道这个设置是为了没有offset(初始化时或者对应 的offset消息被删了)的comsumer重置offset的策略
earliest:重置offset为最小的未被消费的offset
latest:重置offset为最新的offset
none:没有offset时直接抛出异常

阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐