Skip to content

Commit

Permalink
Review response
Browse files Browse the repository at this point in the history
  • Loading branch information
devinrsmith committed Nov 16, 2023
1 parent 564530c commit 3f5e5a1
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -505,8 +505,8 @@ public void describeDifferences(@NotNull List<String> differences, @NotNull fina
}

/**
* Checks if {@link #getDataType() dataType} can be cast to {@code destDataType}. If not, this throws a
* {@link ClassCastException}.
* Checks if objects of type {@link #getDataType() dataType} can be cast to {@code destDataType} (equivalent to
* {@code destDataType.isAssignableFrom(dataType)}). If not, this throws a {@link ClassCastException}.
*
* @param destDataType the destination data type
*/
Expand All @@ -515,9 +515,11 @@ public final void checkCastTo(Class<?> destDataType) {
}

/**
* Checks if {@link #getDataType() dataType} can be cast to {@code destDataType} and checks that
* {@link #getComponentType() componentType} can be cast to {@code destComponentType} (both component types must be
* present and cast-able, or both must be {@code null}). If not, this throws a {@link ClassCastException}.
* Checks if objects of type {@link #getDataType() dataType} can be cast to {@code destDataType} (equivalent to
* {@code destDataType.isAssignableFrom(dataType)}) and checks that objects of type {@link #getComponentType()
* componentType} can be cast to {@code destComponentType} (both component types must be present and cast-able, or
* both must be {@code null}; when both present, is equivalent to
* {@code destComponentType.isAssignableFrom(componentType)}). If not, this throws a {@link ClassCastException}.
*
* @param destDataType the destination data type
* @param destComponentType the destination component type, may be {@code null}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,8 @@ public final void checkHasColumn(@NotNull String columnName) {
}

/**
* Checks if {@code columnName} exists and if supports {@link ColumnDefinition#checkCastTo(Class)} with
* {@code clazz}. Otherwise, throws a {@link NoSuchColumnException} or a {@link ClassCastException}.
* Checks if {@code columnName} exists and supports {@link ColumnDefinition#checkCastTo(Class)} with {@code clazz}.
* Otherwise, throws a {@link NoSuchColumnException} or a {@link ClassCastException}.
*
* @param columnName the column name
* @param clazz the data type
Expand All @@ -330,7 +330,7 @@ public final void checkHasColumn(@NotNull String columnName, @NotNull Class<?> c
}

/**
* Checks if {@code columnName} exists and if supports {@link ColumnDefinition#checkCastTo(Class, Class)} with
* Checks if {@code columnName} exists and supports {@link ColumnDefinition#checkCastTo(Class, Class)} with
* {@code clazz} and {@code componentType}. Otherwise, throws a {@link NoSuchColumnException} or a
* {@link ClassCastException}.
*
Expand All @@ -357,7 +357,6 @@ public final void checkHasColumns(@NotNull Collection<String> columns) {
NoSuchColumnException.throwIf(getColumnNameSet(), columns);
}


/**
* Tests mutual-compatibility of {@code this} and {@code other}. To be mutually compatible, they must have the same
* number of columns, each matched up with {@link ColumnDefinition#isCompatible}. As such, this method has an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,26 @@ public static Builder builder() {
public abstract Table table();

/**
* The default kafka topic to publish to. When {@code null}, {@link #topicColumn()} must be set.
* The default Kafka topic to publish to. When {@code null}, {@link #topicColumn()} must be set.
*
* @return the default kafka topic
* @return the default Kafka topic
* @see #topicColumn()
*/
@Nullable
public abstract String topic();

/**
* The default kafka partition to publish to.
* The default Kafka partition to publish to.
*
* @return the default kafka partition
* @return the default Kafka partition
* @see #partitionColumn()
*/
public abstract OptionalInt partition();

/**
* The kafka configuration properties.
* The Kafka configuration properties.
*
* @return the kafka configuration
* @return the Kafka configuration
*/
public abstract Properties config();

