Skip to content

Commit

Permalink
Adding logic to delete lineage data.
Browse files Browse the repository at this point in the history
  • Loading branch information
ritwiksahani committed Aug 12, 2024
1 parent cbcb4a9 commit 70e64fc
Show file tree
Hide file tree
Showing 13 changed files with 230 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.cdap.cdap.spi.data.table.field.Range;
import io.cdap.cdap.store.StoreDefinition;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -66,6 +67,41 @@ public static LineageTable create(StructuredTableContext context) {
return new LineageTable(context);
}

/**
* Deletes all completed run records with a start time before {@code timeUpperBound}, throwing
* {@code IOException} if the delete operation fails.
* <p>
* Assumption is that lineage records are added only after run is completed, so we don't need to
* check completed run statuses.
* </p>
*
* <p>
* This function will not work with the in memory no-sql DB and will return
* {@code InvalidFieldException}. This is because of the way we need to query and will only be
* used in the managed instances controlled by a flag.
* </p>
*/
public void deleteCompletedLineageRecordsStartedBefore(Instant timeUpperBound)
throws IOException {
long maxTimeEpoch = timeUpperBound.getEpochSecond();
if (maxTimeEpoch == Long.MAX_VALUE) {
// We don't want to blanket delete all records in case of incorrect values.
return;
}
// Data should be deleted from both the lineage tables.
getDatasetTable()
.deleteAll(createStartTimeEndRange(maxTimeEpoch));
getProgramTable()
.deleteAll(createStartTimeEndRange(maxTimeEpoch));
}

private Range createStartTimeEndRange(long endTime) {
ImmutableList<Field<?>> end = ImmutableList.of(
Fields.longField(StoreDefinition.LineageStore.START_TIME_FIELD, invertTime(endTime)));
return Range.to(end, Range.Bound.EXCLUSIVE);
}


private LineageTable(StructuredTableContext structuredTableContext) {
this.structuredTableContext = structuredTableContext;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,25 @@ public void deleteAll(Range keyRange) throws InvalidFieldException, IOException
}
}

@Override
public void scanDeleteAll(Range keyRange)
throws InvalidFieldException, UnsupportedOperationException, IOException {
try {
if (!emitTimeMetrics) {
structuredTable.scanDeleteAll(keyRange);
} else {
long curTime = System.nanoTime();
structuredTable.scanDeleteAll(keyRange);
long duration = System.nanoTime() - curTime;
metricsCollector.increment(metricPrefix + "scanDeleteAll.time", duration);
}
metricsCollector.increment(metricPrefix + "scanDeleteAll.count", 1L);
} catch (Exception e) {
metricsCollector.increment(metricPrefix + "scanDeleteAll.error", 1L);
throw e;
}
}

@Override
public void updateAll(Range keyRange, Collection<Field<?>> fields)
throws InvalidFieldException, IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,17 @@ public void deleteAll(Range keyRange) throws InvalidFieldException, IOException
}
}

@Override
public void scanDeleteAll(Range keyRange)
throws InvalidFieldException, UnsupportedOperationException, IOException {
// For No SQL tables we are not supporting deletes on random columns because of performance
// concerns.
throw new UnsupportedOperationException(
String.format("scanDeleteAll operation not supported for NoSQL Table: %s",
schema.getTableId()
.getName()));
}

