Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17850: fix leaking internal exception in state manager #17711

Open
wants to merge 4 commits into
base: trunk
Choose a base branch
from

Conversation

sebastienviale
Copy link
Contributor

@sebastienviale sebastienviale commented Nov 7, 2024

Following the KIP-1033 a FailedProcessingException is passed to the Streams-specific uncaught exception handler.

The goal of the PR is to unwrap a FailedProcessingException into a StreamsException when an exception occurs during the flushing or closing of a store

@github-actions github-actions bot added streams small Small PRs labels Nov 7, 2024
@cadonna
Copy link
Contributor

cadonna commented Nov 13, 2024

Could you please add a description to the PR so that it is clear what it changes. Linking to the JIRA ticket is actually not really needed since the JIRA ticket number can be found in the title.

Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, @sebastienviale !

Here my comments.

firstException = exception;
else
firstException = new ProcessorStateException(
format("%sFailed to flush state store %s", logPrefix, store.name()), exception);
}
log.error("Failed to flush state store {}: ", store.name(), exception);
log.error("Failed to flush state store {}: ", store.name(), exception.getCause());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you change this? If a StreamsException that is NOT a FailedProcessingException is thrown, it would be perfectly fine to log the exception.
You could do something like:

log.error("Failed to flush state store {}: ", store.name(), exception instanceof FailedProcessingException ? exception.getCause() : exception);

or maybe it is cleaner to copy the log message inside the if (firstException == null) {.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied the log.error in the if (firstException == null):
log.error("Failed to flush cache of store {}: ", store.name(), firstException);
else
log.error("Failed to flush cache of store {}: ", store.name(), exception);

if (exception instanceof StreamsException)
// In case of FailedProcessingException Do not keep the failed processing exception in the stack trace
if (exception instanceof FailedProcessingException)
firstException = new StreamsException(exception.getCause());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we introduced the FailedProcessingException we did not wrap the exception into a plain StreamsException but we wrapped it into a ProcessorStateException in the else-branch. We should keep that behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of FailedProcessingException, wrap it into a ProcessorStateException

 if (exception instanceof FailedProcessingException) {
    firstException = new ProcessorStateException(exception.getCause());
}

@@ -538,13 +539,16 @@ public void flush() {
} catch (final RuntimeException exception) {
if (firstException == null) {
// do NOT wrap the error if it is actually caused by Streams itself
if (exception instanceof StreamsException)
// In case of FailedProcessingException Do not keep the failed processing exception in the stack trace
if (exception instanceof FailedProcessingException)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add unit tests to ProcessorStateManagerTest that verify the correct behavior?
You should also verify the behavior when a processing error handler that continues instead of fails is set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added tests for flush and close in ProcessorStateManagerTest

For flushCache I added tests in ProcessingExceptionHandlerIntegrationTest

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why could you not also add a test for flushCache() to the unit tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done
good catch, I was not able to call flushCache() from the actual MockKeyValueStore.
So i added a new MockCacheKeyValueStore that also implements the CachedStateStore

@github-actions github-actions bot removed the small Small PRs label Nov 13, 2024
Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates, @sebastienviale !

Here my feedback.

if (exception instanceof StreamsException)
// In case of FailedProcessingException Do not keep the failed processing exception in the stack trace
if (exception instanceof FailedProcessingException)
firstException = new ProcessorStateException(exception.getCause());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, we should keep it exactly the same as before the FailedProcessingException was introduced:

Suggested change
firstException = new ProcessorStateException(exception.getCause());
firstException = new ProcessorStateException(
format("%sFailed to flush state store %s", logPrefix, store.name()), exception.getCause());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

stateManager.registerStore(stateStore, stateStore.stateRestoreCallback, null);

final ProcessorStateException thrown = assertThrows(ProcessorStateException.class, stateManager::flush);
assertEquals(exception, thrown.getCause());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also verify the message and that the stack trace does not contain the FailedProcessingException.

Copy link
Contributor Author

@sebastienviale sebastienviale Nov 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

stateManager.registerStore(stateStore, stateStore.stateRestoreCallback, null);

final ProcessorStateException thrown = assertThrows(ProcessorStateException.class, stateManager::close);
assertEquals(exception, thrown.getCause());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -538,13 +539,16 @@ public void flush() {
} catch (final RuntimeException exception) {
if (firstException == null) {
// do NOT wrap the error if it is actually caused by Streams itself
if (exception instanceof StreamsException)
// In case of FailedProcessingException Do not keep the failed processing exception in the stack trace
if (exception instanceof FailedProcessingException)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why could you not also add a test for flushCache() to the unit tests?

@sebastienviale
Copy link
Contributor Author

I fixed all your comments @cadonna

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants