diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java index e51f44251d6..34cee93c488 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java @@ -1218,33 +1218,30 @@ public Function visit(@NotNull final PerPar } /** - * Consume from Kafka to {@link StreamConsumer stream consumers} supplied by {@code streamConsumerRegistrar}. - * - * @param kafkaProperties Properties to configure this table and also to be passed to create the KafkaConsumer - * @param topic Kafka topic name - * @param partitionFilter A predicate returning true for the partitions to consume. The convenience constant - * {@code ALL_PARTITIONS} is defined to facilitate requesting all partitions. - * @param partitionToInitialOffset A function specifying the desired initial offset for each partition consumed - * @param keySpec Conversion specification for Kafka record keys - * @param valueSpec Conversion specification for Kafka record values - * @param streamConsumerRegistrarProvider A provider for a function to - * {@link StreamPublisher#register(StreamConsumer) register} {@link StreamConsumer} instances. The registered - * stream consumers must accept {@link ChunkType chunk types} that correspond to - * {@link StreamChunkUtils#chunkTypeForColumnIndex(TableDefinition, int)} for the supplied - * {@link TableDefinition}. See {@link StreamConsumerRegistrarProvider#single(SingleConsumerRegistrar) - * single} and {@link StreamConsumerRegistrarProvider#perPartition(PerPartitionConsumerRegistrar) - * per-partition}. - * @param consumerLoopCallback callback to inject logic into the ingester's consumer loop + * Basic holder structure used to pass multiple objects back to a calling method. */ - public static void consume( + private static class ConsumeStruct { + final TableDefinition tableDefinition; + final KafkaStreamPublisher.Parameters publisherParameters; + final Deserializer keyDeser; + final Deserializer valueDeser; + + private ConsumeStruct( + @NotNull final TableDefinition tableDefinition, + @NotNull final KafkaStreamPublisher.Parameters publisherParameters, + @NotNull final Deserializer keyDeser, + @NotNull final Deserializer valueDeser) { + this.tableDefinition = tableDefinition; + this.publisherParameters = publisherParameters; + this.keyDeser = keyDeser; + this.valueDeser = valueDeser; + } + } + + private static ConsumeStruct getConsumeStruct( @NotNull final Properties kafkaProperties, - @NotNull final String topic, - @NotNull final IntPredicate partitionFilter, - @NotNull final InitialOffsetLookup partitionToInitialOffset, @NotNull final Consume.KeyOrValueSpec keySpec, - @NotNull final Consume.KeyOrValueSpec valueSpec, - @NotNull final StreamConsumerRegistrarProvider streamConsumerRegistrarProvider, - @Nullable final ConsumerLoopCallback consumerLoopCallback) { + @NotNull final Consume.KeyOrValueSpec valueSpec) { if (Consume.isIgnore(keySpec) && Consume.isIgnore(valueSpec)) { throw new IllegalArgumentException( "can't ignore both key and value: keySpec and valueSpec can't both be ignore specs"); @@ -1297,12 +1294,64 @@ public static void consume( .setValueToChunkObjectMapper(valueIngestData.toObjectChunkMapper); } - final KafkaStreamPublisher.Parameters publisherParameters = publisherParametersBuilder.build(); + return new ConsumeStruct(tableDefinition, publisherParametersBuilder.build(), keyDeser, valueDeser); + } + + /** + * Construct a {@link TableDefinition} based on the input Properties and {@link Consume.KeyOrValueSpec} parameters. + * Given the same input Properties and Consume.KeyOrValueSpec parameters, the returned TableDefinition is the same + * as the TableDefinition of the table produced by + * {@link #consumeToTable(Properties, String, IntPredicate, IntToLongFunction, Consume.KeyOrValueSpec, Consume.KeyOrValueSpec, TableType)} + * + * @param kafkaProperties Properties to configure this table + * @param keySpec Conversion specification for Kafka record keys + * @param valueSpec Conversion specification for Kafka record values + * @return A TableDefinition derived from the input Properties and KeyOrValueSpec instances + */ + @SuppressWarnings("unused") + public static TableDefinition getTableDefinition( + @NotNull final Properties kafkaProperties, + @NotNull final Consume.KeyOrValueSpec keySpec, + @NotNull final Consume.KeyOrValueSpec valueSpec) { + return getConsumeStruct(kafkaProperties, keySpec, valueSpec).tableDefinition; + } + + /** + * Consume from Kafka to {@link StreamConsumer stream consumers} supplied by {@code streamConsumerRegistrar}. + * + * @param kafkaProperties Properties to configure this table and also to be passed to create the KafkaConsumer + * @param topic Kafka topic name + * @param partitionFilter A predicate returning true for the partitions to consume. The convenience constant + * {@code ALL_PARTITIONS} is defined to facilitate requesting all partitions. + * @param partitionToInitialOffset A function specifying the desired initial offset for each partition consumed + * @param keySpec Conversion specification for Kafka record keys + * @param valueSpec Conversion specification for Kafka record values + * @param streamConsumerRegistrarProvider A provider for a function to + * {@link StreamPublisher#register(StreamConsumer) register} {@link StreamConsumer} instances. The registered + * stream consumers must accept {@link ChunkType chunk types} that correspond to + * {@link StreamChunkUtils#chunkTypeForColumnIndex(TableDefinition, int)} for the supplied + * {@link TableDefinition}. See {@link StreamConsumerRegistrarProvider#single(SingleConsumerRegistrar) + * single} and {@link StreamConsumerRegistrarProvider#perPartition(PerPartitionConsumerRegistrar) + * per-partition}. + * @param consumerLoopCallback callback to inject logic into the ingester's consumer loop + */ + public static void consume( + @NotNull final Properties kafkaProperties, + @NotNull final String topic, + @NotNull final IntPredicate partitionFilter, + @NotNull final InitialOffsetLookup partitionToInitialOffset, + @NotNull final Consume.KeyOrValueSpec keySpec, + @NotNull final Consume.KeyOrValueSpec valueSpec, + @NotNull final StreamConsumerRegistrarProvider streamConsumerRegistrarProvider, + @Nullable final ConsumerLoopCallback consumerLoopCallback) { + final ConsumeStruct consumeStruct = getConsumeStruct(kafkaProperties, keySpec, valueSpec); + final MutableObject kafkaIngesterHolder = new MutableObject<>(); final Function kafkaRecordConsumerFactory = streamConsumerRegistrarProvider.walk( - new KafkaRecordConsumerFactoryCreator(publisherParameters, kafkaIngesterHolder::getValue)); + new KafkaRecordConsumerFactoryCreator(consumeStruct.publisherParameters, + kafkaIngesterHolder::getValue)); final KafkaIngester ingester = new KafkaIngester( log, @@ -1311,8 +1360,8 @@ public static void consume( partitionFilter, kafkaRecordConsumerFactory, partitionToInitialOffset, - keyDeser, - valueDeser, + consumeStruct.keyDeser, + consumeStruct.valueDeser, consumerLoopCallback); kafkaIngesterHolder.setValue(ingester); ingester.start();