Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retention job summary events #3683

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,21 @@

package org.apache.gobblin.data.management.dataset;

import lombok.RequiredArgsConstructor;

import java.io.IOException;
import java.util.Collection;

import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import com.google.common.collect.ImmutableList;

import lombok.RequiredArgsConstructor;

import org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.data.management.copy.CopyableDataset;
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.CopyableDataset;
import org.apache.gobblin.data.management.retention.dataset.CleanableDataset;
import org.apache.gobblin.dataset.FileSystemDataset;


/**
Expand All @@ -42,8 +42,9 @@ public class DummyDataset implements CopyableDataset, CleanableDataset, FileSyst

private final Path datasetRoot;

@Override public void clean() throws IOException {
@Override public int clean() throws IOException {
// Do nothing
return 0;
}

@Override public Collection<? extends CopyEntity> getCopyableFiles(FileSystem targetFs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand All @@ -47,6 +48,7 @@
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.data.management.retention.dataset.CleanableDataset;
import org.apache.gobblin.data.management.retention.profile.MultiCleanableDatasetFinder;
import org.apache.gobblin.data.management.retention.version.DatasetRetentionSummary;
import org.apache.gobblin.dataset.Dataset;
import org.apache.gobblin.dataset.DatasetsFinder;
import org.apache.gobblin.instrumented.Instrumentable;
Expand All @@ -56,6 +58,7 @@
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.DynamicConfigGeneratorFactory;
import org.apache.gobblin.runtime.util.GsonUtils;
import org.apache.gobblin.util.AzkabanTags;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
Expand Down Expand Up @@ -88,6 +91,7 @@ public class DatasetCleaner implements Instrumentable, Closeable {
private Optional<Meter> datasetsCleanSuccessMeter = Optional.absent();
private Optional<Meter> datasetsCleanFailureMeter = Optional.absent();
private Optional<CountDownLatch> finishCleanSignal;
private final ConcurrentHashMap<String, DatasetRetentionSummary> datasetRetentionSummaries = new ConcurrentHashMap<>();
private final List<Throwable> throwables;

public DatasetCleaner(FileSystem fs, Properties props) throws IOException {
Expand Down Expand Up @@ -141,21 +145,24 @@ public void clean() throws IOException {
List<Dataset> dataSets = this.datasetFinder.findDatasets();
this.finishCleanSignal = Optional.of(new CountDownLatch(dataSets.size()));
for (final Dataset dataset : dataSets) {
ListenableFuture<Void> future = this.service.submit(new Callable<Void>() {
ListenableFuture<Integer> future = this.service.submit(new Callable<Integer>(){
@Override
public Void call() throws Exception {
public Integer call() throws Exception {
if (dataset instanceof CleanableDataset) {
((CleanableDataset) dataset).clean();
int versionsDeleted = ((CleanableDataset) dataset).clean();
return versionsDeleted;
}
return null;
return 0;
}
});
Futures.addCallback(future, new FutureCallback<Void>() {
Futures.addCallback(future, new FutureCallback<Integer>() {
@Override
public void onFailure(Throwable throwable) {
DatasetCleaner.this.finishCleanSignal.get().countDown();
LOG.warn("Exception caught when cleaning " + dataset.datasetURN() + ".", throwable);
DatasetCleaner.this.throwables.add(throwable);
DatasetRetentionSummary summary = new DatasetRetentionSummary(dataset.datasetURN(), 0, false);
DatasetCleaner.this.datasetRetentionSummaries.put(dataset.datasetURN(), summary);
Instrumented.markMeter(DatasetCleaner.this.datasetsCleanFailureMeter);
DatasetCleaner.this.eventSubmitter.submit(RetentionEvents.CleanFailed.EVENT_NAME,
ImmutableMap.of(RetentionEvents.CleanFailed.FAILURE_CONTEXT_METADATA_KEY,
Expand All @@ -164,9 +171,11 @@ public void onFailure(Throwable throwable) {
}

@Override
public void onSuccess(Void arg0) {
public void onSuccess(Integer datasetsDeleted) {
DatasetCleaner.this.finishCleanSignal.get().countDown();
LOG.info("Successfully cleaned: " + dataset.datasetURN());
DatasetRetentionSummary summary = new DatasetRetentionSummary(dataset.datasetURN(), datasetsDeleted, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it certain we'd only ever overwrite the summary, rather than potentially needing to add this new count to whatever accumulating sum was already tracked for the same dataset URN?

DatasetCleaner.this.datasetRetentionSummaries.put(dataset.datasetURN(), summary);
LOG.info("Successfully cleaned: {} with {} versions marked for deletion", dataset.datasetURN(), datasetsDeleted);
Instrumented.markMeter(DatasetCleaner.this.datasetsCleanSuccessMeter);
}
});
Expand All @@ -187,8 +196,12 @@ public void close() throws IOException {
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Not all datasets finish cleanning", e);
throw new IOException("Not all datasets finished cleaning", e);
} finally {
// Submit a job summary event
String serializedSummary = GsonUtils.gsonBuilderWithSerializationSupport().disableHtmlEscaping().create().toJson(this.datasetRetentionSummaries.values());
this.eventSubmitter.submit(RetentionEvents.RetentionJobSummary.EVENT_NAME,
ImmutableMap.of(RetentionEvents.RetentionJobSummary.DATASET_VERSIONS_CLEANED_METADATA_KEY, serializedSummary));
ExecutorsUtils.shutdownExecutorService(this.service, Optional.of(LOG));
this.closer.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.gobblin.data.management.retention.dataset.CleanableDataset;


/**
* Holds event names and constants used in events submitted by a retention job.
*/
Expand All @@ -34,6 +35,12 @@ static class CleanFailed {
*/
static final String FAILURE_CONTEXT_METADATA_KEY = "failureContext";
}

