Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stricter GenericRecordObjectFieldCopier to catch errors early #3608

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

devinrsmith
Copy link
Member

We should prefer to error out sooner rather than later when we know that the published objects are not compatible with the column definition.

Related to #3602

@devinrsmith devinrsmith added this to the Apr 2023 milestone Mar 27, 2023
@devinrsmith devinrsmith self-assigned this Mar 27, 2023
@devinrsmith
Copy link
Member Author

Note: this stronger error checking is causing issues where the kafka table stops, but I don't get any indication of failure besides the inconspicuous log line:

2023-03-27T21:39:49.469Z | Ingestermy_topic:ALL |  INFO | .d.s.StreamToTableAdapter | Deregistering StreamToTableAdapter-Kafka-my_topic-ALL

By hacking in some explicit logging at this exception site, I'm able to see this example:

java.lang.IllegalStateException: Unexpected value type class java.util.HashMap, expected instance of interface org.apache.avro.generic.GenericRecord
        at io.deephaven.kafka.ingest.GenericRecordObjectFieldCopier.copyField(GenericRecordObjectFieldCopier.java:38)
        at io.deephaven.kafka.ingest.MultiFieldChunkAdapter.handleChunk(MultiFieldChunkAdapter.java:74)
        at io.deephaven.kafka.ingest.KafkaStreamPublisher.flushValueChunk(KafkaStreamPublisher.java:324)
        at io.deephaven.kafka.ingest.KafkaStreamPublisher.doConsumeRecords(KafkaStreamPublisher.java:275)
        at io.deephaven.kafka.ingest.KafkaStreamPublisher.lambda$consumeRecords$0(KafkaStreamPublisher.java:153)
        at io.deephaven.kafka.StreamPublisherImpl.doLocked(StreamPublisherImpl.java:70)
        at io.deephaven.kafka.ingest.KafkaStreamPublisher.consumeRecords(KafkaStreamPublisher.java:153)
        at io.deephaven.kafka.KafkaTools$SimpleKafkaStreamConsumer.accept(KafkaTools.java:2176)
        at io.deephaven.kafka.KafkaTools$SimpleKafkaStreamConsumer.accept(KafkaTools.java:2163)
        at io.deephaven.kafka.ingest.KafkaIngester.pollOnce(KafkaIngester.java:332)
        at io.deephaven.kafka.ingest.KafkaIngester.consumerLoop(KafkaIngester.java:273)
        at java.base/java.lang.Thread.run(Thread.java:829)

Looking at this trace, it looks like there are a few different layers that are trying to handle "acceptFailure", and maybe they are in conflict with each other?

            try {
                streamConsumer.accept(partitionRecords);
            } catch (Throwable ex) {
                ++messagesWithErr;
                log.error().append(logPrefix).append("Exception while processing Kafka message:").append(ex).endl();
                if (messagesWithErr > MAX_ERRS) {
                    streamConsumer.acceptFailure(ex);
                    log.error().append(logPrefix)
                            .append("Max number of errors exceeded, aborting " + this + " consumer thread.")
                            .endl();
                    return false;
                }
                continue;
            }

at io.deephaven.kafka.ingest.KafkaIngester.pollOnce(KafkaIngester.java:332)

vs

        @Override
        public void accept(List<? extends ConsumerRecord<?, ?>> consumerRecords) {
            try {
                adapter.consumeRecords(consumerRecords);
            } catch (Exception e) {
                acceptFailure(e);
            }
        }

at io.deephaven.kafka.KafkaTools$SimpleKafkaStreamConsumer.accept(KafkaTools.java:2176)

@devinrsmith devinrsmith changed the title Stronger record checking Stricter GenericRecordObjectFieldCopier to catch errors early Mar 27, 2023
@devinrsmith devinrsmith marked this pull request as ready for review April 19, 2023 20:30
@devinrsmith devinrsmith removed the request for review from jcferretti April 19, 2023 20:31
@devinrsmith
Copy link
Member Author

Note: the above issues need to be resolved first before we can consider merging this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants