springboot2.x+kafka使用和源码分析七(消费者和生产者使用拦截器)
2020-01-11 18:26
666 查看
Apache Kafka提供了一种向生产者和消费者添加拦截器的机制,此拦截器由kafka进行管理和spring无关。所以无法使用Spring依赖注入功能。但是我们可以使用拦截器提供的config()
方法来手动来获取这些bean依赖。
第一步:自定义拦截器:
[code]/** * 自定义生产者拦截器 * @author fangyuan */ public class PersonInfoProducerInterceptor implements ProducerInterceptor<Integer,String> { private SomeBean someBean; @Override public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) { someBean.execute("producer interceptor"); return record; } /** * 被手动提交后执行代码 * @param metadata * @param exception */ @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { } /** * 关闭需要执行代码 */ @Override public void close() { } /** * 可以从配置文件获取一些配置信息 * @param configs */ @Override public void configure(Map<String, ?> configs) { someBean = (SomeBean) configs.get("someBean"); } }
[code]/** * 自定义消费者拦截器 * @author fangyuan */ public class PersonInfoConsumerInterceptor implements ConsumerInterceptor<Integer,String> { private SomeBean someBean; @Override public ConsumerRecords<Integer, String> onConsume(ConsumerRecords<Integer, String> records) { someBean.execute("consumer interceptor"); return records; } /** * 提交offset 时需要处理的 * @param offsets */ @Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) { } /** * 关闭需要执行代码 */ @Override public void close() { } /** * 可以从配置文件获取一些配置信息 * @param configs */ @Override public void configure(Map<String, ?> configs) { someBean = (SomeBean) configs.get("someBean"); } }
[code]/** *拦截器中依赖Bean * @author fangyuan */ public class SomeBean { public void execute(String nr){ System.out.println(nr); } }
第二步:消费者生产者绑定拦截器
[code]消费者: props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, PersonInfoConsumerInterceptor.class.getName()); SomeBean someBean = new SomeBean(); props.put("someBean",someBean); 生产者: props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, PersonInfoProducerInterceptor.class.getName()); SomeBean someBean = new SomeBean(); props.put("someBean",someBean);
测试结果:
Demo项目github地址:https://github.com/fangyuan94/kafkaDemo
- 点赞 1
- 收藏
- 分享
- 文章举报
相关文章推荐
- springboot2.x +kafka使用和源码分析八(自定义分区器)
- springboot2.x +redis使用和源码分析二(RedisTemplate)
- springboot2.x +redis使用和源码分析一(springboot自动装配源码分析)
- springboot2.x +redis使用和源码分析三(序列化器)
- spring boot 集成kafka (多线程,消费者使用kafka的原生api实现,因为@KakfkaListener修改groupId无效)
- 从源码分析如何优雅的使用 Kafka 生产者
- springboot源码分析10-ApplicationContextInitializer使用
- Springboot 整各 kafka(生产者--消费者)
- 分布式消息系统:Kafka(九)应用Spring Boot实现消费者和生产者
- Spring-Boot使用Spring-Kafka生产者
- springboot源码分析4-springboot之SpringFactoriesLoader使用
- 从源码分析如何优雅的使用 Kafka 生产者
- springboot源码分析16-spring boot监听器使用
- Springboot 2使用外部Tomcat源码分析
- springboot2.x整合rabbitMQ:简单的生产者和消费者
- spring boot的拦截器简单使用
- kafka集群搭建和使用Java写kafka生产者消费者
- SpringBoot-Loader源码分析系列2:启动 new JarLauncher().launch(args)的.launch(args)部分
- spring boot + kafka 使用详细步骤
- kafka集群搭建和使用Java写kafka生产者消费者