From 3f5e5a1f5e330e67c4ed1537ed7b406049ed7006 Mon Sep 17 00:00:00 2001 From: Devin Smith Date: Thu, 16 Nov 2023 15:17:07 -0800 Subject: [PATCH] Review response --- .../engine/table/ColumnDefinition.java | 12 +++--- .../engine/table/TableDefinition.java | 7 ++-- .../deephaven/kafka/KafkaPublishOptions.java | 26 ++++++------ py/server/deephaven/stream/kafka/producer.py | 42 +++++++++---------- py/server/tests/test_kafka_producer.py | 20 ++++----- 5 files changed, 54 insertions(+), 53 deletions(-) diff --git a/engine/api/src/main/java/io/deephaven/engine/table/ColumnDefinition.java b/engine/api/src/main/java/io/deephaven/engine/table/ColumnDefinition.java index 914333c9908..e729a627bf8 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/ColumnDefinition.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/ColumnDefinition.java @@ -505,8 +505,8 @@ public void describeDifferences(@NotNull List differences, @NotNull fina } /** - * Checks if {@link #getDataType() dataType} can be cast to {@code destDataType}. If not, this throws a - * {@link ClassCastException}. + * Checks if objects of type {@link #getDataType() dataType} can be cast to {@code destDataType} (equivalent to + * {@code destDataType.isAssignableFrom(dataType)}). If not, this throws a {@link ClassCastException}. * * @param destDataType the destination data type */ @@ -515,9 +515,11 @@ public final void checkCastTo(Class destDataType) { } /** - * Checks if {@link #getDataType() dataType} can be cast to {@code destDataType} and checks that - * {@link #getComponentType() componentType} can be cast to {@code destComponentType} (both component types must be - * present and cast-able, or both must be {@code null}). If not, this throws a {@link ClassCastException}. + * Checks if objects of type {@link #getDataType() dataType} can be cast to {@code destDataType} (equivalent to + * {@code destDataType.isAssignableFrom(dataType)}) and checks that objects of type {@link #getComponentType() + * componentType} can be cast to {@code destComponentType} (both component types must be present and cast-able, or + * both must be {@code null}; when both present, is equivalent to + * {@code destComponentType.isAssignableFrom(componentType)}). If not, this throws a {@link ClassCastException}. * * @param destDataType the destination data type * @param destComponentType the destination component type, may be {@code null} diff --git a/engine/api/src/main/java/io/deephaven/engine/table/TableDefinition.java b/engine/api/src/main/java/io/deephaven/engine/table/TableDefinition.java index 79e002887f6..45fe5a33a37 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/TableDefinition.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/TableDefinition.java @@ -314,8 +314,8 @@ public final void checkHasColumn(@NotNull String columnName) { } /** - * Checks if {@code columnName} exists and if supports {@link ColumnDefinition#checkCastTo(Class)} with - * {@code clazz}. Otherwise, throws a {@link NoSuchColumnException} or a {@link ClassCastException}. + * Checks if {@code columnName} exists and supports {@link ColumnDefinition#checkCastTo(Class)} with {@code clazz}. + * Otherwise, throws a {@link NoSuchColumnException} or a {@link ClassCastException}. * * @param columnName the column name * @param clazz the data type @@ -330,7 +330,7 @@ public final void checkHasColumn(@NotNull String columnName, @NotNull Class c } /** - * Checks if {@code columnName} exists and if supports {@link ColumnDefinition#checkCastTo(Class, Class)} with + * Checks if {@code columnName} exists and supports {@link ColumnDefinition#checkCastTo(Class, Class)} with * {@code clazz} and {@code componentType}. Otherwise, throws a {@link NoSuchColumnException} or a * {@link ClassCastException}. * @@ -357,7 +357,6 @@ public final void checkHasColumns(@NotNull Collection columns) { NoSuchColumnException.throwIf(getColumnNameSet(), columns); } - /** * Tests mutual-compatibility of {@code this} and {@code other}. To be mutually compatible, they must have the same * number of columns, each matched up with {@link ColumnDefinition#isCompatible}. As such, this method has an diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaPublishOptions.java b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaPublishOptions.java index 79d9d436e2b..7e6a43d4013 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaPublishOptions.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaPublishOptions.java @@ -39,26 +39,26 @@ public static Builder builder() { public abstract Table table(); /** - * The default kafka topic to publish to. When {@code null}, {@link #topicColumn()} must be set. + * The default Kafka topic to publish to. When {@code null}, {@link #topicColumn()} must be set. * - * @return the default kafka topic + * @return the default Kafka topic * @see #topicColumn() */ @Nullable public abstract String topic(); /** - * The default kafka partition to publish to. + * The default Kafka partition to publish to. * - * @return the default kafka partition + * @return the default Kafka partition * @see #partitionColumn() */ public abstract OptionalInt partition(); /** - * The kafka configuration properties. + * The Kafka configuration properties. * - * @return the kafka configuration + * @return the Kafka configuration */ public abstract Properties config(); @@ -110,7 +110,7 @@ public boolean publishInitial() { /** * The topic column. When set, uses the the given {@link CharSequence}-compatible column from {@link #table()} as - * the first source for setting the kafka record topic. When not present, or the column value is null, + * the first source for setting the Kafka record topic. When not present, or if the column value is null, * {@link #topic()} will be used. * * @return the topic column name @@ -119,10 +119,10 @@ public boolean publishInitial() { /** * The partition column. When set, uses the the given {@code int} column from {@link #table()} as the first source - * for setting the kafka record partition. When not present, or the column value is null, {@link #partition()} will - * be used if present. If a valid partition number is specified, that partition will be used when sending the - * record. If no partition is specified but a key is present, a partition will be chosen using a hash of the key. If - * neither key nor partition is present, a partition will be assigned in a round-robin fashion. + * for setting the Kafka record partition. When not present, or if the column value is null, {@link #partition()} + * will be used if present. If a valid partition number is specified, that partition will be used when sending the + * record. Otherwise, Kafka will choose a partition using a hash of the key if the key is present, or will assign a + * partition in a round-robin fashion if the key is not present. * * @return the partition column name */ @@ -130,8 +130,8 @@ public boolean publishInitial() { /** * The timestamp column. When set, uses the the given {@link Instant} column from {@link #table()} as the first - * source for setting the kafka record timestamp. When not present, or the column value is null, the producer will - * stamp the record with its current time. The timestamp eventually used by Kafka depends on the timestamp type + * source for setting the Kafka record timestamp. When not present, or if the column value is null, the producer + * will stamp the record with its current time. The timestamp eventually used by Kafka depends on the timestamp type * configured for the topic. If the topic is configured to use CreateTime, the timestamp in the producer record will * be used by the broker. If the topic is configured to use LogAppendTime, the timestamp in the producer record will * be overwritten by the broker with the broker local time when it appends the message to its log. diff --git a/py/server/deephaven/stream/kafka/producer.py b/py/server/deephaven/stream/kafka/producer.py index 77ff9ee5c02..0b814ee8e8d 100644 --- a/py/server/deephaven/stream/kafka/producer.py +++ b/py/server/deephaven/stream/kafka/producer.py @@ -43,18 +43,18 @@ def produce( last_by_key_columns: bool = False, publish_initial: bool = True, partition: Optional[int] = None, - topic_column: Optional[str] = None, - partition_column: Optional[str] = None, - timestamp_column: Optional[str] = None, + topic_col: Optional[str] = None, + partition_col: Optional[str] = None, + timestamp_col: Optional[str] = None, ) -> Callable[[], None]: """Produce to Kafka from a Deephaven table. Args: table (Table): the source table to publish to Kafka - kafka_config (Dict): configuration for the associated kafka producer. + kafka_config (Dict): configuration for the associated Kafka producer. This is used to call the constructor of org.apache.kafka.clients.producer.KafkaProducer; pass any KafkaProducer specific desired configuration here - topic (Optional[str]): the default topic name. When None, topic_column must be set. See topic_column for behavior. + topic (Optional[str]): the default topic name. When None, topic_col must be set. See topic_col for behavior. key_spec (KeyValueSpec): specifies how to map table column(s) to the Key field in produced Kafka messages. This should be the result of calling one of the functions simple_spec(), avro_spec() or json_spec() in this module, or the constant KeyValueSpec.IGNORE @@ -66,17 +66,17 @@ def produce( aggregation on table grouped by the input columns of key_spec and publish to Kafka from the result. publish_initial (bool): whether the initial data in table should be published. When False, table.is_refreshing must be True. By default, is True. - partition (Optional[int]): the default partition, None by default. See partition_column for partition behavior. - topic_column (Optional[str]): the topic column, None by default. When set, uses the the given string column from - table as the first source for setting the kafka record topic. When None, or the column value is null, topic + partition (Optional[int]): the default partition, None by default. See partition_col for partition behavior. + topic_col (Optional[str]): the topic column, None by default. When set, uses the the given string column from + table as the first source for setting the Kafka record topic. When None, or if the column value is null, topic will be used. - partition_column (Optional[str]): the partition column, None by default. When set, uses the the given int column - from table as the first source for setting the kafka record partition. When None, or the column value is null, - partition will be used if present. If a valid partition number is specified, that partition will be used when - sending the record. If no partition is specified but a key is present, a partition will be chosen using a hash - of the key. If neither key nor partition is present, a partition will be assigned in a round-robin fashion. - timestamp_column (Optional[str]): the timestamp column, None by default. When set, uses the the given timestamp - column from table as the first source for setting the kafka record timestamp. When None, or the column value + partition_col (Optional[str]): the partition column, None by default. When set, uses the the given int column + from table as the first source for setting the Kafka record partition. When None, or if the column value is null, + partition will be used if present. If a valid partition number is specified, that partition will be used + when sending the record. Otherwise, Kafka will choose a partition using a hash of the key if the key is present, + or will assign a partition in a round-robin fashion if the key is not present. + timestamp_col (Optional[str]): the timestamp column, None by default. When set, uses the the given timestamp + column from table as the first source for setting the Kafka record timestamp. When None, or if the column value is null, the producer will stamp the record with its current time. The timestamp eventually used by Kafka depends on the timestamp type configured for the topic. If the topic is configured to use CreateTime, the timestamp in the producer record will be used by the broker. If the topic is configured to use LogAppendTime, @@ -111,12 +111,12 @@ def produce( options_builder.topic(topic) if partition: options_builder.partition(partition) - if topic_column: - options_builder.topicColumn(_JColumnName.of(topic_column)) - if partition_column: - options_builder.partitionColumn(_JColumnName.of(partition_column)) - if timestamp_column: - options_builder.timestampColumn(_JColumnName.of(timestamp_column)) + if topic_col: + options_builder.topicColumn(_JColumnName.of(topic_col)) + if partition_col: + options_builder.partitionColumn(_JColumnName.of(partition_col)) + if timestamp_col: + options_builder.timestampColumn(_JColumnName.of(timestamp_col)) with auto_locking_ctx(table): runnable = _JKafkaTools.produceFromTable(options_builder.build()) diff --git a/py/server/tests/test_kafka_producer.py b/py/server/tests/test_kafka_producer.py index a4b61f70ea7..d71b8ca8960 100644 --- a/py/server/tests/test_kafka_producer.py +++ b/py/server/tests/test_kafka_producer.py @@ -51,7 +51,7 @@ def test_simple_spec(self): self.assertIsNotNone(cleanup) cleanup() - def test_simple_spec_topic_column_no_default_topic(self): + def test_simple_spec_topic_col_no_default_topic(self): """ Check a simple Kafka producer works with a topic column but no default topic """ @@ -65,13 +65,13 @@ def test_simple_spec_topic_column_no_default_topic(self): None, key_spec=KeyValueSpec.IGNORE, value_spec=pk.simple_spec('Price'), - topic_column='Topic' + topic_col='Topic' ) self.assertIsNotNone(cleanup) cleanup() - def test_simple_spec_topic_column_default_topic(self): + def test_simple_spec_topic_col_default_topic(self): """ Check a simple Kafka producer works with a topic column and a default topic """ @@ -85,7 +85,7 @@ def test_simple_spec_topic_column_default_topic(self): 'orders', key_spec=KeyValueSpec.IGNORE, value_spec=pk.simple_spec('Price'), - topic_column='Topic' + topic_col='Topic' ) self.assertIsNotNone(cleanup) @@ -110,7 +110,7 @@ def test_simple_spec_default_partition(self): self.assertIsNotNone(cleanup) cleanup() - def test_simple_spec_partition_column_no_default_partition(self): + def test_simple_spec_partition_col_no_default_partition(self): """ Check a simple Kafka producer works with a partition column """ @@ -124,13 +124,13 @@ def test_simple_spec_partition_column_no_default_partition(self): "orders", key_spec=KeyValueSpec.IGNORE, value_spec=pk.simple_spec('Price'), - partition_column='Partition' + partition_col='Partition' ) self.assertIsNotNone(cleanup) cleanup() - def test_simple_spec_partition_column_default_partition(self): + def test_simple_spec_partition_col_default_partition(self): """ Check a simple Kafka producer works with a partition column and default partition """ @@ -145,13 +145,13 @@ def test_simple_spec_partition_column_default_partition(self): key_spec=KeyValueSpec.IGNORE, value_spec=pk.simple_spec('Price'), partition=0, - partition_column='Partition' + partition_col='Partition' ) self.assertIsNotNone(cleanup) cleanup() - def test_simple_spec_timestamp_column(self): + def test_simple_spec_timestamp_col(self): """ Check a simple Kafka producer works with a timestamp column """ @@ -165,7 +165,7 @@ def test_simple_spec_timestamp_column(self): "orders", key_spec=KeyValueSpec.IGNORE, value_spec=pk.simple_spec('Price'), - timestamp_column='Timestamp' + timestamp_col='Timestamp' ) self.assertIsNotNone(cleanup)