diff --git a/engine/table/build.gradle b/engine/table/build.gradle index d72c1a39543..83f2b935e7f 100644 --- a/engine/table/build.gradle +++ b/engine/table/build.gradle @@ -34,6 +34,7 @@ dependencies { implementation 'com.tdunning:t-digest:3.2' implementation 'com.squareup:javapoet:1.13.0' implementation 'io.github.classgraph:classgraph:4.8.165' + implementation 'it.unimi.dsi:fastutil:8.5.13' implementation project(':plugin') implementation depCommonsLang3 diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java index 3a736ff098f..8e203d88c78 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java @@ -24,7 +24,12 @@ import java.util.List; /** - * This will filter a table for the most recent N nanoseconds (must be on a date time column). + * This will filter a table for the most recent N nanoseconds (must be on an {@link Instant} column). + * + *

+ * Note, this filter rescans the source table. You should prefer to use {@link io.deephaven.engine.util.WindowCheck} + * instead. + *

*/ public class TimeSeriesFilter extends WhereFilterLivenessArtifactImpl diff --git a/engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java b/engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java index 846a25a2fd7..08fdd21b532 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java @@ -11,7 +11,7 @@ import io.deephaven.chunk.WritableChunk; import io.deephaven.chunk.WritableObjectChunk; import io.deephaven.chunk.attributes.Values; -import io.deephaven.datastructures.util.CollectionUtil; +import io.deephaven.engine.primitive.iterator.CloseablePrimitiveIteratorOfLong; import io.deephaven.engine.rowset.*; import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys; @@ -21,7 +21,9 @@ import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableUpdate; import io.deephaven.engine.table.impl.TableUpdateImpl; +import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; import io.deephaven.engine.table.impl.sources.ReinterpretUtils; +import io.deephaven.engine.table.iterators.ChunkedLongColumnIterator; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph; import io.deephaven.time.DateTimeUtils; @@ -30,11 +32,12 @@ import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.impl.MutableColumnSourceGetDefaults; import io.deephaven.base.RAPriQueue; -import gnu.trove.map.hash.TLongObjectHashMap; import io.deephaven.util.QueryConstants; +import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.LongBidirectionalIterator; import org.jetbrains.annotations.NotNull; -import java.time.Instant; import java.util.*; /** @@ -58,6 +61,10 @@ private WindowCheck() {} * The resultant table ticks whenever the input table ticks, or modifies a row when it passes out of the window. *

* + *

+ * The timestamp column must be an Instant or a long value expressed as nanoseconds since the epoch. + *

+ * * @param table the input table * @param timestampColumn the timestamp column to monitor in table * @param windowNanos how many nanoseconds in the past a timestamp can be before it is out of the window @@ -67,7 +74,9 @@ private WindowCheck() {} @SuppressWarnings("unused") public static Table addTimeWindow(QueryTable table, String timestampColumn, long windowNanos, String inWindowColumn) { - return addTimeWindowInternal(null, table, timestampColumn, windowNanos, inWindowColumn, true).first; + return QueryPerformanceRecorder.withNugget("addTimeWindow(" + timestampColumn + ", " + windowNanos + ")", + table.sizeForInstrumentation(), + () -> addTimeWindowInternal(null, table, timestampColumn, windowNanos, inWindowColumn, true).first); } private static class WindowListenerRecorder extends ListenerRecorder { @@ -105,8 +114,10 @@ static Pair addTimeWindowInternal(Clock clock, QueryT final TimeWindowListener timeWindowListener = new TimeWindowListener(inWindowColumn, inWindowColumnSource, recorder, table, result); recorder.setMergedListener(timeWindowListener); - table.addUpdateListener(recorder); - timeWindowListener.addRowSequence(table.getRowSet()); + if (table.isRefreshing()) { + table.addUpdateListener(recorder); + } + timeWindowListener.addRowSequence(table.getRowSet(), false); result.addParentReference(timeWindowListener); result.manage(table); if (addToMonitor) { @@ -119,42 +130,67 @@ static Pair addTimeWindowInternal(Clock clock, QueryT * The TimeWindowListener maintains a priority queue of rows that are within a configured window, when they pass out * of the window, the InWindow column is set to false and a modification tick happens. * + *

* It implements {@link Runnable}, so that we can be inserted into the {@link PeriodicUpdateGraph}. + *

*/ static class TimeWindowListener extends MergedListener implements Runnable { private final InWindowColumnSource inWindowColumnSource; private final QueryTable result; - /** a priority queue of InWindow entries, with the least recent timestamps getting pulled out first. */ + /** + * A priority queue of entries within our window, with the least recent timestamps getting pulled out first. + */ private final RAPriQueue priorityQueue; - /** a map from table indices to our entries. */ - private final TLongObjectHashMap rowKeyToEntry; + /** + * A sorted map from the last row key in an entry, to our entries. + */ + private final Long2ObjectAVLTreeMap rowKeyToEntry; private final ModifiedColumnSet.Transformer mcsTransformer; - private final ModifiedColumnSet mcsNewColumns; - private final ModifiedColumnSet reusableModifiedColumnSet; + private final ModifiedColumnSet mcsResultWindowColumn; + private final ModifiedColumnSet mcsSourceTimestamp; private final Table source; private final ListenerRecorder recorder; /** - * An intrusive entry inside of indexToEntry and priorityQueue. + * An intrusive entry in priorityQueue, also stored in rowKeyToEntry (for tables with + * modifications/removes/shifts). + * + *

+ * Each entry contains a contiguous range of row keys, with non-descending timestamps. + *

*/ private static class Entry { - /** position in the priority queue */ + /** + * position in the priority queue + */ int pos; - /** the timestamp */ + /** + * the timestamp of the first row key + */ long nanos; - /** the row key within the source (and result) table */ - long rowKey; - Entry(long rowKey, long timestamp) { - this.rowKey = Require.geqZero(rowKey, "rowKey"); - this.nanos = timestamp; + /** + * the first row key within the source (and result) table + */ + long firstRowKey; + /** + * the last row key within the source (and result) table + */ + long lastRowKey; + + + Entry(final long firstRowKey, final long lastRowKey, final long firstTimestamp) { + this.firstRowKey = Require.geqZero(firstRowKey, "firstRowKey"); + this.lastRowKey = Require.geq(lastRowKey, "lastRowKey", firstRowKey, "firstRowKey"); + this.nanos = firstTimestamp; } @Override public String toString() { return "Entry{" + "nanos=" + nanos + - ", rowKey=" + rowKey + + ", firstRowKey=" + firstRowKey + + ", lastRowKey=" + lastRowKey + '}'; } } @@ -177,27 +213,31 @@ private TimeWindowListener(final String inWindowColumnName, final InWindowColumn // queue; we'll just depend on exponential doubling to get us there if need be this.priorityQueue = new RAPriQueue<>(4096, new RAPriQueue.Adapter<>() { @Override - public boolean less(Entry a, Entry b) { + public boolean less(final Entry a, final Entry b) { return a.nanos < b.nanos; } @Override - public void setPos(Entry el, int pos) { + public void setPos(final Entry el, final int pos) { el.pos = pos; } @Override - public int getPos(Entry el) { + public int getPos(final Entry el) { return el.pos; } }, Entry.class); - this.rowKeyToEntry = new TLongObjectHashMap<>(); + if (source.isAddOnly()) { + this.rowKeyToEntry = null; + } else { + this.rowKeyToEntry = new Long2ObjectAVLTreeMap<>(); + } this.mcsTransformer = source.newModifiedColumnSetTransformer(result, source.getDefinition().getColumnNamesArray()); - this.mcsNewColumns = result.newModifiedColumnSet(inWindowColumnName); - this.reusableModifiedColumnSet = new ModifiedColumnSet(this.mcsNewColumns); + this.mcsSourceTimestamp = source.newModifiedColumnSet(inWindowColumnSource.timeStampName); + this.mcsResultWindowColumn = result.newModifiedColumnSet(inWindowColumnName); } @Override @@ -205,58 +245,124 @@ protected void process() { if (recorder.recordedVariablesAreValid()) { final TableUpdate upstream = recorder.getUpdate(); - // remove the removed indices from the priority queue - removeIndex(upstream.removed()); + // remove the removed row keys from the priority queue + removeRowSet(upstream.removed(), true); // anything that was shifted needs to be placed in the proper slots - try (final RowSet preShiftRowSet = source.getRowSet().copyPrev()) { + try (final WritableRowSet preShiftRowSet = source.getRowSet().copyPrev()) { + preShiftRowSet.remove(upstream.removed()); upstream.shifted().apply((start, end, delta) -> { - final RowSet subRowSet = preShiftRowSet.subSetByKeyRange(start, end); - - final RowSet.SearchIterator it = - delta < 0 ? subRowSet.searchIterator() : subRowSet.reverseIterator(); - while (it.hasNext()) { - final long idx = it.nextLong(); - final Entry entry = rowKeyToEntry.remove(idx); - if (entry != null) { - entry.rowKey = idx + delta; - rowKeyToEntry.put(idx + delta, entry); - } + try (final RowSet subRowSet = preShiftRowSet.subSetByKeyRange(start, end)) { + shiftSubRowset(subRowSet, delta); } }); } - // TODO: improve performance with getChunk - // TODO: reinterpret inWindowColumnSource so that it compares longs instead of objects - // figure out for all the modified row keys if the timestamp or row key changed - upstream.forAllModified((oldIndex, newIndex) -> { - final long currentTimestamp = inWindowColumnSource.timeStampSource.getLong(newIndex); - final long prevTimestamp = inWindowColumnSource.timeStampSource.getPrevLong(oldIndex); - if (currentTimestamp != prevTimestamp) { - updateRow(newIndex, currentTimestamp); + if (upstream.modifiedColumnSet().containsAny(mcsSourceTimestamp)) { + final RowSetBuilderSequential changedTimestampRowsToRemovePost = RowSetFactory.builderSequential(); + final RowSetBuilderSequential changedTimestampRowsToAddPost = RowSetFactory.builderSequential(); + + final int chunkSize = (int) Math.min(upstream.modified().size(), 4096); + + try (final ChunkSource.GetContext prevContext = + inWindowColumnSource.timeStampSource.makeGetContext(chunkSize); + final ChunkSource.GetContext currContext = + inWindowColumnSource.timeStampSource.makeGetContext(chunkSize); + final RowSequence.Iterator prevIt = upstream.getModifiedPreShift().getRowSequenceIterator(); + final RowSequence.Iterator currIt = upstream.modified().getRowSequenceIterator()) { + while (currIt.hasMore()) { + final RowSequence prevRows = prevIt.getNextRowSequenceWithLength(chunkSize); + final RowSequence currRows = currIt.getNextRowSequenceWithLength(chunkSize); + final LongChunk chunkKeys = currRows.asRowKeyChunk(); + final LongChunk prevTimestamps = inWindowColumnSource.timeStampSource + .getPrevChunk(prevContext, prevRows).asLongChunk(); + final LongChunk currTimestamps = + inWindowColumnSource.timeStampSource.getChunk(currContext, currRows).asLongChunk(); + + for (int ii = 0; ii < prevTimestamps.size(); ++ii) { + final long prevTimestamp = prevTimestamps.get(ii); + final long currentTimestamp = currTimestamps.get(ii); + if (currentTimestamp != prevTimestamp) { + final boolean prevInWindow = prevTimestamp != QueryConstants.NULL_LONG + && inWindowColumnSource.computeInWindowUnsafePrev(prevTimestamp); + final boolean curInWindow = currentTimestamp != QueryConstants.NULL_LONG + && inWindowColumnSource.computeInWindowUnsafe(currentTimestamp); + final long rowKey = chunkKeys.get(ii); + if (prevInWindow && curInWindow) { + // we might not have actually reordered anything, if we can check that "easily" + // we should do it to avoid churn and reading from the column, first find the + // entry based on our row key + final LongBidirectionalIterator iterator = + rowKeyToEntry.keySet().iterator(rowKey - 1); + // we have to have an entry, otherwise we would not be in the window + Assert.assertion(iterator.hasNext(), "iterator.hasNext()"); + final Entry foundEntry = rowKeyToEntry.get(iterator.nextLong()); + Assert.neqNull(foundEntry, "foundEntry"); + + if (foundEntry.firstRowKey == rowKey + && foundEntry.lastRowKey == foundEntry.firstRowKey) { + // we should update the nanos for this entry + foundEntry.nanos = currentTimestamp; + priorityQueue.enter(foundEntry); + continue; + } + + /* + * If we want to get fancier, there are some more cases where we could determine + * that there is no need to re-read the data. In particular, we would have to + * know that we have both the previous and next values in our chunk; otherwise + * we would be re-reading data anyway. The counterpoint is that if we are + * actually in those cases, where we are modifying Timestamps that are in the + * window it seems unlikely that the table is going to have consecutive + * timestamp ranges. To encode that logic would be fairly complex, and I think + * not actually worth it. + */ + } + if (prevInWindow) { + changedTimestampRowsToRemovePost.appendKey(rowKey); + } + if (curInWindow) { + changedTimestampRowsToAddPost.appendKey(rowKey); + } + } + } + } } - }); + + // we should have shifted values where relevant above, so we only operate on the new row key + try (final RowSet changedTimestamps = changedTimestampRowsToRemovePost.build()) { + if (changedTimestamps.isNonempty()) { + removeRowSet(changedTimestamps, false); + } + } + try (final RowSet changedTimestamps = changedTimestampRowsToAddPost.build()) { + if (changedTimestamps.isNonempty()) { + addRowSequence(changedTimestamps, rowKeyToEntry != null); + } + } + } // now add the new timestamps - addRowSequence(upstream.added()); + addRowSequence(upstream.added(), rowKeyToEntry != null); + + final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream); - final WritableRowSet downstreamModified = upstream.modified().copy(); try (final RowSet modifiedByTime = recomputeModified()) { if (modifiedByTime.isNonempty()) { - downstreamModified.insert(modifiedByTime); + downstream.modified.writableCast().insert(modifiedByTime); } } // everything that was added, removed, or modified stays added removed or modified - if (downstreamModified.isNonempty()) { - mcsTransformer.clearAndTransform(upstream.modifiedColumnSet(), reusableModifiedColumnSet); - reusableModifiedColumnSet.setAll(mcsNewColumns); + downstream.modifiedColumnSet = result.getModifiedColumnSetForUpdates(); + if (downstream.modified.isNonempty()) { + mcsTransformer.clearAndTransform(upstream.modifiedColumnSet(), downstream.modifiedColumnSet); + downstream.modifiedColumnSet.setAll(mcsResultWindowColumn); } else { - reusableModifiedColumnSet.clear(); + downstream.modifiedColumnSet.clear(); } - result.notifyListeners(new TableUpdateImpl(upstream.added().copy(), upstream.removed().copy(), - downstreamModified, upstream.shifted(), reusableModifiedColumnSet)); + result.notifyListeners(downstream); } else { final RowSet modifiedByTime = recomputeModified(); if (modifiedByTime.isNonempty()) { @@ -265,9 +371,9 @@ protected void process() { downstream.added = RowSetFactory.empty(); downstream.removed = RowSetFactory.empty(); downstream.shifted = RowSetShiftData.EMPTY; - downstream.modifiedColumnSet = reusableModifiedColumnSet; - downstream.modifiedColumnSet().clear(); - downstream.modifiedColumnSet().setAll(mcsNewColumns); + downstream.modifiedColumnSet = result.getModifiedColumnSetForUpdates(); + downstream.modifiedColumnSet.clear(); + downstream.modifiedColumnSet.setAll(mcsResultWindowColumn); result.notifyListeners(downstream); } else { modifiedByTime.close(); @@ -275,79 +381,367 @@ protected void process() { } } - /** - * Handles modified rowSets. If they are outside of the window, they need to be removed from the queue. If they - * are inside the window, they need to be (re)inserted into the queue. - */ - private void updateRow(final long rowKey, long currentTimestamp) { - Entry entry = rowKeyToEntry.remove(rowKey); - if (currentTimestamp == QueryConstants.NULL_LONG) { - if (entry != null) { - priorityQueue.remove(entry); - } - return; - } - if (inWindowColumnSource.computeInWindow(currentTimestamp, inWindowColumnSource.currentTime)) { - if (entry == null) { - entry = new Entry(rowKey, 0); - } - entry.nanos = currentTimestamp; - priorityQueue.enter(entry); - rowKeyToEntry.put(entry.rowKey, entry); - } else if (entry != null) { - priorityQueue.remove(entry); - } - } - /** * If the value of the timestamp is within the window, insert it into the queue and map. * * @param rowSequence the row sequence to insert into the table + * @param tryCombine try to combine newly added ranges with those already in the maps. For initial addition, + * there is nothing to combine with, so we do not spend the time on map lookups. For add-only tables, we + * do not maintain the rowKeyToEntry map, so cannot find adjacent ranges for combination. */ - private void addRowSequence(RowSequence rowSequence) { + private void addRowSequence(RowSequence rowSequence, boolean tryCombine) { final int chunkSize = (int) Math.min(rowSequence.size(), 4096); + Entry pendingEntry = null; + long lastNanos = Long.MAX_VALUE; + try (final ChunkSource.GetContext getContext = inWindowColumnSource.timeStampSource.makeGetContext(chunkSize); - final RowSequence.Iterator okit = rowSequence.getRowSequenceIterator()) { - while (okit.hasMore()) { - final RowSequence chunkOk = okit.getNextRowSequenceWithLength(chunkSize); - final LongChunk keyIndices = chunkOk.asRowKeyChunk(); + final RowSequence.Iterator rsit = rowSequence.getRowSequenceIterator()) { + while (rsit.hasMore()) { + final RowSequence chunkRows = rsit.getNextRowSequenceWithLength(chunkSize); + final LongChunk rowKeys = chunkRows.asRowKeyChunk(); final LongChunk timestampValues = - inWindowColumnSource.timeStampSource.getChunk(getContext, chunkOk).asLongChunk(); - for (int ii = 0; ii < keyIndices.size(); ++ii) { + inWindowColumnSource.timeStampSource.getChunk(getContext, chunkRows).asLongChunk(); + for (int ii = 0; ii < rowKeys.size(); ++ii) { + final long currentRowKey = rowKeys.get(ii); final long currentTimestamp = timestampValues.get(ii); if (currentTimestamp == QueryConstants.NULL_LONG) { + if (pendingEntry != null) { + enter(pendingEntry, lastNanos, tryCombine); + pendingEntry = null; + } continue; } - if (inWindowColumnSource.computeInWindowUnsafe( - currentTimestamp, inWindowColumnSource.currentTime)) { - final Entry el = new Entry(keyIndices.get(ii), currentTimestamp); - priorityQueue.enter(el); - rowKeyToEntry.put(el.rowKey, el); + if (pendingEntry != null && (currentTimestamp < lastNanos + || pendingEntry.lastRowKey + 1 != currentRowKey)) { + enter(pendingEntry, lastNanos, tryCombine); + pendingEntry = null; + } + if (inWindowColumnSource.computeInWindowUnsafe(currentTimestamp)) { + lastNanos = currentTimestamp; + if (pendingEntry == null) { + if (tryCombine) { + // see if this can be combined with the prior entry + final Entry priorEntry = rowKeyToEntry.get(currentRowKey - 1); + if (priorEntry != null && priorEntry.nanos <= currentTimestamp) { + Assert.eq(priorEntry.lastRowKey, "priorEntry.lastRowKey", currentRowKey - 1, + "currentRowKey - 1"); + final boolean canCombine; + if (priorEntry.firstRowKey != priorEntry.lastRowKey) { + final long priorEntryLastNanos = + inWindowColumnSource.timeStampSource.getLong(priorEntry.lastRowKey); + canCombine = priorEntryLastNanos <= currentTimestamp; + } else { + canCombine = true; + } + if (canCombine) { + rowKeyToEntry.remove(currentRowKey - 1); + // Since we might be combining this with an entry later, we should remove it + // so that we don't have extra entries + priorityQueue.remove(priorEntry); + priorEntry.lastRowKey = currentRowKey; + pendingEntry = priorEntry; + continue; + } + } + } + pendingEntry = new Entry(currentRowKey, currentRowKey, currentTimestamp); + } else { + Assert.eq(pendingEntry.lastRowKey, "pendingEntry.lastRowKey", currentRowKey - 1, + "currentRowKey - 1"); + pendingEntry.lastRowKey = currentRowKey; + } + } else { + Assert.eqNull(pendingEntry, "pendingEntry"); } } } + if (pendingEntry != null) { + enter(pendingEntry, lastNanos, tryCombine); + } + } + } + + /** + * Add an entry into the priority queue, and if applicable the reverse map + * + * @param pendingEntry the entry to insert + */ + void enter(@NotNull final Entry pendingEntry) { + priorityQueue.enter(pendingEntry); + if (rowKeyToEntry != null) { + rowKeyToEntry.put(pendingEntry.lastRowKey, pendingEntry); + } + } + + /** + * Insert pendingEntry into the queue and map (if applicable). + * + * @param pendingEntry the entry to insert into our queue and reverse map + * @param lastNanos the final nanosecond value of the pending entry to insert, used to determine if we may + * combine with the next entry + * @param tryCombine true if we should combine values with the next entry, previous entries would have been + * combined during addRowSequence + */ + void enter(@NotNull final Entry pendingEntry, final long lastNanos, final boolean tryCombine) { + if (tryCombine) { + final LongBidirectionalIterator it = rowKeyToEntry.keySet().iterator(pendingEntry.lastRowKey); + if (it.hasNext()) { + final long nextKey = it.nextLong(); + final Entry nextEntry = rowKeyToEntry.get(nextKey); + if (nextEntry.firstRowKey == pendingEntry.lastRowKey + 1 && nextEntry.nanos >= lastNanos) { + // we can combine ourselves into next entry, because it is contiguous and has a timestamp + // greater than or equal to our entries last timestamp + nextEntry.nanos = pendingEntry.nanos; + nextEntry.firstRowKey = pendingEntry.firstRowKey; + priorityQueue.enter(nextEntry); + return; + } + } } + enter(pendingEntry); } /** * If the keys are in the window, remove them from the map and queue. * * @param rowSet the row keys to remove + * @param previous whether to operate in previous space */ - private void removeIndex(final RowSet rowSet) { - rowSet.forAllRowKeys((final long key) -> { - final Entry e = rowKeyToEntry.remove(key); - if (e != null) { - priorityQueue.remove(e); + private void removeRowSet(final RowSet rowSet, final boolean previous) { + if (rowSet.isEmpty()) { + return; + } + Assert.neqNull(rowKeyToEntry, "rowKeyToEntry"); + + RANGE: for (final RowSet.RangeIterator rangeIterator = rowSet.rangeIterator(); rangeIterator.hasNext();) { + rangeIterator.next(); + long start = rangeIterator.currentRangeStart(); + final long end = rangeIterator.currentRangeEnd(); + + // We have some range in the rowSet that is removed. This range (or part thereof) may or may not exist + // in one or more entries. We process from the front of the range to the end of the range, possibly + // advancing the range start. + + while (start <= end) { + // we look for start - 1, so that we will find start if it exists + // https://fastutil.di.unimi.it/docs/it/unimi/dsi/fastutil/longs/LongSortedSet.html#iterator(long) + // "The next element of the returned iterator is the least element of the set that is greater than + // the starting point (if there are no elements greater than the starting point, hasNext() will + // return false)." + final LongBidirectionalIterator reverseMapIterator = rowKeyToEntry.keySet().iterator(start - 1); + // if there is no next, then the reverse map contains no values that are greater than or equal to + // start, we can actually break out of the entire loop + if (!reverseMapIterator.hasNext()) { + break RANGE; + } + + final long entryLastKey = reverseMapIterator.nextLong(); + final Entry entry = rowKeyToEntry.get(entryLastKey); + if (entry.firstRowKey > end) { + // there is nothing here for us + start = entry.lastRowKey + 1; + continue; + } + + // there is some part of our start to end range that could be present in this entry. + if (entry.firstRowKey >= start) { + // we have visually one of the following three situations when start == firstRowKey: + // @formatter:off + // [ RANGE ] + // [ ENTRY ] - the entry exceeds the range ( case a) + // [ ENTRY ] - the whole entry is contained (case b) + // [ ENTRY ] - the entry is a prefix - (case c) + // @formatter:on + + // we have visually one of the following three situations when start > firstRowKey: + // @formatter:off + // [ RANGE ] + // [ ENTRY ] - the entry starts in the middle and terminates after (case a); so we remove a prefix of the entry + // [ ENTRY ] - entry starts in the middle and terminates the at same value (case b); delete the entry + // [ ENTRY ] - this cannot happen based on the search (case c) + // @formatter:on + + if (entry.lastRowKey > end) { // (case a) + // slice off the beginning of the entry + entry.firstRowKey = end + 1; + entry.nanos = previous + ? inWindowColumnSource.timeStampSource.getPrevLong(entry.firstRowKey) + : inWindowColumnSource.timeStampSource.getLong(entry.firstRowKey); + priorityQueue.enter(entry); + } else { // (case b and c) + // we are consuming the entire entry, so can remove it from the queue + reverseMapIterator.remove(); + priorityQueue.remove(entry); + } + // and we look for the next entry after this one + start = entry.lastRowKey + 1; + } else { + // our entry is at least partially before end (because of the check after retrieving it), + // and is after start (because of how we searched in the map). + + // we have visually one of the following three situations: + // @formatter:off + // [ RANGE ] + // [ ENTRY ] - the entry exceeds the range ( case a), we must split into two entries + // [ ENTRY ] - the entry starts before the range but ends with the range (case b); so we remove a suffix of the entry + // [ ENTRY ] - the entry starts before the range and ends inside the range(case c); so we must remove a suffix of the entry + // @formatter:on + + if (entry.lastRowKey > end) { + final Entry frontEntry = new Entry(entry.firstRowKey, start - 1, entry.nanos); + enter(frontEntry); + + entry.firstRowKey = end + 1; + entry.nanos = previous + ? inWindowColumnSource.timeStampSource.getPrevLong(entry.firstRowKey) + : inWindowColumnSource.timeStampSource.getLong(entry.firstRowKey); + priorityQueue.enter(entry); + } else { // case b and c + entry.lastRowKey = start - 1; + reverseMapIterator.remove(); + rowKeyToEntry.put(entry.lastRowKey, entry); + } + } + } + } + } + + private void shiftSubRowset(final RowSet rowSet, final long delta) { + Assert.neqNull(rowKeyToEntry, "rowKeyToEntry"); + + // We need to be careful about reinserting entries into the correct order, if we are traversing forward, + // then we need to add the entries in opposite order to avoid overwriting another entry. We remove the + // entries + // in the loop, and if entriesToInsert is non-null add them to the list. If entriesToInsert is null, then + // we add them to the map. + final List entriesToInsert = delta > 0 ? new ArrayList<>() : null; + + RANGE: for (final RowSet.RangeIterator rangeIterator = rowSet.rangeIterator(); rangeIterator.hasNext();) { + rangeIterator.next(); + long start = rangeIterator.currentRangeStart(); + final long end = rangeIterator.currentRangeEnd(); + + // We have some range in the rowSet that has been moved about. This range (or part thereof) may or may + // not exist in one or more entries. We process from the front of the range to the end of the range, + // possibly advancing the range start. + + while (start <= end) { + // we look for start - 1, so that we will find start if it exists + // https://fastutil.di.unimi.it/docs/it/unimi/dsi/fastutil/longs/LongSortedSet.html#iterator(long) + // "The next element of the returned iterator is the least element of the set that is greater than + // the starting point (if there are no elements greater than the starting point, hasNext() will + // return false)." + final LongBidirectionalIterator reverseMapIterator = rowKeyToEntry.keySet().iterator(start - 1); + // if there is no next, then the reverse map contains no values that are greater than or equal to + // start, we can actually break out of the entire loop + if (!reverseMapIterator.hasNext()) { + break RANGE; + } + + final long entryLastKey = reverseMapIterator.nextLong(); + final Entry entry = rowKeyToEntry.get(entryLastKey); + if (entry.firstRowKey > end) { + // there is nothing here for us + start = entry.lastRowKey + 1; + continue; + } + + // there is some part of our start to end range that could be present in this entry. + if (entry.firstRowKey >= start) { + + // @formatter:off + // we have visually one of the following three situations when start == firstRowKey: + // [ RANGE ] + // [ ENTRY ] - the entry exceeds the range ( case a) + // [ ENTRY ] - the whole entry is contained (case b) + // [ ENTRY ] - the entry is a prefix - (case c) + + // we have visually one of the following three situations when start > firstRowKey: + // [ RANGE ] + // [ ENTRY ] - the entry starts in the middle and terminates after (case a) + // [ ENTRY ] - entry starts in the middle and terminates the at same value (case b) + // [ ENTRY ] - this cannot happen based on the search (case c) + // @formatter:on + + // we look for the next entry after this one, but need to make sure to keep that happening in + // pre-shift space + start = entry.lastRowKey + 1; + + if (entry.lastRowKey > end) { // (case a) + // slice off the beginning of the entry, creating a new entry for the shift + final Entry newEntry = new Entry(entry.firstRowKey + delta, end + delta, entry.nanos); + + entry.firstRowKey = end + 1; + entry.nanos = inWindowColumnSource.timeStampSource.getPrevLong(entry.firstRowKey); + priorityQueue.enter(entry); + priorityQueue.enter(newEntry); + + addOrDeferEntry(entriesToInsert, newEntry); + } else { // (case b and c) + // we are consuming the entire entry, so can leave it in the queue as is, but need to change + // its reverse mapping + entry.firstRowKey += delta; + entry.lastRowKey += delta; + reverseMapIterator.remove(); + addOrDeferEntry(entriesToInsert, entry); + } + } else { + // our entry is at least partially before end (because of the check after retrieving it), + // and is after start (because of how we searched in the map). + + // we have visually one of the following three situations: + // @formatter:off + // [ RANGE ] + // [ ENTRY ] - the entry exceeds the range ( case a), we must split into three entries; + // but we would be splatting over stuff, so this is not permitted in a reasonable shift + // [ ENTRY ] - the entry starts before the range but ends with the range (case b) + // [ ENTRY ] - the entry starts before the range and ends inside the range(case c) + // @formatter:on + + if (entry.lastRowKey > end) { + throw new IllegalStateException(); + } else { // case b and c + + final long backNanos = inWindowColumnSource.timeStampSource.getPrevLong(start); + final Entry backEntry = new Entry(start + delta, entry.lastRowKey + delta, backNanos); + priorityQueue.enter(backEntry); + + // the nanos stays the same, so entry just needs an adjust last rowSet and the reverse map + entry.lastRowKey = start - 1; + + // by reinserting, we preserve the things that we have not changed to enable us to find them + // in the rest of the processing + reverseMapIterator.remove(); + rowKeyToEntry.put(entry.lastRowKey, entry); + + addOrDeferEntry(entriesToInsert, backEntry); + } + } + } + } + if (entriesToInsert != null) { + for (int ii = entriesToInsert.size() - 1; ii >= 0; ii--) { + final Entry entry = entriesToInsert.get(ii); + rowKeyToEntry.put(entry.lastRowKey, entry); } - }); + } + } + + private void addOrDeferEntry(final List entriesToInsert, final Entry entry) { + if (entriesToInsert == null) { + rowKeyToEntry.put(entry.lastRowKey, entry); + } else { + entriesToInsert.add(entry); + } } /** * Pop elements out of the queue until we find one that is in the window. * + *

* Send a modification to the resulting table. + *

*/ @Override public void run() { @@ -364,14 +758,44 @@ private RowSet recomputeModified() { break; } - if (inWindowColumnSource.computeInWindowUnsafe(entry.nanos, inWindowColumnSource.currentTime)) { + if (inWindowColumnSource.computeInWindowUnsafe(entry.nanos)) { break; - } else { - // take it out of the queue, and mark it as modified - final Entry taken = priorityQueue.removeTop(); - Assert.equals(entry, "entry", taken, "taken"); - builder.addKey(entry.rowKey); - rowKeyToEntry.remove(entry.rowKey); + } + + // take it out of the queue, and mark it as modified + final Entry taken = priorityQueue.removeTop(); + Assert.equals(entry, "entry", taken, "taken"); + + + // now scan the rest of the entry, which requires reading from the timestamp source; + // this would ideally be done as a chunk, reusing the context + long newFirst = entry.firstRowKey + 1; + if (newFirst <= entry.lastRowKey) { + try (final RowSequence rowSequence = + RowSequenceFactory.forRange(entry.firstRowKey + 1, entry.lastRowKey); + final CloseablePrimitiveIteratorOfLong timestampIterator = + new ChunkedLongColumnIterator(inWindowColumnSource.timeStampSource, + rowSequence)) { + while (newFirst <= entry.lastRowKey) { + final long nanos = timestampIterator.nextLong(); + if (inWindowColumnSource.computeInWindowUnsafe(nanos)) { + // nothing more to do, we've passed out of the window, note the new nanos for this entry + entry.nanos = nanos; + break; + } + ++newFirst; + } + } + } + + builder.addRange(entry.firstRowKey, newFirst - 1); + + // if anything is left, we need to reinsert it into the priority queue + if (newFirst <= entry.lastRowKey) { + entry.firstRowKey = newFirst; + priorityQueue.enter(entry); + } else if (rowKeyToEntry != null) { + rowKeyToEntry.remove(entry.lastRowKey); } } @@ -384,16 +808,78 @@ void validateQueue() { final Entry[] entries = new Entry[priorityQueue.size()]; priorityQueue.dump(entries, 0); - Arrays.stream(entries).mapToLong(entry -> entry.rowKey).forEach(builder::addKey); + + if (rowKeyToEntry != null && entries.length != rowKeyToEntry.size()) { + dumpQueue(); + Assert.eq(entries.length, "entries.length", rowKeyToEntry.size(), "rowKeyToEntry.size()"); + } + + long entrySize = 0; + for (final Entry entry : entries) { + builder.addRange(entry.firstRowKey, entry.lastRowKey); + entrySize += (entry.lastRowKey - entry.firstRowKey + 1); + if (rowKeyToEntry != null) { + final Entry check = rowKeyToEntry.get(entry.lastRowKey); + if (check != entry) { + dumpQueue(); + Assert.equals(check, "check", entry, "entry"); + } + } + // validate that the entry is non-descending + if (entry.lastRowKey > entry.firstRowKey) { + long lastNanos = inWindowColumnSource.timeStampSource.getLong(entry.firstRowKey); + for (long rowKey = entry.firstRowKey + 1; rowKey <= entry.lastRowKey; ++rowKey) { + long nanos = inWindowColumnSource.timeStampSource.getLong(rowKey); + if (nanos < lastNanos) { + dumpQueue(); + Assert.geq(nanos, "nanos at " + rowKey, lastNanos, "lastNanos"); + } + lastNanos = nanos; + } + } + } final RowSet inQueue = builder.build(); - Assert.eq(inQueue.size(), "inQueue.size()", priorityQueue.size(), "priorityQueue.size()"); + Assert.eq(inQueue.size(), "inQueue.size()", entrySize, "entrySize"); + final boolean condition = inQueue.subsetOf(resultRowSet); if (!condition) { + dumpQueue(); // noinspection ConstantConditions Assert.assertion(condition, "inQueue.subsetOf(resultRowSet)", inQueue, "inQueue", resultRowSet, "resultRowSet", inQueue.minus(resultRowSet), "inQueue.minus(resultRowSet)"); } + + // Verify that the size of inQueue is equal to the number of values in the window + final RowSetBuilderSequential inWindowBuilder = RowSetFactory.builderSequential(); + try (final CloseablePrimitiveIteratorOfLong valueIt = + new ChunkedLongColumnIterator(inWindowColumnSource.timeStampSource, source.getRowSet())) { + source.getRowSet().forAllRowKeys(key -> { + long value = valueIt.nextLong(); + if (value != QueryConstants.NULL_LONG && inWindowColumnSource.computeInWindowUnsafe(value)) { + inWindowBuilder.appendKey(key); + } + }); + } + try (final RowSet rowsInWindow = inWindowBuilder.build()) { + Assert.equals(rowsInWindow, "rowsInWindow", inQueue, "inQueue"); + } + } + + void dumpQueue() { + final Entry[] entries = new Entry[priorityQueue.size()]; + priorityQueue.dump(entries, 0); + System.out.println("Queue size: " + entries.length); + for (final Entry entry : entries) { + System.out.println(entry); + } + + if (rowKeyToEntry != null) { + System.out.println("Map size: " + rowKeyToEntry.size()); + for (final Long2ObjectMap.Entry x : rowKeyToEntry.long2ObjectEntrySet()) { + System.out.println(x.getLongKey() + ": " + x.getValue()); + } + } } @Override @@ -422,24 +908,31 @@ private static class InWindowColumnSource extends AbstractColumnSource implements MutableColumnSourceGetDefaults.ForBoolean { private final long windowNanos; private final ColumnSource timeStampSource; + private final String timeStampName; - private long prevTime; - private long currentTime; + private long prevTime = 0; + private long currentTime = 0; private long clockStep; private final long initialStep; InWindowColumnSource(Table table, String timestampColumn, long windowNanos) { super(Boolean.class); this.windowNanos = windowNanos; + this.timeStampName = timestampColumn; clockStep = updateGraph.clock().currentStep(); initialStep = clockStep; - final ColumnSource timeStampSource = table.getColumnSource(timestampColumn); - if (!Instant.class.isAssignableFrom(timeStampSource.getType())) { - throw new IllegalArgumentException(timestampColumn + " is not of type Instant!"); + final ColumnSource timeStampSource = table.getColumnSource(timestampColumn); + final ColumnSource reinterpreted = ReinterpretUtils.maybeConvertToPrimitive(timeStampSource); + Class timestampType = reinterpreted.getType(); + if (timestampType == long.class) { + // noinspection unchecked + this.timeStampSource = (ColumnSource) reinterpreted; + } else { + throw new IllegalArgumentException("The timestamp column, " + timestampColumn + + ", cannot be interpreted as a long, it should be a supported time type (e.g. long, Instant, ZonedDateTime...)"); } - this.timeStampSource = ReinterpretUtils.instantToLongSource(timeStampSource); } /** @@ -480,6 +973,14 @@ private boolean computeInWindowUnsafe(long tableNanos, long time) { return (time - tableNanos) < windowNanos; } + private boolean computeInWindowUnsafe(long tableNanos) { + return computeInWindowUnsafe(tableNanos, currentTime); + } + + private boolean computeInWindowUnsafePrev(long tableNanos) { + return computeInWindowUnsafe(tableNanos, timeStampForPrev()); + } + @Override public boolean isImmutable() { return false; diff --git a/engine/table/src/test/java/io/deephaven/engine/util/TestWindowCheck.java b/engine/table/src/test/java/io/deephaven/engine/util/TestWindowCheck.java index 26d26b477d3..4d22e210975 100644 --- a/engine/table/src/test/java/io/deephaven/engine/util/TestWindowCheck.java +++ b/engine/table/src/test/java/io/deephaven/engine/util/TestWindowCheck.java @@ -5,7 +5,11 @@ import io.deephaven.base.Pair; import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.context.QueryScope; import io.deephaven.engine.rowset.RowSet; +import io.deephaven.engine.rowset.RowSetFactory; +import io.deephaven.engine.rowset.TrackingWritableRowSet; +import io.deephaven.engine.rowset.WritableRowSet; import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableUpdate; @@ -13,6 +17,7 @@ import io.deephaven.engine.table.impl.InstrumentedTableUpdateListener; import io.deephaven.engine.table.impl.QueryTable; import io.deephaven.engine.table.impl.TableUpdateValidator; +import io.deephaven.engine.table.impl.util.RuntimeMemory; import io.deephaven.engine.testutil.*; import io.deephaven.engine.testutil.generator.IntGenerator; import io.deephaven.engine.testutil.generator.UnsortedInstantGenerator; @@ -27,10 +32,12 @@ import org.junit.Test; import org.junit.experimental.categories.Category; +import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; import java.time.Instant; import java.util.Arrays; +import java.util.Collections; import java.util.Map; import java.util.Random; import java.util.stream.Stream; @@ -38,6 +45,7 @@ import static io.deephaven.engine.testutil.TstUtils.*; import static io.deephaven.engine.util.TableTools.col; import static io.deephaven.engine.util.TableTools.intCol; +import static org.junit.jupiter.api.Assertions.assertTrue; @Category(OutOfBandTest.class) public class TestWindowCheck { @@ -47,21 +55,34 @@ public class TestWindowCheck { @Test public void testWindowCheckIterative() { for (int seed = 0; seed < 1; ++seed) { - testWindowCheckIterative(seed); + testWindowCheckIterative(seed, true); + } + } + + @Test + public void testWindowCheckIterativeNoShifts() { + for (int seed = 0; seed < 1; ++seed) { + testWindowCheckIterative(seed, false); } } /** * Run a window check over the course of a simulated day. * + *

* We have a Timestamp column and a sentinel column. + *

* + *

* Time advances by one second per step, which randomly modifies the source table. + *

* + *

* The WindowEvalNugget verifies the original columns are unchanged and that the value of the InWindow column is * correct. A prev checker is added to ensure that getPrev works on the new table. + *

*/ - private void testWindowCheckIterative(int seed) { + private void testWindowCheckIterative(int seed, boolean withShifts) { final Random random = new Random(seed); final Random combinedRandom = new Random(seed); @@ -103,15 +124,21 @@ private void testWindowCheckIterative(int seed) { ++step; final boolean combined = combinedRandom.nextBoolean(); + final GenerateTableUpdates.SimulationProfile profile = + withShifts ? GenerateTableUpdates.DEFAULT_PROFILE : GenerateTableUpdates.NO_SHIFT_PROFILE; + if (combined) { + if (RefreshingTableTestCase.printTableUpdates) { + System.out.println("Combined Step " + step); + } updateGraph.runWithinUnitTestCycle(() -> { - advanceTime(clock, en); - GenerateTableUpdates.generateShiftAwareTableUpdates(GenerateTableUpdates.DEFAULT_PROFILE, size, + advanceTime(clock, en, 5 * DateTimeUtils.SECOND); + GenerateTableUpdates.generateShiftAwareTableUpdates(profile, size, random, table, columnInfo); }); TstUtils.validate("Step " + step, en); } else { - updateGraph.runWithinUnitTestCycle(() -> advanceTime(clock, en)); + updateGraph.runWithinUnitTestCycle(() -> advanceTime(clock, en, 5 * DateTimeUtils.SECOND)); if (RefreshingTableTestCase.printTableUpdates) { TstUtils.validate("Step = " + step + " time = " + DateTimeUtils.epochNanosToInstant(clock.now), en); } @@ -120,15 +147,17 @@ private void testWindowCheckIterative(int seed) { if (RefreshingTableTestCase.printTableUpdates) { System.out.println("Step " + step + "-" + ii); } - RefreshingTableTestCase.simulateShiftAwareStep(step + "-" + ii, stepSize, random, table, columnInfo, + RefreshingTableTestCase.simulateShiftAwareStep(profile, step + "-" + ii, stepSize, random, table, + columnInfo, en); + TstUtils.validate("Step " + step + "-" + ii, en); } } } } - private void advanceTime(TestClock clock, WindowEvalNugget[] en) { - clock.now += 5 * DateTimeUtils.SECOND; + private void advanceTime(final TestClock clock, final WindowEvalNugget[] en, final long nanosToAdvance) { + clock.now += nanosToAdvance; if (RefreshingTableTestCase.printTableUpdates) { System.out.println("Ticking time to " + DateTimeUtils.epochNanosToInstant(clock.now)); } @@ -154,7 +183,7 @@ public void testWindowCheckEmptyInitial() { () -> WindowCheck.addTimeWindowInternal(clock, tableToCheck, "Timestamp", DateTimeUtils.SECOND * 60, "InWindow", false)); - TableTools.showWithRowSet(windowed.first); + TableTools.showWithRowSet(windowed.first, 200); updateGraph.runWithinUnitTestCycle(windowed.second::run); @@ -374,7 +403,212 @@ public void validate(String msg) { @Override public void show() { - TableTools.show(windowed.first); + TableTools.showWithRowSet(windowed.first, 200); + windowed.second.dumpQueue(); } } + + @Test + public void testMemoryUsageInWindow() throws IOException { + final TestClock timeProvider = new TestClock(); + final Instant startTime = DateTimeUtils.parseInstant("2022-07-14T09:30:00 NY"); + timeProvider.now = DateTimeUtils.epochNanos(startTime); + + QueryScope.addParam("startTime", startTime); + + final QueryTable inputTable = + (QueryTable) TableTools.emptyTable(100_000_000).updateView("Timestamp = startTime"); + inputTable.setRefreshing(true); + System.gc(); + final RuntimeMemory.Sample sample = new RuntimeMemory.Sample(); + RuntimeMemory.getInstance().read(sample); + final long memStart = sample.totalMemory - sample.freeMemory; + System.out.println("Start Memory: " + memStart); + final Pair withCheck = WindowCheck.addTimeWindowInternal(timeProvider, + inputTable, + "Timestamp", + 60 * DateTimeUtils.SECOND, + "InLastXSeconds", + false); + System.gc(); + RuntimeMemory.getInstance().read(sample); + final long memEnd = sample.totalMemory - sample.freeMemory; + System.out.println("End Memory: " + memEnd); + final long memChange = memEnd - memStart; + System.out.println("Change: " + memChange); + assertTrue(memChange < 100_000_000); // this previously would require about 645MB, so we're doing better + assertTableEquals(inputTable.updateView("InLastXSeconds=true"), withCheck.first); + } + + @Test + public void testSequentialRanges() throws IOException { + final TestClock timeProvider = new TestClock(); + final Instant startTime = DateTimeUtils.parseInstant("2022-07-14T09:30:00 NY"); + // start about three minutes in so we'll take things off more directly + timeProvider.now = DateTimeUtils.epochNanos(startTime) + 180 * 1_000_000_000L; + + QueryScope.addParam("startTime", startTime); + + // each row is 10ms, we have 50_000 rows so the span of the table is 500 seconds + final Table inputRanges = TableTools.emptyTable(50_000).updateView("Timestamp = startTime + (ii * 10_000_000)"); + ((QueryTable) inputRanges).setRefreshing(true); + + final Table[] duplicated = new Table[10]; + Arrays.fill(duplicated, inputRanges); + final Table inputTable = TableTools.merge(duplicated); + + final WindowEvalNugget[] en; + final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + updateGraph.exclusiveLock().lock(); + try { + en = new WindowEvalNugget[] { + new WindowEvalNugget(timeProvider, (QueryTable) inputTable) + }; + } finally { + updateGraph.exclusiveLock().unlock(); + } + + int step = 0; + while (timeProvider.now < DateTimeUtils.epochNanos(startTime) + 600 * DateTimeUtils.SECOND) { + step++; + updateGraph.runWithinUnitTestCycle(() -> advanceTime(timeProvider, en, 5 * DateTimeUtils.SECOND)); + TstUtils.validate("Step " + step, en); + } + } + + @Test + public void testSequentialRangesAddOnly() throws IOException { + final TestClock timeProvider = new TestClock(); + final Instant startTime = DateTimeUtils.parseInstant("2022-07-14T09:30:00 NY"); + // start about three minutes in so we'll take things off more directly + timeProvider.now = DateTimeUtils.epochNanos(startTime) + 180 * 1_000_000_000L; + final long regionSize = 1_000_000L; + + QueryScope.addParam("startTime", startTime); + QueryScope.addParam("regionSize", regionSize); + + final TrackingWritableRowSet inputRowSet = RowSetFactory.fromRange(0, 9999).toTracking(); + + inputRowSet.insertRange(regionSize, regionSize + 9_999); + final QueryTable rowsetTable = TstUtils.testRefreshingTable(inputRowSet); + // each chunk of 10_000 rows should account for one minute, or 60_000_000_000 / 10_000 = 6_000_000 nanos per row + // we start 3 minutes behind the start, so everything is in the five-minute window + final Table inputTable = rowsetTable.updateView("Timestamp = startTime + ((k % regionSize) * 6_000_000)") + .withAttributes(Collections.singletonMap(Table.ADD_ONLY_TABLE_ATTRIBUTE, true)); + + final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + + final WindowEvalNugget[] en; + final PrintListener pl; + updateGraph.exclusiveLock().lock(); + try { + en = new WindowEvalNugget[] { + new WindowEvalNugget(timeProvider, (QueryTable) inputTable) + }; + pl = new PrintListener("windowed", en[0].windowed.first, 0); + } finally { + updateGraph.exclusiveLock().unlock(); + } + + for (int step = 1; step < 10; ++step) { + final int fstep = step; + updateGraph.runWithinUnitTestCycle(() -> { + final WritableRowSet added = + RowSetFactory.fromRange(fstep * 10_000, fstep * 10_000 + 9_999); + added.insertRange(fstep * 10_000 + regionSize, fstep * 10_000 + 9_999 + regionSize); + rowsetTable.getRowSet().writableCast().insert(added); + rowsetTable.notifyListeners(added, i(), i()); + advanceTime(timeProvider, en, fstep * DateTimeUtils.MINUTE); + }); + TstUtils.validate("Step " + step, en); + } + } + + @Test + public void testCombination() { + final TestClock timeProvider = new TestClock(); + final Instant startTime = DateTimeUtils.parseInstant("2024-02-28T09:29:01 NY"); + timeProvider.now = DateTimeUtils.epochNanos(startTime); + + final Instant[] initialValues = Arrays.stream( + new String[] {"2024-02-28T09:25:00 NY", "2024-02-28T09:27:00 NY"}) + .map(DateTimeUtils::parseInstant).toArray(Instant[]::new); + final QueryTable tableToCheck = testRefreshingTable(i(0, 1).toTracking(), + col("Timestamp", initialValues), + intCol("Sentinel", 1, 2)); + + final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + final Pair windowed = updateGraph.sharedLock().computeLocked( + () -> WindowCheck.addTimeWindowInternal( + timeProvider, tableToCheck, "Timestamp", DateTimeUtils.MINUTE * 5, "InWindow", false)); + + TableTools.showWithRowSet(windowed.first); + + updateGraph.runWithinUnitTestCycle(windowed.second::run); + assertTableEquals(tableToCheck.updateView("InWindow = true"), windowed.first); + + updateGraph.runWithinUnitTestCycle(() -> { + final Instant[] newValue = new Instant[] {DateTimeUtils.parseInstant("2024-02-28T09:26:00 NY")}; + TstUtils.addToTable(tableToCheck, i(2), col("Timestamp", newValue), intCol("Sentinel", 3)); + tableToCheck.notifyListeners(i(2), i(), i()); + }); + TableTools.showWithRowSet(windowed.first); + windowed.second.validateQueue(); + assertTableEquals(tableToCheck.updateView("InWindow = true"), windowed.first); + + // advance the clock to make sure we actually work + timeProvider.now += DateTimeUtils.MINUTE; // 9:30:01, passing the first entry out of the window + updateGraph.runWithinUnitTestCycle(windowed.second::run); + TableTools.showWithRowSet(windowed.first); + assertTableEquals(tableToCheck.updateView("InWindow = Sentinel > 1"), windowed.first); + + timeProvider.now += DateTimeUtils.MINUTE; // 9:32:01, passing the second entry out of the window + updateGraph.runWithinUnitTestCycle(windowed.second::run); + TableTools.showWithRowSet(windowed.first); + assertTableEquals(tableToCheck.updateView("InWindow = Sentinel == 2"), windowed.first); + + timeProvider.now += DateTimeUtils.MINUTE; // 9:33:01, passing the all entries out of the window + updateGraph.runWithinUnitTestCycle(windowed.second::run); + TableTools.showWithRowSet(windowed.first); + assertTableEquals(tableToCheck.updateView("InWindow = false"), windowed.first); + } + + @Test + public void testCombination2() { + final TestClock timeProvider = new TestClock(); + final Instant startTime = DateTimeUtils.parseInstant("2024-02-28T09:29:01 NY"); + timeProvider.now = DateTimeUtils.epochNanos(startTime); + + final Instant[] initialValues = Arrays.stream( + new String[] {"2024-02-28T09:25:00 NY", "2024-02-28T09:27:00 NY", "2024-02-28T09:26:00 NY"}) + .map(DateTimeUtils::parseInstant).toArray(Instant[]::new); + final QueryTable tableToCheck = testRefreshingTable(i(0, 1, 2).toTracking(), + col("Timestamp", initialValues), + intCol("Sentinel", 1, 2, 3)); + + final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + final Pair windowed = updateGraph.sharedLock().computeLocked( + () -> WindowCheck.addTimeWindowInternal( + timeProvider, tableToCheck, "Timestamp", DateTimeUtils.MINUTE * 5, "InWindow", false)); + + TableTools.showWithRowSet(windowed.first); + assertTableEquals(tableToCheck.updateView("InWindow = true"), windowed.first); + windowed.second.validateQueue(); + + // advance the clock to make sure we actually work + timeProvider.now += DateTimeUtils.MINUTE; // 9:30:01, passing the first entry out of the window + updateGraph.runWithinUnitTestCycle(windowed.second::run); + TableTools.showWithRowSet(windowed.first); + assertTableEquals(tableToCheck.updateView("InWindow = Sentinel > 1"), windowed.first); + + timeProvider.now += DateTimeUtils.MINUTE; // 9:32:01, passing the second entry out of the window + updateGraph.runWithinUnitTestCycle(windowed.second::run); + TableTools.showWithRowSet(windowed.first); + assertTableEquals(tableToCheck.updateView("InWindow = Sentinel == 2"), windowed.first); + + timeProvider.now += DateTimeUtils.MINUTE; // 9:33:01, passing the all entries out of the window + updateGraph.runWithinUnitTestCycle(windowed.second::run); + TableTools.showWithRowSet(windowed.first); + assertTableEquals(tableToCheck.updateView("InWindow = false"), windowed.first); + } } diff --git a/engine/test-utils/build.gradle b/engine/test-utils/build.gradle index 743e05a1dc5..bb3b8f172ca 100644 --- a/engine/test-utils/build.gradle +++ b/engine/test-utils/build.gradle @@ -18,7 +18,7 @@ dependencies { implementation depCommonsLang3 implementation depTrove3 - implementation 'it.unimi.dsi:fastutil:8.5.11' + implementation 'it.unimi.dsi:fastutil:8.5.13' Classpaths.inheritJUnitClassic(project, 'implementation') Classpaths.inheritJUnitPlatform(project, 'implementation') diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/GenerateTableUpdates.java b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/GenerateTableUpdates.java index 9e6daaea98c..b046fce019e 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/GenerateTableUpdates.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/GenerateTableUpdates.java @@ -144,6 +144,15 @@ private void validateGroup(int... opts) { public static final SimulationProfile DEFAULT_PROFILE = new SimulationProfile(); + public static final SimulationProfile NO_SHIFT_PROFILE = + new SimulationProfile() { + { + SHIFT_10_PERCENT_KEY_SPACE = 0; + SHIFT_10_PERCENT_POS_SPACE = 0; + SHIFT_AGGRESSIVELY = 0; + } + }; + public static void generateShiftAwareTableUpdates(final SimulationProfile profile, final int targetUpdateSize, final Random random, final QueryTable table, final ColumnInfo[] columnInfo) { diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/QueryTableTestBase.java b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/QueryTableTestBase.java index 6121df0b38e..b345b4f8c8e 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/QueryTableTestBase.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/QueryTableTestBase.java @@ -23,21 +23,13 @@ public abstract class QueryTableTestBase extends RefreshingTableTestCase { public final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd"); - private static final GenerateTableUpdates.SimulationProfile NO_SHIFT_PROFILE = - new GenerateTableUpdates.SimulationProfile() { - { - SHIFT_10_PERCENT_KEY_SPACE = 0; - SHIFT_10_PERCENT_POS_SPACE = 0; - SHIFT_AGGRESSIVELY = 0; - } - }; - public final JoinIncrement leftStep = new JoinIncrement() { @Override public void step(int leftSize, int rightSize, QueryTable leftTable, QueryTable rightTable, ColumnInfo[] leftColumnInfo, ColumnInfo[] rightColumnInfo, EvalNuggetInterface[] en, Random random) { - simulateShiftAwareStep(NO_SHIFT_PROFILE, toString(), leftSize, random, leftTable, leftColumnInfo, en); + simulateShiftAwareStep(GenerateTableUpdates.NO_SHIFT_PROFILE, toString(), leftSize, random, leftTable, + leftColumnInfo, en); } @Override @@ -63,7 +55,8 @@ public String toString() { public void step(int leftSize, int rightSize, QueryTable leftTable, QueryTable rightTable, ColumnInfo[] leftColumnInfo, ColumnInfo[] rightColumnInfo, EvalNuggetInterface[] en, Random random) { - simulateShiftAwareStep(NO_SHIFT_PROFILE, toString(), rightSize, random, rightTable, rightColumnInfo, en); + simulateShiftAwareStep(GenerateTableUpdates.NO_SHIFT_PROFILE, toString(), rightSize, random, rightTable, + rightColumnInfo, en); } @Override @@ -89,8 +82,10 @@ public String toString() { public void step(int leftSize, int rightSize, QueryTable leftTable, QueryTable rightTable, ColumnInfo[] leftColumnInfo, ColumnInfo[] rightColumnInfo, EvalNuggetInterface[] en, Random random) { - simulateShiftAwareStep(NO_SHIFT_PROFILE, toString(), leftSize, random, leftTable, leftColumnInfo, en); - simulateShiftAwareStep(NO_SHIFT_PROFILE, toString(), rightSize, random, rightTable, rightColumnInfo, en); + simulateShiftAwareStep(GenerateTableUpdates.NO_SHIFT_PROFILE, toString(), leftSize, random, leftTable, + leftColumnInfo, en); + simulateShiftAwareStep(GenerateTableUpdates.NO_SHIFT_PROFILE, toString(), rightSize, random, rightTable, + rightColumnInfo, en); } @Override diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/testcase/RefreshingTableTestCase.java b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/testcase/RefreshingTableTestCase.java index 813cd115d5e..44fdb6611cd 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/testcase/RefreshingTableTestCase.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/testcase/RefreshingTableTestCase.java @@ -160,7 +160,7 @@ public static void simulateShiftAwareStep(final String ctxt, int targetUpdateSiz en); } - protected static void simulateShiftAwareStep(final GenerateTableUpdates.SimulationProfile simulationProfile, + public static void simulateShiftAwareStep(final GenerateTableUpdates.SimulationProfile simulationProfile, final String ctxt, int targetUpdateSize, Random random, QueryTable table, ColumnInfo[] columnInfo, EvalNuggetInterface[] en) { final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();