Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve WindowCheck memory usage. #5197

Merged
merged 15 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions engine/table/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies {
implementation 'com.tdunning:t-digest:3.2'
implementation 'com.squareup:javapoet:1.13.0'
implementation 'io.github.classgraph:classgraph:4.8.165'
implementation 'it.unimi.dsi:fastutil:8.5.13'
cpwright marked this conversation as resolved.
Show resolved Hide resolved

implementation project(':plugin')
implementation depCommonsLang3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@
import java.util.List;

/**
* This will filter a table for the most recent N nanoseconds (must be on a date time column).
* This will filter a table for the most recent N nanoseconds (must be on an {@link Instant} column).
*
* <p>
* Note, this filter rescans the source table. You should prefer to use {@link io.deephaven.engine.util.WindowCheck}
* instead.
* </p>
*/
public class TimeSeriesFilter
extends WhereFilterLivenessArtifactImpl
Expand Down
686 changes: 575 additions & 111 deletions engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java

Large diffs are not rendered by default.

155 changes: 145 additions & 10 deletions engine/table/src/test/java/io/deephaven/engine/util/TestWindowCheck.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@

import io.deephaven.base.Pair;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.context.QueryScope;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.TrackingWritableRowSet;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.TableUpdateListener;
import io.deephaven.engine.table.impl.InstrumentedTableUpdateListener;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.table.impl.TableUpdateValidator;
import io.deephaven.engine.table.impl.util.RuntimeMemory;
import io.deephaven.engine.testutil.*;
import io.deephaven.engine.testutil.generator.IntGenerator;
import io.deephaven.engine.testutil.generator.UnsortedInstantGenerator;
Expand All @@ -27,17 +31,20 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Random;
import java.util.stream.Stream;

import static io.deephaven.engine.testutil.TstUtils.*;
import static io.deephaven.engine.util.TableTools.col;
import static io.deephaven.engine.util.TableTools.intCol;
import static org.junit.jupiter.api.Assertions.assertTrue;

@Category(OutOfBandTest.class)
public class TestWindowCheck {
Expand All @@ -46,8 +53,15 @@ public class TestWindowCheck {

@Test
public void testWindowCheckIterative() {
for (int seed = 0; seed < 10; ++seed) {
testWindowCheckIterative(seed, true);
}
}

@Test
public void testWindowCheckIterativeNoShifts() {
for (int seed = 0; seed < 1; ++seed) {
testWindowCheckIterative(seed);
testWindowCheckIterative(seed, false);
}
}

Expand All @@ -61,12 +75,12 @@ public void testWindowCheckIterative() {
* The WindowEvalNugget verifies the original columns are unchanged and that the value of the InWindow column is
* correct. A prev checker is added to ensure that getPrev works on the new table.
*/
private void testWindowCheckIterative(int seed) {
private void testWindowCheckIterative(int seed, boolean withShifts) {
final Random random = new Random(seed);
final Random combinedRandom = new Random(seed);

final ColumnInfo<?, ?>[] columnInfo;
final int size = 100;
final int size = 10;
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
final Instant startTime = DateTimeUtils.parseInstant("2018-02-23T09:30:00 NY");
final Instant endTime;
if (SHORT_TESTS) {
Expand Down Expand Up @@ -103,15 +117,18 @@ private void testWindowCheckIterative(int seed) {
++step;
final boolean combined = combinedRandom.nextBoolean();

final GenerateTableUpdates.SimulationProfile profile =
withShifts ? GenerateTableUpdates.DEFAULT_PROFILE : GenerateTableUpdates.NO_SHIFT_PROFILE;

if (combined) {
updateGraph.runWithinUnitTestCycle(() -> {
advanceTime(clock, en);
GenerateTableUpdates.generateShiftAwareTableUpdates(GenerateTableUpdates.DEFAULT_PROFILE, size,
advanceTime(clock, en, 5 * DateTimeUtils.SECOND);
GenerateTableUpdates.generateShiftAwareTableUpdates(profile, size,
random, table, columnInfo);
});
TstUtils.validate("Step " + step, en);
} else {
updateGraph.runWithinUnitTestCycle(() -> advanceTime(clock, en));
updateGraph.runWithinUnitTestCycle(() -> advanceTime(clock, en, 5 * DateTimeUtils.SECOND));
if (RefreshingTableTestCase.printTableUpdates) {
TstUtils.validate("Step = " + step + " time = " + DateTimeUtils.epochNanosToInstant(clock.now), en);
}
Expand All @@ -120,15 +137,17 @@ private void testWindowCheckIterative(int seed) {
if (RefreshingTableTestCase.printTableUpdates) {
System.out.println("Step " + step + "-" + ii);
}
RefreshingTableTestCase.simulateShiftAwareStep(step + "-" + ii, stepSize, random, table, columnInfo,
RefreshingTableTestCase.simulateShiftAwareStep(profile, step + "-" + ii, stepSize, random, table,
columnInfo,
en);
TstUtils.validate("Step " + step + "-" + ii, en);
}
}
}
}

private void advanceTime(TestClock clock, WindowEvalNugget[] en) {
clock.now += 5 * DateTimeUtils.SECOND;
private void advanceTime(TestClock clock, WindowEvalNugget[] en, long nanosToAdvance) {
cpwright marked this conversation as resolved.
Show resolved Hide resolved
clock.now += nanosToAdvance;
if (RefreshingTableTestCase.printTableUpdates) {
System.out.println("Ticking time to " + DateTimeUtils.epochNanosToInstant(clock.now));
}
Expand Down Expand Up @@ -374,7 +393,123 @@ public void validate(String msg) {

@Override
public void show() {
TableTools.show(windowed.first);
TableTools.showWithRowSet(windowed.first);
cpwright marked this conversation as resolved.
Show resolved Hide resolved
}
}

@Test
public void testMemoryUsageInWindow() throws IOException {
final TestClock timeProvider = new TestClock();
final Instant startTime = DateTimeUtils.parseInstant("2022-07-14T09:30:00 NY");
timeProvider.now = DateTimeUtils.epochNanos(startTime);

QueryScope.addParam("startTime", startTime);

final QueryTable inputTable =
(QueryTable) TableTools.emptyTable(100_000_000).updateView("Timestamp = startTime");
inputTable.setRefreshing(true);
System.gc();
final RuntimeMemory.Sample sample = new RuntimeMemory.Sample();
RuntimeMemory.getInstance().read(sample);
final long memStart = sample.totalMemory - sample.freeMemory;
System.out.println("Start Memory: " + memStart);
final Pair<Table, WindowCheck.TimeWindowListener> withCheck = WindowCheck.addTimeWindowInternal(timeProvider,
inputTable,
"Timestamp",
60 * DateTimeUtils.SECOND,
"InLastXSeconds",
false);
System.gc();
RuntimeMemory.getInstance().read(sample);
final long memEnd = sample.totalMemory - sample.freeMemory;
System.out.println("End Memory: " + memEnd);
final long memChange = memEnd - memStart;
System.out.println("Change: " + memChange);
assertTrue(memChange < 100_000_000); // this previously would require about 645MB, so we're doing better
assertTableEquals(inputTable.updateView("InLastXSeconds=true"), withCheck.first);
}

@Test
public void testSequentialRanges() throws IOException {
final TestClock timeProvider = new TestClock();
final Instant startTime = DateTimeUtils.parseInstant("2022-07-14T09:30:00 NY");
// start about three minutes in so we'll take things off more directly
timeProvider.now = DateTimeUtils.epochNanos(startTime) + 180 * 1_000_000_000L;

QueryScope.addParam("startTime", startTime);

// each row is 10ms, we have 50_000 rows so the span of the table is 500 seconds
final Table inputRanges = TableTools.emptyTable(50_000).updateView("Timestamp = startTime + (ii * 10_000_000)");
((QueryTable) inputRanges).setRefreshing(true);

final Table[] duplicated = new Table[10];
Arrays.fill(duplicated, inputRanges);
final Table inputTable = TableTools.merge(duplicated);

final WindowEvalNugget[] en;
final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();
updateGraph.exclusiveLock().lock();
try {
en = new WindowEvalNugget[] {
new WindowEvalNugget(timeProvider, (QueryTable) inputTable)
};
} finally {
updateGraph.exclusiveLock().unlock();
}

int step = 0;
while (timeProvider.now < DateTimeUtils.epochNanos(startTime) + 600 * DateTimeUtils.SECOND) {
step++;
updateGraph.runWithinUnitTestCycle(() -> advanceTime(timeProvider, en, 5 * DateTimeUtils.SECOND));
TstUtils.validate("Step " + step, en);
}
}

@Test
public void testSequentialRangesAddOnly() throws IOException {
final TestClock timeProvider = new TestClock();
final Instant startTime = DateTimeUtils.parseInstant("2022-07-14T09:30:00 NY");
// start about three minutes in so we'll take things off more directly
timeProvider.now = DateTimeUtils.epochNanos(startTime) + 180 * 1_000_000_000L;
final long regionSize = 1_000_000L;

QueryScope.addParam("startTime", startTime);
QueryScope.addParam("regionSize", regionSize);

final TrackingWritableRowSet inputRowSet = RowSetFactory.fromRange(0, 9999).toTracking();

inputRowSet.insertRange(regionSize, regionSize + 9_999);
final QueryTable indexTable = TstUtils.testRefreshingTable(inputRowSet);
cpwright marked this conversation as resolved.
Show resolved Hide resolved
// each chunk of 10_000 rows should account for one minute, or 60_000_000_000 / 10_000 = 6_000_000 nanos per row
// we start 3 minutes behind the start, so everything is in the five-minute window
final Table inputTable = indexTable.updateView("Timestamp = startTime + ((k % regionSize) * 6_000_000)")
.withAttributes(Collections.singletonMap(Table.ADD_ONLY_TABLE_ATTRIBUTE, true));

final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();

final WindowEvalNugget[] en;
final PrintListener pl;
updateGraph.exclusiveLock().lock();
try {
en = new WindowEvalNugget[] {
new WindowEvalNugget(timeProvider, (QueryTable) inputTable)
};
pl = new PrintListener("windowed", en[0].windowed.first, 0);
} finally {
updateGraph.exclusiveLock().unlock();
}

for (int step = 1; step < 10; ++step) {
final int fstep = step;
updateGraph.runWithinUnitTestCycle(() -> {
final TrackingWritableRowSet added =
RowSetFactory.fromRange(fstep * 10_000, fstep * 10_000 + 9_999).toTracking();
cpwright marked this conversation as resolved.
Show resolved Hide resolved
added.insertRange(fstep * 10_000 + regionSize, fstep * 10_000 + 9_999 + regionSize);
indexTable.getRowSet().writableCast().insert(added);
indexTable.notifyListeners(added, i(), i());
advanceTime(timeProvider, en, fstep * DateTimeUtils.MINUTE);
});
TstUtils.validate("Step " + step, en);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,15 @@ private void validateGroup(int... opts) {

public static final SimulationProfile DEFAULT_PROFILE = new SimulationProfile();

public static final SimulationProfile NO_SHIFT_PROFILE =
new SimulationProfile() {
{
SHIFT_10_PERCENT_KEY_SPACE = 0;
SHIFT_10_PERCENT_POS_SPACE = 0;
SHIFT_AGGRESSIVELY = 0;
}
};

public static void generateShiftAwareTableUpdates(final SimulationProfile profile, final int targetUpdateSize,
final Random random, final QueryTable table,
final ColumnInfo<?, ?>[] columnInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,13 @@
public abstract class QueryTableTestBase extends RefreshingTableTestCase {
public final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");

private static final GenerateTableUpdates.SimulationProfile NO_SHIFT_PROFILE =
new GenerateTableUpdates.SimulationProfile() {
{
SHIFT_10_PERCENT_KEY_SPACE = 0;
SHIFT_10_PERCENT_POS_SPACE = 0;
SHIFT_AGGRESSIVELY = 0;
}
};

public final JoinIncrement leftStep = new JoinIncrement() {
@Override
public void step(int leftSize, int rightSize, QueryTable leftTable, QueryTable rightTable,
ColumnInfo<?, ?>[] leftColumnInfo, ColumnInfo<?, ?>[] rightColumnInfo, EvalNuggetInterface[] en,
Random random) {
simulateShiftAwareStep(NO_SHIFT_PROFILE, toString(), leftSize, random, leftTable, leftColumnInfo, en);
simulateShiftAwareStep(GenerateTableUpdates.NO_SHIFT_PROFILE, toString(), leftSize, random, leftTable,
leftColumnInfo, en);
}

@Override
Expand All @@ -63,7 +55,8 @@ public String toString() {
public void step(int leftSize, int rightSize, QueryTable leftTable, QueryTable rightTable,
ColumnInfo<?, ?>[] leftColumnInfo, ColumnInfo<?, ?>[] rightColumnInfo, EvalNuggetInterface[] en,
Random random) {
simulateShiftAwareStep(NO_SHIFT_PROFILE, toString(), rightSize, random, rightTable, rightColumnInfo, en);
simulateShiftAwareStep(GenerateTableUpdates.NO_SHIFT_PROFILE, toString(), rightSize, random, rightTable,
rightColumnInfo, en);
}

@Override
Expand All @@ -89,8 +82,10 @@ public String toString() {
public void step(int leftSize, int rightSize, QueryTable leftTable, QueryTable rightTable,
ColumnInfo<?, ?>[] leftColumnInfo, ColumnInfo<?, ?>[] rightColumnInfo, EvalNuggetInterface[] en,
Random random) {
simulateShiftAwareStep(NO_SHIFT_PROFILE, toString(), leftSize, random, leftTable, leftColumnInfo, en);
simulateShiftAwareStep(NO_SHIFT_PROFILE, toString(), rightSize, random, rightTable, rightColumnInfo, en);
simulateShiftAwareStep(GenerateTableUpdates.NO_SHIFT_PROFILE, toString(), leftSize, random, leftTable,
leftColumnInfo, en);
simulateShiftAwareStep(GenerateTableUpdates.NO_SHIFT_PROFILE, toString(), rightSize, random, rightTable,
rightColumnInfo, en);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public static void simulateShiftAwareStep(final String ctxt, int targetUpdateSiz
en);
}

protected static void simulateShiftAwareStep(final GenerateTableUpdates.SimulationProfile simulationProfile,
public static void simulateShiftAwareStep(final GenerateTableUpdates.SimulationProfile simulationProfile,
final String ctxt, int targetUpdateSize, Random random, QueryTable table, ColumnInfo[] columnInfo,
EvalNuggetInterface[] en) {
final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();
Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionSha256Sum=97a52d145762adc241bad7fd18289bf7f6801e08ece6badf80402fe2b9f250b1
distributionSha256Sum=85719317abd2112f021d4f41f09ec370534ba288432065f4b477b6a3b652910d
cpwright marked this conversation as resolved.
Show resolved Hide resolved
distributionUrl=https\://services.gradle.org/distributions/gradle-8.6-all.zip
networkTimeout=10000
validateDistributionUrl=true
Expand Down
Loading