Skip to content

Commit

Permalink
feat: Add add_only_to_blink() function (#6204)
Browse files Browse the repository at this point in the history
Fixes #6185

---------

Co-authored-by: Ryan Caudy <[email protected]>
Co-authored-by: Chip Kent <[email protected]>
  • Loading branch information
3 people authored Oct 22, 2024
1 parent d25bfd6 commit 79b8d8a
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import org.jetbrains.annotations.NotNull;

import java.util.List;
import java.util.Map;

import static io.deephaven.engine.table.Table.APPEND_ONLY_TABLE_ATTRIBUTE;
import static io.deephaven.engine.table.Table.BLINK_TABLE_ATTRIBUTE;

/**
Expand All @@ -42,14 +44,17 @@ public static Table toBlink(@NotNull final Table table) {
return table;
}

final Table addOnlyTable;
if (!Boolean.TRUE.equals(table.getAttribute(Table.ADD_ONLY_TABLE_ATTRIBUTE))
&& !Boolean.TRUE.equals(table.getAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE))) {
throw new IllegalArgumentException("Argument table is neither add-only nor append-only");
&& !Boolean.TRUE.equals(table.getAttribute(APPEND_ONLY_TABLE_ATTRIBUTE))) {
addOnlyTable = table.withAttributes(Map.of(Table.ADD_ONLY_TABLE_ATTRIBUTE, Boolean.TRUE));
} else {
addOnlyTable = table;
}

final MutableObject<QueryTable> resultHolder = new MutableObject<>();
final MutableObject<AddOnlyToBlinkListener> listenerHolder = new MutableObject<>();
final BaseTable<?> coalesced = (BaseTable<?>) table.coalesce();
final BaseTable<?> coalesced = (BaseTable<?>) addOnlyTable.coalesce();
final OperationSnapshotControl snapshotControl =
coalesced.createSnapshotControlIfRefreshing(OperationSnapshotControl::new);

Expand All @@ -58,14 +63,14 @@ public static Table toBlink(@NotNull final Table table) {
(final boolean usePrev, final long beforeClockValue) -> {
// Start with the same rows as the original table
final TrackingRowSet resultRowSet = usePrev
? table.getRowSet().copyPrev().toTracking()
: table.getRowSet().copy().toTracking();
final QueryTable result = new QueryTable(resultRowSet, table.getColumnSourceMap());
? addOnlyTable.getRowSet().copyPrev().toTracking()
: addOnlyTable.getRowSet().copy().toTracking();
final QueryTable result = new QueryTable(resultRowSet, addOnlyTable.getColumnSourceMap());
result.setRefreshing(true);
result.setAttribute(BLINK_TABLE_ATTRIBUTE, true);

final ListenerRecorder recorder =
new ListenerRecorder("AddOnlyToBlinkListenerRecorder", table, null);
new ListenerRecorder("AddOnlyToBlinkListenerRecorder", addOnlyTable, null);
final AddOnlyToBlinkListener listener = new AddOnlyToBlinkListener(recorder, result);
recorder.setMergedListener(listener);
result.addParentReference(listener);
Expand Down
24 changes: 24 additions & 0 deletions py/server/deephaven/stream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,30 @@
from deephaven.table import Table

_JBlinkTableTools = jpy.get_type("io.deephaven.engine.table.impl.BlinkTableTools")
_JAddOnlyToBlinkTableAdapter = jpy.get_type("io.deephaven.engine.table.impl.AddOnlyToBlinkTableAdapter")

def add_only_to_blink(table: Table) -> Table:
""" Creates a blink table from the given add-only table. The blink table contains the rows added in the latest
update cycle.
Note that the use of this function should be limited to add-only tables that are not fully in-memory, or when
blink table specific aggregation semantics are desired. If the table is fully in-memory, creating a downstream blink
table is not recommended because it doesn't achieve the main benefit of blink tables, which is to reduce memory usage
but instead increases memory usage.
Args:
table (Table): the source table
Returns:
a blink table
Raises:
DHError
"""
try:
return Table(j_table=_JAddOnlyToBlinkTableAdapter.toBlink(table.j_table))
except Exception as e:
raise DHError(e, "failed to create a blink table.") from e

def blink_to_append_only(table: Table) -> Table:
""" Creates an 'append only' table from the blink table.
Expand Down
13 changes: 12 additions & 1 deletion py/server/tests/test_table_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from deephaven.table_factory import DynamicTableWriter, InputTable, ring_table
from tests.testbase import BaseTestCase
from deephaven.table import Table
from deephaven.stream import blink_to_append_only, stream_to_append_only
from deephaven.stream import blink_to_append_only, stream_to_append_only, add_only_to_blink

JArrayList = jpy.get_type("java.util.ArrayList")
_JBlinkTableTools = jpy.get_type("io.deephaven.engine.table.impl.BlinkTableTools")
Expand Down Expand Up @@ -409,6 +409,17 @@ def test_ring_table(self):
self.assertTrue(ring_t.is_refreshing)
self.wait_ticking_table_update(ring_t, 6, 5)

def test_add_only_to_blink(self):
t = time_table("PT00:00:01")
bt = add_only_to_blink(t)
self.assertTrue(bt.is_refreshing)
self.assertTrue(bt.is_blink)

t = empty_table(0).update("Timestamp=nowSystem()")
with self.assertRaises(DHError) as cm:
add_only_to_blink(t)
self.assertIn("failed to create a blink table", str(cm.exception))

def test_blink_to_append_only(self):
_JTimeTable = jpy.get_type("io.deephaven.engine.table.impl.TimeTable")
_JBaseTable = jpy.get_type("io.deephaven.engine.table.impl.BaseTable")
Expand Down

0 comments on commit 79b8d8a

Please sign in to comment.