Skip to content

Commit

Permalink
Respond to review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jmao-denver committed Nov 29, 2023
1 parent 7909f0c commit a170d55
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 14 deletions.
16 changes: 8 additions & 8 deletions py/server/deephaven/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -2673,8 +2673,8 @@ def transform(self, func: Callable[[Table], Table],
func (Callable[[Table], Table]): a function which takes a Table as input and returns a new Table
dependencies (Optional[Sequence[Union[Table, PartitionedTable]]]): additional dependencies that must be
satisfied before applying the provided transform function to added or modified constituents during
update processing, use this when the transform function uses additional Table or Partitioned Table
inputs besides the constituents of this PartitionedTable. Defaults to None.
update processing. If the transform function uses any other refreshing Table or refreshing Partitioned
Table, they must be included in this argument. Defaults to None.
Returns:
a PartitionedTable
Expand All @@ -2685,8 +2685,8 @@ def transform(self, func: Callable[[Table], Table],
try:
j_operator = j_unary_operator(func, dtypes.from_jtype(Table.j_object_type.jclass))
dependencies = to_sequence(dependencies, wrapped=True)
j_dependencies = [d.j_table for d in dependencies if isinstance(d, Table)]
j_dependencies.extend([d.table.j_table for d in dependencies if isinstance(d, PartitionedTable)])
j_dependencies = [d.j_table for d in dependencies if isinstance(d, Table) and d.is_refreshing]
j_dependencies.extend([d.table.j_table for d in dependencies if isinstance(d, PartitionedTable) and d.is_refreshing])
with auto_locking_ctx(self, *dependencies):
j_pt = self.j_partitioned_table.transform(j_operator, j_dependencies)
return PartitionedTable(j_partitioned_table=j_pt)
Expand All @@ -2710,8 +2710,8 @@ def partitioned_transform(self, other: PartitionedTable, func: Callable[[Table,
func (Callable[[Table, Table], Table]): a function which takes two Tables as input and returns a new Table
dependencies (Optional[Sequence[Union[Table, PartitionedTable]]]): additional dependencies that must be
satisfied before applying the provided transform function to added, modified, or newly-matched
constituents during update processing, use this when the transform function uses additional Table or
Partitioned Table inputs besides the constituents of this PartitionedTable. Defaults to None.
constituents during update processing. If the transform function uses any other refreshing Table or
refreshing Partitioned Table, they must be included in this argument. Defaults to None.
Returns:
a PartitionedTable
Expand All @@ -2722,8 +2722,8 @@ def partitioned_transform(self, other: PartitionedTable, func: Callable[[Table,
try:
j_operator = j_binary_operator(func, dtypes.from_jtype(Table.j_object_type.jclass))
dependencies = to_sequence(dependencies, wrapped=True)
j_dependencies = [d.j_table for d in dependencies if isinstance(d, Table)]
j_dependencies.extend([d.table.j_table for d in dependencies if isinstance(d, PartitionedTable)])
j_dependencies = [d.j_table for d in dependencies if isinstance(d, Table) and d.is_refreshing]
j_dependencies.extend([d.table.j_table for d in dependencies if isinstance(d, PartitionedTable) and d.is_refreshing])
with auto_locking_ctx(self, other, *dependencies):
j_pt = self.j_partitioned_table.partitionedTransform(other.j_partitioned_table, j_operator,
j_dependencies)
Expand Down
19 changes: 13 additions & 6 deletions py/server/tests/test_partitioned_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,12 @@ def test_transform(self):
pt = self.partitioned_table.transform(Transformer)
self.assertIn("f", [col.name for col in pt.constituent_table_columns])

with shared_lock(self.test_table):
pt = self.partitioned_table.transform(Transformer, dependencies=[self.test_table])
self.assertIn("f", [col.name for col in pt.constituent_table_columns])
ticking_t = time_table("PT00:00:01")
pt = self.partitioned_table.transform(Transformer, dependencies=[ticking_t])
self.assertIn("f", [col.name for col in pt.constituent_table_columns])

pt = self.partitioned_table.transform(Transformer, dependencies=[self.test_table])
self.assertIn("f", [col.name for col in pt.constituent_table_columns])

with self.assertRaises(DHError) as cm:
pt = self.partitioned_table.transform(lambda t, t1: t.join(t1))
Expand All @@ -146,9 +149,13 @@ def test_partitioned_transform(self):
pt = self.partitioned_table.partitioned_transform(other_pt, PartitionedTransformer())
self.assertIn("f", [col.name for col in pt.constituent_table_columns])

with shared_lock(other_pt):
pt = self.partitioned_table.partitioned_transform(other_pt, PartitionedTransformer(), dependencies=[other_pt])
self.assertIn("f", [col.name for col in pt.constituent_table_columns])
ticking_pt = time_table("PT00:00:01").update(["X= i % 10", "Y = String.valueOf(i)"]).partition_by("X")
pt = self.partitioned_table.partitioned_transform(other_pt, PartitionedTransformer(),
dependencies=[ticking_pt])
self.assertIn("f", [col.name for col in pt.constituent_table_columns])

pt = self.partitioned_table.partitioned_transform(other_pt, PartitionedTransformer(), dependencies=[other_pt])
self.assertIn("f", [col.name for col in pt.constituent_table_columns])

def test_partition_agg(self):
with update_graph.shared_lock(self.test_update_graph):
Expand Down

0 comments on commit a170d55

Please sign in to comment.