From 76f27bec85364d1b397ff7764d5ed7299ca465fb Mon Sep 17 00:00:00 2001 From: Nate Bauernfeind Date: Wed, 21 Aug 2024 14:30:18 -0600 Subject: [PATCH] fix: Parallel Where should not NPE on a Big TableUpdate With Both Adds and Mods (#5961) --- .../table/impl/AbstractFilterExecution.java | 4 +- .../impl/QueryTableWhereParallelTest.java | 48 +++++++++++++++++++ 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java index 3659e8bb530..6c388b3ee7d 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java @@ -190,12 +190,12 @@ private void doFilterParallel( // Clean up the row sets created by the filter. try (final RowSet ignored = adds; final RowSet ignored2 = mods) { - if (addedResult != null) { + if (addedResult != null && adds != null) { synchronized (addedResult) { addedResult.insert(adds); } } - if (modifiedResult != null) { + if (modifiedResult != null && mods != null) { synchronized (modifiedResult) { modifiedResult.insert(mods); } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereParallelTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereParallelTest.java index 65b27a6b380..0659f11a139 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereParallelTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereParallelTest.java @@ -5,12 +5,23 @@ import gnu.trove.list.TLongList; import gnu.trove.list.array.TLongArrayList; +import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.rowset.RowSet; +import io.deephaven.engine.rowset.RowSetFactory; +import io.deephaven.engine.rowset.RowSetShiftData; import io.deephaven.engine.rowset.WritableRowSet; +import io.deephaven.engine.table.ModifiedColumnSet; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; +import io.deephaven.engine.table.TableUpdate; import io.deephaven.engine.table.impl.select.WhereFilter; import io.deephaven.engine.table.impl.select.WhereFilterImpl; +import io.deephaven.engine.table.impl.sources.RowKeyColumnSource; +import io.deephaven.engine.table.impl.sources.RowPositionColumnSource; +import io.deephaven.engine.testutil.ControlledUpdateGraph; +import io.deephaven.engine.testutil.EvalNugget; +import io.deephaven.engine.testutil.EvalNuggetInterface; +import io.deephaven.engine.testutil.TstUtils; import io.deephaven.engine.util.TableTools; import io.deephaven.test.types.OutOfBandTest; import org.jetbrains.annotations.NotNull; @@ -21,6 +32,7 @@ import java.util.Collections; import java.util.List; +import java.util.Map; import static org.junit.Assert.assertEquals; @@ -119,4 +131,40 @@ public WhereFilter copy() { QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT = oldSize; } } + + @Test + public void testParallelExecutionViaTableUpdate() { + final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + + final long oldSize = QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT; + try { + QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT = 1_000; + final QueryTable table = TstUtils.testRefreshingTable(RowSetFactory.flat(1500).toTracking()) + .withAdditionalColumns(Map.of("K", new RowKeyColumnSource())); + table.setRefreshing(true); + table.setAttribute(BaseTable.TEST_SOURCE_TABLE_ATTRIBUTE, true); + final Table source = table.updateView("J = ii % 2 == 0 ? K : 0"); + + final EvalNuggetInterface[] en = new EvalNuggetInterface[] { + EvalNugget.from(() -> source.where("K == J")), + }; + + updateGraph.runWithinUnitTestCycle(() -> { + final RowSet added = RowSetFactory.fromRange(1500, 2999); + final RowSet modified = RowSetFactory.fromRange(0, 1499); + table.getRowSet().writableCast().insert(added); + + final TableUpdate upstream = new TableUpdateImpl( + added, RowSetFactory.empty(), modified, RowSetShiftData.EMPTY, ModifiedColumnSet.ALL); + + table.notifyListeners(upstream); + }); + + // Ensure the table is as expected. + TstUtils.validate(en); + + } finally { + QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT = oldSize; + } + } }