Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17850: fix leaking internal exception in state manager #17711

Open
wants to merge 4 commits into
base: trunk
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.errors.internals.FailedProcessingException;
import org.apache.kafka.streams.processor.CommitCallback;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreListener;
Expand Down Expand Up @@ -538,13 +539,16 @@ public void flush() {
} catch (final RuntimeException exception) {
if (firstException == null) {
// do NOT wrap the error if it is actually caused by Streams itself
if (exception instanceof StreamsException)
// In case of FailedProcessingException Do not keep the failed processing exception in the stack trace
if (exception instanceof FailedProcessingException)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add unit tests to ProcessorStateManagerTest that verify the correct behavior?
You should also verify the behavior when a processing error handler that continues instead of fails is set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added tests for flush and close in ProcessorStateManagerTest

For flushCache I added tests in ProcessingExceptionHandlerIntegrationTest

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why could you not also add a test for flushCache() to the unit tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done
good catch, I was not able to call flushCache() from the actual MockKeyValueStore.
So i added a new MockCacheKeyValueStore that also implements the CachedStateStore

firstException = new StreamsException(exception.getCause());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we introduced the FailedProcessingException we did not wrap the exception into a plain StreamsException but we wrapped it into a ProcessorStateException in the else-branch. We should keep that behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of FailedProcessingException, wrap it into a ProcessorStateException

 if (exception instanceof FailedProcessingException) {
    firstException = new ProcessorStateException(exception.getCause());
}

else if (exception instanceof StreamsException)
firstException = exception;
else
firstException = new ProcessorStateException(
format("%sFailed to flush state store %s", logPrefix, store.name()), exception);
}
log.error("Failed to flush state store {}: ", store.name(), exception);
log.error("Failed to flush state store {}: ", store.name(), exception.getCause());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you change this? If a StreamsException that is NOT a FailedProcessingException is thrown, it would be perfectly fine to log the exception.
You could do something like:

log.error("Failed to flush state store {}: ", store.name(), exception instanceof FailedProcessingException ? exception.getCause() : exception);

or maybe it is cleaner to copy the log message inside the if (firstException == null) {.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied the log.error in the if (firstException == null):
log.error("Failed to flush cache of store {}: ", store.name(), firstException);
else
log.error("Failed to flush cache of store {}: ", store.name(), exception);

}
}
}
Expand Down Expand Up @@ -573,7 +577,10 @@ public void flushCache() {
} catch (final RuntimeException exception) {
if (firstException == null) {
// do NOT wrap the error if it is actually caused by Streams itself
if (exception instanceof StreamsException) {
// In case of FailedProcessingException Do not keep the failed processing exception in the stack trace
if (exception instanceof FailedProcessingException) {
firstException = new StreamsException(exception.getCause());
} else if (exception instanceof StreamsException) {
firstException = exception;
} else {
firstException = new ProcessorStateException(
Expand All @@ -582,7 +589,7 @@ public void flushCache() {
);
}
}
log.error("Failed to flush cache of store {}: ", store.name(), exception);
log.error("Failed to flush cache of store {}: ", store.name(), exception.getCause());
}
}
}
Expand Down Expand Up @@ -618,13 +625,16 @@ public void close() throws ProcessorStateException {
} catch (final RuntimeException exception) {
if (firstException == null) {
// do NOT wrap the error if it is actually caused by Streams itself
if (exception instanceof StreamsException)
// In case of FailedProcessingException Do not keep the failed processing exception in the stack trace
if (exception instanceof FailedProcessingException)
firstException = new StreamsException(exception.getCause());
else if (exception instanceof StreamsException)
firstException = exception;
else
firstException = new ProcessorStateException(
format("%sFailed to close state store %s", logPrefix, store.name()), exception);
}
log.error("Failed to close state store {}: ", store.name(), exception);
log.error("Failed to close state store {}: ", store.name(), exception.getCause());
}
}

Expand Down