Skip to content

Commit

Permalink
feat: Add static getter for TableDefinition in KafkaTools
Browse files Browse the repository at this point in the history
Backport of #5956
  • Loading branch information
robbcamera authored and devinrsmith committed Aug 23, 2024
1 parent 4581ce5 commit c9d6575
Showing 1 changed file with 77 additions and 28 deletions.
105 changes: 77 additions & 28 deletions extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java
Original file line number Diff line number Diff line change
Expand Up @@ -1218,33 +1218,30 @@ public Function<TopicPartition, KafkaRecordConsumer> visit(@NotNull final PerPar
}

/**
* Consume from Kafka to {@link StreamConsumer stream consumers} supplied by {@code streamConsumerRegistrar}.
*
* @param kafkaProperties Properties to configure this table and also to be passed to create the KafkaConsumer
* @param topic Kafka topic name
* @param partitionFilter A predicate returning true for the partitions to consume. The convenience constant
* {@code ALL_PARTITIONS} is defined to facilitate requesting all partitions.
* @param partitionToInitialOffset A function specifying the desired initial offset for each partition consumed
* @param keySpec Conversion specification for Kafka record keys
* @param valueSpec Conversion specification for Kafka record values
* @param streamConsumerRegistrarProvider A provider for a function to
* {@link StreamPublisher#register(StreamConsumer) register} {@link StreamConsumer} instances. The registered
* stream consumers must accept {@link ChunkType chunk types} that correspond to
* {@link StreamChunkUtils#chunkTypeForColumnIndex(TableDefinition, int)} for the supplied
* {@link TableDefinition}. See {@link StreamConsumerRegistrarProvider#single(SingleConsumerRegistrar)
* single} and {@link StreamConsumerRegistrarProvider#perPartition(PerPartitionConsumerRegistrar)
* per-partition}.
* @param consumerLoopCallback callback to inject logic into the ingester's consumer loop
* Basic holder structure used to pass multiple objects back to a calling method.
*/
public static void consume(
private static class ConsumeStruct {
final TableDefinition tableDefinition;
final KafkaStreamPublisher.Parameters publisherParameters;
final Deserializer<?> keyDeser;
final Deserializer<?> valueDeser;

private ConsumeStruct(
@NotNull final TableDefinition tableDefinition,
@NotNull final KafkaStreamPublisher.Parameters publisherParameters,
@NotNull final Deserializer<?> keyDeser,
@NotNull final Deserializer<?> valueDeser) {
this.tableDefinition = tableDefinition;
this.publisherParameters = publisherParameters;
this.keyDeser = keyDeser;
this.valueDeser = valueDeser;
}
}

private static ConsumeStruct getConsumeStruct(
@NotNull final Properties kafkaProperties,
@NotNull final String topic,
@NotNull final IntPredicate partitionFilter,
@NotNull final InitialOffsetLookup partitionToInitialOffset,
@NotNull final Consume.KeyOrValueSpec keySpec,
@NotNull final Consume.KeyOrValueSpec valueSpec,
@NotNull final StreamConsumerRegistrarProvider streamConsumerRegistrarProvider,
@Nullable final ConsumerLoopCallback consumerLoopCallback) {
@NotNull final Consume.KeyOrValueSpec valueSpec) {
if (Consume.isIgnore(keySpec) && Consume.isIgnore(valueSpec)) {
throw new IllegalArgumentException(
"can't ignore both key and value: keySpec and valueSpec can't both be ignore specs");
Expand Down Expand Up @@ -1297,12 +1294,64 @@ public static void consume(
.setValueToChunkObjectMapper(valueIngestData.toObjectChunkMapper);
}

final KafkaStreamPublisher.Parameters publisherParameters = publisherParametersBuilder.build();
return new ConsumeStruct(tableDefinition, publisherParametersBuilder.build(), keyDeser, valueDeser);
}

/**
* Construct a {@link TableDefinition} based on the input Properties and {@link Consume.KeyOrValueSpec} parameters.
* Given the same input Properties and Consume.KeyOrValueSpec parameters, the returned TableDefinition is the same
* as the TableDefinition of the table produced by
* {@link #consumeToTable(Properties, String, IntPredicate, IntToLongFunction, Consume.KeyOrValueSpec, Consume.KeyOrValueSpec, TableType)}
*
* @param kafkaProperties Properties to configure this table
* @param keySpec Conversion specification for Kafka record keys
* @param valueSpec Conversion specification for Kafka record values
* @return A TableDefinition derived from the input Properties and KeyOrValueSpec instances
*/
@SuppressWarnings("unused")
public static TableDefinition getTableDefinition(
@NotNull final Properties kafkaProperties,
@NotNull final Consume.KeyOrValueSpec keySpec,
@NotNull final Consume.KeyOrValueSpec valueSpec) {
return getConsumeStruct(kafkaProperties, keySpec, valueSpec).tableDefinition;
}

/**
* Consume from Kafka to {@link StreamConsumer stream consumers} supplied by {@code streamConsumerRegistrar}.
*
* @param kafkaProperties Properties to configure this table and also to be passed to create the KafkaConsumer
* @param topic Kafka topic name
* @param partitionFilter A predicate returning true for the partitions to consume. The convenience constant
* {@code ALL_PARTITIONS} is defined to facilitate requesting all partitions.
* @param partitionToInitialOffset A function specifying the desired initial offset for each partition consumed
* @param keySpec Conversion specification for Kafka record keys
* @param valueSpec Conversion specification for Kafka record values
* @param streamConsumerRegistrarProvider A provider for a function to
* {@link StreamPublisher#register(StreamConsumer) register} {@link StreamConsumer} instances. The registered
* stream consumers must accept {@link ChunkType chunk types} that correspond to
* {@link StreamChunkUtils#chunkTypeForColumnIndex(TableDefinition, int)} for the supplied
* {@link TableDefinition}. See {@link StreamConsumerRegistrarProvider#single(SingleConsumerRegistrar)
* single} and {@link StreamConsumerRegistrarProvider#perPartition(PerPartitionConsumerRegistrar)
* per-partition}.
* @param consumerLoopCallback callback to inject logic into the ingester's consumer loop
*/
public static void consume(
@NotNull final Properties kafkaProperties,
@NotNull final String topic,
@NotNull final IntPredicate partitionFilter,
@NotNull final InitialOffsetLookup partitionToInitialOffset,
@NotNull final Consume.KeyOrValueSpec keySpec,
@NotNull final Consume.KeyOrValueSpec valueSpec,
@NotNull final StreamConsumerRegistrarProvider streamConsumerRegistrarProvider,
@Nullable final ConsumerLoopCallback consumerLoopCallback) {
final ConsumeStruct consumeStruct = getConsumeStruct(kafkaProperties, keySpec, valueSpec);

final MutableObject<KafkaIngester> kafkaIngesterHolder = new MutableObject<>();

final Function<TopicPartition, KafkaRecordConsumer> kafkaRecordConsumerFactory =
streamConsumerRegistrarProvider.walk(
new KafkaRecordConsumerFactoryCreator(publisherParameters, kafkaIngesterHolder::getValue));
new KafkaRecordConsumerFactoryCreator(consumeStruct.publisherParameters,
kafkaIngesterHolder::getValue));

final KafkaIngester ingester = new KafkaIngester(
log,
Expand All @@ -1311,8 +1360,8 @@ public static void consume(
partitionFilter,
kafkaRecordConsumerFactory,
partitionToInitialOffset,
keyDeser,
valueDeser,
consumeStruct.keyDeser,
consumeStruct.valueDeser,
consumerLoopCallback);
kafkaIngesterHolder.setValue(ingester);
ingester.start();
Expand Down

0 comments on commit c9d6575

Please sign in to comment.