Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add topic, partition, and timestamp column kafka publishing support #4771

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@
package io.deephaven.kafka;

import io.deephaven.annotations.BuildableStyle;
import io.deephaven.api.ColumnName;
import io.deephaven.engine.table.Table;
import io.deephaven.kafka.KafkaTools.Produce;
import io.deephaven.kafka.KafkaTools.Produce.KeyOrValueSpec;
import org.immutables.value.Value.Check;
import org.immutables.value.Value.Default;
import org.immutables.value.Value.Immutable;

import javax.annotation.Nullable;
import java.time.Instant;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;

/**
Expand All @@ -34,12 +39,20 @@ public static Builder builder() {
public abstract Table table();

/**
* The kafka topic to publish to.
* The default kafka topic to publish to.
*
* @return the kafka topic
* @return the default kafka topic
*/
@Nullable
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
public abstract String topic();

/**
* The default kafka partition to publish to.
*
* @return the default kafka partition
*/
public abstract OptionalInt partition();

/**
* The kafka configuration properties.
*
Expand Down Expand Up @@ -93,6 +106,30 @@ public boolean publishInitial() {
return true;
}

/**
* The topic column. When set, uses the the given {@link CharSequence}-compatible column from {@link #table()} as
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
* the first source for setting the kafka record topic.
*
* @return the topic column name
*/
public abstract Optional<ColumnName> topicColumn();

/**
* The partition column. When set, uses the the given {@code int} column from {@link #table()} as the first source
* for setting the kafka record partition.
*
* @return the partition column name
*/
public abstract Optional<ColumnName> partitionColumn();

/**
* The timestamp column. When set, uses the the given {@link Instant} column from {@link #table()} as the first
* source for setting the kafka record timestamp.
*
* @return the partition column name
*/
public abstract Optional<ColumnName> timestampColumn();

@Check
final void checkNotBothIgnore() {
if (Produce.isIgnore(keySpec()) && Produce.isIgnore(valueSpec())) {
Expand All @@ -114,12 +151,42 @@ final void checkLastBy() {
}
}

@Check
final void checkTopic() {
if (topic() == null && topicColumn().isEmpty()) {
throw new IllegalArgumentException("Must set topic or topicColumn (or both)");
}
}

@Check
final void checkTopicColumn() {
if (topicColumn().isPresent()) {
table().getColumnSource(topicColumn().get().name(), CharSequence.class);
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
}
}

@Check
final void checkPartitionColumn() {
if (partitionColumn().isPresent()) {
table().getColumnSource(partitionColumn().get().name(), int.class);
}
}

@Check
final void checkTimestampColumn() {
if (timestampColumn().isPresent()) {
table().getColumnSource(timestampColumn().get().name(), Instant.class);
}
}

public interface Builder {

Builder table(Table table);

Builder topic(String topic);

Builder partition(int partition);

Builder config(Properties config);

Builder keySpec(KeyOrValueSpec keySpec);
Expand All @@ -130,6 +197,12 @@ public interface Builder {

Builder publishInitial(boolean publishInitial);

Builder topicColumn(ColumnName columnName);

Builder partitionColumn(ColumnName columnName);

Builder timestampColumn(ColumnName columnName);

KafkaPublishOptions build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1442,12 +1442,16 @@ public static Runnable produceFromTable(KafkaPublishOptions options) {
options.config(),
effectiveTable,
options.topic(),
options.partition().isEmpty() ? null : options.partition().getAsInt(),
keyColumns,
keySpecSerializer,
keySerializer,
valueColumns,
valueSpecSerializer,
valueSerializer,
options.topicColumn().orElse(null),
options.partitionColumn().orElse(null),
options.timestampColumn().orElse(null),
options.publishInitial());
}
return publisherScope::release;
Expand Down
Loading
Loading