@Override
public void updateAll(Range keyRange, Collection<Field<?>> fields) throws InvalidFieldException {
LOG.trace("Table {}: Update fields {} in range {}", schema.getTableId(), fields, keyRange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,19 @@ public void delete(Collection<Field<?>> keys) throws InvalidFieldException, IOEx
public void deleteAll(Range keyRange) throws InvalidFieldException, IOException {
LOG.trace("Table {}: DeleteAll with range {}", tableSchema.getTableId(), keyRange);
fieldValidator.validateScanRange(keyRange);
executeDeleteAll(keyRange);
}

@Override
public void scanDeleteAll(Range keyRange)
throws InvalidFieldException, UnsupportedOperationException, IOException {
LOG.trace("Table {}: ScanDeleteAll with range {}", tableSchema.getTableId(), keyRange);
keyRange.getBegin().forEach(fieldValidator::validateField);
keyRange.getEnd().forEach(fieldValidator::validateField);
executeDeleteAll(keyRange);
}

private void executeDeleteAll(Range keyRange) throws IOException {
String sql = getDeleteAllStatement(keyRange);
try (PreparedStatement statement = connection.prepareStatement(sql)) {
setStatementFieldByRange(keyRange, statement);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void testMultipleRelations() {
}

@SafeVarargs
private static <T> Set<T> toSet(T... elements) {
protected static <T> Set<T> toSet(T... elements) {
return ImmutableSet.copyOf(elements);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,22 @@ public static void beforeClass() throws IOException, TableAlreadyExistsException
StoreDefinition.createAllTables(structuredTableAdmin);
}

// @Test(expected = InvalidFieldException.class)
// public void testDeleteCompletedLineageRecordsThrowsException() {
// final Instant currentTime = Instant.now();
// final RunId runId = RunIds.generate(10000);
// final DatasetId datasetInstance = new DatasetId("default", "dataset1");
// final ProgramId program = new ProgramId("default", "app1", ProgramType.SERVICE, "service1");
// final ProgramRunId run = program.run(runId.getId());
//
// final long accessTimeMillis = System.currentTimeMillis();
// TransactionRunners.run(transactionRunner, context -> {
// LineageTable lineageTable = LineageTable.create(context);
// lineageTable.addAccess(run, datasetInstance, AccessType.READ, accessTimeMillis);
// Set<NamespacedEntityId> entitiesForRun = lineageTable.getEntitiesForRun(run);
// // Since no-SQL DBs are not supported for time based deletes, this should throw an
// // InvalidFieldException.
// lineageTable.deleteCompletedLineageRecordsStartedBefore(currentTime);
// });
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.junit.rules.TemporaryFolder;

public class SqlLineageTableTest extends LineageTableTest {

@ClassRule
public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();

Expand Down Expand Up @@ -67,6 +68,27 @@ protected void configure() {
StoreDefinition.createAllTables(injector.getInstance(StructuredTableAdmin.class));
}


// @Test
// public void testDeleteOutOfRangeCompletedRuns() {
// final Instant currentTime = Instant.now();
// final RunId runId = RunIds.generate(10000);
// final DatasetId datasetInstance = new DatasetId("default", "dataset1");
// final ProgramId program = new ProgramId("default", "app1", ProgramType.SERVICE, "service1");
// final ProgramRunId run = program.run(runId.getId());
//
// final long accessTimeMillis = System.currentTimeMillis();
// TransactionRunners.run(transactionRunner, context -> {
// LineageTable lineageTable = LineageTable.create(context);
// lineageTable.addAccess(run, datasetInstance, AccessType.READ, accessTimeMillis);
//
// lineageTable.deleteCompletedLineageRecordsStartedBefore(currentTime);
// Set<NamespacedEntityId> entitiesForRun = lineageTable.getEntitiesForRun(run);
// // Asserts that the records are deleted.
// Assert.assertTrue(entitiesForRun.isEmpty());
// Assert.assertEquals(ImmutableList.of(accessTimeMillis), lineageTable.getAccessTimesForRun(run));
// });
// }
@AfterClass
public static void teardown() throws IOException {
pg.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.cdap.cdap.spi.data.table.StructuredTableSchema;
import io.cdap.cdap.spi.data.table.field.Field;
import io.cdap.cdap.spi.data.table.field.Fields;
import io.cdap.cdap.spi.data.transaction.TransactionException;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -202,6 +203,11 @@ public void testFilterByIndexIteratorMultiMatch() {
Assert.assertEquals(actual.get(2), filterIndex.getValue());
}

@Test(expected = TransactionException.class)
@Override
public void testScanDeleteAll() throws Exception{
super.testScanDeleteAll();
}
private static class MockScanner implements Scanner {
private final Iterator<Integer> iterator;
private boolean closed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.cdap.cdap.spi.data.table.field.FieldValidator;
import io.cdap.cdap.spi.data.table.field.Fields;
import io.cdap.cdap.spi.data.table.field.Range;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -388,6 +389,16 @@ public void deleteAll(Range range) throws InvalidFieldException {
transactionContext.executeUpdate(builder.build());
}

@Override
public void scanDeleteAll(Range keyRange)
throws InvalidFieldException, UnsupportedOperationException, IOException {
// At this point we do not plan to support such operations in spanner, we may look to add this
// at a later time post performance testing.
throw new UnsupportedOperationException(
String.format("scanDeleteAll operation not supported for Spanner Table: %s",
schema.getTableId().getName()));
}

@Override
public void updateAll(Range keyRange, Collection<Field<?>> fields) throws InvalidFieldException {
// validate that the range is strictly a primary key prefix
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.cdap.cdap.spi.data.StructuredTable;
import io.cdap.cdap.spi.data.StructuredTableAdmin;
import io.cdap.cdap.spi.data.StructuredTableTest;
import io.cdap.cdap.spi.data.transaction.TransactionException;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -30,6 +31,7 @@
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

/**
* Unit tests for GCP spanner implementation of the {@link StructuredTable}. This test needs the following
Expand Down Expand Up @@ -93,6 +95,11 @@ protected TransactionRunner getTransactionRunner() {
return storageProvider.getTransactionRunner();
}

@Test(expected = TransactionException.class)
@Override
public void testScanDeleteAll() throws Exception{
super.testScanDeleteAll();
}
private static final class MockStorageProviderContext implements StorageProviderContext {

private final Map<String, String> config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,29 @@ void increment(Collection<Field<?>> keys, String column, long amount)
*
* @param keyRange key range of the rows to delete
* @throws InvalidFieldException if any of the keys are not part of table schema, or their
* types do not match the schema
* types do not match the schema or the first key in range does not match the first field of
* the primary key.
* @throws IOException if there is an error reading or deleting from the table
*/
void deleteAll(Range keyRange) throws InvalidFieldException, IOException;

/**
* Delete a range of rows from the table based on a potentially non indexed column.
*
* <p>
* This is a potentially slow operation because it may query a non primary key or a non indexed
* column.
* </p>
*
* @param keyRange key range of the rows to delete
* @throws InvalidFieldException if any of the keys are not part of table schema, or their
* types do not match the schema
* @throws UnsupportedOperationException if this method is not supported in the database
* implementation.
* @throws IOException if there is an error reading or deleting from the table
*/
void scanDeleteAll(Range keyRange) throws InvalidFieldException, UnsupportedOperationException, IOException;

/**
* Updates the specific fields in a range of rows from the table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,72 @@ public void testDeleteAll() throws Exception {
Assert.assertEquals(expected, scanSimpleStructuredRows(Range.all(), max));
}

@Test
public void testScanDeleteAll() throws Exception {
int max = 10;
List<Collection<Field<?>>> expected = writeSimpleStructuredRows(max, "");
Assert.assertEquals(max, expected.size());

// Delete 7-9 (both inclusive) using a column which is part of PK but not the first PK.
expected.subList(7, 10).clear();
getTransactionRunner().run(context -> {
StructuredTable table = context.getTable(SIMPLE_TABLE);
Range range = Range.create(Arrays.asList(Fields.longField(KEY2, 7L)),
Range.Bound.INCLUSIVE,
Arrays.asList(Fields.longField(KEY2, 9L)),
Range.Bound.INCLUSIVE);
table.scanDeleteAll(range);
table.close();
});
// Verify the deletion
Assert.assertEquals(expected, scanSimpleStructuredRows(Range.all(), max));

// Delete 6 using a random column
expected.subList(6, 7).clear();
getTransactionRunner().run(context -> {
StructuredTable table = context.getTable(SIMPLE_TABLE);
Range range = Range.create(Arrays.asList(Fields.doubleField(DOUBLE_COL, 6.0)),
Range.Bound.INCLUSIVE,
Arrays.asList(Fields.doubleField(DOUBLE_COL, 6.0)),
Range.Bound.INCLUSIVE);
table.scanDeleteAll(range);
});

// Verify the deletion
Assert.assertEquals(expected, scanSimpleStructuredRows(Range.all(), max));

// Delete 2-5 (end exclusive) using the first key only
expected.subList(2, 5).clear();
getTransactionRunner().run(context -> {
StructuredTable table = context.getTable(SIMPLE_TABLE);
Range range = Range.create(Collections.singletonList(Fields.intField(KEY, 2)), Range.Bound.INCLUSIVE,
Collections.singletonList(Fields.intField(KEY, 5)), Range.Bound.EXCLUSIVE);
table.scanDeleteAll(range);
});
// Verify the deletion
Assert.assertEquals(expected, scanSimpleStructuredRows(Range.all(), max));

// Use a range outside the element list, nothing should get deleted
getTransactionRunner().run(context -> {
StructuredTable table = context.getTable(SIMPLE_TABLE);
Range range = Range.create(Collections.singletonList(Fields.intField(KEY, max + 1)), Range.Bound.INCLUSIVE,
Collections.singletonList(Fields.intField(KEY, max + 5)), Range.Bound.EXCLUSIVE);
table.scanDeleteAll(range);
});
// Verify the deletion
Assert.assertEquals(expected, scanSimpleStructuredRows(Range.all(), max));

// Delete all the remaining
expected.clear();
getTransactionRunner().run(context -> {
StructuredTable table = context.getTable(SIMPLE_TABLE);
table.scanDeleteAll(Range.all());
});
// Verify the deletion
Assert.assertEquals(expected, scanSimpleStructuredRows(Range.all(), max));
}


@Test
public void testIndexScanIndexStringFieldType() throws Exception {
int num = 5;
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1752,7 +1752,7 @@
<dependency>
<groupId>io.zonky.test.postgres</groupId>
<artifactId>embedded-postgres-binaries-bom</artifactId>
<version>11.14.0</version>
<version>14.2.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down

0 comments on commit 70e64fc

Please sign in to comment.