Kafka 生产消费实例
2017-06-12 20:28
381 查看
环境准备
创建topic
命令行模式
执行生产者消费者实例
客户端模式
运行消费者生产者
1.登录到kafka集群中节点,并切换到kafka管理员用户下
2.创建topic
注意创建topic时会指定分区数,zookerper集群名及topicname要换成自己的,topicname不要重复
3.给我自己的用户创建读写topic的权限
2.将kafka用户目录下的producer.properties、consumer.properties拷贝到自己目录下
生产消费者实例启动后,在生产者窗口中输入任意字符后,消费者窗口能接收到,则实例运行完成。
命令行的实例很简单,就一个收发功能,只是让我们先认识一下kafka的生产消费形式。实际项目中都是在代码中实现生产消费的。
当然,客户端的生产者启动后,命令行消费者同样可以收到消息。不过如果使用kerberos认证后,一定要注意客户端和服务端的时间,kerberos有个时间检验,若两端时间不一致,则消费者收不到消息。
创建topic
命令行模式
执行生产者消费者实例
客户端模式
运行消费者生产者
1. 环境准备
说明:kafka集群环境我比较懒直接使用公司现有的环境。安全起见,所有的操作都是在自己用户下完成的,若是自己的kafka环境,完全可以使用kafka管理员的用户。创建topic时需要在kafka管理员的用户下完成。1.登录到kafka集群中节点,并切换到kafka管理员用户下
ssh 172.16.150.xx 22 su - kafka
2.创建topic
创建topic命令: kafka-topics --zookeeper bdap-nn-1.cebbank.com,bdap-mn-1.cebbank.com,bdap-nn-2.cebbank.com:2181/kafka --create --topic topicname --partitions 4 --replication-factor 3 查询topic命令: kafka-topics --zookeeper bdap-nn-1.cebbank.com,bdap-mn-1.cebbank.com,bdap-nn-2.cebbank.com:2181/kafka –list
注意创建topic时会指定分区数,zookerper集群名及topicname要换成自己的,topicname不要重复
3.给我自己的用户创建读写topic的权限
写权限: kafka-acls --authorizer-properties zookeeper.connect=bdap-nn-1.cebbank.com,bdap-mn-1.cebbank.com,bdap-nn-2.cebbank.com:2181/kafka --add --allow-principal User:xx --operation Write --operation Describe --topic topicname 读权限: kafka-acls --authorizer-properties zookeeper.connect=bdap-nn-1.cebbank.com,bdap-mn-1.cebbank.com,bdap-nn-2.cebbank.com:2181/kafka --add --allow-principal User:xx --operation READ --topic topicname --group "*"
2. 命令行
1.需要切换到自己用户下2.将kafka用户目录下的producer.properties、consumer.properties拷贝到自己目录下
执行消费者实例
kafka-console-consumer --zookeeper bdap-nn-1.cebbank.com,bdap-mn-1.cebbank.com,bdap-nn-2.cebbank.com:2181/kafka -consumer.config /home/username/consumer.properties --topic topicname --new-consumer --bootstrap-server bdap-nn-1.cebbank.com:9092 --from-beginning
执行生产者实例
kafka-console-producer --broker-list bdap-nn-1.cebbank.com:9092, bdap-mn-1.cebbank.com:9092,bdap-nn-2.cebbank.com:9092 --topic topicname --producer.config /home/username/producer.properties
生产消费者实例启动后,在生产者窗口中输入任意字符后,消费者窗口能接收到,则实例运行完成。
命令行的实例很简单,就一个收发功能,只是让我们先认识一下kafka的生产消费形式。实际项目中都是在代码中实现生产消费的。
3. 客户端
消费者代码
package kafka.consumer; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MyKafkaConsumer { private static final Logger log = LoggerFactory.getLogger(MyKafkaConsumer.class); public static void main(String[] args) throws InterruptedException { //kerberos配置,无认证时,不需要引入 System.setProperty("java.security.krb5.conf","D:/krb5.conf"); System.setProperty("java.security.auth.login.config","D:/lsz_jaas.conf"); Properties props = new Properties(); log.info("**********************************************"); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.kerberos.service.name", "kafka"); props.put("bootstrap.servers", "172.16.150.xx:9092,172.16.150.xx1:9092,172.16.150.xx2:9092"); //消费者分组,如果一个topic有4个分区,并且一个消费者分组有2个消费者。 //每个消费者消费2个分组 props.put("group.id", "kafka_lsz_1"); //自动提交偏移量,改为false为手动控制偏移量 props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000");//增大poll的间隔,可以为消费者提供更多的时间去处理返回的消息,缺点是此值越大将会延迟组重新平衡 props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // props.put("auto.offset.reset", "earliest"); KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props); consumer.seekToBeginning(); consumer.subscribe(Arrays.asList("lsztopic3")); /**自动提交偏移量*/ while (true) { //消费者订阅topic后,调用poll方法,加入到组。 //要留在组中,必须持续调用poll方法 ConsumerRecords<byte[], byte[]> records = consumer.poll(100); for (ConsumerRecord<byte[], byte[]> record : records) { System.out.println(record.topic()+" --- "+ record.partition()); System.out.printf("offset = %d, key = %s", record.offset(),record.key()+" \r\n"); } } } }
生产者
package kafka.producer; import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class consume { private static final Logger LOG = LoggerFactory.getLogger(MyKafkaProducer.class); private static final String TOPIC = "lsztopic3"; public static void main(String[] args) throws Exception { System.setProperty("java.security.krb5.conf","D:/krb5.conf"); System.setProperty("java.security.auth.login.config","D:/lsz_jaas.conf"); Properties props = new Properties(); props.put("bootstrap.servers", "172.16.150.xx:9092,172.16.150.xx1:9092,172.16.150.xx2:9092"); props.put("producer.type", "async"); // 重试次数 props.put("message.send.max.retries", "3"); // 异步提交的时候(async),并发提交的记录数 props.put("batch.num.messages", "200"); //缓存池大小 props.put("batch.size", "16384"); // 设置缓冲区大小,默认10KB props.put("send.buffer.bytes", "102400"); props.put("request.required.acks", "1"); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.kerberos.service.name", "kafka"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // props.put("partitioner.class", "kafka.producer.KafkaCustomPartitioner"); KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(props); String key = ""; String value = ""; ProducerRecord<String,String> records = new ProducerRecord<String,String>(TOPIC,key,value); kafkaProducer.send(records,new Callback(){ public void onCompletion(RecordMetadata metadata, Exception e) { if(e != null) e.printStackTrace(); System.out.println("The offset of the record we just sent is: "+metadata.partition() +" "+ metadata.offset()); } }); //Thread.sleep(5000); kafkaProducer.close(); } }
当然,客户端的生产者启动后,命令行消费者同样可以收到消息。不过如果使用kerberos认证后,一定要注意客户端和服务端的时间,kerberos有个时间检验,若两端时间不一致,则消费者收不到消息。
相关文章推荐
- c语言使用librdkafka库实现kafka的生产和消费实例
- c语言使用librdkafka库实现kafka的生产和消费实例(转)
- Kafka利用Java实现数据的生产和消费实例教程
- Kafka生产与消费实战
- kafka java 生产消费程序demo示例
- Linux qtcreator下kafka之librdkafka库的C++语言封装,实现生产和消费(★firecat推荐★)
- Kafka生产与消费实战
- Linux qtcreator下kafka之librdkafka库的C语言封装,实现生产和消费
- 生产消费模型实例C++11
- spring整合kafka项目生产和消费测试结果记录(一)
- Kafka生产与消费实战
- Kafka 使用Java实现数据的生产和消费demo
- kafka java 生产消费demo
- Kafka生产与消费实战
- Kafka 使用Java实现数据的生产和消费demo
- Kafka学习(4)——生产消费实践
- java线程的同步中notify和wait方法之生产消费实例讲解
- kafka0.8.2集群的环境搭建并实现基本的生产消费
- Kafka生产与消费实战
- Java API 生产和消费Kafka消息