From 4ce2e7601389cfa22205614079b0fbe9d3d70381 Mon Sep 17 00:00:00 2001 From: Devin Smith Date: Wed, 15 Nov 2023 10:55:35 -0800 Subject: [PATCH] More docs --- .../deephaven/kafka/KafkaPublishOptions.java | 20 ++++++++++++++----- .../kafka/KafkaPublishOptionsTest.java | 12 +++++++++++ py/server/deephaven/stream/kafka/producer.py | 19 +++++++++++++----- 3 files changed, 41 insertions(+), 10 deletions(-) diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaPublishOptions.java b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaPublishOptions.java index ad4e42e225d..74acab261f0 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaPublishOptions.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaPublishOptions.java @@ -39,9 +39,10 @@ public static Builder builder() { public abstract Table table(); /** - * The default kafka topic to publish to. + * The default kafka topic to publish to. When {@code null}, {@link #topicColumn()} must be set. * * @return the default kafka topic + * @see #topicColumn() */ @Nullable public abstract String topic(); @@ -50,6 +51,7 @@ public static Builder builder() { * The default kafka partition to publish to. * * @return the default kafka partition + * @see #partitionColumn() */ public abstract OptionalInt partition(); @@ -108,7 +110,8 @@ public boolean publishInitial() { /** * The topic column. When set, uses the the given {@link CharSequence}-compatible column from {@link #table()} as - * the first source for setting the kafka record topic. + * the first source for setting the kafka record topic. When not present, or the column value is null, + * {@link #topic()} will be used. * * @return the topic column name */ @@ -116,7 +119,10 @@ public boolean publishInitial() { /** * The partition column. When set, uses the the given {@code int} column from {@link #table()} as the first source - * for setting the kafka record partition. + * for setting the kafka record partition. When not present, or the column value is null, {@link #partition()} will + * be used if present. If a valid partition number is specified that partition will be used when sending the record. + * If no partition is specified but a key is present a partition will be chosen using a hash of the key. If neither + * key nor partition is present a partition will be assigned in a round-robin fashion. * * @return the partition column name */ @@ -124,9 +130,13 @@ public boolean publishInitial() { /** * The timestamp column. When set, uses the the given {@link Instant} column from {@link #table()} as the first - * source for setting the kafka record timestamp. + * source for setting the kafka record timestamp. When not present, or the column value is null, the producer will + * stamp the record with its current time. The timestamp eventually used by Kafka depends on the timestamp type + * configured for the topic. If the topic is configured to use CreateTime, the timestamp in the producer record will + * be used by the broker. If the topic is configured to use LogAppendTime, the timestamp in the producer record will + * be overwritten by the broker with the broker local time when it appends the message to its log. * - * @return the partition column name + * @return the timestamp column name */ public abstract Optional timestampColumn(); diff --git a/extensions/kafka/src/test/java/io/deephaven/kafka/KafkaPublishOptionsTest.java b/extensions/kafka/src/test/java/io/deephaven/kafka/KafkaPublishOptionsTest.java index a4a200ec5a4..b6d351e7d33 100644 --- a/extensions/kafka/src/test/java/io/deephaven/kafka/KafkaPublishOptionsTest.java +++ b/extensions/kafka/src/test/java/io/deephaven/kafka/KafkaPublishOptionsTest.java @@ -34,6 +34,18 @@ public void ok() { .build(); } + @Test + public void okPartition() { + KafkaPublishOptions.builder() + .table(TableTools.newTable(TD)) + .topic("HotTopic") + .partition(123) + .config(new Properties()) + .valueSpec(Produce.simpleSpec("MyValue")) + .build(); + } + + @Test public void checkNotBothIgnore() { try { diff --git a/py/server/deephaven/stream/kafka/producer.py b/py/server/deephaven/stream/kafka/producer.py index 645f8a6760f..0f530a041c9 100644 --- a/py/server/deephaven/stream/kafka/producer.py +++ b/py/server/deephaven/stream/kafka/producer.py @@ -54,7 +54,7 @@ def produce( kafka_config (Dict): configuration for the associated kafka producer. This is used to call the constructor of org.apache.kafka.clients.producer.KafkaProducer; pass any KafkaProducer specific desired configuration here - topic (Optional[str]): the default topic name + topic (Optional[str]): the default topic name. When None, topic_column must be set. See topic_column for behavior. key_spec (KeyValueSpec): specifies how to map table column(s) to the Key field in produced Kafka messages. This should be the result of calling one of the functions simple_spec(), avro_spec() or json_spec() in this module, or the constant KeyValueSpec.IGNORE @@ -66,13 +66,22 @@ def produce( aggregation on table grouped by the input columns of key_spec and publish to Kafka from the result. publish_initial (bool): whether the initial data in table should be published. When False, table.is_refreshing must be True. By default, is True. - partition (Optional[int]): the default partition, None by default. + partition (Optional[int]): the default partition, None by default. See partition_column for partition behavior. topic_column (Optional[str]): the topic column, None by default. When set, uses the the given string column from - table as the first source for setting the kafka record topic. + table as the first source for setting the kafka record topic. When None, or the column value is null, topic + will be used. partition_column (Optional[str]): the partition column, None by default. When set, uses the the given int column - from table as the first source for setting the kafka record partition. + from table as the first source for setting the kafka record partition. When None, or the column value is null, + partition will be used if present. If a valid partition number is specified that partition will be used when + sending the record. If no partition is specified but a key is present a partition will be chosen using a hash + of the key. If neither key nor partition is present a partition will be assigned in a round-robin fashion. timestamp_column (Optional[str]): the timestamp column, None by default. When set, uses the the given timestamp - column from table as the first source for setting the kafka record timestamp. + column from table as the first source for setting the kafka record timestamp. When None, or the column value + is null, the producer will stamp the record with its current time. The timestamp eventually used by Kafka + depends on the timestamp type configured for the topic. If the topic is configured to use CreateTime, the + timestamp in the producer record will be used by the broker. If the topic is configured to use LogAppendTime, + the timestamp in the producer record will be overwritten by the broker with the broker local time when it + appends the message to its log. Returns: a callback that, when invoked, stops publishing and cleans up subscriptions and resources.