From 70e64fc3ad243c4018179a7593f9b2e83003207f Mon Sep 17 00:00:00 2001 From: ritwiksahani Date: Tue, 6 Aug 2024 15:58:53 +0530 Subject: [PATCH] Adding logic to delete lineage data. --- .../data2/metadata/lineage/LineageTable.java | 36 ++++++++++ .../data/common/MetricStructuredTable.java | 19 ++++++ .../spi/data/nosql/NoSqlStructuredTable.java | 11 ++++ .../data/sql/PostgreSqlStructuredTable.java | 13 ++++ .../metadata/lineage/LineageTableTest.java | 2 +- .../lineage/NoSqlLineageTableTest.java | 18 +++++ .../metadata/lineage/SqlLineageTableTest.java | 22 +++++++ .../data/nosql/NoSqlStructuredTableTest.java | 6 ++ .../spanner/SpannerStructuredTable.java | 11 ++++ .../spanner/SpannerStructuredTableTest.java | 7 ++ .../cdap/cdap/spi/data/StructuredTable.java | 20 +++++- .../cdap/spi/data/StructuredTableTest.java | 66 +++++++++++++++++++ pom.xml | 2 +- 13 files changed, 230 insertions(+), 3 deletions(-) diff --git a/cdap-data-fabric/src/main/java/io/cdap/cdap/data2/metadata/lineage/LineageTable.java b/cdap-data-fabric/src/main/java/io/cdap/cdap/data2/metadata/lineage/LineageTable.java index 86867dd5963c..17ef8275f653 100644 --- a/cdap-data-fabric/src/main/java/io/cdap/cdap/data2/metadata/lineage/LineageTable.java +++ b/cdap-data-fabric/src/main/java/io/cdap/cdap/data2/metadata/lineage/LineageTable.java @@ -35,6 +35,7 @@ import io.cdap.cdap.spi.data.table.field.Range; import io.cdap.cdap.store.StoreDefinition; import java.io.IOException; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Set; @@ -66,6 +67,41 @@ public static LineageTable create(StructuredTableContext context) { return new LineageTable(context); } + /** + * Deletes all completed run records with a start time before {@code timeUpperBound}, throwing + * {@code IOException} if the delete operation fails. + *

+ * Assumption is that lineage records are added only after run is completed, so we don't need to + * check completed run statuses. + *

+ * + *

+ * This function will not work with the in memory no-sql DB and will return + * {@code InvalidFieldException}. This is because of the way we need to query and will only be + * used in the managed instances controlled by a flag. + *

+ */ + public void deleteCompletedLineageRecordsStartedBefore(Instant timeUpperBound) + throws IOException { + long maxTimeEpoch = timeUpperBound.getEpochSecond(); + if (maxTimeEpoch == Long.MAX_VALUE) { + // We don't want to blanket delete all records in case of incorrect values. + return; + } + // Data should be deleted from both the lineage tables. + getDatasetTable() + .deleteAll(createStartTimeEndRange(maxTimeEpoch)); + getProgramTable() + .deleteAll(createStartTimeEndRange(maxTimeEpoch)); + } + + private Range createStartTimeEndRange(long endTime) { + ImmutableList> end = ImmutableList.of( + Fields.longField(StoreDefinition.LineageStore.START_TIME_FIELD, invertTime(endTime))); + return Range.to(end, Range.Bound.EXCLUSIVE); + } + + private LineageTable(StructuredTableContext structuredTableContext) { this.structuredTableContext = structuredTableContext; } diff --git a/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/common/MetricStructuredTable.java b/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/common/MetricStructuredTable.java index 0d595aa5743c..29b6b45fcecf 100644 --- a/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/common/MetricStructuredTable.java +++ b/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/common/MetricStructuredTable.java @@ -315,6 +315,25 @@ public void deleteAll(Range keyRange) throws InvalidFieldException, IOException } } + @Override + public void scanDeleteAll(Range keyRange) + throws InvalidFieldException, UnsupportedOperationException, IOException { + try { + if (!emitTimeMetrics) { + structuredTable.scanDeleteAll(keyRange); + } else { + long curTime = System.nanoTime(); + structuredTable.scanDeleteAll(keyRange); + long duration = System.nanoTime() - curTime; + metricsCollector.increment(metricPrefix + "scanDeleteAll.time", duration); + } + metricsCollector.increment(metricPrefix + "scanDeleteAll.count", 1L); + } catch (Exception e) { + metricsCollector.increment(metricPrefix + "scanDeleteAll.error", 1L); + throw e; + } + } + @Override public void updateAll(Range keyRange, Collection> fields) throws InvalidFieldException, IOException { diff --git a/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/nosql/NoSqlStructuredTable.java b/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/nosql/NoSqlStructuredTable.java index 5b86f3cb9eae..1aa40adbf95a 100644 --- a/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/nosql/NoSqlStructuredTable.java +++ b/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/nosql/NoSqlStructuredTable.java @@ -456,6 +456,17 @@ public void deleteAll(Range keyRange) throws InvalidFieldException, IOException } } + @Override + public void scanDeleteAll(Range keyRange) + throws InvalidFieldException, UnsupportedOperationException, IOException { + // For No SQL tables we are not supporting deletes on random columns because of performance + // concerns. + throw new UnsupportedOperationException( + String.format("scanDeleteAll operation not supported for NoSQL Table: %s", + schema.getTableId() + .getName())); + } + @Override public void updateAll(Range keyRange, Collection> fields) throws InvalidFieldException { LOG.trace("Table {}: Update fields {} in range {}", schema.getTableId(), fields, keyRange); diff --git a/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/sql/PostgreSqlStructuredTable.java b/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/sql/PostgreSqlStructuredTable.java index 392d4dca14aa..0a2a55943af9 100644 --- a/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/sql/PostgreSqlStructuredTable.java +++ b/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/sql/PostgreSqlStructuredTable.java @@ -556,6 +556,19 @@ public void delete(Collection> keys) throws InvalidFieldException, IOEx public void deleteAll(Range keyRange) throws InvalidFieldException, IOException { LOG.trace("Table {}: DeleteAll with range {}", tableSchema.getTableId(), keyRange); fieldValidator.validateScanRange(keyRange); + executeDeleteAll(keyRange); + } + + @Override + public void scanDeleteAll(Range keyRange) + throws InvalidFieldException, UnsupportedOperationException, IOException { + LOG.trace("Table {}: ScanDeleteAll with range {}", tableSchema.getTableId(), keyRange); + keyRange.getBegin().forEach(fieldValidator::validateField); + keyRange.getEnd().forEach(fieldValidator::validateField); + executeDeleteAll(keyRange); + } + + private void executeDeleteAll(Range keyRange) throws IOException { String sql = getDeleteAllStatement(keyRange); try (PreparedStatement statement = connection.prepareStatement(sql)) { setStatementFieldByRange(keyRange, statement); diff --git a/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/LineageTableTest.java b/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/LineageTableTest.java index 3e660a606d6a..0006f2d60272 100644 --- a/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/LineageTableTest.java +++ b/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/LineageTableTest.java @@ -136,7 +136,7 @@ public void testMultipleRelations() { } @SafeVarargs - private static Set toSet(T... elements) { + protected static Set toSet(T... elements) { return ImmutableSet.copyOf(elements); } } diff --git a/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/NoSqlLineageTableTest.java b/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/NoSqlLineageTableTest.java index 0c3c80da1386..7d657775f991 100644 --- a/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/NoSqlLineageTableTest.java +++ b/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/NoSqlLineageTableTest.java @@ -41,4 +41,22 @@ public static void beforeClass() throws IOException, TableAlreadyExistsException StoreDefinition.createAllTables(structuredTableAdmin); } +// @Test(expected = InvalidFieldException.class) +// public void testDeleteCompletedLineageRecordsThrowsException() { +// final Instant currentTime = Instant.now(); +// final RunId runId = RunIds.generate(10000); +// final DatasetId datasetInstance = new DatasetId("default", "dataset1"); +// final ProgramId program = new ProgramId("default", "app1", ProgramType.SERVICE, "service1"); +// final ProgramRunId run = program.run(runId.getId()); +// +// final long accessTimeMillis = System.currentTimeMillis(); +// TransactionRunners.run(transactionRunner, context -> { +// LineageTable lineageTable = LineageTable.create(context); +// lineageTable.addAccess(run, datasetInstance, AccessType.READ, accessTimeMillis); +// Set entitiesForRun = lineageTable.getEntitiesForRun(run); +// // Since no-SQL DBs are not supported for time based deletes, this should throw an +// // InvalidFieldException. +// lineageTable.deleteCompletedLineageRecordsStartedBefore(currentTime); +// }); +// } } diff --git a/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/SqlLineageTableTest.java b/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/SqlLineageTableTest.java index aa011651bd6f..bc7661729abc 100644 --- a/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/SqlLineageTableTest.java +++ b/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/SqlLineageTableTest.java @@ -40,6 +40,7 @@ import org.junit.rules.TemporaryFolder; public class SqlLineageTableTest extends LineageTableTest { + @ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); @@ -67,6 +68,27 @@ protected void configure() { StoreDefinition.createAllTables(injector.getInstance(StructuredTableAdmin.class)); } + +// @Test +// public void testDeleteOutOfRangeCompletedRuns() { +// final Instant currentTime = Instant.now(); +// final RunId runId = RunIds.generate(10000); +// final DatasetId datasetInstance = new DatasetId("default", "dataset1"); +// final ProgramId program = new ProgramId("default", "app1", ProgramType.SERVICE, "service1"); +// final ProgramRunId run = program.run(runId.getId()); +// +// final long accessTimeMillis = System.currentTimeMillis(); +// TransactionRunners.run(transactionRunner, context -> { +// LineageTable lineageTable = LineageTable.create(context); +// lineageTable.addAccess(run, datasetInstance, AccessType.READ, accessTimeMillis); +// +// lineageTable.deleteCompletedLineageRecordsStartedBefore(currentTime); +// Set entitiesForRun = lineageTable.getEntitiesForRun(run); +// // Asserts that the records are deleted. +// Assert.assertTrue(entitiesForRun.isEmpty()); +// Assert.assertEquals(ImmutableList.of(accessTimeMillis), lineageTable.getAccessTimesForRun(run)); +// }); +// } @AfterClass public static void teardown() throws IOException { pg.close(); diff --git a/cdap-data-fabric/src/test/java/io/cdap/cdap/spi/data/nosql/NoSqlStructuredTableTest.java b/cdap-data-fabric/src/test/java/io/cdap/cdap/spi/data/nosql/NoSqlStructuredTableTest.java index 861f7948cdfd..a11bd0d7f9ff 100644 --- a/cdap-data-fabric/src/test/java/io/cdap/cdap/spi/data/nosql/NoSqlStructuredTableTest.java +++ b/cdap-data-fabric/src/test/java/io/cdap/cdap/spi/data/nosql/NoSqlStructuredTableTest.java @@ -29,6 +29,7 @@ import io.cdap.cdap.spi.data.table.StructuredTableSchema; import io.cdap.cdap.spi.data.table.field.Field; import io.cdap.cdap.spi.data.table.field.Fields; +import io.cdap.cdap.spi.data.transaction.TransactionException; import io.cdap.cdap.spi.data.transaction.TransactionRunner; import java.io.IOException; import java.util.ArrayList; @@ -202,6 +203,11 @@ public void testFilterByIndexIteratorMultiMatch() { Assert.assertEquals(actual.get(2), filterIndex.getValue()); } + @Test(expected = TransactionException.class) + @Override + public void testScanDeleteAll() throws Exception{ + super.testScanDeleteAll(); + } private static class MockScanner implements Scanner { private final Iterator iterator; private boolean closed; diff --git a/cdap-storage-ext-spanner/src/main/java/io/cdap/cdap/storage/spanner/SpannerStructuredTable.java b/cdap-storage-ext-spanner/src/main/java/io/cdap/cdap/storage/spanner/SpannerStructuredTable.java index 96c42968c5eb..47c150816ce3 100644 --- a/cdap-storage-ext-spanner/src/main/java/io/cdap/cdap/storage/spanner/SpannerStructuredTable.java +++ b/cdap-storage-ext-spanner/src/main/java/io/cdap/cdap/storage/spanner/SpannerStructuredTable.java @@ -37,6 +37,7 @@ import io.cdap.cdap.spi.data.table.field.FieldValidator; import io.cdap.cdap.spi.data.table.field.Fields; import io.cdap.cdap.spi.data.table.field.Range; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -388,6 +389,16 @@ public void deleteAll(Range range) throws InvalidFieldException { transactionContext.executeUpdate(builder.build()); } + @Override + public void scanDeleteAll(Range keyRange) + throws InvalidFieldException, UnsupportedOperationException, IOException { + // At this point we do not plan to support such operations in spanner, we may look to add this + // at a later time post performance testing. + throw new UnsupportedOperationException( + String.format("scanDeleteAll operation not supported for Spanner Table: %s", + schema.getTableId().getName())); + } + @Override public void updateAll(Range keyRange, Collection> fields) throws InvalidFieldException { // validate that the range is strictly a primary key prefix diff --git a/cdap-storage-ext-spanner/src/test/java/io/cdap/cdap/storage/spanner/SpannerStructuredTableTest.java b/cdap-storage-ext-spanner/src/test/java/io/cdap/cdap/storage/spanner/SpannerStructuredTableTest.java index e0ed8b37fa2f..79cc1fb38af8 100644 --- a/cdap-storage-ext-spanner/src/test/java/io/cdap/cdap/storage/spanner/SpannerStructuredTableTest.java +++ b/cdap-storage-ext-spanner/src/test/java/io/cdap/cdap/storage/spanner/SpannerStructuredTableTest.java @@ -21,6 +21,7 @@ import io.cdap.cdap.spi.data.StructuredTable; import io.cdap.cdap.spi.data.StructuredTableAdmin; import io.cdap.cdap.spi.data.StructuredTableTest; +import io.cdap.cdap.spi.data.transaction.TransactionException; import io.cdap.cdap.spi.data.transaction.TransactionRunner; import java.util.Collections; import java.util.HashMap; @@ -30,6 +31,7 @@ import org.junit.Assume; import org.junit.BeforeClass; import org.junit.Ignore; +import org.junit.Test; /** * Unit tests for GCP spanner implementation of the {@link StructuredTable}. This test needs the following @@ -93,6 +95,11 @@ protected TransactionRunner getTransactionRunner() { return storageProvider.getTransactionRunner(); } + @Test(expected = TransactionException.class) + @Override + public void testScanDeleteAll() throws Exception{ + super.testScanDeleteAll(); + } private static final class MockStorageProviderContext implements StorageProviderContext { private final Map config; diff --git a/cdap-storage-spi/src/main/java/io/cdap/cdap/spi/data/StructuredTable.java b/cdap-storage-spi/src/main/java/io/cdap/cdap/spi/data/StructuredTable.java index df05e525b7d6..e4272b3c9292 100644 --- a/cdap-storage-spi/src/main/java/io/cdap/cdap/spi/data/StructuredTable.java +++ b/cdap-storage-spi/src/main/java/io/cdap/cdap/spi/data/StructuredTable.java @@ -274,11 +274,29 @@ void increment(Collection> keys, String column, long amount) * * @param keyRange key range of the rows to delete * @throws InvalidFieldException if any of the keys are not part of table schema, or their - * types do not match the schema + * types do not match the schema or the first key in range does not match the first field of + * the primary key. * @throws IOException if there is an error reading or deleting from the table */ void deleteAll(Range keyRange) throws InvalidFieldException, IOException; + /** + * Delete a range of rows from the table based on a potentially non indexed column. + * + *

+ * This is a potentially slow operation because it may query a non primary key or a non indexed + * column. + *

+ * + * @param keyRange key range of the rows to delete + * @throws InvalidFieldException if any of the keys are not part of table schema, or their + * types do not match the schema + * @throws UnsupportedOperationException if this method is not supported in the database + * implementation. + * @throws IOException if there is an error reading or deleting from the table + */ + void scanDeleteAll(Range keyRange) throws InvalidFieldException, UnsupportedOperationException, IOException; + /** * Updates the specific fields in a range of rows from the table. * diff --git a/cdap-storage-spi/src/test/java/io/cdap/cdap/spi/data/StructuredTableTest.java b/cdap-storage-spi/src/test/java/io/cdap/cdap/spi/data/StructuredTableTest.java index 81c876486809..34bae3a3ec70 100644 --- a/cdap-storage-spi/src/test/java/io/cdap/cdap/spi/data/StructuredTableTest.java +++ b/cdap-storage-spi/src/test/java/io/cdap/cdap/spi/data/StructuredTableTest.java @@ -1037,6 +1037,72 @@ public void testDeleteAll() throws Exception { Assert.assertEquals(expected, scanSimpleStructuredRows(Range.all(), max)); } + @Test + public void testScanDeleteAll() throws Exception { + int max = 10; + List>> expected = writeSimpleStructuredRows(max, ""); + Assert.assertEquals(max, expected.size()); + + // Delete 7-9 (both inclusive) using a column which is part of PK but not the first PK. + expected.subList(7, 10).clear(); + getTransactionRunner().run(context -> { + StructuredTable table = context.getTable(SIMPLE_TABLE); + Range range = Range.create(Arrays.asList(Fields.longField(KEY2, 7L)), + Range.Bound.INCLUSIVE, + Arrays.asList(Fields.longField(KEY2, 9L)), + Range.Bound.INCLUSIVE); + table.scanDeleteAll(range); + table.close(); + }); + // Verify the deletion + Assert.assertEquals(expected, scanSimpleStructuredRows(Range.all(), max)); + + // Delete 6 using a random column + expected.subList(6, 7).clear(); + getTransactionRunner().run(context -> { + StructuredTable table = context.getTable(SIMPLE_TABLE); + Range range = Range.create(Arrays.asList(Fields.doubleField(DOUBLE_COL, 6.0)), + Range.Bound.INCLUSIVE, + Arrays.asList(Fields.doubleField(DOUBLE_COL, 6.0)), + Range.Bound.INCLUSIVE); + table.scanDeleteAll(range); + }); + + // Verify the deletion + Assert.assertEquals(expected, scanSimpleStructuredRows(Range.all(), max)); + + // Delete 2-5 (end exclusive) using the first key only + expected.subList(2, 5).clear(); + getTransactionRunner().run(context -> { + StructuredTable table = context.getTable(SIMPLE_TABLE); + Range range = Range.create(Collections.singletonList(Fields.intField(KEY, 2)), Range.Bound.INCLUSIVE, + Collections.singletonList(Fields.intField(KEY, 5)), Range.Bound.EXCLUSIVE); + table.scanDeleteAll(range); + }); + // Verify the deletion + Assert.assertEquals(expected, scanSimpleStructuredRows(Range.all(), max)); + + // Use a range outside the element list, nothing should get deleted + getTransactionRunner().run(context -> { + StructuredTable table = context.getTable(SIMPLE_TABLE); + Range range = Range.create(Collections.singletonList(Fields.intField(KEY, max + 1)), Range.Bound.INCLUSIVE, + Collections.singletonList(Fields.intField(KEY, max + 5)), Range.Bound.EXCLUSIVE); + table.scanDeleteAll(range); + }); + // Verify the deletion + Assert.assertEquals(expected, scanSimpleStructuredRows(Range.all(), max)); + + // Delete all the remaining + expected.clear(); + getTransactionRunner().run(context -> { + StructuredTable table = context.getTable(SIMPLE_TABLE); + table.scanDeleteAll(Range.all()); + }); + // Verify the deletion + Assert.assertEquals(expected, scanSimpleStructuredRows(Range.all(), max)); + } + + @Test public void testIndexScanIndexStringFieldType() throws Exception { int num = 5; diff --git a/pom.xml b/pom.xml index 7a6be191a069..2a78c9eddba9 100644 --- a/pom.xml +++ b/pom.xml @@ -1752,7 +1752,7 @@ io.zonky.test.postgres embedded-postgres-binaries-bom - 11.14.0 + 14.2.0 pom import