kafka学习笔记:一、Centos7安装kafka及生产者消费者创建示例
2019-11-24 07:05
1876 查看
一、环境说明
centos7(vm) + JDK1.8 + zookeeper3.5.5 + kafka2.11-2.3.1
下载JDK 8解压并安装,假设安装之后的目录为/usr/local/java/jdk1.8.0_231,以下的操作以此为前提
二、安装zookeeper
-
下载zookeeper并解压
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.5/apache-zookeeper-3.5.5-bin.tar.gz tar -zxf apache-zookeeper-3.5.5-bin.tar.gz mv apache-zookeeper-3.5.5-bin zookeeper
-
配置
创建数据存放目录:mkdir -p /var/lib/zookeeper
使用基本的配置参数创建zoo.cfgcat > /usr/local/zookeeper/conf/zoo.cfg << EOF tickTime=2000 dataDir=/var/lib/zookeeper clientPort=2181 EOF
-
启动zookeeper
export JAVA_HOME=/usr/local/java/jdk1.8.0_231 /usr/local/zookeeper/bin/zkServer.sh start
三、安装kafka
可以在官网的下载页面 http://kafka.apache.org/downloads.html 下载需要安装的版本;这里使用的是kafka2.11-2.3.1
- 解压
tar -zxf kafka_2.11-2.3.1.tgz mv kafka_2.11-0.9.0.1 /usr/local/kafka mkdir /tmp/kafka-logs
- 启动
export JAVA_HOME=/usr/java/jdk1.8.0_231 /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
如果想查看启动情况,可以把-daemon去掉,但去掉该参数之后,命令终端如果关闭,kafka也会随之关闭。 - 关闭
kafka启动后如果需要关闭,可以通过以下命令关闭/usr/local/kafka/bin/kafka-server-stop.sh
四、生产者示例
新建一个maven工程,引入如下依赖:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.1.1</version> </dependency>
创建生产者:
public class MessageProducer { private static Properties kafkaProps; private static Producer<String, String> kafkaProducer; static{ kafkaProps = new Properties(); kafkaProps.put("bootstrap.servers", "192.168.254.131:9092"); kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kafkaProducer = new KafkaProducer<String, String>(kafkaProps); } /** * 一、发送并忘记(不关心消息是否正常到达) * @param producerRecord */ public void sendMsgAndForget(ProducerRecord<String, String> producerRecord){ kafkaProducer.send(producerRecord); } /** * 二、同步发送(等待返回Future对象) * @param producerRecord * @return * @throws ExecutionException * @throws InterruptedException */ public RecordMetadata sendSynMsg(ProducerRecord<String, String> producerRecord) throws ExecutionException, InterruptedException { RecordMetadata metaData = kafkaProducer.send(producerRecord).get(); return metaData; } /** * 三、异步发送(指定回调函数,服务器在返回响应时调用该函数) * @param producerRecord */ public void sendAsynMsg(ProducerRecord<String, String> producerRecord){ kafkaProducer.send(producerRecord, new ProducerCallback()); } public static void main(String[] args) throws ExecutionException, InterruptedException { MessageProducer messageProducer = new MessageProducer(); ExecutorService executorService = Executors.newFixedThreadPool(10); for(int i=0; i < 10; i++){ executorService.submit(new Runnable() { @Override public void run() { while(true){ Random random = new Random(); int randNum = random.nextInt(3)%3 + 1; ProducerRecord<String, String> record = null; switch (randNum){ case 1 : record = new ProducerRecord<String, String>("test.topic", "smaf", "send and forget"); messageProducer.sendMsgAndForget(record); break; case 2 : record = new ProducerRecord<String, String>("test.topic", "send", "send"); try { messageProducer.sendSynMsg(record); } catch (ExecutionException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } break; case 3: record = new ProducerRecord<String, String>("test.topic", "sendAsyn", "send asyn"); messageProducer.sendAsynMsg(record); break; } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }); } } } /** * 回调处理类 */ class ProducerCallback implements Callback { public void onCompletion(RecordMetadata recordMetadata, Exception e) { //回调处理逻辑 if(null != e){ e.printStackTrace(); } } }
五、消费者示例
创建消费者
public class MessageConsumer { private static Properties kafkaProps; private static Consumer<String, String> kafkaConsumer; static{ kafkaProps = new Properties(); kafkaProps.put("bootstrap.servers", "192.168.254.131:9092"); kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); kafkaProps.put("group.id", "testGroup"); kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); kafkaConsumer = new KafkaConsumer<String, String>(kafkaProps); } public void consumeMessage(String topic) throws InterruptedException { kafkaConsumer.subscribe(Collections.singletonList(topic)); Duration duration = Duration.ofSeconds(10l); while(true){ ConsumerRecords<String, String> records = kafkaConsumer.poll(duration); System.out.println("new messages:"); if(records.count()==0) System.out.println("empty"); for(ConsumerRecord<String, String> record : records){ System.out.printf("topic=%s,partition=%s,key=%s,value=%s\n",record.topic(), record.partition(), record.key(), record.value()); } } } public static void main(String[] args) throws InterruptedException { MessageConsumer messageConsumer = new MessageConsumer(); messageConsumer.consumeMessage("test.topic"); } }
六、注意
如果遇到连接超时的问题:
Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic test.topic not present in metadata after 60000 ms. at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1269) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:933) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:856) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:743) Caused by: org.apache.kafka.common.errors.TimeoutException: Topic test.topic not present in metadata after 60000 ms.
可以通过以下方式处理:
查看防火墙是否开启 firewall-cmd --state 查看9092端口是否对外开放 firewall-cmd --list-ports 如果没有对外开放,使用命令开放端口 firewall-cmd --zone=public --add-port=9092/tcp --permanent firewall-cmd --reload
相关文章推荐
- java多线程总结及示例(线程创建、后台线程、volatile、线程池、生产者消费者)(转)...
- java个人学习笔记19(多生产者多消费者+循环判断标记+notifyAll()+Lock+Condition)
- Linux学习笔记(四)---centos7系统安装后的一些简单操作
- 学习笔记之----生产者-消费者模型的实现
- java并发学习笔记(一):wait() notifyAll() 生产者 消费者
- Django学习笔记(一)--安装与创建工程
- (四)Kafka 学习笔记之消费者
- 学习笔记--Git安装 创建版本库 图文详解
- 第116讲:Hadoop集群之安装Java、创建Hadoop用户、配置SSH等实战学习笔记
- vue.js学习笔记(一)安装及项目的创建和运行
- Maven学习笔记(一):Maven介绍、安装及简单示例
- Django框架学习笔记(1.安装创建初识)
- 学习笔记:从0开始学习大数据-1.centos7安装
- Centos7安装gitlab11 学习笔记之备份恢复及邮箱配置
- Kafka集群安装部署、Kafka生产者、Kafka消费者
- Python学习笔记1:安装,创建,执行
- Git学习笔记(1)——安装,配置,创建库,文件添加到库
- Kafka生产者消费者java示例(包含Avro序列化)
- Android开发学习笔记(三)——基于Eclipse开发环境的搭建及HelloAndroid示例程序的创建
- ExtJS 学习笔记 示例1-点击按钮弹出一个新窗体,避免重复创建