Expand Down Expand Up @@ -110,7 +110,7 @@ public boolean publishInitial() {

/**
* The topic column. When set, uses the the given {@link CharSequence}-compatible column from {@link #table()} as
* the first source for setting the kafka record topic. When not present, or the column value is null,
* the first source for setting the Kafka record topic. When not present, or if the column value is null,
* {@link #topic()} will be used.
*
* @return the topic column name
Expand All @@ -119,19 +119,19 @@ public boolean publishInitial() {

/**
* The partition column. When set, uses the the given {@code int} column from {@link #table()} as the first source
* for setting the kafka record partition. When not present, or the column value is null, {@link #partition()} will
* be used if present. If a valid partition number is specified, that partition will be used when sending the
* record. If no partition is specified but a key is present, a partition will be chosen using a hash of the key. If
* neither key nor partition is present, a partition will be assigned in a round-robin fashion.
* for setting the Kafka record partition. When not present, or if the column value is null, {@link #partition()}
* will be used if present. If a valid partition number is specified, that partition will be used when sending the
* record. Otherwise, Kafka will choose a partition using a hash of the key if the key is present, or will assign a
* partition in a round-robin fashion if the key is not present.
*
* @return the partition column name
*/
public abstract Optional<ColumnName> partitionColumn();

/**
* The timestamp column. When set, uses the the given {@link Instant} column from {@link #table()} as the first
* source for setting the kafka record timestamp. When not present, or the column value is null, the producer will
* stamp the record with its current time. The timestamp eventually used by Kafka depends on the timestamp type
* source for setting the Kafka record timestamp. When not present, or if the column value is null, the producer
* will stamp the record with its current time. The timestamp eventually used by Kafka depends on the timestamp type
* configured for the topic. If the topic is configured to use CreateTime, the timestamp in the producer record will
* be used by the broker. If the topic is configured to use LogAppendTime, the timestamp in the producer record will
* be overwritten by the broker with the broker local time when it appends the message to its log.
Expand Down
42 changes: 21 additions & 21 deletions py/server/deephaven/stream/kafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,18 @@ def produce(
last_by_key_columns: bool = False,
publish_initial: bool = True,
partition: Optional[int] = None,
topic_column: Optional[str] = None,
partition_column: Optional[str] = None,
timestamp_column: Optional[str] = None,
topic_col: Optional[str] = None,
partition_col: Optional[str] = None,
timestamp_col: Optional[str] = None,
) -> Callable[[], None]:
"""Produce to Kafka from a Deephaven table.
Args:
table (Table): the source table to publish to Kafka
kafka_config (Dict): configuration for the associated kafka producer.
kafka_config (Dict): configuration for the associated Kafka producer.
This is used to call the constructor of org.apache.kafka.clients.producer.KafkaProducer;
pass any KafkaProducer specific desired configuration here
topic (Optional[str]): the default topic name. When None, topic_column must be set. See topic_column for behavior.
topic (Optional[str]): the default topic name. When None, topic_col must be set. See topic_col for behavior.
key_spec (KeyValueSpec): specifies how to map table column(s) to the Key field in produced Kafka messages.
This should be the result of calling one of the functions simple_spec(), avro_spec() or json_spec() in this
module, or the constant KeyValueSpec.IGNORE
Expand All @@ -66,17 +66,17 @@ def produce(
aggregation on table grouped by the input columns of key_spec and publish to Kafka from the result.
publish_initial (bool): whether the initial data in table should be published. When False, table.is_refreshing
must be True. By default, is True.
partition (Optional[int]): the default partition, None by default. See partition_column for partition behavior.
topic_column (Optional[str]): the topic column, None by default. When set, uses the the given string column from
table as the first source for setting the kafka record topic. When None, or the column value is null, topic
partition (Optional[int]): the default partition, None by default. See partition_col for partition behavior.
topic_col (Optional[str]): the topic column, None by default. When set, uses the the given string column from
table as the first source for setting the Kafka record topic. When None, or if the column value is null, topic
will be used.
partition_column (Optional[str]): the partition column, None by default. When set, uses the the given int column
from table as the first source for setting the kafka record partition. When None, or the column value is null,
partition will be used if present. If a valid partition number is specified, that partition will be used when
sending the record. If no partition is specified but a key is present, a partition will be chosen using a hash
of the key. If neither key nor partition is present, a partition will be assigned in a round-robin fashion.
timestamp_column (Optional[str]): the timestamp column, None by default. When set, uses the the given timestamp
column from table as the first source for setting the kafka record timestamp. When None, or the column value
partition_col (Optional[str]): the partition column, None by default. When set, uses the the given int column
from table as the first source for setting the Kafka record partition. When None, or if the column value is null,
partition will be used if present. If a valid partition number is specified, that partition will be used
when sending the record. Otherwise, Kafka will choose a partition using a hash of the key if the key is present,
or will assign a partition in a round-robin fashion if the key is not present.
timestamp_col (Optional[str]): the timestamp column, None by default. When set, uses the the given timestamp
column from table as the first source for setting the Kafka record timestamp. When None, or if the column value
is null, the producer will stamp the record with its current time. The timestamp eventually used by Kafka
depends on the timestamp type configured for the topic. If the topic is configured to use CreateTime, the
timestamp in the producer record will be used by the broker. If the topic is configured to use LogAppendTime,
Expand Down Expand Up @@ -111,12 +111,12 @@ def produce(
options_builder.topic(topic)
if partition:
options_builder.partition(partition)
if topic_column:
options_builder.topicColumn(_JColumnName.of(topic_column))
if partition_column:
options_builder.partitionColumn(_JColumnName.of(partition_column))
if timestamp_column:
options_builder.timestampColumn(_JColumnName.of(timestamp_column))
if topic_col:
options_builder.topicColumn(_JColumnName.of(topic_col))
if partition_col:
options_builder.partitionColumn(_JColumnName.of(partition_col))
if timestamp_col:
options_builder.timestampColumn(_JColumnName.of(timestamp_col))

with auto_locking_ctx(table):
runnable = _JKafkaTools.produceFromTable(options_builder.build())
Expand Down
20 changes: 10 additions & 10 deletions py/server/tests/test_kafka_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def test_simple_spec(self):
self.assertIsNotNone(cleanup)
cleanup()

def test_simple_spec_topic_column_no_default_topic(self):
def test_simple_spec_topic_col_no_default_topic(self):
"""
Check a simple Kafka producer works with a topic column but no default topic
"""
Expand All @@ -65,13 +65,13 @@ def test_simple_spec_topic_column_no_default_topic(self):
None,
key_spec=KeyValueSpec.IGNORE,
value_spec=pk.simple_spec('Price'),
topic_column='Topic'
topic_col='Topic'
)

self.assertIsNotNone(cleanup)
cleanup()

def test_simple_spec_topic_column_default_topic(self):
def test_simple_spec_topic_col_default_topic(self):
"""
Check a simple Kafka producer works with a topic column and a default topic
"""
Expand All @@ -85,7 +85,7 @@ def test_simple_spec_topic_column_default_topic(self):
'orders',
key_spec=KeyValueSpec.IGNORE,
value_spec=pk.simple_spec('Price'),
topic_column='Topic'
topic_col='Topic'
)

self.assertIsNotNone(cleanup)
Expand All @@ -110,7 +110,7 @@ def test_simple_spec_default_partition(self):
self.assertIsNotNone(cleanup)
cleanup()

def test_simple_spec_partition_column_no_default_partition(self):
def test_simple_spec_partition_col_no_default_partition(self):
"""
Check a simple Kafka producer works with a partition column
"""
Expand All @@ -124,13 +124,13 @@ def test_simple_spec_partition_column_no_default_partition(self):
"orders",
key_spec=KeyValueSpec.IGNORE,
value_spec=pk.simple_spec('Price'),
partition_column='Partition'
partition_col='Partition'
)

self.assertIsNotNone(cleanup)
cleanup()

def test_simple_spec_partition_column_default_partition(self):
def test_simple_spec_partition_col_default_partition(self):
"""
Check a simple Kafka producer works with a partition column and default partition
"""
Expand All @@ -145,13 +145,13 @@ def test_simple_spec_partition_column_default_partition(self):
key_spec=KeyValueSpec.IGNORE,
value_spec=pk.simple_spec('Price'),
partition=0,
partition_column='Partition'
partition_col='Partition'
)

self.assertIsNotNone(cleanup)
cleanup()

def test_simple_spec_timestamp_column(self):
def test_simple_spec_timestamp_col(self):
"""
Check a simple Kafka producer works with a timestamp column
"""
Expand All @@ -165,7 +165,7 @@ def test_simple_spec_timestamp_column(self):
"orders",
key_spec=KeyValueSpec.IGNORE,
value_spec=pk.simple_spec('Price'),
timestamp_column='Timestamp'
timestamp_col='Timestamp'
)

self.assertIsNotNone(cleanup)
Expand Down

0 comments on commit 3f5e5a1

Please sign in to comment.