From 542c80126a16b87f91ebb4ffcdcff9b7770262a9 Mon Sep 17 00:00:00 2001 From: Bastien Bouclet Date: Thu, 26 Jul 2018 09:29:29 +0200 Subject: [PATCH] GH-753: Close transactional producer on error Fixes https://github.com/spring-projects/spring-kafka/issues/753 Improve exception handling for producer transaction commit / rollback * Close the producer if an exception is thrown while committing / rollbacking a transaction when synchronizing the Kafka transaction with another TransactionManager. * Don't reuse transactional producers if an exception is thrown when committing / rollbacking a transaction. Some of the exceptions are fatal and mean the producer cannot be reused. * Close the transactional producer if an exception occurs when calling beginTransaction when not using DefaultKafkaProducerFactory. --- .../core/DefaultKafkaProducerFactory.java | 40 ++++--- .../kafka/core/KafkaTemplate.java | 10 +- .../kafka/core/ProducerFactoryUtils.java | 25 +++-- .../core/KafkaTemplateTransactionTests.java | 54 +++++++++ .../ResourcelessTransactionManager.java | 105 ++++++++++++++++++ 5 files changed, 212 insertions(+), 22 deletions(-) create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/support/transaction/ResourcelessTransactionManager.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java index ab2df4f703..a92606e102 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java @@ -299,14 +299,6 @@ public void beginTransaction() throws ProducerFencedException { } catch (RuntimeException e) { this.txFailed = true; - logger.error("Illegal transaction state; producer removed from cache; possible cause: " - + "broker restarted during transaction", e); - try { - this.delegate.close(); - } - catch (Exception ee) { - // empty - } throw e; } } @@ -319,20 +311,40 @@ public void sendOffsetsToTransaction(Map offs @Override public void commitTransaction() throws ProducerFencedException { - this.delegate.commitTransaction(); + try { + this.delegate.commitTransaction(); + } + catch (RuntimeException e) { + this.txFailed = true; + throw e; + } } @Override public void abortTransaction() throws ProducerFencedException { - this.delegate.abortTransaction(); + try { + this.delegate.abortTransaction(); + } + catch (RuntimeException e) { + this.txFailed = true; + throw e; + } } @Override public void close() { - if (this.cache != null && !this.txFailed) { - synchronized (this) { - if (!this.cache.contains(this)) { - this.cache.offer(this); + if (this.cache != null) { + if (this.txFailed) { + logger.warn("Error during transactional operation; producer removed from cache; possible cause: " + + "broker restarted during transaction"); + + this.delegate.close(); + } + else { + synchronized (this) { + if (!this.cache.contains(this)) { + this.cache.offer(this); + } } } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index d4bb1dcaa7..4e8f1f3763 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -237,7 +237,15 @@ public T executeInTransaction(OperationsCallback callback) { Producer producer = this.producers.get(); Assert.state(producer == null, "Nested calls to 'executeInTransaction' are not allowed"); producer = this.producerFactory.createProducer(); - producer.beginTransaction(); + + try { + producer.beginTransaction(); + } + catch (Exception e) { + closeProducer(producer, false); + throw e; + } + this.producers.set(producer); T result = null; try { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java index 452d474732..3b84bdbf36 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java @@ -57,7 +57,15 @@ public static KafkaResourceHolder getTransactionalResourceHolder( .getResource(producerFactory); if (resourceHolder == null) { Producer producer = producerFactory.createProducer(); - producer.beginTransaction(); + + try { + producer.beginTransaction(); + } + catch (RuntimeException e) { + producer.close(); + throw e; + } + resourceHolder = new KafkaResourceHolder(producer); bindResourceToTransaction(resourceHolder, producerFactory); } @@ -128,14 +136,17 @@ protected boolean shouldReleaseBeforeCompletion() { @Override public void afterCompletion(int status) { - if (status == TransactionSynchronization.STATUS_COMMITTED) { - this.resourceHolder.commit(); + try { + if (status == TransactionSynchronization.STATUS_COMMITTED) { + this.resourceHolder.commit(); + } + else { + this.resourceHolder.rollback(); + } } - else { - this.resourceHolder.rollback(); + finally { + super.afterCompletion(status); } - - super.afterCompletion(status); } @Override diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java index 38089c3902..8b418b698b 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java @@ -37,6 +37,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; @@ -50,6 +51,7 @@ import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.transaction.ResourcelessTransactionManager; import org.springframework.kafka.test.rule.KafkaEmbedded; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.kafka.transaction.KafkaTransactionManager; @@ -180,6 +182,58 @@ public void testNoTx() { .hasMessageContaining("No transaction is in process;"); } + @Test + public void testTransactionSynchronization() { + MockProducer producer = new MockProducer<>(); + producer.initTransactions(); + + @SuppressWarnings("unchecked") + ProducerFactory pf = mock(ProducerFactory.class); + given(pf.transactionCapable()).willReturn(true); + given(pf.createProducer()).willReturn(producer); + + KafkaTemplate template = new KafkaTemplate<>(pf); + template.setDefaultTopic(STRING_KEY_TOPIC); + + ResourcelessTransactionManager tm = new ResourcelessTransactionManager(); + + new TransactionTemplate(tm).execute(s -> { + template.sendDefault("foo", "bar"); + return null; + }); + + assertThat(producer.history()).containsExactly(new ProducerRecord<>(STRING_KEY_TOPIC, "foo", "bar")); + assertThat(producer.transactionCommitted()).isTrue(); + assertThat(producer.closed()).isTrue(); + } + + @Test + public void testTransactionSynchronizationExceptionOnCommit() { + MockProducer producer = new MockProducer<>(); + producer.initTransactions(); + + @SuppressWarnings("unchecked") + ProducerFactory pf = mock(ProducerFactory.class); + given(pf.transactionCapable()).willReturn(true); + given(pf.createProducer()).willReturn(producer); + + KafkaTemplate template = new KafkaTemplate<>(pf); + template.setDefaultTopic(STRING_KEY_TOPIC); + + ResourcelessTransactionManager tm = new ResourcelessTransactionManager(); + + new TransactionTemplate(tm).execute(s -> { + template.sendDefault("foo", "bar"); + + // Mark the mock producer as fenced so it throws when committing the transaction + producer.fenceProducer(); + return null; + }); + + assertThat(producer.transactionCommitted()).isFalse(); + assertThat(producer.closed()).isTrue(); + } + @Configuration @EnableTransactionManagement public static class DeclarativeConfig { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/transaction/ResourcelessTransactionManager.java b/spring-kafka/src/test/java/org/springframework/kafka/support/transaction/ResourcelessTransactionManager.java new file mode 100644 index 0000000000..7e88387468 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/transaction/ResourcelessTransactionManager.java @@ -0,0 +1,105 @@ +/* + * Copyright 2017-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support.transaction; + +import java.util.ArrayList; +import java.util.List; + +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.TransactionException; +import org.springframework.transaction.support.AbstractPlatformTransactionManager; +import org.springframework.transaction.support.DefaultTransactionStatus; +import org.springframework.transaction.support.TransactionSynchronizationManager; + +@SuppressWarnings("serial") +public class ResourcelessTransactionManager extends AbstractPlatformTransactionManager { + + @Override + protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException { + ((ResourcelessTransaction) transaction).begin(); + } + + @Override + protected void doCommit(DefaultTransactionStatus status) throws TransactionException { + if (logger.isDebugEnabled()) { + logger.debug("Committing resourceless transaction on [" + status.getTransaction() + "]"); + } + } + + @Override + protected Object doGetTransaction() throws TransactionException { + Object transaction = new ResourcelessTransaction(); + List resources; + if (!TransactionSynchronizationManager.hasResource(this)) { + resources = new ArrayList<>(); + TransactionSynchronizationManager.bindResource(this, resources); + } + else { + @SuppressWarnings("unchecked") + List stack = (List) TransactionSynchronizationManager.getResource(this); + resources = stack; + } + resources.add(transaction); + return transaction; + } + + @Override + protected void doRollback(DefaultTransactionStatus status) throws TransactionException { + if (logger.isDebugEnabled()) { + logger.debug("Rolling back resourceless transaction on [" + status.getTransaction() + "]"); + } + } + + @Override + protected boolean isExistingTransaction(Object transaction) throws TransactionException { + if (TransactionSynchronizationManager.hasResource(this)) { + List stack = (List) TransactionSynchronizationManager.getResource(this); + return stack.size() > 1; + } + return ((ResourcelessTransaction) transaction).isActive(); + } + + @Override + protected void doSetRollbackOnly(DefaultTransactionStatus status) throws TransactionException { + } + + @Override + protected void doCleanupAfterCompletion(Object transaction) { + List resources = (List) TransactionSynchronizationManager.getResource(this); + resources.clear(); + TransactionSynchronizationManager.unbindResource(this); + ((ResourcelessTransaction) transaction).clear(); + } + + private static class ResourcelessTransaction { + + private boolean active = false; + + public boolean isActive() { + return active; + } + + public void begin() { + active = true; + } + + public void clear() { + active = false; + } + + } +}