Skip to content

Commit

Permalink
Accepted RWC changes, additional improvements and DynamicWhereFilter …
Browse files Browse the repository at this point in the history
…tests passing.
  • Loading branch information
lbooker42 committed Dec 5, 2023
1 parent e1ef9ad commit b48fc91
Show file tree
Hide file tree
Showing 21 changed files with 505 additions and 282 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,24 @@ default ChunkType getChunkType() {
return ChunkType.fromElementType(dataType);
}

/**
* Return a {@link RowSet row set} where the values in the column source match the given keys.
*
* @param invertMatch Whether to invert the match, i.e. return the rows where the values do not match the given keys
* @param usePrev Whether to use the previous values for the ColumnSource
* @param caseInsensitive Whether to perform a case insensitive match
* @param dataIndex An optional data index that can be used to accelerate the match (the index table must be
* included in snapshot controls or otherwise guaranteed to be current)
* @param mapper Restrict results to this row set
* @param keys The keys to match in the column
*
* @return The rows that match the given keys
*/
WritableRowSet match(
boolean invertMatch,
boolean usePrev,
boolean caseInsensitive,
@NotNull final RowSet fullSet,
@Nullable final DataIndex dataIndex,
@NotNull RowSet mapper,
Object... keys);

Expand Down
72 changes: 68 additions & 4 deletions engine/api/src/main/java/io/deephaven/engine/table/DataIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
*/
public interface DataIndex extends LivenessReferent {
/**
* Provides a lookup function from {@code key} to the position in the index table. Keys are specified as follows:
* Provides a lookup function from {@code key} to the position in the index table. Keys consist of reinterpreted
* values and are specified as follows:
* <dl>
* <dt>No key columns</dt>
* <dd>"Empty" keys are signified by any zero-length {@code Object[]}</dd>
Expand All @@ -37,8 +38,8 @@ interface PositionLookup {
}

/**
* Provides a lookup function from {@code key} to the {@link RowSet} containing the matching table rows. Keys are
* specified as follows:
* Provides a lookup function from {@code key} to the {@link RowSet} containing the matching table rows. Keys
* consist of reinterpreted values and are specified as follows:
* <dl>
* <dt>No key columns</dt>
* <dd>"Empty" keys are signified by any zero-length {@code Object[]}</dd>
Expand Down Expand Up @@ -68,6 +69,14 @@ interface RowSetLookup {
/** Get the output row set column name for this index. */
String rowSetColumnName();

/** Return the index table key sources in the order of the index table. **/
@FinalDefault
default ColumnSource<?>[] indexKeyColumns() {
final ColumnSource<?>[] columnSources = keyColumnMap().keySet().toArray(new ColumnSource[0]);
return indexKeyColumns(columnSources);
// TODO-RWC: Should this be in a static helper instead of the interface?
}

/** Return the index table key sources in the relative order of the indexed sources supplied. **/
@FinalDefault
default ColumnSource<?>[] indexKeyColumns(@NotNull final ColumnSource<?>[] columnSources) {
Expand Down Expand Up @@ -96,14 +105,69 @@ default ColumnSource<RowSet> rowSetColumn() {
Table table();

/**
* Build a {@link RowSetLookup lookup} function of index row sets for this index. If {@link #isRefreshing()} is
* Return a {@link RowSetLookup lookup} function of index row sets for this index. If {@link #isRefreshing()} is
* true, this lookup function is guaranteed to be accurate only for the current cycle.
*
* @return a function that provides map-like lookup of matching rows from an index key.
*/
@NotNull
RowSetLookup rowSetLookup();

/**
* Return a {@link RowSetLookup lookup} function of index row sets for this index. If {@link #isRefreshing()} is
* true, this lookup function is guaranteed to be accurate only for the current cycle. The keys provided must be in
* the order of the {@code lookupSources}.
*
* @return a function that provides map-like lookup of matching rows from an index key.
*/
@NotNull
@FinalDefault
default RowSetLookup rowSetLookup(@NotNull final ColumnSource<?>[] lookupSources) {
if (lookupSources.length == 1) {
// Trivially ordered.
return rowSetLookup();
}

final ColumnSource<?>[] indexSourceColumns = keyColumnMap().keySet().toArray(ColumnSource[]::new);
if (Arrays.equals(lookupSources, indexSourceColumns)) {
// Order matches, so we can use the default lookup function.
return rowSetLookup();
}

// We need to wrap the lookup function with a key remapping function.

// Maps index keys -> user-supplied keys
final int[] indexToUserMapping = new int[lookupSources.length];

// Build an intermediate map (N^2 loop but N is small and this is called rarely).
for (int ii = 0; ii < indexSourceColumns.length; ++ii) {
boolean found = false;
for (int jj = 0; jj < lookupSources.length; ++jj) {
if (indexSourceColumns[ii] == lookupSources[jj]) {
indexToUserMapping[ii] = jj;
found = true;
break;
}
}
if (!found) {
throw new IllegalArgumentException("The provided columns must match the data index key columns");
}
}

return (key, usePrev) -> {
// This is the key provided by the caller.
final Object[] keys = (Object[]) key;
// This is the key we need to provide to the lookup function.
final Object[] remappedKey = new Object[keys.length];

for (int ii = 0; ii < remappedKey.length; ++ii) {
remappedKey[ii] = keys[indexToUserMapping[ii]];
}

return rowSetLookup().apply(remappedKey, usePrev);
};
}

/**
* Build a {@link PositionLookup lookup} of positions for this index. If {@link #isRefreshing()} is true, this
* lookup function is guaranteed to be accurate only for the current cycle.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.deephaven.engine.table.impl.chunkfillers.ChunkFiller;
import io.deephaven.engine.table.impl.chunkfilter.ChunkFilter;
import io.deephaven.engine.table.impl.chunkfilter.ChunkMatchFilterFactory;
import io.deephaven.engine.table.impl.indexer.DataIndexer;
import io.deephaven.engine.table.impl.sources.UnboxedLongBackedColumnSource;
import io.deephaven.engine.table.iterators.ChunkedColumnIterator;
import io.deephaven.engine.updategraph.UpdateGraph;
Expand Down Expand Up @@ -125,14 +124,11 @@ public WritableRowSet match(
final boolean invertMatch,
final boolean usePrev,
final boolean caseInsensitive,
@NotNull final RowSet fullSet,
@Nullable final DataIndex dataIndex,
@NotNull final RowSet mapper,
final Object... keys) {

final DataIndexer dataIndexer = fullSet.isTracking() ? DataIndexer.of(fullSet.trackingCast()) : null;
if (dataIndexer != null && dataIndexer.hasDataIndex(this)) {
final DataIndex dataIndex = dataIndexer.getDataIndex(this);

if (dataIndex != null) {
final RowSetBuilderRandom allInMatchingGroups = RowSetFactory.builderRandom();

if (caseInsensitive && (type == String.class)) {
Expand Down Expand Up @@ -162,10 +158,10 @@ public WritableRowSet match(
}
}
} else {
// Use the lookup function
// Use the lookup function to get the matching RowSets intersected with the mapper
final DataIndex.RowSetLookup rowSetLookup = dataIndex.rowSetLookup();
for (Object key : keys) {
RowSet range = rowSetLookup.apply(key, usePrev);
final RowSet range = rowSetLookup.apply(key, usePrev);
if (range != null) {
allInMatchingGroups.addRowSet(range);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1220,8 +1220,18 @@ private QueryTable whereInternal(final WhereFilter... filters) {
}

return memoizeResult(MemoizedOperationKey.filter(filters), () -> {
final OperationSnapshotControl snapshotControl =
createSnapshotControlIfRefreshing(OperationSnapshotControl::new);
// Request the data indexes from the filters so we can include any index tables in the
// snapshot control.
final List<DataIndex> dataIndexList = new ArrayList<>();
Arrays.stream(filters).forEach(filter -> dataIndexList.addAll(filter.getDataIndexes(this)));
final NotificationStepSource[] dataIndexTables = dataIndexList.stream()
.map(di -> (NotificationStepSource) di.table())
.toArray(NotificationStepSource[]::new);

final OperationSnapshotControl snapshotControl = createSnapshotControlIfRefreshing(
(final BaseTable<?> parent) -> dataIndexTables.length > 0
? new OperationSnapshotControlEx(parent, dataIndexTables)
: new OperationSnapshotControl(parent));

final Mutable<QueryTable> result = new MutableObject<>();
initializeWithSnapshot("where", snapshotControl,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.rowset.chunkattributes.RowKeys;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.DataIndex;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.impl.sources.FillUnordered;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
Expand Down Expand Up @@ -250,7 +251,7 @@ public WritableRowSet match(
final boolean invertMatch,
final boolean usePrev,
final boolean caseInsensitive,
@NotNull final RowSet fullSet,
@Nullable final DataIndex dataIndex,
@NotNull final RowSet selection,
final Object... keys) {
if (startTime == null) {
Expand Down Expand Up @@ -393,7 +394,7 @@ public WritableRowSet match(
final boolean invertMatch,
final boolean usePrev,
final boolean caseInsensitive,
@NotNull final RowSet fullSet,
@Nullable final DataIndex dataIndex,
@NotNull final RowSet selection,
final Object... keys) {
if (startTime == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ public WritableRowSet match(
final boolean invertMatch,
final boolean usePrev,
final boolean caseInsensitive,
@NotNull final RowSet fullSet,
@Nullable final DataIndex dataIndex,
@NotNull final RowSet mapper,
final Object... keys) {
boolean hasFalse = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.deephaven.engine.table.impl.OperationSnapshotControl;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
import io.deephaven.engine.table.impl.sources.ReinterpretUtils;
import io.deephaven.engine.table.impl.sources.RowSetColumnSourceWrapper;
import io.deephaven.engine.table.iterators.ChunkedColumnIterator;
import io.deephaven.engine.table.iterators.ColumnIterator;
Expand Down Expand Up @@ -119,7 +120,7 @@ protected int hash(Object key) {
* @param rowSetColumn The name of the row set column to wrap
* @return The copied table
*/
protected static QueryTable wrappedRowSetTable(
protected static QueryTable indexTableWrapper(
@NotNull final QueryTable parent,
@NotNull final String rowSetColumn) {
// TODO-RWC/LAB: Use new assertions to assert that parent has a RowSet ColumnSource of name rowSetColumn.
Expand All @@ -133,9 +134,10 @@ protected static QueryTable wrappedRowSetTable(
if (columnName.equals(rowSetColumn)) {
resultColumnSourceMap.put(
columnName, RowSetColumnSourceWrapper.from(parent.getColumnSource(rowSetColumn)));
return;
} else {
// Convert the key columns to primitive column sources.
resultColumnSourceMap.put(columnName, ReinterpretUtils.maybeConvertToPrimitive(columnSource));
}
resultColumnSourceMap.put(columnName, columnSource);
});
final OperationSnapshotControl snapshotControl =
parent.createSnapshotControlIfRefreshing(OperationSnapshotControl::new);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package io.deephaven.engine.table.impl.dataindex;

import io.deephaven.hash.KeyedObjectHashSet;
import io.deephaven.hash.KeyedObjectKey;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.Arrays;
import java.util.Objects;
import java.util.function.Consumer;

public class DataIndexKeySet {
private static final KeyedObjectKey<Object, Object> INSTANCE = new KeyHashAdapter();
private static final Object NULL_OBJECT_KEY = new Object();

private final KeyedObjectHashSet<Object, Object> set;

private static class KeyHashAdapter implements KeyedObjectKey<Object, Object> {
private KeyHashAdapter() {}

@Override
public Object getKey(@NotNull final Object value) {
return value;
}

@Override
public int hashKey(@Nullable final Object key) {
if (key instanceof Object[]) {
return Arrays.hashCode((Object[]) key);
}
return Objects.hashCode(key);
}

@Override
public boolean equalKey(@Nullable final Object key, @NotNull final Object value) {
if (key instanceof Object[] && value instanceof Object[]) {
return Arrays.equals((Object[]) key, (Object[]) value);
}
return Objects.equals(key, value);
}
}



public DataIndexKeySet(final int initialCapacity) {
set = new KeyedObjectHashSet<>(initialCapacity, INSTANCE);
}

public DataIndexKeySet() {
set = new KeyedObjectHashSet<>(INSTANCE);
}

public boolean remove(Object o) {
if (o == null) {
return set.remove(NULL_OBJECT_KEY);
}
return set.removeValue(o);
}

public boolean add(Object o) {
if (o == null) {
return set.add(NULL_OBJECT_KEY);
}
return set.add(o);
}

public boolean contains(Object o) {
if (o == null) {
return set.contains(NULL_OBJECT_KEY);
}
return set.contains(o);
}

public void forEach(Consumer<? super Object> action) {
set.forEach(key -> {
if (key == NULL_OBJECT_KEY) {
action.accept(null);
} else {
action.accept(key);
}
});
}

public Object[] toArray() {
final Object[] result = set.toArray();
for (int ii = 0; ii < result.length; ii++) {
if (result[ii] == NULL_OBJECT_KEY) {
result[ii] = null;
}
}
return result;
}

}
Loading

0 comments on commit b48fc91

Please sign in to comment.