diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java index cb899ce92625..40659ca251d7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java @@ -50,6 +50,7 @@ import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.common.ReflectHelpers; +import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner; @@ -74,6 +75,9 @@ public class FileSystems { Pattern.compile("(?[a-zA-Z][-a-zA-Z0-9+.]*):/.*"); private static final Pattern GLOB_PATTERN = Pattern.compile("[*?{}]"); + private static final AtomicReference> FILESYSTEM_REVISION = + new AtomicReference<>(); + private static final AtomicReference> SCHEME_TO_FILESYSTEM = new AtomicReference<>(ImmutableMap.of(DEFAULT_SCHEME, new LocalFileSystem())); @@ -529,13 +533,27 @@ static FileSystem getFileSystemInternal(String scheme) { @Internal public static void setDefaultPipelineOptions(PipelineOptions options) { checkNotNull(options, "options"); - Set registrars = - Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE); - registrars.addAll( - Lists.newArrayList( - ServiceLoader.load(FileSystemRegistrar.class, ReflectHelpers.findClassLoader()))); + long id = options.getOptionsId(); + int nextRevision = options.revision(); + + while (true) { + KV revision = FILESYSTEM_REVISION.get(); + // only update file systems if the pipeline changed or the options revision increased + if (revision != null && revision.getKey().equals(id) && revision.getValue() >= nextRevision) { + return; + } + + if (FILESYSTEM_REVISION.compareAndSet(revision, KV.of(id, nextRevision))) { + Set registrars = + Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE); + registrars.addAll( + Lists.newArrayList( + ServiceLoader.load(FileSystemRegistrar.class, ReflectHelpers.findClassLoader()))); - SCHEME_TO_FILESYSTEM.set(verifySchemesAreUnique(options, registrars)); + SCHEME_TO_FILESYSTEM.set(verifySchemesAreUnique(options, registrars)); + return; + } + } } @VisibleForTesting