Skip to content

Commit

Permalink
When Sorting with a Static Symbol Table use a RowKey or RowPosition C…
Browse files Browse the repository at this point in the history
…olumnSource (#5154)
  • Loading branch information
nbauernfeind committed Feb 18, 2024
1 parent 1ef397c commit 73cdc66
Show file tree
Hide file tree
Showing 15 changed files with 447 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ RollupTable rollup(Collection<? extends Aggregation> aggregations, boolean inclu
/**
* Get a {@link Table} that contains a sub-set of the rows from {@code this}. The result will share the same
* {@link #getColumnSources() column sources} and {@link #getDefinition() definition} as this table.
*
* <p>
* The result will not update on its own. The caller must also establish an appropriate listener to update
* {@code rowSet} and propagate {@link TableUpdate updates}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3233,10 +3233,10 @@ public void propagateFlatness(QueryTable result) {
/**
* Get a {@link Table} that contains a sub-set of the rows from {@code this}. The result will share the same
* {@link #getColumnSources() column sources} and {@link #getDefinition() definition} as this table.
*
* <p>
* The result will not update on its own, the caller must also establish an appropriate listener to update
* {@code rowSet} and propagate {@link TableUpdate updates}.
*
* <p>
* No {@link QueryPerformanceNugget nugget} is opened for this table, to prevent operations that call this
* repeatedly from having an inordinate performance penalty. If callers require a nugget, they must create one in
* the enclosing operation.
Expand All @@ -3252,16 +3252,38 @@ public QueryTable getSubTable(@NotNull final TrackingRowSet rowSet) {
}
}

/**
* Get a {@link Table} that adds, or overwrites, columns from {@code this}. The result will share the same
* {@link #getRowSet() row set} as this table.
* <p>
* The result will not update on its own. The caller must also establish an appropriate listener to update the
* provided column sources and propagate {@link TableUpdate updates}.
* <p>
* No attributes are propagated to the result table.
*
* @param additionalSources The additional columns to add or overwrite
* @return A new table with the additional columns
*/
public QueryTable withAdditionalColumns(@NotNull final Map<String, ColumnSource<?>> additionalSources) {
final UpdateGraph updateGraph = getUpdateGraph();
try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) {
final LinkedHashMap<String, ColumnSource<?>> columns = new LinkedHashMap<>(this.columns);
columns.putAll(additionalSources);
final TableDefinition definition = TableDefinition.inferFrom(columns);
return new QueryTable(definition, rowSet, columns, null, null);
}
}

/**
* Get a {@link Table} that contains a sub-set of the rows from {@code this}. The result will share the same
* {@link #getColumnSources() column sources} and {@link #getDefinition() definition} as this table.
*
* <p>
* The result will not update on its own, the caller must also establish an appropriate listener to update
* {@code rowSet} and propagate {@link TableUpdate updates}.
*
* <p>
* This method is intended to be used for composing alternative engine operations, in particular
* {@link #partitionBy(boolean, String...)}.
*
* <p>
* No {@link QueryPerformanceNugget nugget} is opened for this table, to prevent operations that call this
* repeatedly from having an inordinate performance penalty. If callers require a nugget, they must create one in
* the enclosing operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ static SortMapping getSortedKeys(SortingOrder[] order, ColumnSource<Comparable<?
*/
static SortMapping getSortedKeys(SortingOrder[] order, ColumnSource<Comparable<?>>[] columnsToSortBy,
RowSet rowSetToSort, boolean usePrev, boolean allowSymbolTable) {
if (rowSetToSort.size() == 0) {
if (rowSetToSort.isEmpty()) {
return EMPTY_SORT_MAPPING;
}

Expand Down Expand Up @@ -300,7 +300,9 @@ private int doIntLookup(long symTabId) {
return lookupTable[region][id];
}

private static SparseSymbolMapping createMapping(LongChunk originalSymbol, IntChunk mappedIndex) {
private static SparseSymbolMapping createMapping(
@NotNull final LongChunk<Values> originalSymbol,
@NotNull final LongChunk<Values> mappedIndex) {
// figure out what the maximum region is, and determine how many bits of it there are
int maxUpperPart = 0;
int minTrailing = 32;
Expand Down Expand Up @@ -330,7 +332,7 @@ private static SparseSymbolMapping createMapping(LongChunk originalSymbol, IntCh
final long symTabId = originalSymbol.get(ii);
final int region = (int) (symTabId >> (32 + minTrailing));
final int id = (int) symTabId;
final int mappedId = mappedIndex.get(ii);
final int mappedId = Math.toIntExact(mappedIndex.get(ii));
maxMapping = Math.max(maxMapping, mappedId);
lookupTable[region][id] = mappedId;
}
Expand All @@ -340,37 +342,44 @@ private static SparseSymbolMapping createMapping(LongChunk originalSymbol, IntCh
}

private static final String SORTED_INDEX_COLUMN_NAME = "SortedIndex";
private static final String SORTED_INDEX_COLUMN_UPDATE = SORTED_INDEX_COLUMN_NAME + "=i";

private static SortMapping doSymbolTableMapping(SortingOrder order, ColumnSource<Comparable<?>> columnSource,
RowSet rowSet, boolean usePrev) {
final int sortSize = rowSet.intSize();

final ColumnSource<Long> reinterpreted = columnSource.reinterpret(long.class);
final Table symbolTable = ((SymbolTableSource) columnSource).getStaticSymbolTable(rowSet, true);
final Table symbolTable = ((SymbolTableSource<?>) columnSource).getStaticSymbolTable(rowSet, true);

if (symbolTable.size() >= sortSize) {
// the very first thing we will do is sort the symbol table, using a regular sort; if it is larger than the
// actual table we care to sort, then it is wasteful to use the symbol table sorting
return getSortMappingOne(order, columnSource, rowSet, usePrev);
}

final Table idMapping = symbolTable.sort(SymbolTableSource.SYMBOL_COLUMN_NAME)
.groupBy(SymbolTableSource.SYMBOL_COLUMN_NAME).update(SORTED_INDEX_COLUMN_UPDATE).ungroup()
final QueryTable groupedSymbols = (QueryTable) symbolTable.sort(SymbolTableSource.SYMBOL_COLUMN_NAME)
.groupBy(SymbolTableSource.SYMBOL_COLUMN_NAME).coalesce();
final Map<String, ColumnSource<?>> extraColumn;
if (groupedSymbols.isFlat()) {
extraColumn = Map.of(SORTED_INDEX_COLUMN_NAME, RowKeyColumnSource.INSTANCE);
} else {
extraColumn = Map.of(SORTED_INDEX_COLUMN_NAME, new RowPositionColumnSource(groupedSymbols.getRowSet()));
}
final Table idMapping = groupedSymbols.withAdditionalColumns(extraColumn)
.ungroup()
.view(SymbolTableSource.ID_COLUMN_NAME, SORTED_INDEX_COLUMN_NAME);

final int symbolEntries = idMapping.intSize();

final SparseSymbolMapping mapping;

try (final WritableLongChunk<Values> originalSymbol = WritableLongChunk.makeWritableChunk(symbolEntries);
final WritableIntChunk<Values> mappedIndex = WritableIntChunk.makeWritableChunk(symbolEntries)) {
final ColumnSource idSource = idMapping.getColumnSource(SymbolTableSource.ID_COLUMN_NAME);
final WritableLongChunk<Values> mappedIndex = WritableLongChunk.makeWritableChunk(symbolEntries)) {
final ColumnSource<?> idSource = idMapping.getColumnSource(SymbolTableSource.ID_COLUMN_NAME);
try (final ColumnSource.FillContext idContext = idSource.makeFillContext(symbolEntries)) {
idSource.fillChunk(idContext, originalSymbol, idMapping.getRowSet());
}

final ColumnSource sortedRowSetSource = idMapping.getColumnSource(SORTED_INDEX_COLUMN_NAME);
final ColumnSource<?> sortedRowSetSource = idMapping.getColumnSource(SORTED_INDEX_COLUMN_NAME);
try (final ColumnSource.FillContext sortedIndexContext =
sortedRowSetSource.makeFillContext(symbolEntries)) {
sortedRowSetSource.fillChunk(sortedIndexContext, mappedIndex, idMapping.getRowSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private QueryTable historicalSort(SortHelpers.SortMapping sortedKeys) {
}

@NotNull
private Result<QueryTable> streamSort(@NotNull final SortHelpers.SortMapping initialSortedKeys) {
private Result<QueryTable> blinkTableSort(@NotNull final SortHelpers.SortMapping initialSortedKeys) {
final LongChunkColumnSource initialInnerRedirectionSource = new LongChunkColumnSource();
if (initialSortedKeys.size() > 0) {
initialInnerRedirectionSource
Expand Down Expand Up @@ -173,7 +173,7 @@ public void onUpdate(@NotNull final TableUpdate upstream) {
}

final SortHelpers.SortMapping updateSortedKeys =
SortHelpers.getSortedKeys(sortOrder, sortColumns, upstream.added(), false);
SortHelpers.getSortedKeys(sortOrder, sortColumns, upstream.added(), false, false);
final LongChunkColumnSource recycled = recycledInnerRedirectionSource.getValue();
recycledInnerRedirectionSource.setValue(null);
final LongChunkColumnSource updateInnerRedirectSource =
Expand Down Expand Up @@ -220,7 +220,7 @@ public Result<QueryTable> initialize(boolean usePrev, long beforeClock) {
final RowSet indexToUse = usePrev ? prevIndex : parent.getRowSet();
final SortHelpers.SortMapping sortedKeys =
SortHelpers.getSortedKeys(sortOrder, sortColumns, indexToUse, usePrev);
return streamSort(sortedKeys);
return blinkTableSort(sortedKeys);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,14 @@ public void onFailureInternal(@NotNull final Throwable originalException, Entry
snapshotControl.setListenerAndResult(listener, result);
}

return ac.transformResult(result);
final QueryTable finalResult = ac.transformResult(result);
final boolean noInitialKeys = initialKeys == null || (!initialKeys.isRefreshing() && initialKeys.isEmpty());
if (!input.isRefreshing() && finalResult.getRowSet().isFlat()) {
finalResult.setFlat();
} else if ((input.isAddOnly() || input.isAppendOnly() || input.isBlink()) && (noInitialKeys || preserveEmpty)) {
finalResult.setFlat();
}
return finalResult;
}

private static OperatorAggregationStateManager makeStateManager(
Expand Down Expand Up @@ -1580,7 +1587,11 @@ private static QueryTable staticGroupedAggregation(QueryTable withView, String k
return keyToSlot::get;
});

return ac.transformResult(result);
final QueryTable finalResult = ac.transformResult(result);
if (finalResult.getRowSet().isFlat()) {
finalResult.setFlat();
}
return finalResult;
}

private static void doGroupedAddition(
Expand Down Expand Up @@ -2115,7 +2126,9 @@ public void onFailureInternal(@NotNull final Throwable originalException,

ac.supplyRowLookup(() -> key -> Arrays.equals((Object[]) key, EMPTY_KEY) ? 0 : DEFAULT_UNKNOWN_ROW);

return ac.transformResult(result);
final QueryTable finalResult = ac.transformResult(result);
finalResult.setFlat();
return finalResult;
}

private static void doNoKeyAddition(RowSequence index, AggregationContext ac,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import static io.deephaven.util.QueryConstants.NULL_LONG;

/**
* A ColumnSource backed by in-memory arrays of data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.ChunkType;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.table.ChunkSource;
import io.deephaven.engine.table.ColumnSource;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/**
* Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.engine.table.impl.sources;

import io.deephaven.chunk.LongChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableLongChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.rowset.chunkattributes.RowKeys;
import io.deephaven.engine.table.impl.AbstractColumnSource;
import io.deephaven.engine.table.impl.ImmutableColumnSourceGetDefaults;
import io.deephaven.util.QueryConstants;
import org.jetbrains.annotations.NotNull;

/**
* This is a column source that uses no additional memory and is an identity mapping from row key to row key.
*/
public class RowKeyColumnSource extends AbstractColumnSource<Long>
implements ImmutableColumnSourceGetDefaults.ForLong, FillUnordered<Values> {
public static final RowKeyColumnSource INSTANCE = new RowKeyColumnSource();

public RowKeyColumnSource() {
super(Long.class);
}

@Override
public long getLong(long rowKey) {
return rowKey < 0 ? QueryConstants.NULL_LONG : rowKey;
}

@Override
public void fillChunk(
@NotNull final FillContext context,
@NotNull final WritableChunk<? super Values> destination,
@NotNull final RowSequence rowSequence) {
doFillChunk(destination, rowSequence);
}

@Override
public void fillPrevChunk(
@NotNull final FillContext context,
@NotNull final WritableChunk<? super Values> destination,
@NotNull final RowSequence rowSequence) {
doFillChunk(destination, rowSequence);
}

static void doFillChunk(
@NotNull final WritableChunk<? super Values> destination,
@NotNull final RowSequence rowSequence) {
final WritableLongChunk<? super Values> longChunk = destination.asWritableLongChunk();
if (rowSequence.isContiguous()) {
final int size = rowSequence.intSize();
longChunk.setSize(size);
final long firstRowKey = rowSequence.firstRowKey();
for (int ii = 0; ii < size; ++ii) {
longChunk.set(ii, firstRowKey + ii);
}
} else {
rowSequence.fillRowKeyChunk(longChunk);
}
}

@Override
public boolean providesFillUnordered() {
return true;
}

@Override
public void fillChunkUnordered(
@NotNull final FillContext context,
@NotNull final WritableChunk<? super Values> dest,
@NotNull final LongChunk<? extends RowKeys> keys) {
doFillUnordered(dest, keys);
}


@Override
public void fillPrevChunkUnordered(
@NotNull final FillContext context,
@NotNull final WritableChunk<? super Values> dest,
@NotNull final LongChunk<? extends RowKeys> keys) {
doFillUnordered(dest, keys);
}

private void doFillUnordered(
@NotNull final WritableChunk<? super Values> dest,
@NotNull final LongChunk<? extends RowKeys> keys) {
final WritableLongChunk<? super Values> longChunk = dest.asWritableLongChunk();
longChunk.setSize(keys.size());
for (int ii = 0; ii < keys.size(); ++ii) {
long key = keys.get(ii);
longChunk.set(ii, key < 0 ? QueryConstants.NULL_LONG : key);
}
}
}
Loading

0 comments on commit 73cdc66

Please sign in to comment.