您的位置:首页 > 编程语言 > Java开发

springboot2.x+kafka使用和源码分析七(消费者和生产者使用拦截器)

2020-01-11 18:26 666 查看

Apache Kafka提供了一种向生产者和消费者添加拦截器的机制,此拦截器由kafka进行管理和spring无关。所以无法使用Spring依赖注入功能。但是我们可以使用拦截器提供的
config()
方法来手动来获取这些bean依赖。

第一步:自定义拦截器:

[code]/**
* 自定义生产者拦截器
* @author fangyuan
*/
public class PersonInfoProducerInterceptor implements ProducerInterceptor<Integer,String> {

private SomeBean someBean;

@Override
public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {

someBean.execute("producer interceptor");

return record;
}

/**
* 被手动提交后执行代码
* @param metadata
* @param exception
*/
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

}

/**
* 关闭需要执行代码
*/
@Override
public void close() {

}

/**
* 可以从配置文件获取一些配置信息
* @param configs
*/
@Override
public void configure(Map<String, ?> configs) {

someBean = (SomeBean) configs.get("someBean");
}
}
[code]/**
* 自定义消费者拦截器
* @author fangyuan
*/
public class PersonInfoConsumerInterceptor implements ConsumerInterceptor<Integer,String> {

private SomeBean someBean;

@Override
public ConsumerRecords<Integer, String> onConsume(ConsumerRecords<Integer, String> records) {

someBean.execute("consumer interceptor");

return records;
}

/**
* 提交offset 时需要处理的
* @param offsets
*/
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {

}

/**
* 关闭需要执行代码
*/
@Override
public void close() {

}

/**
* 可以从配置文件获取一些配置信息
* @param configs
*/
@Override
public void configure(Map<String, ?> configs) {

someBean = (SomeBean) configs.get("someBean");
}
}
[code]/**
*拦截器中依赖Bean
* @author fangyuan
*/
public class SomeBean {

public  void execute(String nr){
System.out.println(nr);

}
}

 

 

第二步:消费者生产者绑定拦截器

[code]消费者:
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, PersonInfoConsumerInterceptor.class.getName());
SomeBean someBean = new SomeBean();
props.put("someBean",someBean);

生产者:
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, PersonInfoProducerInterceptor.class.getName());
SomeBean someBean = new SomeBean();
props.put("someBean",someBean);

测试结果:

Demo项目github地址:https://github.com/fangyuan94/kafkaDemo

  • 点赞 1
  • 收藏
  • 分享
  • 文章举报
F_Hello_World 发布了38 篇原创文章 · 获赞 47 · 访问量 1056 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: