-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17850: fix leaking internal exception in state manager #17711
base: trunk
Are you sure you want to change the base?
Conversation
Could you please add a description to the PR so that it is clear what it changes. Linking to the JIRA ticket is actually not really needed since the JIRA ticket number can be found in the title. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, @sebastienviale !
Here my comments.
firstException = exception; | ||
else | ||
firstException = new ProcessorStateException( | ||
format("%sFailed to flush state store %s", logPrefix, store.name()), exception); | ||
} | ||
log.error("Failed to flush state store {}: ", store.name(), exception); | ||
log.error("Failed to flush state store {}: ", store.name(), exception.getCause()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you change this? If a StreamsException
that is NOT a FailedProcessingException
is thrown, it would be perfectly fine to log the exception.
You could do something like:
log.error("Failed to flush state store {}: ", store.name(), exception instanceof FailedProcessingException ? exception.getCause() : exception);
or maybe it is cleaner to copy the log message inside the if (firstException == null) {
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I copied the log.error in the if (firstException == null)
:
log.error("Failed to flush cache of store {}: ", store.name(), firstException);
else
log.error("Failed to flush cache of store {}: ", store.name(), exception);
if (exception instanceof StreamsException) | ||
// In case of FailedProcessingException Do not keep the failed processing exception in the stack trace | ||
if (exception instanceof FailedProcessingException) | ||
firstException = new StreamsException(exception.getCause()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before we introduced the FailedProcessingException
we did not wrap the exception into a plain StreamsException
but we wrapped it into a ProcessorStateException
in the else
-branch. We should keep that behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case of FailedProcessingException, wrap it into a ProcessorStateException
if (exception instanceof FailedProcessingException) {
firstException = new ProcessorStateException(exception.getCause());
}
@@ -538,13 +539,16 @@ public void flush() { | |||
} catch (final RuntimeException exception) { | |||
if (firstException == null) { | |||
// do NOT wrap the error if it is actually caused by Streams itself | |||
if (exception instanceof StreamsException) | |||
// In case of FailedProcessingException Do not keep the failed processing exception in the stack trace | |||
if (exception instanceof FailedProcessingException) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add unit tests to ProcessorStateManagerTest
that verify the correct behavior?
You should also verify the behavior when a processing error handler that continues instead of fails is set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added tests for flush and close in ProcessorStateManagerTest
For flushCache I added tests in ProcessingExceptionHandlerIntegrationTest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why could you not also add a test for flushCache()
to the unit tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
good catch, I was not able to call flushCache() from the actual MockKeyValueStore.
So i added a new MockCacheKeyValueStore that also implements the CachedStateStore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates, @sebastienviale !
Here my feedback.
if (exception instanceof StreamsException) | ||
// In case of FailedProcessingException Do not keep the failed processing exception in the stack trace | ||
if (exception instanceof FailedProcessingException) | ||
firstException = new ProcessorStateException(exception.getCause()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, we should keep it exactly the same as before the FailedProcessingException
was introduced:
firstException = new ProcessorStateException(exception.getCause()); | |
firstException = new ProcessorStateException( | |
format("%sFailed to flush state store %s", logPrefix, store.name()), exception.getCause()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
stateManager.registerStore(stateStore, stateStore.stateRestoreCallback, null); | ||
|
||
final ProcessorStateException thrown = assertThrows(ProcessorStateException.class, stateManager::flush); | ||
assertEquals(exception, thrown.getCause()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also verify the message and that the stack trace does not contain the FailedProcessingException
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
stateManager.registerStore(stateStore, stateStore.stateRestoreCallback, null); | ||
|
||
final ProcessorStateException thrown = assertThrows(ProcessorStateException.class, stateManager::close); | ||
assertEquals(exception, thrown.getCause()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -538,13 +539,16 @@ public void flush() { | |||
} catch (final RuntimeException exception) { | |||
if (firstException == null) { | |||
// do NOT wrap the error if it is actually caused by Streams itself | |||
if (exception instanceof StreamsException) | |||
// In case of FailedProcessingException Do not keep the failed processing exception in the stack trace | |||
if (exception instanceof FailedProcessingException) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why could you not also add a test for flushCache()
to the unit tests?
I fixed all your comments @cadonna |
Following the KIP-1033 a FailedProcessingException is passed to the Streams-specific uncaught exception handler.
The goal of the PR is to unwrap a FailedProcessingException into a StreamsException when an exception occurs during the flushing or closing of a store