Skip to content

Commit

Permalink
Adding logic to delete lineage data.
Browse files Browse the repository at this point in the history
  • Loading branch information
ritwiksahani committed Aug 12, 2024
1 parent cbcb4a9 commit 91eb602
Show file tree
Hide file tree
Showing 13 changed files with 256 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.cdap.cdap.spi.data.table.field.Range;
import io.cdap.cdap.store.StoreDefinition;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -66,6 +67,42 @@ public static LineageTable create(StructuredTableContext context) {
return new LineageTable(context);
}

/**
* Deletes all completed run records with a start time before {@code timeUpperBound}, throwing
* {@code IOException} if the delete operation fails.
* <p>
* Assumption is that lineage records are added only after run is completed, so we don't need to
* check completed run statuses.
* </p>
*
* <p>
* This function will not work with the in memory no-sql DB and will return
* {@code InvalidFieldException}. This is because of the way we need to query and will only be
* used in the managed instances controlled by a flag.
* </p>
*/
public void deleteCompletedLineageRecordsStartedBefore(Instant timeUpperBound)
throws IOException {
long maxTimeEpoch = timeUpperBound.getEpochSecond();
if (maxTimeEpoch == Long.MAX_VALUE) {
// We don't want to blanket delete all records in case of incorrect values.
return;
}
// Data should be deleted from both the lineage tables.
getDatasetTable()
.scanDeleteAll(createStartTimeEndRange(maxTimeEpoch));
getProgramTable()
.scanDeleteAll(createStartTimeEndRange(maxTimeEpoch));
}

private Range createStartTimeEndRange(long endTime) {
ImmutableList<Field<?>> end = ImmutableList.of(
Fields.longField(StoreDefinition.LineageStore.START_TIME_FIELD, invertTime(endTime)));
// Since the times are inverted the end time will be the start of the range.
return Range.from(end, Range.Bound.EXCLUSIVE);
}


private LineageTable(StructuredTableContext structuredTableContext) {
this.structuredTableContext = structuredTableContext;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,25 @@ public void deleteAll(Range keyRange) throws InvalidFieldException, IOException
}
}

@Override
public void scanDeleteAll(Range keyRange)
throws InvalidFieldException, UnsupportedOperationException, IOException {
try {
if (!emitTimeMetrics) {
structuredTable.scanDeleteAll(keyRange);
} else {
long curTime = System.nanoTime();
structuredTable.scanDeleteAll(keyRange);
long duration = System.nanoTime() - curTime;
metricsCollector.increment(metricPrefix + "scanDeleteAll.time", duration);
}
metricsCollector.increment(metricPrefix + "scanDeleteAll.count", 1L);
} catch (Exception e) {
metricsCollector.increment(metricPrefix + "scanDeleteAll.error", 1L);
throw e;
}
}

@Override
public void updateAll(Range keyRange, Collection<Field<?>> fields)
throws InvalidFieldException, IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,17 @@ public void deleteAll(Range keyRange) throws InvalidFieldException, IOException
}
}

@Override
public void scanDeleteAll(Range keyRange)
throws InvalidFieldException, UnsupportedOperationException, IOException {
// For No SQL tables we are not supporting deletes on random columns because of performance
// concerns.
throw new UnsupportedOperationException(
String.format("scanDeleteAll operation not supported for NoSQL Table: %s",
schema.getTableId()
.getName()));
}

