Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-3227: Implement handleOne() in CommonDelegatingErrorHandler #3228

Merged
merged 3 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
* @author Gary Russell
* @author Adrian Chlebosz
* @author Antonin Arquey
* @author Dan Blackney
* @since 2.8
*
*/
Expand Down Expand Up @@ -181,6 +182,19 @@ public void handleOtherException(Exception thrownException, Consumer<?, ?> consu
}
}

@Override
public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
MessageListenerContainer container) {

CommonErrorHandler handler = findDelegate(thrownException);
if (handler != null) {
return handler.handleOne(thrownException, record, consumer, container);
}
else {
return this.defaultErrorHandler.handleOne(thrownException, record, consumer, container);
}
}

@Nullable
private CommonErrorHandler findDelegate(Throwable thrownException) {
Throwable cause = findCause(thrownException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand All @@ -29,6 +30,7 @@
import java.util.Map;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.junit.jupiter.api.Test;

Expand All @@ -42,13 +44,14 @@
* @author Gary Russell
* @author Adrian Chlebosz
* @author Antonin Arquey
* @author Dan Blackney
* @since 2.8
*
*/
public class CommonDelegatingErrorHandlerTests {

@Test
void testRecordDelegates() {
void testHandleRemainingDelegates() {
var def = mock(CommonErrorHandler.class);
var one = mock(CommonErrorHandler.class);
var two = mock(CommonErrorHandler.class);
Expand All @@ -72,7 +75,7 @@ void testRecordDelegates() {
}

@Test
void testBatchDelegates() {
void testHandleBatchDelegates() {
var def = mock(CommonErrorHandler.class);
var one = mock(CommonErrorHandler.class);
var two = mock(CommonErrorHandler.class);
Expand All @@ -95,6 +98,54 @@ void testBatchDelegates() {
verify(one).handleBatch(any(), any(), any(), any(), any());
}

@Test
void testHandleOtherExceptionDelegates() {
var def = mock(CommonErrorHandler.class);
var one = mock(CommonErrorHandler.class);
var two = mock(CommonErrorHandler.class);
var three = mock(CommonErrorHandler.class);
var eh = new CommonDelegatingErrorHandler(def);
eh.setErrorHandlers(Map.of(IllegalStateException.class, one, IllegalArgumentException.class, two));
eh.addDelegate(RuntimeException.class, three);

eh.handleOtherException(wrap(new IOException()), mock(Consumer.class),
mock(MessageListenerContainer.class), true);
verify(def).handleOtherException(any(), any(), any(), anyBoolean());
eh.handleOtherException(wrap(new KafkaException("test")), mock(Consumer.class),
mock(MessageListenerContainer.class), true);
verify(three).handleOtherException(any(), any(), any(), anyBoolean());
eh.handleOtherException(wrap(new IllegalArgumentException()), mock(Consumer.class),
mock(MessageListenerContainer.class), true);
verify(two).handleOtherException(any(), any(), any(), anyBoolean());
eh.handleOtherException(wrap(new IllegalStateException()), mock(Consumer.class),
mock(MessageListenerContainer.class), true);
verify(one).handleOtherException(any(), any(), any(), anyBoolean());
}

@Test
void testHandleOneDelegates() {
var def = mock(CommonErrorHandler.class);
var one = mock(CommonErrorHandler.class);
var two = mock(CommonErrorHandler.class);
var three = mock(CommonErrorHandler.class);
var eh = new CommonDelegatingErrorHandler(def);
eh.setErrorHandlers(Map.of(IllegalStateException.class, one, IllegalArgumentException.class, two));
eh.addDelegate(RuntimeException.class, three);

eh.handleOne(wrap(new IOException()), mock(ConsumerRecord.class), mock(Consumer.class),
mock(MessageListenerContainer.class));
verify(def).handleOne(any(), any(), any(), any());
eh.handleOne(wrap(new KafkaException("test")), mock(ConsumerRecord.class), mock(Consumer.class),
mock(MessageListenerContainer.class));
verify(three).handleOne(any(), any(), any(), any());
eh.handleOne(wrap(new IllegalArgumentException()), mock(ConsumerRecord.class), mock(Consumer.class),
mock(MessageListenerContainer.class));
verify(two).handleOne(any(), any(), any(), any());
eh.handleOne(wrap(new IllegalStateException()), mock(ConsumerRecord.class), mock(Consumer.class),
mock(MessageListenerContainer.class));
verify(one).handleOne(any(), any(), any(), any());
}

@Test
void testDelegateForThrowableIsAppliedWhenCauseTraversingIsEnabled() {
var defaultHandler = mock(CommonErrorHandler.class);
Expand Down