Skip to content

Commit

Permalink
Safe initialization of file systems using pipeline options revision.
Browse files Browse the repository at this point in the history
  • Loading branch information
Moritz Mack authored and mosche committed Aug 1, 2023
1 parent 2f55b25 commit f3bdc8d
Showing 1 changed file with 24 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
Expand All @@ -74,6 +75,9 @@ public class FileSystems {
Pattern.compile("(?<scheme>[a-zA-Z][-a-zA-Z0-9+.]*):/.*");
private static final Pattern GLOB_PATTERN = Pattern.compile("[*?{}]");

private static final AtomicReference<KV<Long, Integer>> FILESYSTEM_REVISION =
new AtomicReference<>();

private static final AtomicReference<Map<String, FileSystem>> SCHEME_TO_FILESYSTEM =
new AtomicReference<>(ImmutableMap.of(DEFAULT_SCHEME, new LocalFileSystem()));

Expand Down Expand Up @@ -529,13 +533,27 @@ static FileSystem getFileSystemInternal(String scheme) {
@Internal
public static void setDefaultPipelineOptions(PipelineOptions options) {
checkNotNull(options, "options");
Set<FileSystemRegistrar> registrars =
Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
registrars.addAll(
Lists.newArrayList(
ServiceLoader.load(FileSystemRegistrar.class, ReflectHelpers.findClassLoader())));
long id = options.getOptionsId();
int nextRevision = options.revision();

while (true) {
KV<Long, Integer> revision = FILESYSTEM_REVISION.get();
// only update file systems if the pipeline changed or the options revision increased
if (revision != null && revision.getKey().equals(id) && revision.getValue() >= nextRevision) {
return;
}

if (FILESYSTEM_REVISION.compareAndSet(revision, KV.of(id, nextRevision))) {
Set<FileSystemRegistrar> registrars =
Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
registrars.addAll(
Lists.newArrayList(
ServiceLoader.load(FileSystemRegistrar.class, ReflectHelpers.findClassLoader())));

SCHEME_TO_FILESYSTEM.set(verifySchemesAreUnique(options, registrars));
SCHEME_TO_FILESYSTEM.set(verifySchemesAreUnique(options, registrars));
return;
}
}
}

@VisibleForTesting
Expand Down

0 comments on commit f3bdc8d

Please sign in to comment.