diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java index 23c27b61da1..85fa80f0fe8 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java @@ -17,10 +17,13 @@ package org.apache.gobblin.data.management.copy; +import com.google.common.cache.Cache; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -58,6 +61,7 @@ @Setter @NoArgsConstructor(access = AccessLevel.PROTECTED) @EqualsAndHashCode(callSuper = true) +@Slf4j public class CopyableFile extends CopyEntity implements File { private static final byte[] EMPTY_CHECKSUM = new byte[0]; @@ -375,6 +379,36 @@ public static List resolveReplicatedOwnerAndPermissionsRecur return ownerAndPermissions; } + /** + * Compute the correct {@link OwnerAndPermission} obtained from replicating source owner and permissions and applying + * the {@link PreserveAttributes} rules for fromPath and every ancestor up to but excluding toPath. + * Unlike the resolveReplicatedOwnerAndPermissionsRecursively() method, this method utilizes permissionMap as a cache to minimize the number of calls to HDFS. + * It is recommended to use this method when recursively calculating permissions for numerous files that share the same ancestor. + * + * @return A list of the computed {@link OwnerAndPermission}s starting from fromPath, up to but excluding toPath. + * @throws IOException if toPath is not an ancestor of fromPath. + */ + public static List resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(FileSystem sourceFs, Path fromPath, + Path toPath, CopyConfiguration copyConfiguration, Cache permissionMap) + throws IOException, ExecutionException { + + if (!PathUtils.isAncestor(toPath, fromPath)) { + throw new IOException(String.format("toPath %s must be an ancestor of fromPath %s.", toPath, fromPath)); + } + + List ownerAndPermissions = Lists.newArrayList(); + Path currentPath = fromPath; + + while (currentPath.getParent() != null && PathUtils.isAncestor(toPath, currentPath.getParent())) { + Path finalCurrentPath = currentPath; + ownerAndPermissions.add(permissionMap.get(finalCurrentPath.toString(), () -> resolveReplicatedOwnerAndPermission(sourceFs, + finalCurrentPath, copyConfiguration))); + currentPath = currentPath.getParent(); + } + + return ownerAndPermissions; + } + public static Map resolveReplicatedAncestorOwnerAndPermissionsRecursively(FileSystem sourceFs, Path fromPath, Path toPath, CopyConfiguration copyConfiguration) throws IOException { Preconditions.checkArgument(sourceFs.getFileStatus(fromPath).isDirectory(), "Source path must be a directory."); diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java index 17de89458f2..fbb88640b23 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java @@ -18,6 +18,8 @@ package org.apache.gobblin.data.management.copy; import com.google.common.base.Optional; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.JsonIOException; @@ -27,6 +29,7 @@ import java.util.Iterator; import java.util.List; import java.util.Properties; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.commit.CommitStep; import org.apache.gobblin.data.management.copy.entities.PrePublishStep; @@ -47,12 +50,15 @@ public class ManifestBasedDataset implements IterableCopyableDataset { private static final String DELETE_FILE_NOT_EXIST_ON_SOURCE = ManifestBasedDatasetFinder.CONFIG_PREFIX + ".deleteFileNotExistOnSource"; private static final String COMMON_FILES_PARENT = ManifestBasedDatasetFinder.CONFIG_PREFIX + ".commonFilesParent"; + private static final String PERMISSION_CACHE_TTL_SECONDS = ManifestBasedDatasetFinder.CONFIG_PREFIX + ".permission.cache.ttl.seconds"; + private static final String DEFAULT_PERMISSION_CACHE_TTL_SECONDS = "30"; private static final String DEFAULT_COMMON_FILES_PARENT = "/"; private final FileSystem fs; private final Path manifestPath; private final Properties properties; private final boolean deleteFileThatNotExistOnSource; private final String commonFilesParent; + private final int permissionCacheTTLSeconds; public ManifestBasedDataset(final FileSystem fs, Path manifestPath, Properties properties) { this.fs = fs; @@ -60,6 +66,7 @@ public ManifestBasedDataset(final FileSystem fs, Path manifestPath, Properties p this.properties = properties; this.deleteFileThatNotExistOnSource = Boolean.parseBoolean(properties.getProperty(DELETE_FILE_NOT_EXIST_ON_SOURCE, "false")); this.commonFilesParent = properties.getProperty(COMMON_FILES_PARENT, DEFAULT_COMMON_FILES_PARENT); + this.permissionCacheTTLSeconds = Integer.parseInt(properties.getProperty(PERMISSION_CACHE_TTL_SECONDS, DEFAULT_PERMISSION_CACHE_TTL_SECONDS)); } @Override @@ -82,25 +89,31 @@ public Iterator> getFileSetIterator(FileSystem targetFs, Cop List toDelete = Lists.newArrayList(); //todo: put permission preserve logic here? try { + long startTime = System.currentTimeMillis(); manifests = CopyManifest.getReadIterator(this.fs, this.manifestPath); + Cache permissionMap = CacheBuilder.newBuilder().expireAfterAccess(permissionCacheTTLSeconds, TimeUnit.SECONDS).build(); + int numFiles = 0; while (manifests.hasNext()) { + numFiles++; + CopyManifest.CopyableUnit file = manifests.next(); //todo: We can use fileSet to partition the data in case of some softbound issue //todo: After partition, change this to directly return iterator so that we can save time if we meet resources limitation - CopyManifest.CopyableUnit file = manifests.next(); Path fileToCopy = new Path(file.fileName); - if (this.fs.exists(fileToCopy)) { + if (fs.exists(fileToCopy)) { boolean existOnTarget = targetFs.exists(fileToCopy); - FileStatus srcFile = this.fs.getFileStatus(fileToCopy); - if (!existOnTarget || shouldCopy(this.fs, targetFs, srcFile, targetFs.getFileStatus(fileToCopy), configuration)) { - CopyableFile copyableFile = - CopyableFile.fromOriginAndDestination(this.fs, srcFile, fileToCopy, configuration) + FileStatus srcFile = fs.getFileStatus(fileToCopy); + OwnerAndPermission replicatedPermission = CopyableFile.resolveReplicatedOwnerAndPermission(fs, srcFile, configuration); + if (!existOnTarget || shouldCopy(targetFs, srcFile, targetFs.getFileStatus(fileToCopy), replicatedPermission)) { + CopyableFile.Builder copyableFileBuilder = + CopyableFile.fromOriginAndDestination(fs, srcFile, fileToCopy, configuration) .fileSet(datasetURN()) .datasetOutputPath(fileToCopy.toString()) - .ancestorsOwnerAndPermission(CopyableFile - .resolveReplicatedOwnerAndPermissionsRecursively(this.fs, fileToCopy.getParent(), - new Path(this.commonFilesParent), configuration)) - .build(); - copyableFile.setFsDatasets(this.fs, targetFs); + .ancestorsOwnerAndPermission( + CopyableFile.resolveReplicatedOwnerAndPermissionsRecursivelyWithCache(fs, fileToCopy.getParent(), + new Path(commonFilesParent), configuration, permissionMap)) + .destinationOwnerAndPermission(replicatedPermission); + CopyableFile copyableFile = copyableFileBuilder.build(); + copyableFile.setFsDatasets(fs, targetFs); copyEntities.add(copyableFile); if (existOnTarget && srcFile.isFile()) { // this is to match the existing publishing behavior where we won't rewrite the target when it's already existed @@ -108,7 +121,7 @@ public Iterator> getFileSetIterator(FileSystem targetFs, Cop toDelete.add(targetFs.getFileStatus(fileToCopy)); } } - } else if (this.deleteFileThatNotExistOnSource && targetFs.exists(fileToCopy)){ + } else if (deleteFileThatNotExistOnSource && targetFs.exists(fileToCopy)) { toDelete.add(targetFs.getFileStatus(fileToCopy)); } } @@ -117,6 +130,7 @@ public Iterator> getFileSetIterator(FileSystem targetFs, Cop CommitStep step = new DeleteFileCommitStep(targetFs, toDelete, this.properties, Optional.absent()); copyEntities.add(new PrePublishStep(datasetURN(), Maps.newHashMap(), step, 1)); } + log.info(String.format("Workunits calculation took %s milliseconds to process %s files", System.currentTimeMillis() - startTime, numFiles)); } catch (JsonIOException| JsonSyntaxException e) { //todo: update error message to point to a sample json file instead of schema which is hard to understand log.warn(String.format("Failed to read Manifest path %s on filesystem %s, please make sure it's in correct json format with schema" @@ -134,11 +148,10 @@ public Iterator> getFileSetIterator(FileSystem targetFs, Cop return Collections.singleton(new FileSet.Builder<>(datasetURN(), this).add(copyEntities).build()).iterator(); } - private static boolean shouldCopy(FileSystem srcFs, FileSystem targetFs, FileStatus fileInSource, FileStatus fileInTarget, CopyConfiguration copyConfiguration) + private static boolean shouldCopy(FileSystem targetFs, FileStatus fileInSource, FileStatus fileInTarget, OwnerAndPermission replicatedPermission) throws IOException { if (fileInSource.isDirectory() || fileInSource.getModificationTime() == fileInTarget.getModificationTime()) { // if source is dir or source and dst has same version, we compare the permission to determine whether it needs another sync - OwnerAndPermission replicatedPermission = CopyableFile.resolveReplicatedOwnerAndPermission(srcFs, fileInSource, copyConfiguration); return !replicatedPermission.hasSameOwnerAndPermission(targetFs, fileInTarget); } return fileInSource.getModificationTime() > fileInTarget.getModificationTime();