springboot2.x +kafka使用和源码分析四(kafka事务)
kafka对于事务的支持(0.11.0.0客户端库开始添加了对事务的支持,kafka针对于事务机制新增名为 __transaction_state topic用以保存数据):
-
KafkaTransactionManager
:与spring提供的事务机制一起使用(@Transactional
,TransactionTemplate
等等)。 -
使用
KafkaMessageListenerContainer 事务性监听容器(消费者保证消费
Exactly Once仅消费处理一次)
-
使用KafkaTemplate
如果需要开启事务机制,使用默认配置需要在yml添加spring.kafka.producer.transaction-id-prefix配置。
或者自己初始化bean在上述KafkaProducerConfigure中添加
[code] /** * 构建生产者工厂类 */ @Bean public ProducerFactory<Integer, String> producerFactory() { Map<String, Object> configs = producerConfigs(); DefaultKafkaProducerFactory<Integer,String> producerFactory = new DefaultKafkaProducerFactory(configs,new IntegerSerializer(),new StringSerializer()); //设置事务Id前缀 开启事务 producerFactory.setTransactionIdPrefix("tx-"); return producerFactory; } @Bean public KafkaTransactionManager<Integer, String> kafkaTransactionManager(ProducerFactory<Integer, String> producerFactory) { return new KafkaTransactionManager<>(producerFactory); }
将KafkaTransactionManager注入到spring中。如果开启的事务,则后续发送消息必须使用@Transactional注解或者使用kafkaTemplate.executeInTransaction() ,否则抛出java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
源码分析:
[code] /** *信息的发送最终会执行此方法 */ protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) { //判断是否开启线程 if (this.transactional) { //判断当前消息是通过事务的方式发送 Assert.state(inTransaction(), "No transaction is in process; " + "possible solutions: run the template operation within the scope of a " + "template.executeInTransaction() operation, start a transaction with @Transactional " + "before invoking the template method, " + "run in a transaction started by a listener container when consuming a record"); } //获取Producer 对象用于发送消息 final Producer<K, V> producer = getTheProducer(); this.logger.trace(() -> "Sending: " + producerRecord); //定义发送结果回调对象 final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>(); producer.send(producerRecord, buildCallback(producerRecord, producer, future)); //是否开启自动刷新 if (this.autoFlush) { flush(); } this.logger.trace(() -> "Sent: " + producerRecord); return future; } /** *判断方法的执行是否在事务中 */ public boolean inTransaction() { return this.transactional && ( //当前执行线程在ThreadLocal中是否保存过producer如果有说明已在事务中 this.producers.get() != null || TransactionSynchronizationManager.getResource(this.producerFactory) != null || TransactionSynchronizationManager.isActualTransactionActive()); }
1:本地事务支持(不支持事务嵌套)
[code] /** * 测试kafka事务机制 */ @RequestMapping("sendSyncPersonInfoStrByTransaction") public void sendSyncPersonInfoStrByTransaction(){ JSONObject j = new JSONObject(); j.put("name","张三测试事务"); j.put("sex","男"); j.put("age",18); Integer key = new Random().nextInt(100); /** * 如果KafkaTransactionManager正在进行一个事务,则不使用它。而是使用新的“嵌套”事务。 */ boolean flag =kafkaTemplate.executeInTransaction(t->{ //如果在这里这些任何异常抛出 代表此次事务需要进行数据回滚 t.send("springboot_test_topic",key,j.toJSONString()) .get(5, TimeUnit.SECONDS); j.put("sex","女"); t.send("springboot_test_topic",key+10,j.toJSONString()) .get(5, TimeUnit.SECONDS); int i = 0/0; return true; }); System.out.println(flag); }
运行上述测试代码,当代码运行报错( int i = 0/0; 除零异常)时,不会发送任何信息到kafka中
查看executeInTransaction方法源码可以知道
[code]public <T> T executeInTransaction(KafkaOperations.OperationsCallback<K, V, T> callback) { Assert.notNull(callback, "'callback' cannot be null"); Assert.state(this.transactional, "Producer factory does not support transactions"); //在ThreadLocal保证线程安全 Producer<K, V> producer = this.producers.get(); Assert.state(producer == null, "Nested calls to 'executeInTransaction' are not allowed"); String transactionIdSuffix; if (this.producerFactory.isProducerPerConsumerPartition()) { transactionIdSuffix = TransactionSupport.getTransactionIdSuffix(); TransactionSupport.clearTransactionIdSuffix(); } else { transactionIdSuffix = null; } //创建producer 事务的操作由此对象处理 producer = this.producerFactory.createProducer(this.transactionIdPrefix); try { //开启事务 producer.beginTransaction(); } catch (Exception e) { //如果发送异常关闭producer 不占用资源 closeProducer(producer, false); throw e; } //将 this.producers.set(producer); try { //执行业务代码 T result = callback.doInOperations(this); try { //提交事务 producer.commitTransaction(); } catch (Exception e) { throw new KafkaTemplate.SkipAbortException(e); } return result; } catch (KafkaTemplate.SkipAbortException e) { // NOSONAR - exception flow control throw ((RuntimeException) e.getCause()); // NOSONAR - lost stack trace } catch (Exception e) { //发生异常 终止事务 producer.abortTransaction(); throw e; } finally { //设置事务id if (transactionIdSuffix != null) { TransactionSupport.setTransactionIdSuffix(transactionIdSuffix); } //在ThreadLocal移除Producer this.producers.remove(); //关闭资源 closeProducer(producer, false); } }
2:使用@Transactional(transactionManager = "kafkaTransactionManager",rollbackFor = Exception.class) 使用
[code]@RequestMapping("sendSyncPersonInfoStrByTransactionZJ") @Transactional(transactionManager = "kafkaTransactionManager",rollbackFor = Exception.class) public void sendSyncPersonInfoStrByTransactionZJ(){ JSONObject j = new JSONObject(); j.put("name","张三测试事务"); j.put("sex","男"); j.put("age",18); Integer key = new Random().nextInt(100); kafkaTemplate.send("transaction_test_topic",20,j.toJSONString()); j.put("sex","女"); kafkaTemplate.send("transaction_test_topic",10,j.toJSONString()); // int i = 0/0; }
3:嵌套事务
在业务系统可能会存在以下的需求,当发送一条消息时,需要记录一条日志到业务数据库(mysql)中,那么这里面存在两种数据源(kafka,mysql)。这也就是我们说的嵌套事务,那么如何保证去数据一致性。spring提供了自己的解决方案(需要结合Consumer的Listen,后续表明)
Demo项目github地址:https://github.com/fangyuan94/kafkaDemo
- 点赞 1
- 收藏
- 分享
- 文章举报
- springboot2.x +kafka使用和源码分析五(消费者配置使用)
- springboot2.x+kafka使用和源码分析七(消费者和生产者使用拦截器)
- springboot2.x +kafka使用和源码分析八(自定义分区器)
- springboot2.x +redis使用和源码分析三(序列化器)
- springboot2.x +redis使用和源码分析二(RedisTemplate)
- springboot2.x +redis使用和源码分析一(springboot自动装配源码分析)
- Springboot 2使用外部Tomcat源码分析
- springboot源码分析10-ApplicationContextInitializer使用
- springboot源码分析4-springboot之SpringFactoriesLoader使用
- springboot源码分析16-spring boot监听器使用
- 【原创】002 | 搭上SpringBoot事务源码分析专车
- SpringMVC @SessionAttributes 使用详解以及源码分析
- spring boot实战(第十篇)Spring boot Bean加载源码分析
- Spring Boot启动过程源码分析(二)事件监听器
- spring事务源码分析结合mybatis源码(一)
- 使用springboot+springsession实现分布式session以及源码解析
- Spring Boot 事务的使用
- Java分布式跟踪系统Zipkin(六):Brave源码分析-Brave和SpringBoot整合
- spring boot实战(第六篇)加载application资源文件源码分析
- Spring事务管理--多个ORM框架在使用时的情况分析