static class RetentionJobSummary {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: RetentionEvents.JobSummary would avoid redundant Retention

static final String EVENT_NAME = "RetentionJobSummary";
static final String DATASET_VERSIONS_CLEANED_METADATA_KEY = "datasetsMarkedForCleaning";
}

static final String NAMESPACE = "gobblin.data.management.retention";
static final String DATASET_URN_METADATA_KEY = "datasetUrn";
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.gobblin.data.management.retention.dataset;

import java.io.IOException;

import org.apache.gobblin.dataset.Dataset;


Expand All @@ -28,10 +29,11 @@
public interface CleanableDataset extends Dataset {

/**
* Cleans the {@link CleanableDataset}. In general, this means to apply a
* Cleans the {@link CleanableDataset}.
* Returns the number of versions marked for deletion
* {@link org.apache.gobblin.data.management.retention.policy.RetentionPolicy} and delete files and directories that need deleting.
Comment on lines +32 to 34
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like new line 33 cut in mid-sentence to the prior javadoc.

also, prefer a @return tag

* @throws IOException
*/
public void clean() throws IOException;
public int clean() throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;

import com.google.common.collect.Lists;

import org.apache.gobblin.data.management.policy.VersionSelectionPolicy;
import org.apache.gobblin.data.management.version.DatasetStateStoreVersion;
import org.apache.gobblin.data.management.version.DatasetVersion;
import org.apache.gobblin.data.management.version.finder.VersionFinder;
import org.apache.gobblin.metastore.DatasetStoreDataset;
import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager;
import com.google.common.collect.Lists;


/**
Expand All @@ -44,7 +46,7 @@ public CleanableDatasetStoreDataset(DatasetStoreDataset.Key key, List<DatasetSta
public abstract VersionSelectionPolicy<T> getVersionSelectionPolicy();

@Override
public void clean() throws IOException {
public int clean() throws IOException {

List<T> versions = Lists.newArrayList(this.getVersionFinder().findDatasetVersions(this));

Expand All @@ -55,5 +57,7 @@ public void clean() throws IOException {
for (Object version : deletableVersions) {
((DatasetStateStoreVersion) version).getEntry().delete();
}

return deletableVersions.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,13 @@ public CleanableHiveDataset(FileSystem fs, HiveMetastoreClientPool clientPool, T
* </p>
*/
@Override
public void clean() throws IOException {
public int clean() throws IOException {

List versions = Lists.newArrayList(this.hiveDatasetVersionFinder.findDatasetVersions(this));

if (versions.isEmpty()) {
log.warn(String.format("No dataset version can be found. Ignoring %s", this.getTable().getCompleteName()));
return;
return 0;
}

Collections.sort(versions, Collections.reverseOrder());
Expand Down Expand Up @@ -159,6 +159,7 @@ public void clean() throws IOException {
if (!exceptions.isEmpty()) {
throw new RuntimeException(String.format("Deletion failed for %s partitions", exceptions.size()));
}
return deletableVersions.size();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@

package org.apache.gobblin.data.management.retention.dataset;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -30,6 +26,18 @@
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.catalog.TableIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.data.management.policy.VersionSelectionPolicy;
Expand All @@ -39,11 +47,6 @@
import org.apache.gobblin.iceberg.GobblinMCEProducer;
import org.apache.gobblin.metadata.OperationType;
import org.apache.gobblin.metadata.SchemaSource;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.catalog.TableIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand Down Expand Up @@ -75,15 +78,15 @@ public CleanableIcebergDataset(FileSystem fs, Properties props, Path datasetRoot
}

@Override
public void clean() throws IOException {
public int clean() throws IOException {

if (this.isDatasetBlacklisted) {
this.log.info("Dataset blacklisted. Cleanup skipped for " + datasetRoot());
return;
return 0;
}

boolean atLeastOneFailureSeen = false;

int totalVersionsDeleted = 0;
for (VersionFinderAndPolicy<T> versionFinderAndPolicy : getVersionFindersAndPolicies()) {
Config retentionConfig = versionFinderAndPolicy.getConfig();
Preconditions.checkArgument(retentionConfig != null,
Expand All @@ -108,7 +111,7 @@ public void clean() throws IOException {
Collections.sort(versions, Collections.reverseOrder());

Collection<T> deletableVersions = selectionPolicy.listSelectedVersions(versions);

totalVersionsDeleted += deletableVersions.size();
cleanImpl(deletableVersions, retentionConfig);
}

Expand All @@ -125,6 +128,7 @@ public void clean() throws IOException {
log.error("interrupted while sleep");
throw new IOException(e);
}
return totalVersionsDeleted;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,6 @@
import java.util.List;
import java.util.Properties;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.Singular;

import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;

Expand All @@ -37,6 +32,11 @@
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.Singular;

import org.apache.gobblin.data.management.policy.EmbeddedRetentionSelectionPolicy;
import org.apache.gobblin.data.management.policy.SelectNothingPolicy;
import org.apache.gobblin.data.management.policy.VersionSelectionPolicy;
Expand Down Expand Up @@ -255,15 +255,16 @@ public MultiVersionCleanableDatasetBase(FileSystem fs, Properties properties, bo
*
*/
@Override
public void clean() throws IOException {
public int clean() throws IOException {

if (this.isDatasetBlacklisted) {
this.log.info("Dataset blacklisted. Cleanup skipped for " + datasetRoot());
return;
return 0;
}

boolean atLeastOneFailureSeen = false;

int totalVersionsDeleted = 0;
for (VersionFinderAndPolicy<T> versionFinderAndPolicy : getVersionFindersAndPolicies()) {

VersionSelectionPolicy<T> selectionPolicy = versionFinderAndPolicy.getVersionSelectionPolicy();
Expand All @@ -277,7 +278,6 @@ public void clean() throws IOException {
versionFinder.getClass().getName(), selectionPolicy));

List<T> versions = Lists.newArrayList(versionFinder.findDatasetVersions(this));

if (versions.isEmpty()) {
this.log.warn("No dataset version can be found. Ignoring.");
continue;
Expand All @@ -286,7 +286,7 @@ public void clean() throws IOException {
Collections.sort(versions, Collections.reverseOrder());

Collection<T> deletableVersions = selectionPolicy.listSelectedVersions(versions);

totalVersionsDeleted += deletableVersions.size();
cleanImpl(deletableVersions);

List<DatasetVersion> allVersions = Lists.newArrayList();
Expand All @@ -308,6 +308,7 @@ public void clean() throws IOException {
throw new RuntimeException(String.format(
"At least one failure happened while processing %s. Look for previous logs for failures", datasetRoot()));
}
return totalVersionsDeleted;
}

protected void cleanImpl(Collection<T> deletableVersions) throws IOException {
Expand Down
Loading