您的位置:首页 > 编程语言 > Java开发

springboot2.x +kafka使用和源码分析三(生产者配置)

2020-01-11 18:26 731 查看

上一章描述springboot支持对于topic进行配置管理,本章主要叙说springboot对于produce支持。

这里通过两种方式

第一种:由springboot框架来初始化基础bean,我们只需要在yml配置文件中编写配置即可。如下图所示(常规配置 具体所有配置可参考http://kafka.apache.org/documentation/ 的producer)

第二种:我们自定义初始化基础bean后交由框架管理(会有助于我们理解springboot对于kafka的支持)

话不多说直接上源码:

第一步:初始化配置

[code]/**
* 编写统一生产者配置类
* @author fangyuan
*/
@Configuration
public class KafkaProducerConfigure {

/**
*构建
* @return
*/
@Bean
public Map<String, Object> producerConfigs() {
//设置produce基础配置
//生产环境这一块的配置可以参考kafka官网对prodeucer支持
Map<String, Object> props = new HashMap<>(10);

//设置kafka服务器集群地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//设置ack机制
props.put(ProducerConfig.ACKS_CONFIG, "-1");

return props;
}

/**
* 构建生产者工厂类 用于createProducer
*/
@Bean
public ProducerFactory<Integer, String> producerFactory() {
//这里可以传入对于key,value的序列化器
// key的作用是用于kafka对于value落定到不同partition分区,后面会对于kafka的partition机制进行描述 包括构建自定义partition器
ProducerFactory<Integer,String>  producerFactory =  new DefaultKafkaProducerFactory(producerConfigs(),new IntegerSerializer(),new StringSerializer());

return producerFactory;
}

/**
* 构建spring KafkaTemplate模版 用于发送消息
* @return
*/
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}

/**
* 构建另外一个kafkaTemplate
* @return
*/
@Bean
public KafkaTemplate<Integer,PersonInfo> personKafkaTemplate() {

IntegerSerializer keySerializer = new IntegerSerializer();
//自定义values序列化 用于序列化指定bean对象
//如何自定义序列化器可参考https://blog.csdn.net/F_Hello_World/article/details/103273861这边博文
PersonInfoSerializer valueSerializer = new PersonInfoSerializer();

ProducerFactory<Integer,PersonInfo>  producerPersonFactory =  new DefaultKafkaProducerFactory(producerConfigs(),keySerializer,valueSerializer);

KafkaTemplate<Integer,PersonInfo> kafkaTemplate = new KafkaTemplate(producerPersonFactory);

return kafkaTemplate;
}

第二步:使用template来是发送消息

目前template 发送消息由两种方式,一异步回调 二同步阻塞 

[code]
@RestController
public class DeomController {

@Autowired
private KafkaTemplate<Integer,String> kafkaTemplate;

@Autowired
private KafkaTemplate<Integer, PersonInfo> personKafkaTemplate;

@Autowired
private ProducerFactory<Integer, String> producerFactory;

/**
* 异步发送
*/
@RequestMapping("sendASyncPersonInfoStr")
public void sendASyncPersonInfoStr(){

//要发送内容
JSONObject j = new JSONObject();

j.put("name","张三异步");
j.put("sex","男");
j.put("age",18);

Integer key = new Random().nextInt(100);
//kafka发送消息
ListenableFuture<SendResult<Integer, String>>  future = kafkaTemplate.send("springboot_test_topic",key,j.toJSONString());
//异步回调确认
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {

//成功调用
@Override
public void onSuccess(SendResult<Integer, String> integerStringSendResult) {

handleSuccess(integerStringSendResult);
}
//失败调用
@Override
public void onFailure(Throwable throwable) {
throwable.printStackTrace();
}
});
kafkaTemplate.flush();

}

/**
* 同步发送信息
*/
@RequestMapping("sendSyncPersonInfoStr")
public void sendSyncPersonInfoStr(){

JSONObject j = new JSONObject();

j.put("name","张三同步");
j.put("sex","男");
j.put("age",18);

Integer key = new Random().nextInt(100);
//kafka发送消息
try {
SendResult<Integer, String> integerStringSendResult = kafkaTemplate.send("springboot_test_topic",key,j.toJSONString())
.get(5, TimeUnit.SECONDS);

kafkaTemplate.flush();
handleSuccess(integerStringSendResult);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}

}

public void handleSuccess(SendResult<Integer, String> integerStringSendResult){

ProducerRecord<Integer, String> producerRecord = integerStringSendResult.getProducerRecord();

//消息消息
System.out.println("=======key=========>"+producerRecord.key());
System.out.println("=======value=========>"+producerRecord.value());
//元数据消息
RecordMetadata recordMetadata = integerStringSendResult.getRecordMetadata();
//查看保存成功偏移量
System.out.println("=======partition=========>"+recordMetadata.partition());
System.out.println("=======offset=========>"+recordMetadata.offset());
System.out.println("=======timestamp=========>"+recordMetadata.timestamp());

}

/**
* 异步发送Person信息
*/
@RequestMapping("sendPersonInfo")
public void sendPersonInfo(){

Integer key = new Random().nextInt(100);

PersonInfo person = PersonInfo.builder().name("丽丽").age(38).sex("女").build();

final ProducerRecord<Integer,PersonInfo> record = new  ProducerRecord("personTopic",key,person);

//kafka发送消息
ListenableFuture<SendResult<Integer, PersonInfo>>  future = personKafkaTemplate.send(record);
//异步回调确认
future.addCallback(new ListenableFutureCallback<SendResult<Integer, PersonInfo>>() {

//成功调用
@Override
public void onSuccess(SendResult<Integer, PersonInfo> integerStringSendResult) {

//发送成功
//元数据消息
RecordMetadata recordMetadata = integerStringSendResult.getRecordMetadata();

//查看保存成功偏移量
System.out.println("=======partition=========>"+recordMetadata.partition());
System.out.println("=======offset=========>"+recordMetadata.offset());
System.out.println("=======timestamp=========>"+recordMetadata.timestamp());
}
//失败调用
@Override
public void onFailure(Throwable throwable) {
throwable.printStackTrace();
}
});

kafkaTemplate.flush();

}
}

(异步回调) 通过http发送模拟产生消息:http://localhost:18082/sendASyncPersonInfoStr 

在控制台可以看到异步回调结果,会告知你最终保存的offset和partition

使用命令行bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic springboot_test_topic

验证是否由消息被产生

(同步阻塞)通过http发送模拟产生消息:http://localhost:18082/sendSyncPersonInfoStr

在控制台可以看到同步阻塞结果

通过命令行也可以观测到增加了一条消息。

验证使用自定义序列化器发送消息

(异步回调)通过http发送模拟产生消息:http://localhost:18082/sendPersonInfo

通过命令行观察信息

Demo项目github地址:https://github.com/fangyuan94/kafkaDemo

 

  • 点赞 1
  • 收藏
  • 分享
  • 文章举报
F_Hello_World 发布了38 篇原创文章 · 获赞 47 · 访问量 1059 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: