From b48fc91b3ee0b1baae1687ac934f7d5275f4f209 Mon Sep 17 00:00:00 2001 From: Larry Booker Date: Tue, 5 Dec 2023 15:44:27 -0800 Subject: [PATCH] Accepted RWC changes, additional improvements and DynamicWhereFilter tests passing. --- .../deephaven/engine/table/ColumnSource.java | 15 +- .../io/deephaven/engine/table/DataIndex.java | 72 ++++- .../table/impl/AbstractColumnSource.java | 12 +- .../engine/table/impl/QueryTable.java | 14 +- .../engine/table/impl/TimeTable.java | 5 +- .../table/impl/WouldMatchOperation.java | 2 +- .../table/impl/dataindex/BaseDataIndex.java | 8 +- .../impl/dataindex/DataIndexBuilder.java | 24 -- .../table/impl/dataindex/DataIndexKeySet.java | 94 ++++++ .../dataindex/PartitioningIndexProvider.java | 64 ---- .../dataindex/StorageBackedDataIndexImpl.java | 2 +- .../dataindex/TableBackedDataIndexImpl.java | 12 +- .../table/impl/indexer/DataIndexer.java | 101 ++++++- .../table/impl/select/DynamicWhereFilter.java | 277 ++++++++++-------- .../engine/table/impl/select/MatchFilter.java | 35 ++- .../engine/table/impl/select/WhereFilter.java | 20 +- .../impl/sources/DelegatingColumnSource.java | 6 +- .../sources/RowSetColumnSourceWrapper.java | 11 +- .../regioned/RegionedColumnSource.java | 3 - .../regioned/RegionedColumnSourceBase.java | 2 - .../deephaven/engine/util/TimeTableTest.java | 8 +- 21 files changed, 505 insertions(+), 282 deletions(-) delete mode 100644 engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/DataIndexBuilder.java create mode 100644 engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/DataIndexKeySet.java delete mode 100644 engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/PartitioningIndexProvider.java diff --git a/engine/api/src/main/java/io/deephaven/engine/table/ColumnSource.java b/engine/api/src/main/java/io/deephaven/engine/table/ColumnSource.java index 4a690dc86c9..91202d07945 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/ColumnSource.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/ColumnSource.java @@ -39,11 +39,24 @@ default ChunkType getChunkType() { return ChunkType.fromElementType(dataType); } + /** + * Return a {@link RowSet row set} where the values in the column source match the given keys. + * + * @param invertMatch Whether to invert the match, i.e. return the rows where the values do not match the given keys + * @param usePrev Whether to use the previous values for the ColumnSource + * @param caseInsensitive Whether to perform a case insensitive match + * @param dataIndex An optional data index that can be used to accelerate the match (the index table must be + * included in snapshot controls or otherwise guaranteed to be current) + * @param mapper Restrict results to this row set + * @param keys The keys to match in the column + * + * @return The rows that match the given keys + */ WritableRowSet match( boolean invertMatch, boolean usePrev, boolean caseInsensitive, - @NotNull final RowSet fullSet, + @Nullable final DataIndex dataIndex, @NotNull RowSet mapper, Object... keys); diff --git a/engine/api/src/main/java/io/deephaven/engine/table/DataIndex.java b/engine/api/src/main/java/io/deephaven/engine/table/DataIndex.java index f183b6c4d5d..b8740447c34 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/DataIndex.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/DataIndex.java @@ -15,7 +15,8 @@ */ public interface DataIndex extends LivenessReferent { /** - * Provides a lookup function from {@code key} to the position in the index table. Keys are specified as follows: + * Provides a lookup function from {@code key} to the position in the index table. Keys consist of reinterpreted + * values and are specified as follows: *
*
No key columns
*
"Empty" keys are signified by any zero-length {@code Object[]}
@@ -37,8 +38,8 @@ interface PositionLookup { } /** - * Provides a lookup function from {@code key} to the {@link RowSet} containing the matching table rows. Keys are - * specified as follows: + * Provides a lookup function from {@code key} to the {@link RowSet} containing the matching table rows. Keys + * consist of reinterpreted values and are specified as follows: *
*
No key columns
*
"Empty" keys are signified by any zero-length {@code Object[]}
@@ -68,6 +69,14 @@ interface RowSetLookup { /** Get the output row set column name for this index. */ String rowSetColumnName(); + /** Return the index table key sources in the order of the index table. **/ + @FinalDefault + default ColumnSource[] indexKeyColumns() { + final ColumnSource[] columnSources = keyColumnMap().keySet().toArray(new ColumnSource[0]); + return indexKeyColumns(columnSources); + // TODO-RWC: Should this be in a static helper instead of the interface? + } + /** Return the index table key sources in the relative order of the indexed sources supplied. **/ @FinalDefault default ColumnSource[] indexKeyColumns(@NotNull final ColumnSource[] columnSources) { @@ -96,7 +105,7 @@ default ColumnSource rowSetColumn() { Table table(); /** - * Build a {@link RowSetLookup lookup} function of index row sets for this index. If {@link #isRefreshing()} is + * Return a {@link RowSetLookup lookup} function of index row sets for this index. If {@link #isRefreshing()} is * true, this lookup function is guaranteed to be accurate only for the current cycle. * * @return a function that provides map-like lookup of matching rows from an index key. @@ -104,6 +113,61 @@ default ColumnSource rowSetColumn() { @NotNull RowSetLookup rowSetLookup(); + /** + * Return a {@link RowSetLookup lookup} function of index row sets for this index. If {@link #isRefreshing()} is + * true, this lookup function is guaranteed to be accurate only for the current cycle. The keys provided must be in + * the order of the {@code lookupSources}. + * + * @return a function that provides map-like lookup of matching rows from an index key. + */ + @NotNull + @FinalDefault + default RowSetLookup rowSetLookup(@NotNull final ColumnSource[] lookupSources) { + if (lookupSources.length == 1) { + // Trivially ordered. + return rowSetLookup(); + } + + final ColumnSource[] indexSourceColumns = keyColumnMap().keySet().toArray(ColumnSource[]::new); + if (Arrays.equals(lookupSources, indexSourceColumns)) { + // Order matches, so we can use the default lookup function. + return rowSetLookup(); + } + + // We need to wrap the lookup function with a key remapping function. + + // Maps index keys -> user-supplied keys + final int[] indexToUserMapping = new int[lookupSources.length]; + + // Build an intermediate map (N^2 loop but N is small and this is called rarely). + for (int ii = 0; ii < indexSourceColumns.length; ++ii) { + boolean found = false; + for (int jj = 0; jj < lookupSources.length; ++jj) { + if (indexSourceColumns[ii] == lookupSources[jj]) { + indexToUserMapping[ii] = jj; + found = true; + break; + } + } + if (!found) { + throw new IllegalArgumentException("The provided columns must match the data index key columns"); + } + } + + return (key, usePrev) -> { + // This is the key provided by the caller. + final Object[] keys = (Object[]) key; + // This is the key we need to provide to the lookup function. + final Object[] remappedKey = new Object[keys.length]; + + for (int ii = 0; ii < remappedKey.length; ++ii) { + remappedKey[ii] = keys[indexToUserMapping[ii]]; + } + + return rowSetLookup().apply(remappedKey, usePrev); + }; + } + /** * Build a {@link PositionLookup lookup} of positions for this index. If {@link #isRefreshing()} is true, this * lookup function is guaranteed to be accurate only for the current cycle. diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java index 6b386ee4655..80fa2a82da3 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java @@ -17,7 +17,6 @@ import io.deephaven.engine.table.impl.chunkfillers.ChunkFiller; import io.deephaven.engine.table.impl.chunkfilter.ChunkFilter; import io.deephaven.engine.table.impl.chunkfilter.ChunkMatchFilterFactory; -import io.deephaven.engine.table.impl.indexer.DataIndexer; import io.deephaven.engine.table.impl.sources.UnboxedLongBackedColumnSource; import io.deephaven.engine.table.iterators.ChunkedColumnIterator; import io.deephaven.engine.updategraph.UpdateGraph; @@ -125,14 +124,11 @@ public WritableRowSet match( final boolean invertMatch, final boolean usePrev, final boolean caseInsensitive, - @NotNull final RowSet fullSet, + @Nullable final DataIndex dataIndex, @NotNull final RowSet mapper, final Object... keys) { - final DataIndexer dataIndexer = fullSet.isTracking() ? DataIndexer.of(fullSet.trackingCast()) : null; - if (dataIndexer != null && dataIndexer.hasDataIndex(this)) { - final DataIndex dataIndex = dataIndexer.getDataIndex(this); - + if (dataIndex != null) { final RowSetBuilderRandom allInMatchingGroups = RowSetFactory.builderRandom(); if (caseInsensitive && (type == String.class)) { @@ -162,10 +158,10 @@ public WritableRowSet match( } } } else { - // Use the lookup function + // Use the lookup function to get the matching RowSets intersected with the mapper final DataIndex.RowSetLookup rowSetLookup = dataIndex.rowSetLookup(); for (Object key : keys) { - RowSet range = rowSetLookup.apply(key, usePrev); + final RowSet range = rowSetLookup.apply(key, usePrev); if (range != null) { allInMatchingGroups.addRowSet(range); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index 8f77ffecfae..c960f39cae2 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -1220,8 +1220,18 @@ private QueryTable whereInternal(final WhereFilter... filters) { } return memoizeResult(MemoizedOperationKey.filter(filters), () -> { - final OperationSnapshotControl snapshotControl = - createSnapshotControlIfRefreshing(OperationSnapshotControl::new); + // Request the data indexes from the filters so we can include any index tables in the + // snapshot control. + final List dataIndexList = new ArrayList<>(); + Arrays.stream(filters).forEach(filter -> dataIndexList.addAll(filter.getDataIndexes(this))); + final NotificationStepSource[] dataIndexTables = dataIndexList.stream() + .map(di -> (NotificationStepSource) di.table()) + .toArray(NotificationStepSource[]::new); + + final OperationSnapshotControl snapshotControl = createSnapshotControlIfRefreshing( + (final BaseTable parent) -> dataIndexTables.length > 0 + ? new OperationSnapshotControlEx(parent, dataIndexTables) + : new OperationSnapshotControl(parent)); final Mutable result = new MutableObject<>(); initializeWithSnapshot("where", snapshotControl, diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/TimeTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/TimeTable.java index 64110b34eb7..239497ae9cd 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/TimeTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/TimeTable.java @@ -17,6 +17,7 @@ import io.deephaven.engine.rowset.WritableRowSet; import io.deephaven.engine.rowset.chunkattributes.RowKeys; import io.deephaven.engine.table.ColumnSource; +import io.deephaven.engine.table.DataIndex; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.impl.sources.FillUnordered; import io.deephaven.engine.updategraph.UpdateSourceRegistrar; @@ -250,7 +251,7 @@ public WritableRowSet match( final boolean invertMatch, final boolean usePrev, final boolean caseInsensitive, - @NotNull final RowSet fullSet, + @Nullable final DataIndex dataIndex, @NotNull final RowSet selection, final Object... keys) { if (startTime == null) { @@ -393,7 +394,7 @@ public WritableRowSet match( final boolean invertMatch, final boolean usePrev, final boolean caseInsensitive, - @NotNull final RowSet fullSet, + @Nullable final DataIndex dataIndex, @NotNull final RowSet selection, final Object... keys) { if (startTime == null) { diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/WouldMatchOperation.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/WouldMatchOperation.java index 89508974820..17750ad2898 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/WouldMatchOperation.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/WouldMatchOperation.java @@ -317,7 +317,7 @@ public WritableRowSet match( final boolean invertMatch, final boolean usePrev, final boolean caseInsensitive, - @NotNull final RowSet fullSet, + @Nullable final DataIndex dataIndex, @NotNull final RowSet mapper, final Object... keys) { boolean hasFalse = false; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/BaseDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/BaseDataIndex.java index 4c53e6e2615..b62588c7b61 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/BaseDataIndex.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/BaseDataIndex.java @@ -14,6 +14,7 @@ import io.deephaven.engine.table.impl.OperationSnapshotControl; import io.deephaven.engine.table.impl.QueryTable; import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; +import io.deephaven.engine.table.impl.sources.ReinterpretUtils; import io.deephaven.engine.table.impl.sources.RowSetColumnSourceWrapper; import io.deephaven.engine.table.iterators.ChunkedColumnIterator; import io.deephaven.engine.table.iterators.ColumnIterator; @@ -119,7 +120,7 @@ protected int hash(Object key) { * @param rowSetColumn The name of the row set column to wrap * @return The copied table */ - protected static QueryTable wrappedRowSetTable( + protected static QueryTable indexTableWrapper( @NotNull final QueryTable parent, @NotNull final String rowSetColumn) { // TODO-RWC/LAB: Use new assertions to assert that parent has a RowSet ColumnSource of name rowSetColumn. @@ -133,9 +134,10 @@ protected static QueryTable wrappedRowSetTable( if (columnName.equals(rowSetColumn)) { resultColumnSourceMap.put( columnName, RowSetColumnSourceWrapper.from(parent.getColumnSource(rowSetColumn))); - return; + } else { + // Convert the key columns to primitive column sources. + resultColumnSourceMap.put(columnName, ReinterpretUtils.maybeConvertToPrimitive(columnSource)); } - resultColumnSourceMap.put(columnName, columnSource); }); final OperationSnapshotControl snapshotControl = parent.createSnapshotControlIfRefreshing(OperationSnapshotControl::new); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/DataIndexBuilder.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/DataIndexBuilder.java deleted file mode 100644 index 12073d081e4..00000000000 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/DataIndexBuilder.java +++ /dev/null @@ -1,24 +0,0 @@ -package io.deephaven.engine.table.impl.dataindex; - -import io.deephaven.engine.rowset.RowSet; -import io.deephaven.engine.table.ColumnDefinition; -import io.deephaven.engine.table.impl.locations.ColumnLocation; -import org.jetbrains.annotations.NotNull; - -public interface DataIndexBuilder { - - void addSource(final int regionIndex, - @NotNull final ColumnLocation columnLocation, - @NotNull final RowSet locationRowSetInTable); - - ColumnDefinition getColumnDefinition(); - - - static PartitioningIndexProvider makeBuilder( - @NotNull final ColumnDefinition columnDefinition) { - if (columnDefinition.isPartitioning()) { - return new PartitioningIndexProvider<>(columnDefinition); - } - return new PartitioningIndexProvider<>(columnDefinition); - } -} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/DataIndexKeySet.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/DataIndexKeySet.java new file mode 100644 index 00000000000..b7d86cfcb93 --- /dev/null +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/DataIndexKeySet.java @@ -0,0 +1,94 @@ +package io.deephaven.engine.table.impl.dataindex; + +import io.deephaven.hash.KeyedObjectHashSet; +import io.deephaven.hash.KeyedObjectKey; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.util.Arrays; +import java.util.Objects; +import java.util.function.Consumer; + +public class DataIndexKeySet { + private static final KeyedObjectKey INSTANCE = new KeyHashAdapter(); + private static final Object NULL_OBJECT_KEY = new Object(); + + private final KeyedObjectHashSet set; + + private static class KeyHashAdapter implements KeyedObjectKey { + private KeyHashAdapter() {} + + @Override + public Object getKey(@NotNull final Object value) { + return value; + } + + @Override + public int hashKey(@Nullable final Object key) { + if (key instanceof Object[]) { + return Arrays.hashCode((Object[]) key); + } + return Objects.hashCode(key); + } + + @Override + public boolean equalKey(@Nullable final Object key, @NotNull final Object value) { + if (key instanceof Object[] && value instanceof Object[]) { + return Arrays.equals((Object[]) key, (Object[]) value); + } + return Objects.equals(key, value); + } + } + + + + public DataIndexKeySet(final int initialCapacity) { + set = new KeyedObjectHashSet<>(initialCapacity, INSTANCE); + } + + public DataIndexKeySet() { + set = new KeyedObjectHashSet<>(INSTANCE); + } + + public boolean remove(Object o) { + if (o == null) { + return set.remove(NULL_OBJECT_KEY); + } + return set.removeValue(o); + } + + public boolean add(Object o) { + if (o == null) { + return set.add(NULL_OBJECT_KEY); + } + return set.add(o); + } + + public boolean contains(Object o) { + if (o == null) { + return set.contains(NULL_OBJECT_KEY); + } + return set.contains(o); + } + + public void forEach(Consumer action) { + set.forEach(key -> { + if (key == NULL_OBJECT_KEY) { + action.accept(null); + } else { + action.accept(key); + } + }); + } + + public Object[] toArray() { + final Object[] result = set.toArray(); + for (int ii = 0; ii < result.length; ii++) { + if (result[ii] == NULL_OBJECT_KEY) { + result[ii] = null; + } + } + return result; + } + +} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/PartitioningIndexProvider.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/PartitioningIndexProvider.java deleted file mode 100644 index 24a82cf7e25..00000000000 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/PartitioningIndexProvider.java +++ /dev/null @@ -1,64 +0,0 @@ -package io.deephaven.engine.table.impl.dataindex; - -import io.deephaven.engine.rowset.RowSet; -import io.deephaven.engine.rowset.RowSetFactory; -import io.deephaven.engine.table.ColumnDefinition; -import io.deephaven.engine.table.ColumnSource; -import io.deephaven.engine.table.Table; -import io.deephaven.engine.table.WritableColumnSource; -import io.deephaven.engine.table.impl.QueryTable; -import io.deephaven.engine.table.impl.locations.ColumnLocation; -import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource; -import io.deephaven.engine.table.impl.sources.ObjectArraySource; -import org.jetbrains.annotations.NotNull; - -import java.util.LinkedHashMap; -import java.util.Map; - -import static io.deephaven.engine.table.impl.dataindex.BaseDataIndex.INDEX_COL_NAME; - -/** - * This class will provide methods to build a deferred single-column index (i.e. grouping) for a partitioning - * storage-backed table. - */ -public class PartitioningIndexProvider implements DataIndexBuilder { - private final ColumnDefinition columnDefinition; - - private final WritableColumnSource valueSource; - private final ObjectArraySource rowSetSource; - - private final Table locationTable; - - public PartitioningIndexProvider(@NotNull final ColumnDefinition columnDefinition) { - this.columnDefinition = columnDefinition; - valueSource = ArrayBackedColumnSource.getMemoryColumnSource(10, columnDefinition.getDataType(), null); - rowSetSource = - (ObjectArraySource) ArrayBackedColumnSource.getMemoryColumnSource(10, RowSet.class, null); - - final Map> columnSourceMap = new LinkedHashMap<>(); - columnSourceMap.put(columnDefinition.getName(), valueSource); - columnSourceMap.put(INDEX_COL_NAME, rowSetSource); - locationTable = new QueryTable(RowSetFactory.empty().toTracking(), columnSourceMap); - } - - @Override - public void addSource(final int regionIndex, - @NotNull final ColumnLocation columnLocation, - @NotNull final RowSet locationRowSetInTable) { - valueSource.ensureCapacity(regionIndex + 1); - rowSetSource.ensureCapacity(regionIndex + 1); - final DATA_TYPE columnPartitionValue = - columnLocation.getTableLocation().getKey().getPartitionValue(columnDefinition.getName()); - valueSource.set(regionIndex, columnPartitionValue); - rowSetSource.set(regionIndex, locationRowSetInTable); - } - - @Override - public ColumnDefinition getColumnDefinition() { - return columnDefinition; - } - - public Table table() { - return locationTable; - } -} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/StorageBackedDataIndexImpl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/StorageBackedDataIndexImpl.java index 1bb9c836c40..b35211cfdd8 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/StorageBackedDataIndexImpl.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/StorageBackedDataIndexImpl.java @@ -290,7 +290,7 @@ public Table table() { final Table mergedOutput = transformed.merge(); final QueryTable result = - wrappedRowSetTable((QueryTable) mergedOutput.select(), INDEX_COL_NAME); + indexTableWrapper((QueryTable) mergedOutput.select(), INDEX_COL_NAME); result.setRefreshing(columnSourceManager.locationTable().isRefreshing()); return result; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TableBackedDataIndexImpl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TableBackedDataIndexImpl.java index f357f9dcd66..29f467307a7 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TableBackedDataIndexImpl.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TableBackedDataIndexImpl.java @@ -102,7 +102,7 @@ public Table table() { Collections .singleton(Pair.of(EXPOSED_GROUP_ROW_SETS, ColumnName.of(INDEX_COL_NAME)))); - return wrappedRowSetTable(renamed, INDEX_COL_NAME); + return indexTableWrapper(renamed, INDEX_COL_NAME); }); } return indexTable; @@ -112,17 +112,17 @@ public Table table() { public @Nullable RowSetLookup rowSetLookup() { final ColumnSource rowSetColumnSource = rowSetColumn(); return (Object key, boolean usePrev) -> { - // Pass the object to the position lookup, then return the row set at that position. + // Pass the object to the position lookup and get the resulting position. final int position = lookupFunction.get(key); if (position == AggregationRowLookup.DEFAULT_UNKNOWN_ROW) { return null; } + + // Aggregations return a dense result, so this position can be used directly as a row key. if (usePrev) { - final long prevRowKey = table().getRowSet().prev().get(position); - return rowSetColumnSource.getPrev(prevRowKey); + return rowSetColumnSource.getPrev(position); } else { - final long rowKey = table().getRowSet().get(position); - return rowSetColumnSource.get(rowKey); + return rowSetColumnSource.get(position); } }; } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/indexer/DataIndexer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/indexer/DataIndexer.java index 38fe3162ac1..c8a87911e09 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/indexer/DataIndexer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/indexer/DataIndexer.java @@ -3,6 +3,7 @@ */ package io.deephaven.engine.table.impl.indexer; +import com.google.common.collect.Sets; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.TrackingRowSet; import io.deephaven.engine.table.ColumnSource; @@ -106,14 +107,7 @@ public boolean canMakeDataIndex(final Table table, final Collection keyC * @return the DataIndex, or null if one does not exist */ public DataIndex getDataIndex(final ColumnSource... keyColumns) { - final WeakHashMap, DataIndexCache> localRoot = root; - if (localRoot == EMPTY_ROOT) { - return null; - } - // noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (localRoot) { - return findIndex(localRoot, Arrays.asList(keyColumns)); - } + return getDataIndexInternal(Arrays.asList(keyColumns)); } /** @@ -137,17 +131,98 @@ public DataIndex getDataIndex(final Table sourceTable, final String... keyColumn final Collection> keyColumns = Arrays.stream(keyColumnNames) .map(columnSourceMap::get).collect(Collectors.toList()); - // If we don't have an index, return null. - if (!hasDataIndex(keyColumns)) { + return getDataIndexInternal(keyColumns); + } + + @Nullable + private DataIndex getDataIndexInternal(final Collection> keyColumns) { + // Return null if there are no indexes. + final WeakHashMap, DataIndexCache> localRoot = root; + if (keyColumns.isEmpty() || localRoot == EMPTY_ROOT) { return null; } - // Return an index if one exists. - final WeakHashMap, DataIndexCache> localRoot = root; // noinspection SynchronizationOnLocalVariableOrMethodParameter synchronized (localRoot) { - return findIndex(ensureRoot(), keyColumns); + // Only return a valid index. + final DataIndex dataIndex = findIndex(localRoot, keyColumns); + if (dataIndex == null || !((BaseDataIndex) dataIndex).validate()) { + return null; + } + // Add this index to the current liveness scope so it isn't released while in use. + ((BaseDataIndex) dataIndex).manageWithCurrentScope(); + return dataIndex; + } + } + + /** + * Return a {@link DataIndex} for a strict subset of the given key columns, or null if no such index exists. Will + * choose the data index that results in the largest index table, following the assumption that the largest index + * table will divide the source table into the most specific partitions. + * + * @param sourceTable The table that is indexed + * @param keyColumnNames The column names to use for the index + * + * @return The optimal partial index, or null if no such index exists + */ + @Nullable + public DataIndex getOptimalPartialIndex(final Table sourceTable, final String... keyColumnNames) { + final Map> columnSourceMap = sourceTable.getColumnSourceMap(); + // Verify all the key columns belong to the source table. + final Collection missingKeys = Arrays.stream(keyColumnNames) + .filter(key -> !columnSourceMap.containsKey(key)).collect(Collectors.toList()); + if (!missingKeys.isEmpty()) { + throw new IllegalArgumentException( + "The following columns were not found in the provide table: " + missingKeys); } + + // Create a collection of the table key columns. + final Collection> keyColumns = Arrays.stream(keyColumnNames) + .map(columnSourceMap::get).collect(Collectors.toList()); + + return getOptimalPartialIndex(keyColumns); + } + + /** + * Return a {@link DataIndex} for a strict subset of the given key columns, or null if no such index exists. Will + * choose the data index that results in the largest index table, following the assumption that the largest index + * table will divide the source table into the most specific partitions. + * + * @param keyColumns The column sources to consider for the index + * @return The optimal partial index, or null if no such index exists + */ + @Nullable + public DataIndex getOptimalPartialIndex(final Collection> keyColumns) { + // Return null if there are no indexes. + final WeakHashMap, DataIndexCache> localRoot = root; + if (keyColumns.isEmpty() || localRoot == EMPTY_ROOT) { + return null; + } + + // Create a power set of the key columns. + final Set> keyColumnSet = new HashSet<>(keyColumns); + Set>> keyColumnPowerSet = Sets.powerSet(keyColumnSet); + + DataIndex optimalIndex = null; + + for (Set> keyColumnSubset : keyColumnPowerSet) { + if (keyColumnSubset.isEmpty() || keyColumnSubset.size() == keyColumnSet.size()) { + // Won't consider the empty or full set. + continue; + } + + // noinspection SynchronizationOnLocalVariableOrMethodParameter + synchronized (localRoot) { + final DataIndex partialIndex = findIndex(localRoot, keyColumnSubset); + // The winner is index with the most rows. + if (optimalIndex == null || + (partialIndex != null && partialIndex.table().size() > optimalIndex.table().size())) { + optimalIndex = partialIndex; + } + } + } + + return optimalIndex; } /** diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/DynamicWhereFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/DynamicWhereFilter.java index 8fd64a1c7cc..a8694f587d9 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/DynamicWhereFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/DynamicWhereFilter.java @@ -5,26 +5,23 @@ import io.deephaven.base.log.LogOutput; import io.deephaven.base.verify.Assert; +import io.deephaven.chunk.*; import io.deephaven.chunk.attributes.Values; -import io.deephaven.datastructures.util.CollectionUtil; import io.deephaven.engine.primitive.iterator.CloseableIterator; import io.deephaven.engine.rowset.*; -import io.deephaven.engine.rowset.RowSetFactory; +import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys; import io.deephaven.engine.table.*; +import io.deephaven.engine.table.impl.*; +import io.deephaven.engine.table.impl.dataindex.DataIndexUtils; +import io.deephaven.engine.table.impl.dataindex.DataIndexKeySet; import io.deephaven.engine.table.impl.indexer.DataIndexer; +import io.deephaven.engine.table.impl.select.setinclusion.SetInclusionKernel; import io.deephaven.engine.table.iterators.ChunkedColumnIterator; -import io.deephaven.engine.updategraph.NotificationQueue; import io.deephaven.engine.updategraph.DynamicNode; -import io.deephaven.engine.table.impl.*; -import io.deephaven.engine.table.impl.select.setinclusion.SetInclusionKernel; -import io.deephaven.chunk.Chunk; -import io.deephaven.chunk.WritableBooleanChunk; -import io.deephaven.chunk.WritableLongChunk; -import io.deephaven.engine.table.impl.TupleSourceFactory; +import io.deephaven.engine.updategraph.NotificationQueue; import io.deephaven.engine.updategraph.UpdateGraph; -import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys; -import io.deephaven.util.SafeCloseable; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import java.util.*; @@ -38,10 +35,10 @@ public class DynamicWhereFilter extends WhereFilterLivenessArtifactImpl implemen private final boolean setRefreshing; private final MatchPair[] matchPairs; - private final TupleSource setTupleSource; + private final ChunkSource.WithPrev setKeySource; private final boolean inclusion; - private final HashSet liveValues = new HashSet<>(); + private final DataIndexKeySet liveValues; private boolean liveValuesArrayValid = false; private boolean kernelValid = false; private Object[] liveValuesArray = null; @@ -54,6 +51,10 @@ public class DynamicWhereFilter extends WhereFilterLivenessArtifactImpl implemen // this reference must be maintained for reachability private final InstrumentedTableUpdateListener setUpdateListener; + /** Stores the optimal data index for this filter. */ + @Nullable + private DataIndex sourceDataIndex; + private RecomputeListener listener; private QueryTable resultTable; @@ -66,15 +67,17 @@ public DynamicWhereFilter(final QueryTable setTable, final boolean inclusion, fi this.matchPairs = setColumnsNames; this.inclusion = inclusion; + liveValues = new DataIndexKeySet(); + final ColumnSource[] setColumns = Arrays.stream(matchPairs) .map(mp -> setTable.getColumnSource(mp.rightColumn())).toArray(ColumnSource[]::new); if (setRefreshing) { this.setTable = setTable; - setTupleSource = TupleSourceFactory.makeTupleSource(setColumns); + setKeySource = DataIndexUtils.makeBoxedKeySource(setColumns); if (setTable.getRowSet().isNonempty()) { try (final CloseableIterator initialKeysIterator = ChunkedColumnIterator.make( - setTupleSource, setTable.getRowSet(), getChunkSize(setTable.getRowSet()))) { + setKeySource, setTable.getRowSet(), getChunkSize(setTable.getRowSet()))) { initialKeysIterator.forEachRemaining(this::addKey); } } @@ -98,7 +101,7 @@ public void onUpdate(final TableUpdate upstream) { // Remove removed keys if (hasRemoves) { try (final CloseableIterator removedKeysIterator = ChunkedColumnIterator.make( - setTupleSource.getPrevSource(), upstream.removed(), getChunkSize(upstream.removed()))) { + setKeySource.getPrevSource(), upstream.removed(), getChunkSize(upstream.removed()))) { removedKeysIterator.forEachRemaining(DynamicWhereFilter.this::removeKey); } } @@ -108,10 +111,10 @@ public void onUpdate(final TableUpdate upstream) { if (hasModifies) { // @formatter:off try (final CloseableIterator preModifiedKeysIterator = ChunkedColumnIterator.make( - setTupleSource.getPrevSource(), upstream.getModifiedPreShift(), + setKeySource.getPrevSource(), upstream.getModifiedPreShift(), getChunkSize(upstream.getModifiedPreShift())); final CloseableIterator postModifiedKeysIterator = ChunkedColumnIterator.make( - setTupleSource, upstream.modified(), + setKeySource, upstream.modified(), getChunkSize(upstream.modified()))) { // @formatter:on while (preModifiedKeysIterator.hasNext()) { @@ -133,7 +136,7 @@ public void onUpdate(final TableUpdate upstream) { // Add added keys if (hasAdds) { try (final CloseableIterator addedKeysIterator = ChunkedColumnIterator.make( - setTupleSource, upstream.added(), getChunkSize(upstream.added()))) { + setKeySource, upstream.added(), getChunkSize(upstream.added()))) { addedKeysIterator.forEachRemaining(DynamicWhereFilter.this::addKey); } } @@ -170,7 +173,7 @@ public void onFailureInternal(Throwable originalException, Entry sourceEntry) { manage(setUpdateListener); } else { this.setTable = null; - setTupleSource = null; + setKeySource = null; if (setTable.getRowSet().isNonempty()) { final TupleSource temporaryTupleSource = TupleSourceFactory.makeTupleSource(setColumns); try (final CloseableIterator initialKeysIterator = ChunkedColumnIterator.make( @@ -191,7 +194,7 @@ public UpdateGraph getUpdateGraph() { private void removeKey(Object key) { final boolean removed = liveValues.remove(key); - if (!removed) { + if (!removed && key != null) { throw new RuntimeException("Inconsistent state, key not found in set: " + key); } kernelValid = liveValuesArrayValid = false; @@ -211,6 +214,20 @@ private void addKeyUnchecked(Object key) { liveValues.add(key); } + @Nullable + private DataIndex optimalIndex(final Table inputTable) { + final String[] keyColumnNames = MatchPair.getLeftColumns(matchPairs); + + final DataIndexer dataIndexer = DataIndexer.of(inputTable.getRowSet()); + final DataIndex fullIndex = dataIndexer.getDataIndex(inputTable, keyColumnNames); + if (fullIndex != null) { + return fullIndex; + } else { + final DataIndex partialIndex = dataIndexer.getOptimalPartialIndex(inputTable, keyColumnNames); + return partialIndex; + } + } + @Override public List getColumns() { return Arrays.asList(MatchPair.getLeftColumns(matchPairs)); @@ -221,6 +238,17 @@ public List getColumnArrays() { return Collections.emptyList(); } + @Override + public List getDataIndexes(final Table sourceTable) { + if (sourceDataIndex == null) { + sourceDataIndex = optimalIndex(sourceTable); + } + if (sourceDataIndex == null) { + return Collections.emptyList(); + } + return List.of(sourceDataIndex); + } + @Override public void init(TableDefinition tableDefinition) {} @@ -235,58 +263,49 @@ public WritableRowSet filter( throw new PreviousFilteringNotSupported(); } - final ColumnSource[] keyColumns = Arrays.stream(matchPairs) - .map(mp -> table.getColumnSource(mp.leftColumn())).toArray(ColumnSource[]::new); - final TupleSource tupleSource = TupleSourceFactory.makeTupleSource(keyColumns); - final TrackingRowSet trackingSelection = selection.isTracking() ? selection.trackingCast() : null; - if (matchPairs.length == 1) { - // this is just a single column filter so it will actually be exactly right + // Single column filter, delegate to the column source. if (!liveValuesArrayValid) { - liveValuesArray = liveValues.toArray(CollectionUtil.ZERO_LENGTH_OBJECT_ARRAY); + liveValuesArray = liveValues.toArray(); liveValuesArrayValid = true; } return table.getColumnSource(matchPairs[0].leftColumn()) - .match(!inclusion, false, false, fullSet, selection, liveValuesArray); + .match(!inclusion, false, false, sourceDataIndex, selection, liveValuesArray); } - // pick something sensible - if (trackingSelection != null) { - final DataIndexer dataIndexer = DataIndexer.of(trackingSelection); + final ColumnSource[] keyColumns = Arrays.stream(matchPairs) + .map(mp -> table.getColumnSource(mp.leftColumn())).toArray(ColumnSource[]::new); + final ChunkSource keySource = DataIndexUtils.makeBoxedKeySource(keyColumns); - // Do we have an index exactly matching the key columns? - if (dataIndexer.hasDataIndex(keyColumns)) { - final DataIndex dataIndex = dataIndexer.getDataIndex(keyColumns); - final Table indexTable = dataIndex.table(); + if (sourceDataIndex != null) { + // Does our index contain every key column? - if (selection.size() > (indexTable.size() * 2L)) { - return filterFullIndex(selection, dataIndex); + if (sourceDataIndex.keyColumnMap().keySet().containsAll(Arrays.asList(keyColumns))) { + // Even if we have an index, we may be better off with a linear search. + if (selection.size() > (sourceDataIndex.table().size() * 2L)) { + return filterFullIndex(selection, sourceDataIndex, keyColumns); } else { - return filterLinear(selection, tupleSource); + return filterLinear(selection, keyColumns); } } - // Do we have any indexes that partially match the key columns? - final ColumnSource[] indexedSources = Arrays.stream(keyColumns) - .filter(dataIndexer::hasDataIndex).toArray(ColumnSource[]::new); - final ColumnSource[] notIndexedSources = Arrays.stream(keyColumns) - .filter(col -> !dataIndexer.hasDataIndex(col)).toArray(ColumnSource[]::new); - - final OptionalInt minCount = Arrays.stream(indexedSources) - .mapToInt(x -> dataIndexer.getDataIndex(x).table().intSize()).min(); - - if (minCount.isPresent() && (minCount.getAsInt() * 4L) < selection.size()) { - return filterPartialIndexes(trackingSelection, dataIndexer, tupleSource); + // We have a partial index, should we use it? + if (selection.size() > (sourceDataIndex.table().size() * 4L)) { + return filterPartialIndex(selection, sourceDataIndex, keyColumns); } } - return filterLinear(selection, tupleSource); + return filterLinear(selection, keyColumns); } @NotNull - private WritableRowSet filterFullIndex(@NotNull final RowSet selection, final DataIndex dataIndex) { - // Use the RowSetLookup to create a combined row set of matching rows. + private WritableRowSet filterFullIndex( + @NotNull final RowSet selection, + final DataIndex dataIndex, + final ColumnSource[] keyColumns) { + // Use the index RowSetLookup to create a combined row set of matching rows. final RowSetBuilderRandom rowSetBuilder = RowSetFactory.builderRandom(); - final DataIndex.RowSetLookup rowSetLookup = dataIndex.rowSetLookup(); + final DataIndex.RowSetLookup rowSetLookup = dataIndex.rowSetLookup(keyColumns); + liveValues.forEach(key -> { final RowSet rowSet = rowSetLookup.apply(key, false); if (rowSet != null) { @@ -294,128 +313,127 @@ private WritableRowSet filterFullIndex(@NotNull final RowSet selection, final Da } }); + try (final RowSet matchingKeys = rowSetBuilder.build()) { return (inclusion ? matchingKeys.copy() : selection.minus(matchingKeys)); } } @NotNull - private WritableRowSet filterPartialIndexes( + private WritableRowSet filterPartialIndex( @NotNull final RowSet selection, - final DataIndexer dataIndexer, - final TupleSource tupleSource) { - - List> sourceList = tupleSource.getColumnSources(); + final DataIndex dataIndex, + final ColumnSource[] keyColumns) { List> indexedSourceList = new ArrayList<>(); - List> notIndexSourceList = new ArrayList<>(); List indexedSourceIndices = new ArrayList<>(); - List notIndexedSourceIndices = new ArrayList<>(); - for (int ii = 0; ii < sourceList.size(); ++ii) { - final ColumnSource source = sourceList.get(ii); - if (dataIndexer.hasDataIndex(source)) { + final Set> indexSourceSet = dataIndex.keyColumnMap().keySet(); + for (int ii = 0; ii < keyColumns.length; ++ii) { + final ColumnSource source = keyColumns[ii]; + if (indexSourceSet.contains(source)) { indexedSourceList.add(source); indexedSourceIndices.add(ii); - } else { - notIndexSourceList.add(source); - notIndexedSourceIndices.add(ii); } } Assert.geqZero(indexedSourceList.size(), "indexedSourceList.size()"); - final ColumnSource[] indexedSources = indexedSourceList.toArray(new ColumnSource[0]); - final TupleSource indexedTupleSource = TupleSourceFactory.makeTupleSource(indexedSources); - - // Get the data indexes for each of the indexed sources. - final DataIndex.RowSetLookup[] indexLookupArr = Arrays.stream(indexedSources) - .map(source -> dataIndexer.getDataIndex(source).rowSetLookup()).toArray(DataIndex.RowSetLookup[]::new); - - final Map indexKeyRowSetMap = new LinkedHashMap<>(); + final List indexRowSets = new ArrayList<>(indexedSourceList.size()); + final DataIndex.RowSetLookup rowSetLookup = dataIndex.rowSetLookup(); if (indexedSourceIndices.size() == 1) { - // Only one indexed source, so we can use the RowSetLookup directly and return the row set. + // Only one indexed source, so we can use the RowSetLookup directly. + final int keyIndex = indexedSourceIndices.get(0); liveValues.forEach(key -> { - final RowSet rowSet = indexLookupArr[0].apply(key, false); + final Object[] keys = (Object[]) key; + final RowSet rowSet = rowSetLookup.apply(keys[keyIndex], false); if (rowSet != null) { - // Make a copy of the row set. - indexKeyRowSetMap.put(key, rowSet.copy()); + indexRowSets.add(rowSet); } }); } else { - // Intersect the retrieved row sets to get the final row set for this key. + final Object[] partialKey = new Object[indexedSourceList.size()]; + liveValues.forEach(key -> { - RowSet result = null; - for (int ii = 0; ii < indexedSourceIndices.size(); ++ii) { - final int tupleIndex = indexedSourceIndices.get(ii); - // noinspection unchecked - final Object singleKey = indexedTupleSource.exportElementReinterpreted(key, tupleIndex); - final RowSet rowSet = indexLookupArr[ii].apply(singleKey, false); - if (rowSet != null) { - result = result == null ? rowSet.copy() : result.intersect(rowSet); - } + final Object[] keys = (Object[]) key; + + // Build the partial lookup key for the supplied key. + int pos = 0; + for (int keyIndex : indexedSourceIndices) { + partialKey[pos++] = keys[keyIndex]; } - if (result != null) { - indexKeyRowSetMap.put(key, result); + + // Perform the lookup using the partial key. + final RowSet rowSet = rowSetLookup.apply(partialKey, false); + if (rowSet != null) { + indexRowSets.add(rowSet); } }); } - if (notIndexSourceList.size() == 0) { - // Combine the indexed answers and return the result. - final RowSetBuilderRandom resultBuilder = RowSetFactory.builderRandom(); - for (final RowSet rowSet : indexKeyRowSetMap.values()) { - try (final SafeCloseable ignored = rowSet) { - resultBuilder.addRowSet(rowSet); + // We have some non-indexed sources, so we need to filter them manually. Iterate through the indexed + // row sets and build a new row set where all keys match. + final ChunkSource indexKeySource = + DataIndexUtils.makeBoxedKeySource(indexedSourceList.toArray(new ColumnSource[0])); + + final List builders = new ArrayList<>(); + + final int CHUNK_SIZE = 1 << 10; // 1024 + try (final ColumnSource.GetContext keyGetContext = indexKeySource.makeGetContext(CHUNK_SIZE)) { + for (final RowSet resultRowSet : indexRowSets) { + if (resultRowSet.isEmpty()) { + continue; } - } - return resultBuilder.build(); - } else { - // We have some non-indexed sources, so we need to filter them manually. Iterate through the indexed - // row sets and build a new row set where all keys match. - final Map keyRowSetBuilder = new LinkedHashMap<>(); - - for (final Map.Entry entry : indexKeyRowSetMap.entrySet()) { - try (final RowSet resultRowSet = entry.getValue()) { - if (resultRowSet.isEmpty()) { - continue; - } - // Iterate through the index-restricted row set for matches. - for (final RowSet.Iterator iterator = resultRowSet.iterator(); iterator.hasNext();) { - final long rowKey = iterator.nextLong(); - final Object key = tupleSource.createTuple(rowKey); + try (final RowSequence.Iterator rsIt = resultRowSet.getRowSequenceIterator()) { + final RowSequence rsChunk = rsIt.getNextRowSequenceWithLength(CHUNK_SIZE); + final ObjectChunk valueChunk = + indexKeySource.getChunk(keyGetContext, rsChunk).asObjectChunk(); + LongChunk keyChunk = rsChunk.asRowKeyChunk(); + RowSetBuilderSequential builder = null; + + final int chunkSize = rsChunk.intSize(); + for (int ii = 0; ii < chunkSize; ++ii) { + final Object key = valueChunk.get(ii); if (!liveValues.contains(key)) { continue; } - - final RowSetBuilderSequential rowSetForKey = - keyRowSetBuilder.computeIfAbsent(key, k -> RowSetFactory.builderSequential()); - rowSetForKey.appendKey(rowKey); + if (builder == null) { + builder = RowSetFactory.builderSequential(); + builders.add(builder); + } + builder.appendKey(keyChunk.get(ii)); } } } + } - // Combine the final answers and return the result. - final RowSetBuilderRandom resultBuilder = RowSetFactory.builderRandom(); - for (final RowSetBuilderSequential builder : keyRowSetBuilder.values()) { - try (final RowSet ignored = builder.build()) { - resultBuilder.addRowSet(ignored); - } + // Combine the final answers and return the result. + final RowSetBuilderRandom resultBuilder = RowSetFactory.builderRandom(); + for (final RowSetBuilderSequential builder : builders) { + try (final RowSet ignored = builder.build()) { + resultBuilder.addRowSet(ignored); } - return resultBuilder.build(); } + return resultBuilder.build(); } - private WritableRowSet filterLinear(RowSet selection, TupleSource tupleSource) { + private WritableRowSet filterLinear(final RowSet selection, final ColumnSource[] keyColumns) { if (selection.isEmpty()) { return RowSetFactory.empty(); } + final ChunkSource keySource = DataIndexUtils.makeBoxedKeySource(keyColumns); + if (!kernelValid) { - setInclusionKernel = SetInclusionKernel.makeKernel(tupleSource.getChunkType(), liveValues, inclusion); + if (!liveValuesArrayValid) { + liveValuesArray = liveValues.toArray(); + liveValuesArrayValid = true; + } + setInclusionKernel = + SetInclusionKernel.makeKernel(keySource.getChunkType(), List.of(liveValuesArray), inclusion); kernelValid = true; } @@ -423,7 +441,7 @@ private WritableRowSet filterLinear(RowSet selection, TupleSource tupleSource final int maxChunkSize = getChunkSize(selection); // @formatter:off - try (final ColumnSource.GetContext keyGetContext = tupleSource.makeGetContext(maxChunkSize); + try (final ChunkSource.GetContext keyGetContext = keySource.makeGetContext(maxChunkSize); final RowSequence.Iterator selectionIterator = selection.getRowSequenceIterator(); final WritableLongChunk selectionRowKeyChunk = WritableLongChunk.makeWritableChunk(maxChunkSize); @@ -433,7 +451,7 @@ private WritableRowSet filterLinear(RowSet selection, TupleSource tupleSource while (selectionIterator.hasMore()) { final RowSequence selectionChunk = selectionIterator.getNextRowSequenceWithLength(maxChunkSize); - final Chunk keyChunk = Chunk.downcast(tupleSource.getChunk(keyGetContext, selectionChunk)); + final Chunk keyChunk = Chunk.downcast(keySource.getChunk(keyGetContext, selectionChunk)); final int thisChunkSize = keyChunk.size(); setInclusionKernel.matchValues(keyChunk, matches); @@ -482,7 +500,8 @@ public DynamicWhereFilter copy() { @Override public boolean satisfied(final long step) { - return setUpdateListener == null || setUpdateListener.satisfied(step); + final boolean indexSatisfied = sourceDataIndex == null || sourceDataIndex.table().satisfied(step); + return indexSatisfied && (setUpdateListener == null || setUpdateListener.satisfied(step)); } @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/MatchFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/MatchFilter.java index 42ac46dca7d..953379ee939 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/MatchFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/MatchFilter.java @@ -7,14 +7,12 @@ import io.deephaven.base.string.cache.CompressedString; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.rowset.WritableRowSet; -import io.deephaven.engine.table.ColumnDefinition; -import io.deephaven.engine.table.Table; -import io.deephaven.engine.table.TableDefinition; +import io.deephaven.engine.table.*; +import io.deephaven.engine.table.impl.indexer.DataIndexer; import io.deephaven.engine.table.impl.preview.DisplayWrapper; import io.deephaven.engine.context.QueryScope; import io.deephaven.time.DateTimeUtils; import io.deephaven.util.type.ArrayTypeUtils; -import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.rowset.RowSet; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -43,6 +41,10 @@ static MatchFilter ofLiterals( private final String[] strValues; private final boolean invertMatch; private final boolean caseInsensitive; + + /** The data index for the source table, if any. */ + @Nullable + private DataIndex sourceDataIndex; private boolean initialized = false; public enum MatchType { @@ -132,6 +134,19 @@ public List getColumnArrays() { return Collections.emptyList(); } + @Override + public List getDataIndexes(final Table sourceTable) { + if (sourceDataIndex == null) { + // The outside world knows we would like to use this index, we can rely on it being in the correct state + // for this cycle only. + sourceDataIndex = DataIndexer.of(sourceTable.getRowSet()).getDataIndex(sourceTable, columnName); + } + if (sourceDataIndex == null) { + return Collections.emptyList(); + } + return List.of(sourceDataIndex); + } + @Override public synchronized void init(TableDefinition tableDefinition) { if (initialized) { @@ -186,7 +201,11 @@ public synchronized void init(TableDefinition tableDefinition) { public WritableRowSet filter( @NotNull RowSet selection, @NotNull RowSet fullSet, @NotNull Table table, boolean usePrev) { final ColumnSource columnSource = table.getColumnSource(columnName); - return columnSource.match(invertMatch, usePrev, caseInsensitive, fullSet, selection, values); + final WritableRowSet result = + columnSource.match(invertMatch, usePrev, caseInsensitive, sourceDataIndex, selection, values); + // We cannot rely on the data index being in the correct state for the next cycle, so we clear it. + sourceDataIndex = null; + return result; } @NotNull @@ -194,7 +213,11 @@ public WritableRowSet filter( public WritableRowSet filterInverse( @NotNull RowSet selection, @NotNull RowSet fullSet, @NotNull Table table, boolean usePrev) { final ColumnSource columnSource = table.getColumnSource(columnName); - return columnSource.match(!invertMatch, usePrev, caseInsensitive, fullSet, selection, values); + final WritableRowSet result = + columnSource.match(!invertMatch, usePrev, caseInsensitive, sourceDataIndex, selection, values); + // We cannot rely on the data index being in the correct state for the next cycle, so we clear it. + sourceDataIndex = null; + return result; } @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/WhereFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/WhereFilter.java index e62154dc016..f4caf8fb534 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/WhereFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/WhereFilter.java @@ -8,6 +8,7 @@ import io.deephaven.engine.context.QueryCompiler; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.WritableRowSet; +import io.deephaven.engine.table.DataIndex; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; import io.deephaven.engine.table.impl.BaseTable; @@ -114,6 +115,15 @@ default void validateSafeForRefresh(final BaseTable sourceTable) { // nothing to validate by default } + /** + * Get all the {@link DataIndex data indexes} that will be used by this filter when executed. + * + * @param sourceTable the table to filter + */ + default List getDataIndexes(final Table sourceTable) { + return List.of(); + } + /** * Filter selection to only matching rows. * @@ -131,7 +141,10 @@ default void validateSafeForRefresh(final BaseTable sourceTable) { */ @NotNull WritableRowSet filter( - @NotNull RowSet selection, @NotNull RowSet fullSet, @NotNull Table table, boolean usePrev); + @NotNull RowSet selection, + @NotNull RowSet fullSet, + @NotNull Table table, + boolean usePrev); /** * Filter selection to only non-matching rows. @@ -164,7 +177,10 @@ WritableRowSet filter( */ @NotNull default WritableRowSet filterInverse( - @NotNull RowSet selection, @NotNull RowSet fullSet, @NotNull Table table, boolean usePrev) { + @NotNull RowSet selection, + @NotNull RowSet fullSet, + @NotNull Table table, + boolean usePrev) { try (final WritableRowSet regular = filter(selection, fullSet, table, usePrev)) { return selection.minus(regular); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/DelegatingColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/DelegatingColumnSource.java index 79314ac55b3..05e5db3818a 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/DelegatingColumnSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/DelegatingColumnSource.java @@ -10,9 +10,11 @@ import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.WritableRowSet; import io.deephaven.engine.table.ColumnSource; +import io.deephaven.engine.table.DataIndex; import io.deephaven.engine.table.SharedContext; import io.deephaven.engine.table.impl.AbstractColumnSource; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import javax.annotation.OverridingMethodsMustInvokeSuper; @@ -29,10 +31,10 @@ public WritableRowSet match( boolean invertMatch, boolean usePrev, boolean caseInsensitive, - @NotNull RowSet fullSet, + @Nullable final DataIndex dataIndex, @NotNull RowSet mapper, Object... keys) { - return delegate.match(invertMatch, usePrev, caseInsensitive, fullSet, mapper, keys); + return delegate.match(invertMatch, usePrev, caseInsensitive, dataIndex, mapper, keys); } @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/RowSetColumnSourceWrapper.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/RowSetColumnSourceWrapper.java index 6bb03620830..a27c5fcc3d4 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/RowSetColumnSourceWrapper.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/RowSetColumnSourceWrapper.java @@ -10,10 +10,7 @@ import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.TrackingRowSet; import io.deephaven.engine.rowset.WritableRowSet; -import io.deephaven.engine.table.ChunkSource; -import io.deephaven.engine.table.ColumnSource; -import io.deephaven.engine.table.SharedContext; -import io.deephaven.engine.table.Table; +import io.deephaven.engine.table.*; import io.deephaven.engine.table.impl.AbstractColumnSource; import io.deephaven.engine.table.impl.by.AggregationProcessor; import org.apache.commons.lang3.NotImplementedException; @@ -142,7 +139,11 @@ public Class getComponentType() { } @Override - public WritableRowSet match(boolean invertMatch, boolean usePrev, boolean caseInsensitive, @NotNull RowSet fullSet, + public WritableRowSet match( + boolean invertMatch, + boolean usePrev, + boolean caseInsensitive, + @Nullable final DataIndex dataIndex, @NotNull RowSet mapper, Object... keys) { throw new NotImplementedException("RowSetColumnSourceWrapper.match"); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSource.java index a159150acd3..30ec7c53adb 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSource.java @@ -6,9 +6,6 @@ import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.Table; -import io.deephaven.engine.table.impl.ColumnSourceManager; -import io.deephaven.engine.table.impl.dataindex.DataIndexBuilder; -import io.deephaven.engine.table.impl.dataindex.PartitioningIndexProvider; import io.deephaven.engine.table.impl.locations.ColumnLocation; import io.deephaven.engine.table.impl.ImmutableColumnSource; import io.deephaven.engine.rowset.RowSet; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceBase.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceBase.java index c046337c7a1..5d408e3c3c3 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceBase.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceBase.java @@ -8,8 +8,6 @@ import io.deephaven.engine.table.impl.AbstractColumnSource; import io.deephaven.chunk.WritableChunk; import io.deephaven.engine.rowset.RowSequence; -import io.deephaven.engine.table.impl.ColumnSourceManager; -import io.deephaven.engine.table.impl.dataindex.PartitioningIndexProvider; import io.deephaven.util.annotations.TestUseOnly; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/engine/table/src/test/java/io/deephaven/engine/util/TimeTableTest.java b/engine/table/src/test/java/io/deephaven/engine/util/TimeTableTest.java index 902ab160adf..1590eaa4d75 100644 --- a/engine/table/src/test/java/io/deephaven/engine/util/TimeTableTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/util/TimeTableTest.java @@ -221,23 +221,23 @@ public void testColumnSourceMatch() { .map(l -> l == null ? null : DateTimeUtils.epochNanosToInstant(l)) .toArray(Instant[]::new); try (final RowSet match = - dtColumn.match(false, false, false, timeTable.getRowSet(), RowSetFactory.fromRange(100, 110), + dtColumn.match(false, false, false, null, RowSetFactory.fromRange(100, 110), (Object[]) keys)) { Assert.assertEquals(match, RowSetFactory.fromKeys(100, 105, 110)); } try (final RowSet match = - column.match(false, false, false, timeTable.getRowSet(), RowSetFactory.fromRange(100, 110), + column.match(false, false, false, null, RowSetFactory.fromRange(100, 110), (Object[]) longKeys)) { Assert.assertEquals(match, RowSetFactory.fromKeys(100, 105, 110)); } // inverted try (final RowSet match = - dtColumn.match(true, false, false, timeTable.getRowSet(), RowSetFactory.fromRange(100, 110), + dtColumn.match(true, false, false, null, RowSetFactory.fromRange(100, 110), (Object[]) keys)) { Assert.assertEquals(match, RowSetFactory.fromKeys(101, 102, 103, 104, 106, 107, 108, 109)); } try (final RowSet match = - column.match(true, false, false, timeTable.getRowSet(), RowSetFactory.fromRange(100, 110), + column.match(true, false, false, null, RowSetFactory.fromRange(100, 110), (Object[]) longKeys)) { Assert.assertEquals(match, RowSetFactory.fromKeys(101, 102, 103, 104, 106, 107, 108, 109)); }