kafkaAPI使用以及常用配置介绍
这篇文章主要介绍kafka中JAVA API的使用,这里面为了介绍配置,所以使用的是原生的javaapi操作,kafka可以与spring通过xml配置集成,或者更加简单通过spring boot引入starter,通过(AutoConfiguration)自动配置完成集成。但其实无论何种使用方式,其根本都是使用原生pai进行操作。
使用maven依赖管理,引入kafka依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.1.0</version> </dependency>
然后是一个生产者的demo,代码如下
public class KafkaProducerDemo extends Thread{ private KafkaProducer<Integer, String> kafkaProducer; private String topic; private boolean isAsync; public KafkaProducerDemo(String topic, boolean isAysnc) { //配置kafka生产者的属性配置 Properties properties = new Properties(); //集群broker地址,多个broker地址逗号隔开 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.15:9092"); //设置生产者id properties.put(ProducerConfig.CLIENT_ID_CONFIG,"KafkaProducerDemo"); //设置发送消息ack模式 properties.put(ProducerConfig.ACKS_CONFIG,"-1"); //key序列化类 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer"); //value序列化类 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //设置批量发送消息的size properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 5); //延迟发送的时间,延迟时间内的消息一起发送到broker' properties.put(ProducerConfig.LINGER_MS_CONFIG, 1000); //每次请求最大的字节数 properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024); kafkaProducer=new KafkaProducer<Integer, String>(properties); this.topic=topic; this.isAsync=isAysnc; } public KafkaProducerDemo(){ } @Override public void run() { int count = 50; if (isAsync){ do { kafkaProducer.send(new ProducerRecord<Integer, String>(topic, count,"isAsyncSend" + count), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { //异步发送回调函数,异步发送过程是类似队列消费过程,先将消息放到列表,然后有一个线程扫描这个列表,发现有消息则进行发送消费 if(recordMetadata!=null){ System.out.println("分区"+recordMetadata.partition()+ "\n 偏移"+recordMetadata.offset()); } } }); count--; }while (count >0); }else { //同步发送消息,get是阻塞进行的 do { try { RecordMetadata recordMetadata = kafkaProducer.send(new ProducerRecord<Integer, String>(topic, count,"isAsyncSend" + count)).get(); System.out.println("分区"+recordMetadata.partition()+ "\n 偏移"+recordMetadata.offset()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } count--; }while (count >0); } } public void init() { } public static void main(String[] args) { KafkaProducerDemo kafkaProducerDemo = new KafkaProducerDemo("test", true); kafkaProducerDemo.start(); } }
拿出比较重要的配置详细介绍,其实上面的配置常量,在对应的源码里面都有很详细的作用介绍,有兴趣可以自己去看。
ProducerConfig.ACKS_CONFIG
0:表示producer不会发送消息后等待任何broker的响应,无法保证消息是否成功发送到broker中。
1:producer只需要得到分区副本中leader的确认就可以。
all:producer需要等到分区副本中所有的副本对消息确认,才可以进行确认。
ProducerConfig.BATCH_SIZE_CONFIG
属于一种优化策略,批量发送消息
为了减少网络请求次数,采取批量发送消息的策略。同时批量发送时消息里面可能有发送到不同分区的消息,而分区也可能落在不同的broker,所以发送时时按不同分区来分发的。
ProducerConfig.LINGER_MS_CONFIG
同样属于一种优化策略,表示延时时间,在延时时间内进行一次消息批量发送,可以配合batch_size使用
kafka生产者的同步发送与异步发送
这里的生产者发送消息存在两种模式:同步和异步。
同步发送的时候,调用send方法后利用futrue阻塞等待返回结果,get()获得发送结果。
异步发送时,先把要发送的消息放到一个列表,然后有一个工作线程负责扫描列表,发现有待发送消息的话就进行消息发送,同时获得发送结果后调用回调函数。
然后是kafka消费者api使用demo,代码如下。
public class KafkaConsumerDemo implements Runnable{ private KafkaConsumer kafkaConsumer = null; public KafkaConsumerDemo(String topic) { Properties properties=new Properties(); //连接broker集群地址,多个broker逗号隔开 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.15:9092"); //配置消费者所属的分组id properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KafkaConsumerDemo"); //是否自动ack properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); //若果是自动ack,那么自动ack的频率是多长 properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); //反序列化key properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); //反序列化value properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //消费者加入group时,消费offset设置策略,earliest重置offset为最早的偏移地址,latest重置ofsset为已经消费的最大偏移,none没有offset时抛异常 properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); //每次允许拉取最大的消息数量 properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5); kafkaConsumer=new KafkaConsumer(properties); kafkaConsumer.subscribe(Collections.singletonList(topic)); } public KafkaConsumerDemo() { } @Override public void run() { while(true){ //拉取消息,暂时不确认消息 ConsumerRecords<Integer,String> consumerRecord=kafkaConsumer.poll(1000); for(ConsumerRecord record:consumerRecord){ System.out.println("-------------message receive:"+record.value()); kafkaConsumer.commitAsync(); } } } public static void main(String[] args) { new Thread(new KafkaConsumerDemo("test")).start(); } }
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
跟别的mq中间件类似,消费者消费消息也有消息确认机制,这个配置true表示开启自动消息确认机制,否则需要手动确认消息
ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG
假如开启了自动ack的话,表示多久进行一次批量ack,批量ack也是一种优化策略。类似 activeMQ的 optimizeAcknowledge批量优化。
ConsumerConfig.MAX_POLL_RECORDS_CONFIG
表示一次可以最多拉取多少条消息进行消费,这个属于批量获取消息,类似activeMQ的prefecthSize.
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
这个配置是针对新加入消费者组的消费者配置的,配置新消费者从哪个offset开始消费消息。
public static final String AUTO_OFFSET_RESET_DOC = "What to do when there is no initial offset in been deleted): <ul><li>earliest: automatically reset the offset to the earliest offset<li>latest: automatically reset the offset to the latest offset</li><li>none: throw exception to the consumer if consumer.</li></ul>";
看了上面一段解释,可以知道这个设置是为了没有offset(初始化时或者对应 的offset消息被删了)的comsumer重置offset的策略
earliest:重置offset为最小的未被消费的offset
latest:重置offset为最新的offset
none:没有offset时直接抛出异常
- Kafka 0.9+Zookeeper3.4.6集群搭建、配置,新Client API的使用要点,高可用性測试,以及各种坑
- Kafka 0.9+Zookeeper3.4.6集群搭建、配置,新Client API的使用要点,高可用性测试,以及各种坑 (转载)
- Kafka 0.9+Zookeeper3.4.6集群搭建、配置,新Client API的使用要点,高可用性测试,以及各种坑
- Kafka 0.9+Zookeeper3.4.6集群搭建、配置,新Client API的使用要点,高可用性测试,以及各种坑
- Kafka 0.9+Zookeeper3.4.6集群搭建、配置,新Client API的使用要点,高可用性测试,以及各种坑 (转载)
- Kafka 0.9+Zookeeper3.4.6集群搭建、配置,新Client API的使用要点,高可用性测试,以及各种坑
- 03_dbcp数据源依赖jar包,DBCP中API介绍,不同过dbcp方式使用dbcp数据库连接池,通过配置文件使用dbcp数据库连接池
- 解析PHPExcel使用的常用说明以及把PHPExcel整合进CI框架的介绍
- mongodb java api常用方法的使用以及和spring的集成使用
- GCD常用基本API介绍以及创建单例的两种方式
- 使用html5中video自定义播放器必备知识点总结以及JS全屏API介绍
- JAVAWEB开发之Struts2详解(一)——Struts2框架介绍与快速入门、流程分析与工具配置以及Struts2的配置以及Action和Result的详细使用
- SSH SecureCRT介绍以及相关使用配置
- SQlite数据库简介&介绍以及使用API调用
- Elasticsearch集群配置以及REST API使用
- Tomcat服务器的介绍以及配置、使用
- struts2的常量配置以及常用常量的相关介绍
- _00017 Kafka的体系结构介绍以及Kafka入门案例(初级案例+Java API的使用)
- Spark Streaming 中使用kafka低级api+zookeeper 保存 offset 并重用 以及 相关代码整合
- STL中的常用算法介绍以及使用