-
Notifications
You must be signed in to change notification settings - Fork 80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stricter GenericRecordObjectFieldCopier to catch errors early #3608
base: main
Are you sure you want to change the base?
Conversation
Note: this stronger error checking is causing issues where the kafka table stops, but I don't get any indication of failure besides the inconspicuous log line:
By hacking in some explicit logging at this exception site, I'm able to see this example:
Looking at this trace, it looks like there are a few different layers that are trying to handle "acceptFailure", and maybe they are in conflict with each other? try {
streamConsumer.accept(partitionRecords);
} catch (Throwable ex) {
++messagesWithErr;
log.error().append(logPrefix).append("Exception while processing Kafka message:").append(ex).endl();
if (messagesWithErr > MAX_ERRS) {
streamConsumer.acceptFailure(ex);
log.error().append(logPrefix)
.append("Max number of errors exceeded, aborting " + this + " consumer thread.")
.endl();
return false;
}
continue;
} at io.deephaven.kafka.ingest.KafkaIngester.pollOnce(KafkaIngester.java:332) vs @Override
public void accept(List<? extends ConsumerRecord<?, ?>> consumerRecords) {
try {
adapter.consumeRecords(consumerRecords);
} catch (Exception e) {
acceptFailure(e);
}
} at io.deephaven.kafka.KafkaTools$SimpleKafkaStreamConsumer.accept(KafkaTools.java:2176) |
Note: the above issues need to be resolved first before we can consider merging this. |
We should prefer to error out sooner rather than later when we know that the published objects are not compatible with the column definition.
Related to #3602