Skip to content

Commit

Permalink
Port changes from legacy.
Browse files Browse the repository at this point in the history
  • Loading branch information
cpwright committed Feb 26, 2024
1 parent 759c0f0 commit c1782f0
Show file tree
Hide file tree
Showing 7 changed files with 688 additions and 136 deletions.
1 change: 1 addition & 0 deletions engine/table/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies {
implementation 'com.tdunning:t-digest:3.2'
implementation 'com.squareup:javapoet:1.13.0'
implementation 'io.github.classgraph:classgraph:4.8.165'
implementation 'it.unimi.dsi:fastutil:8.5.13'

implementation project(':plugin')
implementation depCommonsLang3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.util.List;

/**
* This will filter a table for the most recent N nanoseconds (must be on a date time column).
* This will filter a table for the most recent N nanoseconds (must be on an {@link Instant} column).
*
* <p>Note, this filter rescans the source table. You should prefer to use {@link io.deephaven.engine.util.WindowCheck} instead.</p.
*/
public class TimeSeriesFilter
extends WhereFilterLivenessArtifactImpl
Expand Down
638 changes: 528 additions & 110 deletions engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java

Large diffs are not rendered by default.

151 changes: 141 additions & 10 deletions engine/table/src/test/java/io/deephaven/engine/util/TestWindowCheck.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@

import io.deephaven.base.Pair;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.context.QueryScope;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.TrackingWritableRowSet;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.TableUpdateListener;
import io.deephaven.engine.table.impl.InstrumentedTableUpdateListener;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.table.impl.TableUpdateValidator;
import io.deephaven.engine.table.impl.util.RuntimeMemory;
import io.deephaven.engine.testutil.*;
import io.deephaven.engine.testutil.generator.IntGenerator;
import io.deephaven.engine.testutil.generator.UnsortedInstantGenerator;
Expand All @@ -27,17 +31,20 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Random;
import java.util.stream.Stream;

import static io.deephaven.engine.testutil.TstUtils.*;
import static io.deephaven.engine.util.TableTools.col;
import static io.deephaven.engine.util.TableTools.intCol;
import static org.junit.jupiter.api.Assertions.assertTrue;

@Category(OutOfBandTest.class)
public class TestWindowCheck {
Expand All @@ -46,8 +53,15 @@ public class TestWindowCheck {

@Test
public void testWindowCheckIterative() {
for (int seed = 0; seed < 10; ++seed) {
testWindowCheckIterative(seed, true);
}
}

@Test
public void testWindowCheckIterativeNoShifts() {
for (int seed = 0; seed < 1; ++seed) {
testWindowCheckIterative(seed);
testWindowCheckIterative(seed, false);
}
}

Expand All @@ -61,12 +75,12 @@ public void testWindowCheckIterative() {
* The WindowEvalNugget verifies the original columns are unchanged and that the value of the InWindow column is
* correct. A prev checker is added to ensure that getPrev works on the new table.
*/
private void testWindowCheckIterative(int seed) {
private void testWindowCheckIterative(int seed, boolean withShifts) {
final Random random = new Random(seed);
final Random combinedRandom = new Random(seed);

final ColumnInfo<?, ?>[] columnInfo;
final int size = 100;
final int size = 10;
final Instant startTime = DateTimeUtils.parseInstant("2018-02-23T09:30:00 NY");
final Instant endTime;
if (SHORT_TESTS) {
Expand Down Expand Up @@ -103,15 +117,17 @@ private void testWindowCheckIterative(int seed) {
++step;
final boolean combined = combinedRandom.nextBoolean();

final GenerateTableUpdates.SimulationProfile profile = withShifts ? GenerateTableUpdates.DEFAULT_PROFILE : GenerateTableUpdates.NO_SHIFT_PROFILE;

if (combined) {
updateGraph.runWithinUnitTestCycle(() -> {
advanceTime(clock, en);
GenerateTableUpdates.generateShiftAwareTableUpdates(GenerateTableUpdates.DEFAULT_PROFILE, size,
advanceTime(clock, en, 5 * DateTimeUtils.SECOND);
GenerateTableUpdates.generateShiftAwareTableUpdates(profile, size,
random, table, columnInfo);
});
TstUtils.validate("Step " + step, en);
} else {
updateGraph.runWithinUnitTestCycle(() -> advanceTime(clock, en));
updateGraph.runWithinUnitTestCycle(() -> advanceTime(clock, en, 5 * DateTimeUtils.SECOND));
if (RefreshingTableTestCase.printTableUpdates) {
TstUtils.validate("Step = " + step + " time = " + DateTimeUtils.epochNanosToInstant(clock.now), en);
}
Expand All @@ -120,15 +136,16 @@ private void testWindowCheckIterative(int seed) {
if (RefreshingTableTestCase.printTableUpdates) {
System.out.println("Step " + step + "-" + ii);
}
RefreshingTableTestCase.simulateShiftAwareStep(step + "-" + ii, stepSize, random, table, columnInfo,
RefreshingTableTestCase.simulateShiftAwareStep(profile, step + "-" + ii, stepSize, random, table, columnInfo,
en);
TstUtils.validate("Step " + step + "-" + ii, en);
}
}
}
}

private void advanceTime(TestClock clock, WindowEvalNugget[] en) {
clock.now += 5 * DateTimeUtils.SECOND;
private void advanceTime(TestClock clock, WindowEvalNugget[] en, long nanosToAdvance) {
clock.now += nanosToAdvance;
if (RefreshingTableTestCase.printTableUpdates) {
System.out.println("Ticking time to " + DateTimeUtils.epochNanosToInstant(clock.now));
}
Expand Down Expand Up @@ -374,7 +391,121 @@ public void validate(String msg) {

@Override
public void show() {
TableTools.show(windowed.first);
TableTools.showWithRowSet(windowed.first);
}
}

@Test
public void testMemoryUsageInWindow() throws IOException {
final TestClock timeProvider = new TestClock();
final Instant startTime = DateTimeUtils.parseInstant("2022-07-14T09:30:00 NY");
timeProvider.now = DateTimeUtils.epochNanos(startTime);

QueryScope.addParam("startTime", startTime);

final QueryTable inputTable = (QueryTable)TableTools.emptyTable(100_000_000).updateView("Timestamp = startTime");
inputTable.setRefreshing(true);
System.gc();
final RuntimeMemory.Sample sample = new RuntimeMemory.Sample();
RuntimeMemory.getInstance().read(sample);
final long memStart = sample.totalMemory - sample.freeMemory;
System.out.println("Start Memory: " + memStart);
final Pair<Table, WindowCheck.TimeWindowListener> withCheck = WindowCheck.addTimeWindowInternal(timeProvider,
inputTable,
"Timestamp",
60 * DateTimeUtils.SECOND,
"InLastXSeconds",
false
);
System.gc();
RuntimeMemory.getInstance().read(sample);
final long memEnd = sample.totalMemory - sample.freeMemory;
System.out.println("End Memory: " + memEnd);
final long memChange = memEnd - memStart;
System.out.println("Change: " + memChange);
assertTrue(memChange < 100_000_000); // this previously would require about 645MB, so we're doing better
assertTableEquals(inputTable.updateView("InLastXSeconds=true"), withCheck.first);
}

@Test
public void testSequentialRanges() throws IOException {
final TestClock timeProvider = new TestClock();
final Instant startTime = DateTimeUtils.parseInstant("2022-07-14T09:30:00 NY");
// start about three minutes in so we'll take things off more directly
timeProvider.now = DateTimeUtils.epochNanos(startTime) + 180 * 1_000_000_000L;

QueryScope.addParam("startTime", startTime);

// each row is 10ms, we have 50_000 rows so the span of the table is 500 seconds
final Table inputRanges = TableTools.emptyTable(50_000).updateView("Timestamp = startTime + (ii * 10_000_000)");
((QueryTable)inputRanges).setRefreshing(true);

final Table [] duplicated = new Table[10];
Arrays.fill(duplicated, inputRanges);
final Table inputTable = TableTools.merge(duplicated);

final WindowEvalNugget [] en;
final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();
updateGraph.exclusiveLock().lock();
try {
en = new WindowEvalNugget[]{
new WindowEvalNugget(timeProvider, (QueryTable)inputTable)
};
} finally {
updateGraph.exclusiveLock().unlock();
}

int step = 0;
while (timeProvider.now < DateTimeUtils.epochNanos(startTime) + 600 * DateTimeUtils.SECOND) {
step++;
updateGraph.runWithinUnitTestCycle(() -> advanceTime(timeProvider, en, 5 * DateTimeUtils.SECOND));
TstUtils.validate("Step " + step, en);
}
}

@Test
public void testSequentialRangesAddOnly() throws IOException {
final TestClock timeProvider = new TestClock();
final Instant startTime = DateTimeUtils.parseInstant("2022-07-14T09:30:00 NY");
// start about three minutes in so we'll take things off more directly
timeProvider.now = DateTimeUtils.epochNanos(startTime) + 180 * 1_000_000_000L;
final long regionSize = 1_000_000L;

QueryScope.addParam("startTime", startTime);
QueryScope.addParam("regionSize", regionSize);

final TrackingWritableRowSet inputRowSet = RowSetFactory.fromRange(0, 9999).toTracking();

inputRowSet.insertRange(regionSize, regionSize + 9_999);
final QueryTable indexTable = TstUtils.testRefreshingTable(inputRowSet);
// each chunk of 10_000 rows should account for one minute, or 60_000_000_000 / 10_000 = 6_000_000 nanos per row
// we start 3 minutes behind the start, so everything is in the five-minute window
final Table inputTable = indexTable.updateView("Timestamp = startTime + ((k % regionSize) * 6_000_000)").withAttributes(Collections.singletonMap(Table.ADD_ONLY_TABLE_ATTRIBUTE, true));

final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();

final WindowEvalNugget [] en;
final PrintListener pl;
updateGraph.exclusiveLock().lock();
try {
en = new WindowEvalNugget[]{
new WindowEvalNugget(timeProvider, (QueryTable)inputTable)
};
pl = new PrintListener("windowed", en[0].windowed.first, 0);
} finally {
updateGraph.exclusiveLock().unlock();
}

for (int step = 1; step < 10; ++step) {
final int fstep = step;
updateGraph.runWithinUnitTestCycle(() -> {
final TrackingWritableRowSet added = RowSetFactory.fromRange(fstep * 10_000, fstep * 10_000 + 9_999).toTracking();
added.insertRange(fstep * 10_000 + regionSize, fstep * 10_000 + 9_999 + regionSize);
indexTable.getRowSet().writableCast().insert(added);
indexTable.notifyListeners(added, i(), i());
advanceTime(timeProvider, en, fstep * DateTimeUtils.MINUTE);
});
TstUtils.validate("Step " + step, en);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public class GenerateTableUpdates {

public static void generateTableUpdates(int size, Random random, QueryTable table,
ColumnInfo<?, ?>[] columnInfo) {
ColumnInfo<?, ?>[] columnInfo) {
final RowSet[] result = computeTableUpdates(size, random, table, columnInfo);
table.notifyListeners(result[0], result[1], result[2]);
}
Expand Down Expand Up @@ -144,6 +144,15 @@ private void validateGroup(int... opts) {

public static final SimulationProfile DEFAULT_PROFILE = new SimulationProfile();

public static final SimulationProfile NO_SHIFT_PROFILE =
new SimulationProfile() {
{
SHIFT_10_PERCENT_KEY_SPACE = 0;
SHIFT_10_PERCENT_POS_SPACE = 0;
SHIFT_AGGRESSIVELY = 0;
}
};

public static void generateShiftAwareTableUpdates(final SimulationProfile profile, final int targetUpdateSize,
final Random random, final QueryTable table,
final ColumnInfo<?, ?>[] columnInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,12 @@
public abstract class QueryTableTestBase extends RefreshingTableTestCase {
public final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");

private static final GenerateTableUpdates.SimulationProfile NO_SHIFT_PROFILE =
new GenerateTableUpdates.SimulationProfile() {
{
SHIFT_10_PERCENT_KEY_SPACE = 0;
SHIFT_10_PERCENT_POS_SPACE = 0;
SHIFT_AGGRESSIVELY = 0;
}
};

public final JoinIncrement leftStep = new JoinIncrement() {
@Override
public void step(int leftSize, int rightSize, QueryTable leftTable, QueryTable rightTable,
ColumnInfo<?, ?>[] leftColumnInfo, ColumnInfo<?, ?>[] rightColumnInfo, EvalNuggetInterface[] en,
Random random) {
simulateShiftAwareStep(NO_SHIFT_PROFILE, toString(), leftSize, random, leftTable, leftColumnInfo, en);
simulateShiftAwareStep(GenerateTableUpdates.NO_SHIFT_PROFILE, toString(), leftSize, random, leftTable, leftColumnInfo, en);
}

@Override
Expand All @@ -63,7 +54,7 @@ public String toString() {
public void step(int leftSize, int rightSize, QueryTable leftTable, QueryTable rightTable,
ColumnInfo<?, ?>[] leftColumnInfo, ColumnInfo<?, ?>[] rightColumnInfo, EvalNuggetInterface[] en,
Random random) {
simulateShiftAwareStep(NO_SHIFT_PROFILE, toString(), rightSize, random, rightTable, rightColumnInfo, en);
simulateShiftAwareStep(GenerateTableUpdates.NO_SHIFT_PROFILE, toString(), rightSize, random, rightTable, rightColumnInfo, en);
}

@Override
Expand All @@ -89,8 +80,8 @@ public String toString() {
public void step(int leftSize, int rightSize, QueryTable leftTable, QueryTable rightTable,
ColumnInfo<?, ?>[] leftColumnInfo, ColumnInfo<?, ?>[] rightColumnInfo, EvalNuggetInterface[] en,
Random random) {
simulateShiftAwareStep(NO_SHIFT_PROFILE, toString(), leftSize, random, leftTable, leftColumnInfo, en);
simulateShiftAwareStep(NO_SHIFT_PROFILE, toString(), rightSize, random, rightTable, rightColumnInfo, en);
simulateShiftAwareStep(GenerateTableUpdates.NO_SHIFT_PROFILE, toString(), leftSize, random, leftTable, leftColumnInfo, en);
simulateShiftAwareStep(GenerateTableUpdates.NO_SHIFT_PROFILE, toString(), rightSize, random, rightTable, rightColumnInfo, en);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public static void simulateShiftAwareStep(final String ctxt, int targetUpdateSiz
en);
}

protected static void simulateShiftAwareStep(final GenerateTableUpdates.SimulationProfile simulationProfile,
public static void simulateShiftAwareStep(final GenerateTableUpdates.SimulationProfile simulationProfile,
final String ctxt, int targetUpdateSize, Random random, QueryTable table, ColumnInfo[] columnInfo,
EvalNuggetInterface[] en) {
final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();
Expand Down

0 comments on commit c1782f0

Please sign in to comment.