Skip to content

Commit

Permalink
[GOBBLIN-1824]Improving the Efficiency of Work Planning in Manifest-B…
Browse files Browse the repository at this point in the history
…ased DistCp Jobs (apache#3686)

* address comments

* use connectionmanager when httpclient is not cloesable

* [GOBBLIN-1810] Support general iceberg catalog in icebergMetadataWriter

* [GOBBLIN-1824]Improving the Efficiency of Work Planning in Manifest-Based DistCp Jobs

* remove un-intended change

* address comments and remove the multi threading since it does not improve too much

* address comments

* make cache TTL configurable

* add comments to describe the difference of resolveReplicatedOwnerAndPermissionsRecursivelyWithCache and resolveReplicatedOwnerAndPermissionsRecursively

---------

Co-authored-by: Zihan Li <[email protected]>
  • Loading branch information
ZihanLi58 and Zihan Li authored May 3, 2023
1 parent b5d2558 commit 7bbf676
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

package org.apache.gobblin.data.management.copy;

import com.google.common.cache.Cache;
import java.io.IOException;
import java.util.List;
import java.util.Map;

import java.util.concurrent.ExecutionException;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -58,6 +61,7 @@
@Setter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@EqualsAndHashCode(callSuper = true)
@Slf4j
public class CopyableFile extends CopyEntity implements File {
private static final byte[] EMPTY_CHECKSUM = new byte[0];

Expand Down Expand Up @@ -375,6 +379,36 @@ public static List<OwnerAndPermission> resolveReplicatedOwnerAndPermissionsRecur
return ownerAndPermissions;
}

/**
* Compute the correct {@link OwnerAndPermission} obtained from replicating source owner and permissions and applying
* the {@link PreserveAttributes} rules for fromPath and every ancestor up to but excluding toPath.
* Unlike the resolveReplicatedOwnerAndPermissionsRecursively() method, this method utilizes permissionMap as a cache to minimize the number of calls to HDFS.
* It is recommended to use this method when recursively calculating permissions for numerous files that share the same ancestor.
*
* @return A list of the computed {@link OwnerAndPermission}s starting from fromPath, up to but excluding toPath.
* @throws IOException if toPath is not an ancestor of fromPath.
*/
public static List<OwnerAndPermission> resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(FileSystem sourceFs, Path fromPath,
Path toPath, CopyConfiguration copyConfiguration, Cache<String, OwnerAndPermission> permissionMap)
throws IOException, ExecutionException {

if (!PathUtils.isAncestor(toPath, fromPath)) {
throw new IOException(String.format("toPath %s must be an ancestor of fromPath %s.", toPath, fromPath));
}

List<OwnerAndPermission> ownerAndPermissions = Lists.newArrayList();
Path currentPath = fromPath;

while (currentPath.getParent() != null && PathUtils.isAncestor(toPath, currentPath.getParent())) {
Path finalCurrentPath = currentPath;
ownerAndPermissions.add(permissionMap.get(finalCurrentPath.toString(), () -> resolveReplicatedOwnerAndPermission(sourceFs,
finalCurrentPath, copyConfiguration)));
currentPath = currentPath.getParent();
}

return ownerAndPermissions;
}

public static Map<String, OwnerAndPermission> resolveReplicatedAncestorOwnerAndPermissionsRecursively(FileSystem sourceFs, Path fromPath,
Path toPath, CopyConfiguration copyConfiguration) throws IOException {
Preconditions.checkArgument(sourceFs.getFileStatus(fromPath).isDirectory(), "Source path must be a directory.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.gobblin.data.management.copy;

import com.google.common.base.Optional;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.JsonIOException;
Expand All @@ -27,6 +29,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
Expand All @@ -47,19 +50,23 @@ public class ManifestBasedDataset implements IterableCopyableDataset {

private static final String DELETE_FILE_NOT_EXIST_ON_SOURCE = ManifestBasedDatasetFinder.CONFIG_PREFIX + ".deleteFileNotExistOnSource";
private static final String COMMON_FILES_PARENT = ManifestBasedDatasetFinder.CONFIG_PREFIX + ".commonFilesParent";
private static final String PERMISSION_CACHE_TTL_SECONDS = ManifestBasedDatasetFinder.CONFIG_PREFIX + ".permission.cache.ttl.seconds";
private static final String DEFAULT_PERMISSION_CACHE_TTL_SECONDS = "30";
private static final String DEFAULT_COMMON_FILES_PARENT = "/";
private final FileSystem fs;
private final Path manifestPath;
private final Properties properties;
private final boolean deleteFileThatNotExistOnSource;
private final String commonFilesParent;
private final int permissionCacheTTLSeconds;

public ManifestBasedDataset(final FileSystem fs, Path manifestPath, Properties properties) {
this.fs = fs;
this.manifestPath = manifestPath;
this.properties = properties;
this.deleteFileThatNotExistOnSource = Boolean.parseBoolean(properties.getProperty(DELETE_FILE_NOT_EXIST_ON_SOURCE, "false"));
this.commonFilesParent = properties.getProperty(COMMON_FILES_PARENT, DEFAULT_COMMON_FILES_PARENT);
this.permissionCacheTTLSeconds = Integer.parseInt(properties.getProperty(PERMISSION_CACHE_TTL_SECONDS, DEFAULT_PERMISSION_CACHE_TTL_SECONDS));
}

@Override
Expand All @@ -82,33 +89,39 @@ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, Cop
List<FileStatus> toDelete = Lists.newArrayList();
//todo: put permission preserve logic here?
try {
long startTime = System.currentTimeMillis();
manifests = CopyManifest.getReadIterator(this.fs, this.manifestPath);
Cache<String, OwnerAndPermission> permissionMap = CacheBuilder.newBuilder().expireAfterAccess(permissionCacheTTLSeconds, TimeUnit.SECONDS).build();
int numFiles = 0;
while (manifests.hasNext()) {
numFiles++;
CopyManifest.CopyableUnit file = manifests.next();
//todo: We can use fileSet to partition the data in case of some softbound issue
//todo: After partition, change this to directly return iterator so that we can save time if we meet resources limitation
CopyManifest.CopyableUnit file = manifests.next();
Path fileToCopy = new Path(file.fileName);
if (this.fs.exists(fileToCopy)) {
if (fs.exists(fileToCopy)) {
boolean existOnTarget = targetFs.exists(fileToCopy);
FileStatus srcFile = this.fs.getFileStatus(fileToCopy);
if (!existOnTarget || shouldCopy(this.fs, targetFs, srcFile, targetFs.getFileStatus(fileToCopy), configuration)) {
CopyableFile copyableFile =
CopyableFile.fromOriginAndDestination(this.fs, srcFile, fileToCopy, configuration)
FileStatus srcFile = fs.getFileStatus(fileToCopy);
OwnerAndPermission replicatedPermission = CopyableFile.resolveReplicatedOwnerAndPermission(fs, srcFile, configuration);
if (!existOnTarget || shouldCopy(targetFs, srcFile, targetFs.getFileStatus(fileToCopy), replicatedPermission)) {
CopyableFile.Builder copyableFileBuilder =
CopyableFile.fromOriginAndDestination(fs, srcFile, fileToCopy, configuration)
.fileSet(datasetURN())
.datasetOutputPath(fileToCopy.toString())
.ancestorsOwnerAndPermission(CopyableFile
.resolveReplicatedOwnerAndPermissionsRecursively(this.fs, fileToCopy.getParent(),
new Path(this.commonFilesParent), configuration))
.build();
copyableFile.setFsDatasets(this.fs, targetFs);
.ancestorsOwnerAndPermission(
CopyableFile.resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(fs, fileToCopy.getParent(),
new Path(commonFilesParent), configuration, permissionMap))
.destinationOwnerAndPermission(replicatedPermission);
CopyableFile copyableFile = copyableFileBuilder.build();
copyableFile.setFsDatasets(fs, targetFs);
copyEntities.add(copyableFile);
if (existOnTarget && srcFile.isFile()) {
// this is to match the existing publishing behavior where we won't rewrite the target when it's already existed
// todo: Change the publish behavior to support overwrite destination file during rename, instead of relying on this delete step which is needed if we want to support task level publish
toDelete.add(targetFs.getFileStatus(fileToCopy));
}
}
} else if (this.deleteFileThatNotExistOnSource && targetFs.exists(fileToCopy)){
} else if (deleteFileThatNotExistOnSource && targetFs.exists(fileToCopy)) {
toDelete.add(targetFs.getFileStatus(fileToCopy));
}
}
Expand All @@ -117,6 +130,7 @@ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, Cop
CommitStep step = new DeleteFileCommitStep(targetFs, toDelete, this.properties, Optional.<Path>absent());
copyEntities.add(new PrePublishStep(datasetURN(), Maps.newHashMap(), step, 1));
}
log.info(String.format("Workunits calculation took %s milliseconds to process %s files", System.currentTimeMillis() - startTime, numFiles));
} catch (JsonIOException| JsonSyntaxException e) {
//todo: update error message to point to a sample json file instead of schema which is hard to understand
log.warn(String.format("Failed to read Manifest path %s on filesystem %s, please make sure it's in correct json format with schema"
Expand All @@ -134,11 +148,10 @@ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, Cop
return Collections.singleton(new FileSet.Builder<>(datasetURN(), this).add(copyEntities).build()).iterator();
}

private static boolean shouldCopy(FileSystem srcFs, FileSystem targetFs, FileStatus fileInSource, FileStatus fileInTarget, CopyConfiguration copyConfiguration)
private static boolean shouldCopy(FileSystem targetFs, FileStatus fileInSource, FileStatus fileInTarget, OwnerAndPermission replicatedPermission)
throws IOException {
if (fileInSource.isDirectory() || fileInSource.getModificationTime() == fileInTarget.getModificationTime()) {
// if source is dir or source and dst has same version, we compare the permission to determine whether it needs another sync
OwnerAndPermission replicatedPermission = CopyableFile.resolveReplicatedOwnerAndPermission(srcFs, fileInSource, copyConfiguration);
return !replicatedPermission.hasSameOwnerAndPermission(targetFs, fileInTarget);
}
return fileInSource.getModificationTime() > fileInTarget.getModificationTime();
Expand Down

0 comments on commit 7bbf676

Please sign in to comment.