diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricServer.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricServer.java
index 40cda348e4ce..59bce1162b35 100644
--- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricServer.java
+++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricServer.java
@@ -83,7 +83,7 @@ public class AppFabricServer extends AbstractIdleService {
private final ProgramNotificationSubscriberService programNotificationSubscriberService;
private final ProgramStopSubscriberService programStopSubscriberService;
private final RunRecordCorrectorService runRecordCorrectorService;
- private final RunRecordTimeToLiveService runRecordTimeToLiveService;
+ private final RecordTimeToLiveService recordTimeToLiveService;
private final ProgramRunStatusMonitorService programRunStatusMonitorService;
private final RunRecordMonitorService runRecordCounterService;
private final CoreSchedulerService coreSchedulerService;
@@ -131,7 +131,7 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
TransactionRunner transactionRunner,
RunRecordMonitorService runRecordCounterService,
CommonNettyHttpServiceFactory commonNettyHttpServiceFactory,
- RunRecordTimeToLiveService runRecordTimeToLiveService,
+ RecordTimeToLiveService recordTimeToLiveService,
SourceControlOperationRunner sourceControlOperationRunner,
RepositoryCleanupService repositoryCleanupService,
OperationNotificationSubscriberService operationNotificationSubscriberService) {
@@ -158,7 +158,7 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
this.systemAppManagementService = systemAppManagementService;
this.transactionRunner = transactionRunner;
this.runRecordCounterService = runRecordCounterService;
- this.runRecordTimeToLiveService = runRecordTimeToLiveService;
+ this.recordTimeToLiveService = recordTimeToLiveService;
this.commonNettyHttpServiceFactory = commonNettyHttpServiceFactory;
this.sourceControlOperationRunner = sourceControlOperationRunner;
this.repositoryCleanupService = repositoryCleanupService;
@@ -191,7 +191,7 @@ protected void startUp() throws Exception {
coreSchedulerService.start(),
credentialProviderService.start(),
runRecordCounterService.start(),
- runRecordTimeToLiveService.start(),
+ recordTimeToLiveService.start(),
sourceControlOperationRunner.start(),
repositoryCleanupService.start(),
operationNotificationSubscriberService.start()
@@ -250,7 +250,7 @@ protected void shutDown() throws Exception {
programRunStatusMonitorService.stopAndWait();
provisioningService.stopAndWait();
runRecordCounterService.stopAndWait();
- runRecordTimeToLiveService.stopAndWait();
+ recordTimeToLiveService.stopAndWait();
sourceControlOperationRunner.stopAndWait();
repositoryCleanupService.stopAndWait();
credentialProviderService.stopAndWait();
diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RecordTimeToLiveService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RecordTimeToLiveService.java
new file mode 100644
index 000000000000..ec36e77f1388
--- /dev/null
+++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RecordTimeToLiveService.java
@@ -0,0 +1,190 @@
+/*
+ * Copyright © 2023 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.cdap.internal.app.services;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.Inject;
+import io.cdap.cdap.common.conf.CConfiguration;
+import io.cdap.cdap.common.conf.Constants;
+import io.cdap.cdap.data2.metadata.lineage.LineageTable;
+import io.cdap.cdap.data2.metadata.lineage.field.FieldLineageTable;
+import io.cdap.cdap.internal.app.store.AppMetadataStore;
+import io.cdap.cdap.spi.data.transaction.TransactionException;
+import io.cdap.cdap.spi.data.transaction.TransactionRunner;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Service which periodically scans the database tables for records which should be deleted per the
+ * global time to live value.
+ *
+ *
Does not run if no TTL is configured or a TTL of 0 is specified.
+ *
+ * This service would invoke other TTL based clean up services which implement the interface
+ * {@link CleanupService}. Those services would be called in sequence.
+ *
+ */
+public final class RecordTimeToLiveService extends AbstractIdleService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RecordTimeToLiveService.class);
+
+ private final TransactionRunner transactionRunner;
+ private final boolean isEnabled;
+ private final Duration ttlMaxAge;
+ private final Duration checkFrequency;
+ private final Duration initialDelay;
+ private final Clock clock;
+
+ private ScheduledExecutorService service;
+ private List cleanupServiceList;
+
+ @Inject
+ RecordTimeToLiveService(CConfiguration cConf, TransactionRunner transactionRunner) {
+ // Negative TTLs do not make sense, treat as 0.
+ this.ttlMaxAge =
+ Duration.ofDays(Math.max(cConf.getInt(Constants.AppFabric.RUN_DATA_CLEANUP_TTL_DAYS), 0));
+ this.isEnabled = !this.ttlMaxAge.isZero();
+ // Delay should be at least 1 hour to ensure it isn't infinitely running.
+ this.checkFrequency =
+ Duration.ofHours(
+ Math.max(cConf.getInt(Constants.AppFabric.RUN_DATA_CLEANUP_TTL_FREQUENCY_HOURS), 1));
+ // Negative delays do not make sense, treat as 0.
+ this.initialDelay =
+ Duration.ofMinutes(
+ Math.max(
+ cConf.getInt(Constants.AppFabric.RUN_DATA_CLEANUP_TTL_INITIAL_DELAY_MINUTES), 0));
+
+ this.transactionRunner = transactionRunner;
+ this.clock = Clock.systemUTC();
+ this.cleanupServiceList = ImmutableList.of(
+ new RunRecordsCleanupService(),
+ new LineageCleanupService(),
+ new FieldLineageCleanupService());
+ }
+
+ @Override
+ protected void startUp() {
+ if (!isEnabled) {
+ LOG.info("No TTL configured, skipping starting RecordTimeToLiveService");
+ return;
+ }
+
+ service =
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setNameFormat("Records TTL janitor").build());
+
+ service.scheduleAtFixedRate(
+ () -> doCleanup(),
+ initialDelay.getSeconds(),
+ checkFrequency.getSeconds(),
+ TimeUnit.SECONDS);
+ }
+
+ @Override
+ protected void shutDown() {
+ if (!isEnabled) {
+ // no-op because no services were started.
+ return;
+ }
+ LOG.info("Stopping RecordTimeToLiveService");
+
+ service.shutdownNow();
+ }
+
+ private void doCleanup() {
+ Instant endDate = Instant.now(clock).minus(ttlMaxAge);
+ // Perform cleanup together for all services with a fixed end time. This is currently a
+ // sequential call and may be executed parallelly in future if required.
+ this.cleanupServiceList.forEach(service -> {
+ long startTime = System.currentTimeMillis();
+ service.doCleanup(endDate);
+ LOG.info("{} cleanup completed in {} seconds",service.getClass().getName(),(System.currentTimeMillis()-startTime)/1000.0);
+ });
+ }
+
+ private interface CleanupService {
+
+ void doCleanup(Instant endDate);
+ }
+
+ private class RunRecordsCleanupService implements CleanupService {
+
+ @Override
+ public void doCleanup(Instant endDate) {
+ LOG.info("Doing scheduled cleanup, deleting all run records before {}", endDate);
+
+ try {
+ transactionRunner.run(
+ context -> {
+ AppMetadataStore appMetadataStore = AppMetadataStore.create(context);
+
+ appMetadataStore.deleteCompletedRunsStartedBefore(endDate);
+ });
+ } catch (TransactionException e) {
+ LOG.error("Failed to clean up old run records", e);
+ }
+ }
+ }
+
+ private class LineageCleanupService implements CleanupService {
+
+ @Override
+ public void doCleanup(Instant endDate) {
+ LOG.info("Doing scheduled cleanup, deleting all run lineage records before {}", endDate);
+
+ try {
+ transactionRunner.run(
+ context -> {
+ LineageTable lineageTable = LineageTable.create(context);
+
+ lineageTable.deleteCompletedLineageRecordsStartedBefore(endDate);
+ });
+ } catch (TransactionException e) {
+ LOG.error("Failed to clean up old lineage records", e);
+ }
+ }
+ }
+
+
+ private class FieldLineageCleanupService implements CleanupService {
+
+ @Override
+ public void doCleanup(Instant endDate) {
+ LOG.info("Doing scheduled cleanup, deleting all field lineage records before {}", endDate);
+
+ try {
+ transactionRunner.run(
+ context -> {
+ FieldLineageTable fieldLineageTable = FieldLineageTable.create(context);
+
+ fieldLineageTable.deleteFieldRecordsBefore(endDate);
+ });
+ } catch (TransactionException e) {
+ LOG.error("Failed to clean up old field lineage records", e);
+ }
+ }
+ }
+}
diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RunRecordTimeToLiveService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RunRecordTimeToLiveService.java
deleted file mode 100644
index bf13c44a5219..000000000000
--- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RunRecordTimeToLiveService.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Copyright © 2023 Cask Data, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package io.cdap.cdap.internal.app.services;
-
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.inject.Inject;
-import io.cdap.cdap.common.conf.CConfiguration;
-import io.cdap.cdap.common.conf.Constants;
-import io.cdap.cdap.internal.app.store.AppMetadataStore;
-import io.cdap.cdap.spi.data.transaction.TransactionException;
-import io.cdap.cdap.spi.data.transaction.TransactionRunner;
-import java.time.Clock;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Service which periodically scans the database tables for run records which should be deleted per
- * the global time to live value.
- *
- * Does not run if no TTL is configured or a TTL of 0 is specified.
- */
-public final class RunRecordTimeToLiveService extends AbstractIdleService {
-
- private static final Logger LOG = LoggerFactory.getLogger(RunRecordTimeToLiveService.class);
-
- private final TransactionRunner transactionRunner;
- private final boolean isEnabled;
- private final Duration ttlMaxAge;
- private final Duration checkFrequency;
- private final Duration initialDelay;
- private final Clock clock;
-
- private ScheduledExecutorService service;
-
- @Inject
- RunRecordTimeToLiveService(CConfiguration cConf, TransactionRunner transactionRunner) {
- // Negative TTLs do not make sense, treat as 0.
- this.ttlMaxAge =
- Duration.ofDays(Math.max(cConf.getInt(Constants.AppFabric.RUN_DATA_CLEANUP_TTL_DAYS), 0));
- this.isEnabled = !this.ttlMaxAge.isZero();
- // Delay should be at least 1 hour to ensure it isn't infinitely running.
- this.checkFrequency =
- Duration.ofHours(
- Math.max(cConf.getInt(Constants.AppFabric.RUN_DATA_CLEANUP_TTL_FREQUENCY_HOURS), 1));
- // Negative delays do not make sense, treat as 0.
- this.initialDelay =
- Duration.ofMinutes(
- Math.max(
- cConf.getInt(Constants.AppFabric.RUN_DATA_CLEANUP_TTL_INITIAL_DELAY_MINUTES), 0));
-
- this.transactionRunner = transactionRunner;
- this.clock = Clock.systemUTC();
- }
-
- @Override
- protected void startUp() {
- if (!isEnabled) {
- LOG.info("No TTL configured, skipping starting RunRecordTimeToLiveService");
- return;
- }
-
- service =
- Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setNameFormat("Run Record TTL janitor").build());
-
- service.scheduleAtFixedRate(
- () -> doCleanup(),
- initialDelay.getSeconds(),
- checkFrequency.getSeconds(),
- TimeUnit.SECONDS);
- }
-
- @Override
- protected void shutDown() {
- if (!isEnabled) {
- // no-op because no services were started.
- return;
- }
- LOG.info("Stopping RunRecordTimeToLiveService");
-
- service.shutdownNow();
- }
-
- private void doCleanup() {
- Instant endDate = Instant.now(clock).minus(ttlMaxAge);
- LOG.info("Doing scheduled cleanup, deleting all run records before {}", endDate);
-
- try {
- transactionRunner.run(
- context -> {
- AppMetadataStore appMetadataStore = AppMetadataStore.create(context);
-
- appMetadataStore.deleteCompletedRunsStartedBefore(endDate);
- });
- } catch (TransactionException e) {
- LOG.error("Failed to clean up old records", e);
- }
- }
-}
diff --git a/cdap-data-fabric/src/main/java/io/cdap/cdap/data2/metadata/lineage/LineageTable.java b/cdap-data-fabric/src/main/java/io/cdap/cdap/data2/metadata/lineage/LineageTable.java
index 86867dd5963c..eb9aa018f224 100644
--- a/cdap-data-fabric/src/main/java/io/cdap/cdap/data2/metadata/lineage/LineageTable.java
+++ b/cdap-data-fabric/src/main/java/io/cdap/cdap/data2/metadata/lineage/LineageTable.java
@@ -35,6 +35,7 @@
import io.cdap.cdap.spi.data.table.field.Range;
import io.cdap.cdap.store.StoreDefinition;
import java.io.IOException;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@@ -66,6 +67,46 @@ public static LineageTable create(StructuredTableContext context) {
return new LineageTable(context);
}
+ /**
+ * Deletes all completed run records with a start time before {@code timeUpperBound}, throwing
+ * {@code IOException} if the delete operation fails.
+ *
+ * Assumption is that lineage records are added only after run is completed, so we don't need to
+ * check completed run statuses.
+ *
+ *
+ *
+ * This function will not work with the in memory no-sql DB and will return
+ * {@code InvalidFieldException}. This is because of the way we need to query and will only be
+ * used in the managed instances controlled by a flag.
+ *
+ *
+ * @param timeUpperBound is the end time before which all records should be deleted.
+ */
+ public void deleteCompletedLineageRecordsStartedBefore(Instant timeUpperBound)
+ throws IOException {
+ // While converting from Run we are using Millis hence we need to get epoch millis.
+ long maxTimeEpoch = timeUpperBound.toEpochMilli();
+ if (maxTimeEpoch == Long.MAX_VALUE) {
+ // We don't want to blanket delete all records in case of incorrect values.
+ LOG.warn("Passed invalid start time to for lineage deletion, this would delete all entries");
+ return;
+ }
+ // Data should be deleted from both the lineage tables.
+ getDatasetTable()
+ .scanDeleteAll(createStartTimeEndRange(maxTimeEpoch));
+ getProgramTable()
+ .scanDeleteAll(createStartTimeEndRange(maxTimeEpoch));
+ }
+
+ private Range createStartTimeEndRange(long endTime) {
+ ImmutableList> end = ImmutableList.of(
+ Fields.longField(StoreDefinition.LineageStore.START_TIME_FIELD, invertTime(endTime)));
+ // Since the times are inverted the end time will be the start of the range.
+ return Range.from(end, Range.Bound.EXCLUSIVE);
+ }
+
+
private LineageTable(StructuredTableContext structuredTableContext) {
this.structuredTableContext = structuredTableContext;
}
diff --git a/cdap-data-fabric/src/main/java/io/cdap/cdap/data2/metadata/lineage/field/FieldLineageTable.java b/cdap-data-fabric/src/main/java/io/cdap/cdap/data2/metadata/lineage/field/FieldLineageTable.java
index 7a3f5cc4dff6..6b4d22eca13e 100644
--- a/cdap-data-fabric/src/main/java/io/cdap/cdap/data2/metadata/lineage/field/FieldLineageTable.java
+++ b/cdap-data-fabric/src/main/java/io/cdap/cdap/data2/metadata/lineage/field/FieldLineageTable.java
@@ -41,6 +41,7 @@
import io.cdap.cdap.store.StoreDefinition;
import java.io.IOException;
import java.lang.reflect.Type;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -164,6 +165,37 @@ public void deleteAll() throws IOException {
getSummaryFieldsTable().deleteAll(Range.all());
}
+ /**
+ * Delete the field lineage records that started before the {@param endTime}.
+ *
+ *
+ * This method deletes all field record entries entries from the
+ * {@link StoreDefinition.FieldLineageStore} tables. Currently only the parent table i.e.
+ * {@code fields_lineage} is being deleted.
+ *
+ *
+ * @param endTime is the end time before which all records should be deleted.
+ */
+ public void deleteFieldRecordsBefore(Instant endTime) throws IOException {
+ // While converting from Run we are using Millis hence we need to get epoch millis.
+ long maxTimeEpoch = endTime.toEpochMilli();
+ if (maxTimeEpoch == Long.MAX_VALUE) {
+ // We don't want to blanket delete all records in case of incorrect values.
+ LOG.warn("Passed invalid start time to for lineage deletion, this would delete all entries");
+ return;
+ }
+ // Start time is only available in the parent Field lineage table and has the maximum amount of
+ // entries. We are only deleting from the parent table. Child tables are essentially nested and
+ // would need to be handled separately.
+ getEndpointChecksumTable().scanDeleteAll(createStartTimeEndRange(maxTimeEpoch));
+ }
+
+ private Range createStartTimeEndRange(long endTime) {
+ ImmutableList> end = ImmutableList.of(
+ Fields.longField(StoreDefinition.FieldLineageStore.START_TIME_FIELD, invertTime(endTime)));
+ // Since the times are inverted the end time will be the start of the range.
+ return Range.from(end, Range.Bound.EXCLUSIVE);
+ }
@Nullable
private Set readOperations(long checksum) throws IOException {
List> fields = getOperationsKey(checksum);
diff --git a/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/common/MetricStructuredTable.java b/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/common/MetricStructuredTable.java
index 0d595aa5743c..29b6b45fcecf 100644
--- a/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/common/MetricStructuredTable.java
+++ b/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/common/MetricStructuredTable.java
@@ -315,6 +315,25 @@ public void deleteAll(Range keyRange) throws InvalidFieldException, IOException
}
}
+ @Override
+ public void scanDeleteAll(Range keyRange)
+ throws InvalidFieldException, UnsupportedOperationException, IOException {
+ try {
+ if (!emitTimeMetrics) {
+ structuredTable.scanDeleteAll(keyRange);
+ } else {
+ long curTime = System.nanoTime();
+ structuredTable.scanDeleteAll(keyRange);
+ long duration = System.nanoTime() - curTime;
+ metricsCollector.increment(metricPrefix + "scanDeleteAll.time", duration);
+ }
+ metricsCollector.increment(metricPrefix + "scanDeleteAll.count", 1L);
+ } catch (Exception e) {
+ metricsCollector.increment(metricPrefix + "scanDeleteAll.error", 1L);
+ throw e;
+ }
+ }
+
@Override
public void updateAll(Range keyRange, Collection> fields)
throws InvalidFieldException, IOException {
diff --git a/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/nosql/NoSqlStructuredTable.java b/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/nosql/NoSqlStructuredTable.java
index 5b86f3cb9eae..1aa40adbf95a 100644
--- a/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/nosql/NoSqlStructuredTable.java
+++ b/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/nosql/NoSqlStructuredTable.java
@@ -456,6 +456,17 @@ public void deleteAll(Range keyRange) throws InvalidFieldException, IOException
}
}
+ @Override
+ public void scanDeleteAll(Range keyRange)
+ throws InvalidFieldException, UnsupportedOperationException, IOException {
+ // For No SQL tables we are not supporting deletes on random columns because of performance
+ // concerns.
+ throw new UnsupportedOperationException(
+ String.format("scanDeleteAll operation not supported for NoSQL Table: %s",
+ schema.getTableId()
+ .getName()));
+ }
+
@Override
public void updateAll(Range keyRange, Collection> fields) throws InvalidFieldException {
LOG.trace("Table {}: Update fields {} in range {}", schema.getTableId(), fields, keyRange);
diff --git a/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/sql/PostgreSqlStructuredTable.java b/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/sql/PostgreSqlStructuredTable.java
index 392d4dca14aa..0a2a55943af9 100644
--- a/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/sql/PostgreSqlStructuredTable.java
+++ b/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/sql/PostgreSqlStructuredTable.java
@@ -556,6 +556,19 @@ public void delete(Collection> keys) throws InvalidFieldException, IOEx
public void deleteAll(Range keyRange) throws InvalidFieldException, IOException {
LOG.trace("Table {}: DeleteAll with range {}", tableSchema.getTableId(), keyRange);
fieldValidator.validateScanRange(keyRange);
+ executeDeleteAll(keyRange);
+ }
+
+ @Override
+ public void scanDeleteAll(Range keyRange)
+ throws InvalidFieldException, UnsupportedOperationException, IOException {
+ LOG.trace("Table {}: ScanDeleteAll with range {}", tableSchema.getTableId(), keyRange);
+ keyRange.getBegin().forEach(fieldValidator::validateField);
+ keyRange.getEnd().forEach(fieldValidator::validateField);
+ executeDeleteAll(keyRange);
+ }
+
+ private void executeDeleteAll(Range keyRange) throws IOException {
String sql = getDeleteAllStatement(keyRange);
try (PreparedStatement statement = connection.prepareStatement(sql)) {
setStatementFieldByRange(keyRange, statement);
diff --git a/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/LineageTableTest.java b/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/LineageTableTest.java
index 3e660a606d6a..0006f2d60272 100644
--- a/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/LineageTableTest.java
+++ b/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/LineageTableTest.java
@@ -136,7 +136,7 @@ public void testMultipleRelations() {
}
@SafeVarargs
- private static Set toSet(T... elements) {
+ protected static Set toSet(T... elements) {
return ImmutableSet.copyOf(elements);
}
}
diff --git a/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/NoSqlLineageTableTest.java b/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/NoSqlLineageTableTest.java
index 0c3c80da1386..c58f04bb8524 100644
--- a/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/NoSqlLineageTableTest.java
+++ b/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/NoSqlLineageTableTest.java
@@ -16,16 +16,27 @@
package io.cdap.cdap.data2.metadata.lineage;
+import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.data2.dataset2.DatasetFrameworkTestUtil;
+import io.cdap.cdap.proto.ProgramType;
+import io.cdap.cdap.proto.id.DatasetId;
+import io.cdap.cdap.proto.id.NamespacedEntityId;
+import io.cdap.cdap.proto.id.ProgramId;
+import io.cdap.cdap.proto.id.ProgramRunId;
import io.cdap.cdap.spi.data.StructuredTableAdmin;
import io.cdap.cdap.spi.data.TableAlreadyExistsException;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
+import io.cdap.cdap.spi.data.transaction.TransactionRunners;
import io.cdap.cdap.store.StoreDefinition;
import java.io.IOException;
+import java.time.Instant;
+import java.util.Set;
+import org.apache.twill.api.RunId;
import org.junit.BeforeClass;
import org.junit.ClassRule;
+import org.junit.Test;
public class NoSqlLineageTableTest extends LineageTableTest {
@ClassRule
@@ -41,4 +52,22 @@ public static void beforeClass() throws IOException, TableAlreadyExistsException
StoreDefinition.createAllTables(structuredTableAdmin);
}
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDeleteCompletedLineageRecordsThrowsException() {
+ final Instant currentTime = Instant.now();
+ final RunId runId = RunIds.generate(10000);
+ final DatasetId datasetInstance = new DatasetId("default", "dataset1");
+ final ProgramId program = new ProgramId("default", "app1", ProgramType.SERVICE, "service1");
+ final ProgramRunId run = program.run(runId.getId());
+
+ final long accessTimeMillis = System.currentTimeMillis();
+ TransactionRunners.run(transactionRunner, context -> {
+ LineageTable lineageTable = LineageTable.create(context);
+ lineageTable.addAccess(run, datasetInstance, AccessType.READ, accessTimeMillis);
+ Set entitiesForRun = lineageTable.getEntitiesForRun(run);
+ // Since no-SQL DBs are not supported for time based deletes, this should throw an
+ // UnsupportedOperationException.
+ lineageTable.deleteCompletedLineageRecordsStartedBefore(currentTime);
+ });
+ }
}
diff --git a/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/SqlLineageTableTest.java b/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/SqlLineageTableTest.java
index aa011651bd6f..f244f10bb9bc 100644
--- a/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/SqlLineageTableTest.java
+++ b/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/SqlLineageTableTest.java
@@ -16,30 +16,45 @@
package io.cdap.cdap.data2.metadata.lineage;
+import com.google.common.collect.ImmutableList;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Scopes;
import io.cdap.cdap.api.metrics.MetricsCollectionService;
+import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.guice.ConfigModule;
import io.cdap.cdap.common.guice.LocalLocationModule;
import io.cdap.cdap.common.metrics.NoOpMetricsCollectionService;
import io.cdap.cdap.data.runtime.StorageModule;
import io.cdap.cdap.data.runtime.SystemDatasetRuntimeModule;
+import io.cdap.cdap.proto.ProgramType;
+import io.cdap.cdap.proto.id.DatasetId;
+import io.cdap.cdap.proto.id.NamespacedEntityId;
+import io.cdap.cdap.proto.id.ProgramId;
+import io.cdap.cdap.proto.id.ProgramRunId;
import io.cdap.cdap.spi.data.StructuredTableAdmin;
import io.cdap.cdap.spi.data.TableAlreadyExistsException;
import io.cdap.cdap.spi.data.sql.PostgresInstantiator;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
+import io.cdap.cdap.spi.data.transaction.TransactionRunners;
import io.cdap.cdap.store.StoreDefinition;
import io.zonky.test.db.postgres.embedded.EmbeddedPostgres;
import java.io.IOException;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Set;
+import org.apache.twill.api.RunId;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
+import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class SqlLineageTableTest extends LineageTableTest {
+
@ClassRule
public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
@@ -67,6 +82,55 @@ protected void configure() {
StoreDefinition.createAllTables(injector.getInstance(StructuredTableAdmin.class));
}
+
+ @Test
+ public void testDeleteOutOfRangeCompletedRuns() {
+ final Instant currentTime = Instant.now();
+ final RunId runId = RunIds.generate(10000);
+ final DatasetId datasetInstance = new DatasetId("default", "dataset1");
+ final ProgramId program = new ProgramId("default", "app1", ProgramType.SERVICE, "service1");
+ final ProgramRunId run = program.run(runId.getId());
+ final long accessTimeMillis = System.currentTimeMillis();
+ TransactionRunners.run(transactionRunner, context -> {
+ LineageTable lineageTable = LineageTable.create(context);
+ lineageTable.addAccess(run, datasetInstance, AccessType.READ, accessTimeMillis);
+
+ lineageTable.deleteCompletedLineageRecordsStartedBefore(currentTime);
+
+ Set entitiesForRun = lineageTable.getEntitiesForRun(run);
+ // Asserts that the records are deleted.
+ Assert.assertTrue(entitiesForRun.isEmpty());
+ });
+ }
+
+ @Test
+ public void testDeleteOutOfRangeCompletedRunsDoesNotDeleteLatestRuns() {
+ final Instant currentTime = Instant.now();
+ final RunId runId = RunIds.generate(currentTime.toEpochMilli());
+ final DatasetId datasetInstance = new DatasetId("default", "dataset1");
+ final ProgramId program = new ProgramId("default", "app1", ProgramType.SERVICE, "service1");
+ final ProgramRunId run = program.run(runId.getId());
+
+
+ final long accessTimeMillis = System.currentTimeMillis();
+ TransactionRunners.run(transactionRunner, context -> {
+ LineageTable lineageTable = LineageTable.create(context);
+ lineageTable.addAccess(run, datasetInstance, AccessType.READ, accessTimeMillis);
+
+ Instant deleteStartTime = currentTime.minus(10, ChronoUnit.HOURS);
+ lineageTable.deleteCompletedLineageRecordsStartedBefore(deleteStartTime);
+
+ Relation expected = new Relation(datasetInstance, program, AccessType.READ, runId);
+ Set relations = lineageTable.getRelations(datasetInstance, 0, currentTime.toEpochMilli(), x -> true);
+ // Asserts that the records are not deleted.
+ Assert.assertEquals(1, relations.size());
+ Assert.assertEquals(expected, relations.iterator().next());
+ Assert.assertEquals(toSet(program, datasetInstance), lineageTable.getEntitiesForRun(run));
+ Assert.assertEquals(ImmutableList.of(accessTimeMillis), lineageTable.getAccessTimesForRun(run));
+ });
+ }
+
+
@AfterClass
public static void teardown() throws IOException {
pg.close();
diff --git a/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/field/FieldLineageTableTest.java b/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/field/FieldLineageTableTest.java
index 5ed965d8b578..130cd737bd58 100644
--- a/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/field/FieldLineageTableTest.java
+++ b/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/field/FieldLineageTableTest.java
@@ -224,7 +224,7 @@ public void testMergeSummaries() {
});
}
- private List generateOperations(boolean addAditionalField) {
+ protected List generateOperations(boolean addAditionalField) {
// read: file -> (offset, body)
// parse: (body) -> (first_name, last_name)
// concat: (first_name, last_name) -> (name)
diff --git a/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/field/NoSqlFieldLineageTableTest.java b/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/field/NoSqlFieldLineageTableTest.java
index 0d759c5ff8dc..47419721b113 100644
--- a/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/field/NoSqlFieldLineageTableTest.java
+++ b/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/field/NoSqlFieldLineageTableTest.java
@@ -16,18 +16,27 @@
package io.cdap.cdap.data2.metadata.lineage.field;
+import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.data2.dataset2.DatasetFrameworkTestUtil;
+import io.cdap.cdap.proto.ProgramType;
+import io.cdap.cdap.proto.id.ProgramId;
+import io.cdap.cdap.proto.id.ProgramRunId;
import io.cdap.cdap.spi.data.StructuredTableAdmin;
import io.cdap.cdap.spi.data.TableAlreadyExistsException;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
+import io.cdap.cdap.spi.data.transaction.TransactionRunners;
import io.cdap.cdap.store.StoreDefinition;
import java.io.IOException;
+import java.time.Instant;
+import org.apache.twill.api.RunId;
import org.junit.BeforeClass;
import org.junit.ClassRule;
+import org.junit.Test;
public class NoSqlFieldLineageTableTest extends FieldLineageTableTest {
+
@ClassRule
public static DatasetFrameworkTestUtil dsFrameworkUtil = new DatasetFrameworkTestUtil();
@@ -41,4 +50,22 @@ public static void beforeClass() throws IOException, TableAlreadyExistsException
StoreDefinition.createAllTables(structuredTableAdmin);
}
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDeleteFieldRecordsBefore() {
+ final Instant currentTime = Instant.now();
+
+ RunId runId = RunIds.generate(10000);
+ ProgramId program = new ProgramId("default", "app1", ProgramType.WORKFLOW, "workflow1");
+ final ProgramRunId programRun1 = program.run(runId.getId());
+ final FieldLineageInfo info1 = new FieldLineageInfo(generateOperations(false));
+
+ TransactionRunners.run(transactionRunner, context -> {
+ FieldLineageTable fieldLineageTable = FieldLineageTable.create(context);
+ fieldLineageTable.addFieldLineageInfo(programRun1, info1);
+ // Should throw UnsupportedOperationException exception since non-primary key based deletes
+ // are not supported in no-sql.
+ fieldLineageTable.deleteFieldRecordsBefore(currentTime);
+ });
+ }
+
}
diff --git a/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/field/SqlFieldLineageTableTest.java b/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/field/SqlFieldLineageTableTest.java
index db98a9580269..190ebe497fbd 100644
--- a/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/field/SqlFieldLineageTableTest.java
+++ b/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/field/SqlFieldLineageTableTest.java
@@ -16,27 +16,43 @@
package io.cdap.cdap.data2.metadata.lineage.field;
+import com.google.common.collect.ImmutableList;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Scopes;
+import io.cdap.cdap.api.lineage.field.EndPoint;
import io.cdap.cdap.api.metrics.MetricsCollectionService;
+import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.guice.ConfigModule;
import io.cdap.cdap.common.guice.LocalLocationModule;
import io.cdap.cdap.common.metrics.NoOpMetricsCollectionService;
import io.cdap.cdap.data.runtime.StorageModule;
import io.cdap.cdap.data.runtime.SystemDatasetRuntimeModule;
+import io.cdap.cdap.proto.ProgramType;
+import io.cdap.cdap.proto.id.ProgramId;
+import io.cdap.cdap.proto.id.ProgramRunId;
import io.cdap.cdap.spi.data.StructuredTableAdmin;
import io.cdap.cdap.spi.data.TableAlreadyExistsException;
import io.cdap.cdap.spi.data.sql.PostgresInstantiator;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
+import io.cdap.cdap.spi.data.transaction.TransactionRunners;
import io.cdap.cdap.store.StoreDefinition;
import io.zonky.test.db.postgres.embedded.EmbeddedPostgres;
import java.io.IOException;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.twill.api.RunId;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
+import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class SqlFieldLineageTableTest extends FieldLineageTableTest {
@@ -67,6 +83,69 @@ protected void configure() {
StoreDefinition.createAllTables(injector.getInstance(StructuredTableAdmin.class));
}
+ @Test
+ public void testDeleteFieldRecordsBefore() {
+ final Instant currentTime = Instant.now();
+
+ RunId runId = RunIds.generate(10000);
+ ProgramId program = new ProgramId("default", "app1", ProgramType.WORKFLOW, "workflow1");
+ final ProgramRunId programRun1 = program.run(runId.getId());
+ final FieldLineageInfo info1 = new FieldLineageInfo(generateOperations(false));
+
+ TransactionRunners.run(transactionRunner, context -> {
+ FieldLineageTable fieldLineageTable = FieldLineageTable.create(context);
+ fieldLineageTable.addFieldLineageInfo(programRun1, info1);
+ EndPoint source = EndPoint.of("ns1", "endpoint1");
+ EndPoint destination = EndPoint.of("myns", "another_file");
+
+ fieldLineageTable.deleteFieldRecordsBefore(currentTime);
+ List actual = fieldLineageTable.getEndpoints("ns1",
+ program.getProgramReference(), runId);
+
+ // Asserts that only the endpoint records are deleted.
+ Assert.assertTrue(actual.isEmpty());
+ // Since the base table is deleted, get fields also won't return anything.
+ Assert.assertTrue(fieldLineageTable.getFields(destination, 0, 10001).isEmpty());
+ Assert.assertTrue(fieldLineageTable.getFields(source, 0, 10001).isEmpty());
+
+ });
+ }
+
+ @Test
+ public void testDeleteFieldRecordsBeforeDoesNotDelete() {
+ final Instant currentTime = Instant.now();
+
+ RunId runId = RunIds.generate(
+ currentTime.minus(1, ChronoUnit.MINUTES)
+ .toEpochMilli());
+ ProgramId program = new ProgramId("default", "app1", ProgramType.WORKFLOW, "workflow1");
+ final ProgramRunId programRun1 = program.run(runId.getId());
+ final FieldLineageInfo info1 = new FieldLineageInfo(generateOperations(false));
+
+ TransactionRunners.run(transactionRunner, context -> {
+ FieldLineageTable fieldLineageTable = FieldLineageTable.create(context);
+ fieldLineageTable.addFieldLineageInfo(programRun1, info1);
+ EndPoint source = EndPoint.of("ns1", "endpoint1");
+ EndPoint destination = EndPoint.of("myns", "another_file");
+ Instant deleteTime = currentTime.minus(2, ChronoUnit.HOURS);
+ fieldLineageTable.deleteFieldRecordsBefore(deleteTime);
+ List actual = fieldLineageTable.getEndpoints("ns1",
+ program.getProgramReference(), runId);
+ List expected = ImmutableList.of(EndPoint.of("ns1", "endpoint1"));
+ // Asserts that only the endpoint records are deleted.
+ Assert.assertEquals(expected, actual);
+
+ Set expectedDestinationFields = new HashSet<>(Arrays.asList("offset", "name"));
+ Set expectedSourceFields = new HashSet<>(Arrays.asList("offset", "body"));
+ // End time of currentTime should return the data for the run which was added at time
+ // currentTime - 1 minute.
+ Assert.assertEquals(expectedDestinationFields,
+ fieldLineageTable.getFields(destination, 0, currentTime.toEpochMilli()));
+ Assert.assertEquals(expectedSourceFields, fieldLineageTable.getFields(source, 0, currentTime.toEpochMilli()));
+
+ });
+ }
+
@AfterClass
public static void teardown() throws IOException {
pg.close();
diff --git a/cdap-data-fabric/src/test/java/io/cdap/cdap/spi/data/nosql/NoSqlStructuredTableTest.java b/cdap-data-fabric/src/test/java/io/cdap/cdap/spi/data/nosql/NoSqlStructuredTableTest.java
index 861f7948cdfd..a11bd0d7f9ff 100644
--- a/cdap-data-fabric/src/test/java/io/cdap/cdap/spi/data/nosql/NoSqlStructuredTableTest.java
+++ b/cdap-data-fabric/src/test/java/io/cdap/cdap/spi/data/nosql/NoSqlStructuredTableTest.java
@@ -29,6 +29,7 @@
import io.cdap.cdap.spi.data.table.StructuredTableSchema;
import io.cdap.cdap.spi.data.table.field.Field;
import io.cdap.cdap.spi.data.table.field.Fields;
+import io.cdap.cdap.spi.data.transaction.TransactionException;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import java.io.IOException;
import java.util.ArrayList;
@@ -202,6 +203,11 @@ public void testFilterByIndexIteratorMultiMatch() {
Assert.assertEquals(actual.get(2), filterIndex.getValue());
}
+ @Test(expected = TransactionException.class)
+ @Override
+ public void testScanDeleteAll() throws Exception{
+ super.testScanDeleteAll();
+ }
private static class MockScanner implements Scanner {
private final Iterator iterator;
private boolean closed;
diff --git a/cdap-storage-ext-spanner/src/main/java/io/cdap/cdap/storage/spanner/SpannerStructuredTable.java b/cdap-storage-ext-spanner/src/main/java/io/cdap/cdap/storage/spanner/SpannerStructuredTable.java
index 96c42968c5eb..47c150816ce3 100644
--- a/cdap-storage-ext-spanner/src/main/java/io/cdap/cdap/storage/spanner/SpannerStructuredTable.java
+++ b/cdap-storage-ext-spanner/src/main/java/io/cdap/cdap/storage/spanner/SpannerStructuredTable.java
@@ -37,6 +37,7 @@
import io.cdap.cdap.spi.data.table.field.FieldValidator;
import io.cdap.cdap.spi.data.table.field.Fields;
import io.cdap.cdap.spi.data.table.field.Range;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -388,6 +389,16 @@ public void deleteAll(Range range) throws InvalidFieldException {
transactionContext.executeUpdate(builder.build());
}
+ @Override
+ public void scanDeleteAll(Range keyRange)
+ throws InvalidFieldException, UnsupportedOperationException, IOException {
+ // At this point we do not plan to support such operations in spanner, we may look to add this
+ // at a later time post performance testing.
+ throw new UnsupportedOperationException(
+ String.format("scanDeleteAll operation not supported for Spanner Table: %s",
+ schema.getTableId().getName()));
+ }
+
@Override
public void updateAll(Range keyRange, Collection> fields) throws InvalidFieldException {
// validate that the range is strictly a primary key prefix
diff --git a/cdap-storage-ext-spanner/src/test/java/io/cdap/cdap/storage/spanner/SpannerStructuredTableTest.java b/cdap-storage-ext-spanner/src/test/java/io/cdap/cdap/storage/spanner/SpannerStructuredTableTest.java
index e0ed8b37fa2f..79cc1fb38af8 100644
--- a/cdap-storage-ext-spanner/src/test/java/io/cdap/cdap/storage/spanner/SpannerStructuredTableTest.java
+++ b/cdap-storage-ext-spanner/src/test/java/io/cdap/cdap/storage/spanner/SpannerStructuredTableTest.java
@@ -21,6 +21,7 @@
import io.cdap.cdap.spi.data.StructuredTable;
import io.cdap.cdap.spi.data.StructuredTableAdmin;
import io.cdap.cdap.spi.data.StructuredTableTest;
+import io.cdap.cdap.spi.data.transaction.TransactionException;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import java.util.Collections;
import java.util.HashMap;
@@ -30,6 +31,7 @@
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Ignore;
+import org.junit.Test;
/**
* Unit tests for GCP spanner implementation of the {@link StructuredTable}. This test needs the following
@@ -93,6 +95,11 @@ protected TransactionRunner getTransactionRunner() {
return storageProvider.getTransactionRunner();
}
+ @Test(expected = TransactionException.class)
+ @Override
+ public void testScanDeleteAll() throws Exception{
+ super.testScanDeleteAll();
+ }
private static final class MockStorageProviderContext implements StorageProviderContext {
private final Map config;
diff --git a/cdap-storage-spi/src/main/java/io/cdap/cdap/spi/data/StructuredTable.java b/cdap-storage-spi/src/main/java/io/cdap/cdap/spi/data/StructuredTable.java
index df05e525b7d6..e4272b3c9292 100644
--- a/cdap-storage-spi/src/main/java/io/cdap/cdap/spi/data/StructuredTable.java
+++ b/cdap-storage-spi/src/main/java/io/cdap/cdap/spi/data/StructuredTable.java
@@ -274,11 +274,29 @@ void increment(Collection> keys, String column, long amount)
*
* @param keyRange key range of the rows to delete
* @throws InvalidFieldException if any of the keys are not part of table schema, or their
- * types do not match the schema
+ * types do not match the schema or the first key in range does not match the first field of
+ * the primary key.
* @throws IOException if there is an error reading or deleting from the table
*/
void deleteAll(Range keyRange) throws InvalidFieldException, IOException;
+ /**
+ * Delete a range of rows from the table based on a potentially non indexed column.
+ *
+ *
+ * This is a potentially slow operation because it may query a non primary key or a non indexed
+ * column.
+ *
+ *
+ * @param keyRange key range of the rows to delete
+ * @throws InvalidFieldException if any of the keys are not part of table schema, or their
+ * types do not match the schema
+ * @throws UnsupportedOperationException if this method is not supported in the database
+ * implementation.
+ * @throws IOException if there is an error reading or deleting from the table
+ */
+ void scanDeleteAll(Range keyRange) throws InvalidFieldException, UnsupportedOperationException, IOException;
+
/**
* Updates the specific fields in a range of rows from the table.
*
diff --git a/cdap-storage-spi/src/test/java/io/cdap/cdap/spi/data/StructuredTableTest.java b/cdap-storage-spi/src/test/java/io/cdap/cdap/spi/data/StructuredTableTest.java
index 81c876486809..0b7446f7f441 100644
--- a/cdap-storage-spi/src/test/java/io/cdap/cdap/spi/data/StructuredTableTest.java
+++ b/cdap-storage-spi/src/test/java/io/cdap/cdap/spi/data/StructuredTableTest.java
@@ -1037,6 +1037,70 @@ public void testDeleteAll() throws Exception {
Assert.assertEquals(expected, scanSimpleStructuredRows(Range.all(), max));
}
+ @Test
+ public void testScanDeleteAll() throws Exception {
+ int max = 10;
+ List>> expected = writeSimpleStructuredRows(max, "");
+ Assert.assertEquals(max, expected.size());
+ // Delete 7-9 (both inclusive) using a column which is part of PK but not the first PK.
+ expected.subList(7, 10).clear();
+ getTransactionRunner().run(context -> {
+ StructuredTable table = context.getTable(SIMPLE_TABLE);
+ Range range = Range.create(Arrays.asList(Fields.longField(KEY2, 7L)),
+ Range.Bound.INCLUSIVE,
+ Arrays.asList(Fields.longField(KEY2, 9L)),
+ Range.Bound.INCLUSIVE);
+ table.scanDeleteAll(range);
+ });
+ // Verify the deletion
+ Assert.assertEquals(expected, scanSimpleStructuredRows(Range.all(), max));
+
+ // Delete 6 using a random column
+ expected.subList(6, 7).clear();
+ getTransactionRunner().run(context -> {
+ StructuredTable table = context.getTable(SIMPLE_TABLE);
+ Range range = Range.create(Arrays.asList(Fields.doubleField(DOUBLE_COL, 6.0)),
+ Range.Bound.INCLUSIVE,
+ Arrays.asList(Fields.doubleField(DOUBLE_COL, 6.0)),
+ Range.Bound.INCLUSIVE);
+ table.scanDeleteAll(range);
+ });
+
+ // Verify the deletion
+ Assert.assertEquals(expected, scanSimpleStructuredRows(Range.all(), max));
+
+ // Delete 2-5 (end exclusive) using the first key only
+ expected.subList(2, 5).clear();
+ getTransactionRunner().run(context -> {
+ StructuredTable table = context.getTable(SIMPLE_TABLE);
+ Range range = Range.create(Collections.singletonList(Fields.intField(KEY, 2)), Range.Bound.INCLUSIVE,
+ Collections.singletonList(Fields.intField(KEY, 5)), Range.Bound.EXCLUSIVE);
+ table.scanDeleteAll(range);
+ });
+ // Verify the deletion
+ Assert.assertEquals(expected, scanSimpleStructuredRows(Range.all(), max));
+
+ // Use a range outside the element list, nothing should get deleted
+ getTransactionRunner().run(context -> {
+ StructuredTable table = context.getTable(SIMPLE_TABLE);
+ Range range = Range.create(Collections.singletonList(Fields.intField(KEY, max + 1)), Range.Bound.INCLUSIVE,
+ Collections.singletonList(Fields.intField(KEY, max + 5)), Range.Bound.EXCLUSIVE);
+ table.scanDeleteAll(range);
+ });
+ // Verify the deletion
+ Assert.assertEquals(expected, scanSimpleStructuredRows(Range.all(), max));
+
+ // Delete all the remaining
+ expected.clear();
+ getTransactionRunner().run(context -> {
+ StructuredTable table = context.getTable(SIMPLE_TABLE);
+ table.scanDeleteAll(Range.all());
+ });
+ // Verify the deletion
+ Assert.assertEquals(expected, scanSimpleStructuredRows(Range.all(), max));
+ }
+
+
@Test
public void testIndexScanIndexStringFieldType() throws Exception {
int num = 5;