springboot2.x +kafka使用和源码分析三(生产者配置)
2020-01-11 18:26
731 查看
上一章描述springboot支持对于topic进行配置管理,本章主要叙说springboot对于produce支持。
这里通过两种方式
第一种:由springboot框架来初始化基础bean,我们只需要在yml配置文件中编写配置即可。如下图所示(常规配置 具体所有配置可参考http://kafka.apache.org/documentation/ 的producer)
第二种:我们自定义初始化基础bean后交由框架管理(会有助于我们理解springboot对于kafka的支持)
话不多说直接上源码:
第一步:初始化配置
[code]/** * 编写统一生产者配置类 * @author fangyuan */ @Configuration public class KafkaProducerConfigure { /** *构建 * @return */ @Bean public Map<String, Object> producerConfigs() { //设置produce基础配置 //生产环境这一块的配置可以参考kafka官网对prodeucer支持 Map<String, Object> props = new HashMap<>(10); //设置kafka服务器集群地址 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); //设置ack机制 props.put(ProducerConfig.ACKS_CONFIG, "-1"); return props; } /** * 构建生产者工厂类 用于createProducer */ @Bean public ProducerFactory<Integer, String> producerFactory() { //这里可以传入对于key,value的序列化器 // key的作用是用于kafka对于value落定到不同partition分区,后面会对于kafka的partition机制进行描述 包括构建自定义partition器 ProducerFactory<Integer,String> producerFactory = new DefaultKafkaProducerFactory(producerConfigs(),new IntegerSerializer(),new StringSerializer()); return producerFactory; } /** * 构建spring KafkaTemplate模版 用于发送消息 * @return */ @Bean public KafkaTemplate<Integer, String> kafkaTemplate() { return new KafkaTemplate<Integer, String>(producerFactory()); } /** * 构建另外一个kafkaTemplate * @return */ @Bean public KafkaTemplate<Integer,PersonInfo> personKafkaTemplate() { IntegerSerializer keySerializer = new IntegerSerializer(); //自定义values序列化 用于序列化指定bean对象 //如何自定义序列化器可参考https://blog.csdn.net/F_Hello_World/article/details/103273861这边博文 PersonInfoSerializer valueSerializer = new PersonInfoSerializer(); ProducerFactory<Integer,PersonInfo> producerPersonFactory = new DefaultKafkaProducerFactory(producerConfigs(),keySerializer,valueSerializer); KafkaTemplate<Integer,PersonInfo> kafkaTemplate = new KafkaTemplate(producerPersonFactory); return kafkaTemplate; }
第二步:使用template来是发送消息
目前template 发送消息由两种方式,一异步回调 二同步阻塞
[code] @RestController public class DeomController { @Autowired private KafkaTemplate<Integer,String> kafkaTemplate; @Autowired private KafkaTemplate<Integer, PersonInfo> personKafkaTemplate; @Autowired private ProducerFactory<Integer, String> producerFactory; /** * 异步发送 */ @RequestMapping("sendASyncPersonInfoStr") public void sendASyncPersonInfoStr(){ //要发送内容 JSONObject j = new JSONObject(); j.put("name","张三异步"); j.put("sex","男"); j.put("age",18); Integer key = new Random().nextInt(100); //kafka发送消息 ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send("springboot_test_topic",key,j.toJSONString()); //异步回调确认 future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() { //成功调用 @Override public void onSuccess(SendResult<Integer, String> integerStringSendResult) { handleSuccess(integerStringSendResult); } //失败调用 @Override public void onFailure(Throwable throwable) { throwable.printStackTrace(); } }); kafkaTemplate.flush(); } /** * 同步发送信息 */ @RequestMapping("sendSyncPersonInfoStr") public void sendSyncPersonInfoStr(){ JSONObject j = new JSONObject(); j.put("name","张三同步"); j.put("sex","男"); j.put("age",18); Integer key = new Random().nextInt(100); //kafka发送消息 try { SendResult<Integer, String> integerStringSendResult = kafkaTemplate.send("springboot_test_topic",key,j.toJSONString()) .get(5, TimeUnit.SECONDS); kafkaTemplate.flush(); handleSuccess(integerStringSendResult); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } public void handleSuccess(SendResult<Integer, String> integerStringSendResult){ ProducerRecord<Integer, String> producerRecord = integerStringSendResult.getProducerRecord(); //消息消息 System.out.println("=======key=========>"+producerRecord.key()); System.out.println("=======value=========>"+producerRecord.value()); //元数据消息 RecordMetadata recordMetadata = integerStringSendResult.getRecordMetadata(); //查看保存成功偏移量 System.out.println("=======partition=========>"+recordMetadata.partition()); System.out.println("=======offset=========>"+recordMetadata.offset()); System.out.println("=======timestamp=========>"+recordMetadata.timestamp()); } /** * 异步发送Person信息 */ @RequestMapping("sendPersonInfo") public void sendPersonInfo(){ Integer key = new Random().nextInt(100); PersonInfo person = PersonInfo.builder().name("丽丽").age(38).sex("女").build(); final ProducerRecord<Integer,PersonInfo> record = new ProducerRecord("personTopic",key,person); //kafka发送消息 ListenableFuture<SendResult<Integer, PersonInfo>> future = personKafkaTemplate.send(record); //异步回调确认 future.addCallback(new ListenableFutureCallback<SendResult<Integer, PersonInfo>>() { //成功调用 @Override public void onSuccess(SendResult<Integer, PersonInfo> integerStringSendResult) { //发送成功 //元数据消息 RecordMetadata recordMetadata = integerStringSendResult.getRecordMetadata(); //查看保存成功偏移量 System.out.println("=======partition=========>"+recordMetadata.partition()); System.out.println("=======offset=========>"+recordMetadata.offset()); System.out.println("=======timestamp=========>"+recordMetadata.timestamp()); } //失败调用 @Override public void onFailure(Throwable throwable) { throwable.printStackTrace(); } }); kafkaTemplate.flush(); } }
(异步回调) 通过http发送模拟产生消息:http://localhost:18082/sendASyncPersonInfoStr
在控制台可以看到异步回调结果,会告知你最终保存的offset和partition
使用命令行bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic springboot_test_topic
验证是否由消息被产生
(同步阻塞)通过http发送模拟产生消息:http://localhost:18082/sendSyncPersonInfoStr
在控制台可以看到同步阻塞结果
通过命令行也可以观测到增加了一条消息。
验证使用自定义序列化器发送消息
(异步回调)通过http发送模拟产生消息:http://localhost:18082/sendPersonInfo
通过命令行观察信息
Demo项目github地址:https://github.com/fangyuan94/kafkaDemo
- 点赞 1
- 收藏
- 分享
- 文章举报
相关文章推荐
- springboot2.x +kafka使用和源码分析五(消费者配置使用)
- springboot2.x+kafka使用和源码分析七(消费者和生产者使用拦截器)
- springboot2.x +kafka使用和源码分析八(自定义分区器)
- springboot2.x +kafka使用和源码分析四(kafka事务)
- springboot2.x +redis使用和源码分析三(序列化器)
- springboot2.x +redis使用和源码分析一(springboot自动装配源码分析)
- springboot2.x +redis使用和源码分析二(RedisTemplate)
- SpringBoot系列三:SpringBoot基本概念(统一父 pom 管理、SpringBoot 代码测试、启动注解分析、配置访问路径、使用内置对象、项目打包发布)
- 为什么Spring Boot推荐使用logback-spring.xml来替代logback.xml来配置logback日志的问题分析
- SpringBoot源码分析之BeanDefinitionLoader注册主Configuration的Java配置类
- SpringBoot系列:自动配置源码分析
- Spring Boot下Druid连接池的使用配置分析
- 详解Spring Boot下Druid连接池的使用配置分析
- springboot源码分析16-spring boot监听器使用
- 从源码分析如何优雅的使用 Kafka 生产者
- SpringBoot源码分析之环境和配置文件的加载
- spring boot 2.x 使用druid+ mybatis 配置多数据源
- springboot源码分析10-ApplicationContextInitializer使用
- 通过实例及源码分析关于SpringBoot启动类启动时自动配置问题
- Spring Boot下Druid连接池的使用配置分析