Skip to content

Commit

Permalink
More docs
Browse files Browse the repository at this point in the history
  • Loading branch information
devinrsmith committed Nov 15, 2023
1 parent 79e9e07 commit 4ce2e76
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ public static Builder builder() {
public abstract Table table();

/**
* The default kafka topic to publish to.
* The default kafka topic to publish to. When {@code null}, {@link #topicColumn()} must be set.
*
* @return the default kafka topic
* @see #topicColumn()
*/
@Nullable
public abstract String topic();
Expand All @@ -50,6 +51,7 @@ public static Builder builder() {
* The default kafka partition to publish to.
*
* @return the default kafka partition
* @see #partitionColumn()
*/
public abstract OptionalInt partition();

Expand Down Expand Up @@ -108,25 +110,33 @@ public boolean publishInitial() {

/**
* The topic column. When set, uses the the given {@link CharSequence}-compatible column from {@link #table()} as
* the first source for setting the kafka record topic.
* the first source for setting the kafka record topic. When not present, or the column value is null,
* {@link #topic()} will be used.
*
* @return the topic column name
*/
public abstract Optional<ColumnName> topicColumn();

/**
* The partition column. When set, uses the the given {@code int} column from {@link #table()} as the first source
* for setting the kafka record partition.
* for setting the kafka record partition. When not present, or the column value is null, {@link #partition()} will
* be used if present. If a valid partition number is specified that partition will be used when sending the record.
* If no partition is specified but a key is present a partition will be chosen using a hash of the key. If neither
* key nor partition is present a partition will be assigned in a round-robin fashion.
*
* @return the partition column name
*/
public abstract Optional<ColumnName> partitionColumn();

/**
* The timestamp column. When set, uses the the given {@link Instant} column from {@link #table()} as the first
* source for setting the kafka record timestamp.
* source for setting the kafka record timestamp. When not present, or the column value is null, the producer will
* stamp the record with its current time. The timestamp eventually used by Kafka depends on the timestamp type
* configured for the topic. If the topic is configured to use CreateTime, the timestamp in the producer record will
* be used by the broker. If the topic is configured to use LogAppendTime, the timestamp in the producer record will
* be overwritten by the broker with the broker local time when it appends the message to its log.
*
* @return the partition column name
* @return the timestamp column name
*/
public abstract Optional<ColumnName> timestampColumn();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ public void ok() {
.build();
}

@Test
public void okPartition() {
KafkaPublishOptions.builder()
.table(TableTools.newTable(TD))
.topic("HotTopic")
.partition(123)
.config(new Properties())
.valueSpec(Produce.simpleSpec("MyValue"))
.build();
}


@Test
public void checkNotBothIgnore() {
try {
Expand Down
19 changes: 14 additions & 5 deletions py/server/deephaven/stream/kafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def produce(
kafka_config (Dict): configuration for the associated kafka producer.
This is used to call the constructor of org.apache.kafka.clients.producer.KafkaProducer;
pass any KafkaProducer specific desired configuration here
topic (Optional[str]): the default topic name
topic (Optional[str]): the default topic name. When None, topic_column must be set. See topic_column for behavior.
key_spec (KeyValueSpec): specifies how to map table column(s) to the Key field in produced Kafka messages.
This should be the result of calling one of the functions simple_spec(), avro_spec() or json_spec() in this
module, or the constant KeyValueSpec.IGNORE
Expand All @@ -66,13 +66,22 @@ def produce(
aggregation on table grouped by the input columns of key_spec and publish to Kafka from the result.
publish_initial (bool): whether the initial data in table should be published. When False, table.is_refreshing
must be True. By default, is True.
partition (Optional[int]): the default partition, None by default.
partition (Optional[int]): the default partition, None by default. See partition_column for partition behavior.
topic_column (Optional[str]): the topic column, None by default. When set, uses the the given string column from
table as the first source for setting the kafka record topic.
table as the first source for setting the kafka record topic. When None, or the column value is null, topic
will be used.
partition_column (Optional[str]): the partition column, None by default. When set, uses the the given int column
from table as the first source for setting the kafka record partition.
from table as the first source for setting the kafka record partition. When None, or the column value is null,
partition will be used if present. If a valid partition number is specified that partition will be used when
sending the record. If no partition is specified but a key is present a partition will be chosen using a hash
of the key. If neither key nor partition is present a partition will be assigned in a round-robin fashion.
timestamp_column (Optional[str]): the timestamp column, None by default. When set, uses the the given timestamp
column from table as the first source for setting the kafka record timestamp.
column from table as the first source for setting the kafka record timestamp. When None, or the column value
is null, the producer will stamp the record with its current time. The timestamp eventually used by Kafka
depends on the timestamp type configured for the topic. If the topic is configured to use CreateTime, the
timestamp in the producer record will be used by the broker. If the topic is configured to use LogAppendTime,
the timestamp in the producer record will be overwritten by the broker with the broker local time when it
appends the message to its log.
Returns:
a callback that, when invoked, stops publishing and cleans up subscriptions and resources.
Expand Down

0 comments on commit 4ce2e76

Please sign in to comment.