Skip to content

Commit

Permalink
fix: Parallel Where should not NPE on a Big TableUpdate With Both Add…
Browse files Browse the repository at this point in the history
…s and Mods (#5961)
  • Loading branch information
nbauernfeind committed Aug 21, 2024
1 parent b04b54b commit 76f27be
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,12 @@ private void doFilterParallel(
// Clean up the row sets created by the filter.
try (final RowSet ignored = adds;
final RowSet ignored2 = mods) {
if (addedResult != null) {
if (addedResult != null && adds != null) {
synchronized (addedResult) {
addedResult.insert(adds);
}
}
if (modifiedResult != null) {
if (modifiedResult != null && mods != null) {
synchronized (modifiedResult) {
modifiedResult.insert(mods);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,23 @@

import gnu.trove.list.TLongList;
import gnu.trove.list.array.TLongArrayList;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.ModifiedColumnSet;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.impl.select.WhereFilter;
import io.deephaven.engine.table.impl.select.WhereFilterImpl;
import io.deephaven.engine.table.impl.sources.RowKeyColumnSource;
import io.deephaven.engine.table.impl.sources.RowPositionColumnSource;
import io.deephaven.engine.testutil.ControlledUpdateGraph;
import io.deephaven.engine.testutil.EvalNugget;
import io.deephaven.engine.testutil.EvalNuggetInterface;
import io.deephaven.engine.testutil.TstUtils;
import io.deephaven.engine.util.TableTools;
import io.deephaven.test.types.OutOfBandTest;
import org.jetbrains.annotations.NotNull;
Expand All @@ -21,6 +32,7 @@

import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.assertEquals;

Expand Down Expand Up @@ -119,4 +131,40 @@ public WhereFilter copy() {
QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT = oldSize;
}
}

@Test
public void testParallelExecutionViaTableUpdate() {
final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();

final long oldSize = QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT;
try {
QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT = 1_000;
final QueryTable table = TstUtils.testRefreshingTable(RowSetFactory.flat(1500).toTracking())
.withAdditionalColumns(Map.of("K", new RowKeyColumnSource()));
table.setRefreshing(true);
table.setAttribute(BaseTable.TEST_SOURCE_TABLE_ATTRIBUTE, true);
final Table source = table.updateView("J = ii % 2 == 0 ? K : 0");

final EvalNuggetInterface[] en = new EvalNuggetInterface[] {
EvalNugget.from(() -> source.where("K == J")),
};

updateGraph.runWithinUnitTestCycle(() -> {
final RowSet added = RowSetFactory.fromRange(1500, 2999);
final RowSet modified = RowSetFactory.fromRange(0, 1499);
table.getRowSet().writableCast().insert(added);

final TableUpdate upstream = new TableUpdateImpl(
added, RowSetFactory.empty(), modified, RowSetShiftData.EMPTY, ModifiedColumnSet.ALL);

table.notifyListeners(upstream);
});

// Ensure the table is as expected.
TstUtils.validate(en);

} finally {
QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT = oldSize;
}
}
}

0 comments on commit 76f27be

Please sign in to comment.