@Override
public void updateAll(Range keyRange, Collection<Field<?>> fields) throws InvalidFieldException {
LOG.trace("Table {}: Update fields {} in range {}", schema.getTableId(), fields, keyRange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,19 @@ public void delete(Collection<Field<?>> keys) throws InvalidFieldException, IOEx
public void deleteAll(Range keyRange) throws InvalidFieldException, IOException {
LOG.trace("Table {}: DeleteAll with range {}", tableSchema.getTableId(), keyRange);
fieldValidator.validateScanRange(keyRange);
executeDeleteAll(keyRange);
}

@Override
public void scanDeleteAll(Range keyRange)
throws InvalidFieldException, UnsupportedOperationException, IOException {
LOG.trace("Table {}: ScanDeleteAll with range {}", tableSchema.getTableId(), keyRange);
keyRange.getBegin().forEach(fieldValidator::validateField);
keyRange.getEnd().forEach(fieldValidator::validateField);
executeDeleteAll(keyRange);
}

private void executeDeleteAll(Range keyRange) throws IOException {
String sql = getDeleteAllStatement(keyRange);
try (PreparedStatement statement = connection.prepareStatement(sql)) {
setStatementFieldByRange(keyRange, statement);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void testMultipleRelations() {
}

@SafeVarargs
private static <T> Set<T> toSet(T... elements) {
protected static <T> Set<T> toSet(T... elements) {
return ImmutableSet.copyOf(elements);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,27 @@

package io.cdap.cdap.data2.metadata.lineage;

import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.data2.dataset2.DatasetFrameworkTestUtil;
import io.cdap.cdap.proto.ProgramType;
import io.cdap.cdap.proto.id.DatasetId;
import io.cdap.cdap.proto.id.NamespacedEntityId;
import io.cdap.cdap.proto.id.ProgramId;
import io.cdap.cdap.proto.id.ProgramRunId;
import io.cdap.cdap.spi.data.StructuredTableAdmin;
import io.cdap.cdap.spi.data.TableAlreadyExistsException;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunners;
import io.cdap.cdap.store.StoreDefinition;
import java.io.IOException;
import java.time.Instant;
import java.util.Set;
import org.apache.twill.api.RunId;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;

public class NoSqlLineageTableTest extends LineageTableTest {
@ClassRule
Expand All @@ -41,4 +52,22 @@ public static void beforeClass() throws IOException, TableAlreadyExistsException
StoreDefinition.createAllTables(structuredTableAdmin);
}

@Test(expected = UnsupportedOperationException.class)
public void testDeleteCompletedLineageRecordsThrowsException() {
final Instant currentTime = Instant.now();
final RunId runId = RunIds.generate(10000);
final DatasetId datasetInstance = new DatasetId("default", "dataset1");
final ProgramId program = new ProgramId("default", "app1", ProgramType.SERVICE, "service1");
final ProgramRunId run = program.run(runId.getId());

final long accessTimeMillis = System.currentTimeMillis();
TransactionRunners.run(transactionRunner, context -> {
LineageTable lineageTable = LineageTable.create(context);
lineageTable.addAccess(run, datasetInstance, AccessType.READ, accessTimeMillis);
Set<NamespacedEntityId> entitiesForRun = lineageTable.getEntitiesForRun(run);
// Since no-SQL DBs are not supported for time based deletes, this should throw an
// InvalidFieldException.
lineageTable.deleteCompletedLineageRecordsStartedBefore(currentTime);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,44 @@

package io.cdap.cdap.data2.metadata.lineage;

import com.google.common.collect.ImmutableList;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Scopes;
import io.cdap.cdap.api.metrics.MetricsCollectionService;
import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.guice.ConfigModule;
import io.cdap.cdap.common.guice.LocalLocationModule;
import io.cdap.cdap.common.metrics.NoOpMetricsCollectionService;
import io.cdap.cdap.data.runtime.StorageModule;
import io.cdap.cdap.data.runtime.SystemDatasetRuntimeModule;
import io.cdap.cdap.proto.ProgramType;
import io.cdap.cdap.proto.id.DatasetId;
import io.cdap.cdap.proto.id.NamespacedEntityId;
import io.cdap.cdap.proto.id.ProgramId;
import io.cdap.cdap.proto.id.ProgramRunId;
import io.cdap.cdap.spi.data.StructuredTableAdmin;
import io.cdap.cdap.spi.data.TableAlreadyExistsException;
import io.cdap.cdap.spi.data.sql.PostgresInstantiator;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunners;
import io.cdap.cdap.store.StoreDefinition;
import io.zonky.test.db.postgres.embedded.EmbeddedPostgres;
import java.io.IOException;
import java.time.Instant;
import java.util.Set;
import org.apache.twill.api.RunId;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class SqlLineageTableTest extends LineageTableTest {

@ClassRule
public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();

Expand Down Expand Up @@ -67,6 +81,28 @@ protected void configure() {
StoreDefinition.createAllTables(injector.getInstance(StructuredTableAdmin.class));
}


@Test
public void testDeleteOutOfRangeCompletedRuns() {
final Instant currentTime = Instant.now();
// The time we are saving is inverted.
final RunId runId = RunIds.generate(10000);
final DatasetId datasetInstance = new DatasetId("default", "dataset1");
final ProgramId program = new ProgramId("default", "app1", ProgramType.SERVICE, "service1");
final ProgramRunId run = program.run(runId.getId());

final long accessTimeMillis = System.currentTimeMillis();
TransactionRunners.run(transactionRunner, context -> {
LineageTable lineageTable = LineageTable.create(context);
lineageTable.addAccess(run, datasetInstance, AccessType.READ, accessTimeMillis);

lineageTable.deleteCompletedLineageRecordsStartedBefore(currentTime);
Set<NamespacedEntityId> entitiesForRun = lineageTable.getEntitiesForRun(run);
// Asserts that the records are deleted.
Assert.assertTrue(entitiesForRun.isEmpty());
});
}

@AfterClass
public static void teardown() throws IOException {
pg.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.cdap.cdap.spi.data.table.StructuredTableSchema;
import io.cdap.cdap.spi.data.table.field.Field;
import io.cdap.cdap.spi.data.table.field.Fields;
import io.cdap.cdap.spi.data.transaction.TransactionException;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -202,6 +203,11 @@ public void testFilterByIndexIteratorMultiMatch() {
Assert.assertEquals(actual.get(2), filterIndex.getValue());
}

@Test(expected = TransactionException.class)
@Override
public void testScanDeleteAll() throws Exception{
super.testScanDeleteAll();
}
private static class MockScanner implements Scanner {
private final Iterator<Integer> iterator;
private boolean closed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.cdap.cdap.spi.data.table.field.FieldValidator;
import io.cdap.cdap.spi.data.table.field.Fields;
import io.cdap.cdap.spi.data.table.field.Range;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -388,6 +389,16 @@ public void deleteAll(Range range) throws InvalidFieldException {
transactionContext.executeUpdate(builder.build());
}

@Override
public void scanDeleteAll(Range keyRange)
throws InvalidFieldException, UnsupportedOperationException, IOException {
// At this point we do not plan to support such operations in spanner, we may look to add this
// at a later time post performance testing.
throw new UnsupportedOperationException(
String.format("scanDeleteAll operation not supported for Spanner Table: %s",
schema.getTableId().getName()));
}

@Override
public void updateAll(Range keyRange, Collection<Field<?>> fields) throws InvalidFieldException {
// validate that the range is strictly a primary key prefix
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.cdap.cdap.spi.data.StructuredTable;
import io.cdap.cdap.spi.data.StructuredTableAdmin;
import io.cdap.cdap.spi.data.StructuredTableTest;
import io.cdap.cdap.spi.data.transaction.TransactionException;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -30,6 +31,7 @@
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

/**
* Unit tests for GCP spanner implementation of the {@link StructuredTable}. This test needs the following
Expand Down Expand Up @@ -93,6 +95,11 @@ protected TransactionRunner getTransactionRunner() {
return storageProvider.getTransactionRunner();
}

@Test(expected = TransactionException.class)
@Override
public void testScanDeleteAll() throws Exception{
super.testScanDeleteAll();
}
private static final class MockStorageProviderContext implements StorageProviderContext {

private final Map<String, String> config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,29 @@ void increment(Collection<Field<?>> keys, String column, long amount)
*
* @param keyRange key range of the rows to delete
* @throws InvalidFieldException if any of the keys are not part of table schema, or their
* types do not match the schema
* types do not match the schema or the first key in range does not match the first field of
* the primary key.
* @throws IOException if there is an error reading or deleting from the table
*/
void deleteAll(Range keyRange) throws InvalidFieldException, IOException;

/**
* Delete a range of rows from the table based on a potentially non indexed column.
*
* <p>
* This is a potentially slow operation because it may query a non primary key or a non indexed
* column.
* </p>
*
* @param keyRange key range of the rows to delete
* @throws InvalidFieldException if any of the keys are not part of table schema, or their
* types do not match the schema
* @throws UnsupportedOperationException if this method is not supported in the database
* implementation.
* @throws IOException if there is an error reading or deleting from the table
*/
void scanDeleteAll(Range keyRange) throws InvalidFieldException, UnsupportedOperationException, IOException;

/**
* Updates the specific fields in a range of rows from the table.
*
Expand Down
Loading

0 comments on commit 91eb602

Please sign in to comment.