From 7347cbc63523300a2573b1fbd54b81382ec1e2c4 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Sat, 7 Sep 2024 23:45:24 +0800 Subject: [PATCH] [Fix](hdfs-fs)The cache expiration should explicitly release the held fs #38610 (#40504) --- .../fs/remote/RemoteFSPhantomManager.java | 117 ++++++++++++++++++ .../doris/fs/remote/RemoteFileSystem.java | 21 +++- .../RemoteFileSystemPhantomReference.java | 44 +++++++ .../apache/doris/fs/remote/S3FileSystem.java | 26 ++-- .../doris/fs/remote/dfs/DFSFileSystem.java | 50 +++++--- 5 files changed, 231 insertions(+), 27 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystemPhantomReference.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java new file mode 100644 index 00000000000000..282361c4cb63b0 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs.remote; + +import org.apache.doris.common.CustomThreadFactory; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.lang.ref.PhantomReference; +import java.lang.ref.Reference; +import java.lang.ref.ReferenceQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * The RemoteFSPhantomManager class is responsible for managing the phantom references + * of RemoteFileSystem objects. It ensures that the associated FileSystem resources are + * automatically cleaned up when the RemoteFileSystem objects are garbage collected. + *

+ * By utilizing a ReferenceQueue and PhantomReference, this class can monitor the lifecycle + * of RemoteFileSystem objects. When a RemoteFileSystem object is no longer in use and is + * garbage collected, its corresponding FileSystem resource is properly closed to prevent + * resource leaks. + *

+ * The class provides a thread-safe mechanism to ensure that the cleanup thread is started only once. + *

+ * Main functionalities include: + * - Registering phantom references of RemoteFileSystem objects. + * - Starting a periodic cleanup thread that automatically closes unused FileSystem resources. + */ +public class RemoteFSPhantomManager { + + private static final Logger LOG = LogManager.getLogger(RemoteFSPhantomManager.class); + + // Scheduled executor for periodic resource cleanup + private static ScheduledExecutorService cleanupExecutor; + + // Reference queue for monitoring RemoteFileSystem objects' phantom references + private static final ReferenceQueue referenceQueue = new ReferenceQueue<>(); + + // Map storing the phantom references and their corresponding FileSystem objects + private static final ConcurrentHashMap, FileSystem> referenceMap + = new ConcurrentHashMap<>(); + + // Flag indicating whether the cleanup thread has been started + private static final AtomicBoolean isStarted = new AtomicBoolean(false); + + /** + * Registers a phantom reference for a RemoteFileSystem object in the manager. + * If the cleanup thread has not been started, it will be started. + * + * @param remoteFileSystem the RemoteFileSystem object to be registered + */ + public static void registerPhantomReference(RemoteFileSystem remoteFileSystem) { + if (!isStarted.get()) { + start(); + isStarted.set(true); + } + RemoteFileSystemPhantomReference phantomReference = new RemoteFileSystemPhantomReference(remoteFileSystem, + referenceQueue); + referenceMap.put(phantomReference, remoteFileSystem.dfsFileSystem); + } + + /** + * Starts the cleanup thread, which periodically checks and cleans up unused FileSystem resources. + * The method uses double-checked locking to ensure thread-safe startup of the cleanup thread. + */ + public static void start() { + if (isStarted.compareAndSet(false, true)) { + synchronized (RemoteFSPhantomManager.class) { + LOG.info("Starting cleanup thread for RemoteFileSystem objects"); + if (cleanupExecutor == null) { + CustomThreadFactory threadFactory = new CustomThreadFactory("remote-fs-phantom-cleanup"); + cleanupExecutor = Executors.newScheduledThreadPool(1, threadFactory); + cleanupExecutor.scheduleAtFixedRate(() -> { + Reference ref; + while ((ref = referenceQueue.poll()) != null) { + RemoteFileSystemPhantomReference phantomRef = (RemoteFileSystemPhantomReference) ref; + + FileSystem fs = referenceMap.remove(phantomRef); + if (fs != null) { + try { + fs.close(); + LOG.info("Closed file system: {}", fs.getUri()); + } catch (IOException e) { + LOG.warn("Failed to close file system", e); + } + } + } + }, 0, 1, TimeUnit.MINUTES); + } + } + } + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java index ffe63f20ac7684..2149cdb4e1d9ca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java @@ -26,13 +26,18 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; -public abstract class RemoteFileSystem extends PersistentFileSystem { +public abstract class RemoteFileSystem extends PersistentFileSystem implements Closeable { // this field will be visited by multi-threads, better use volatile qualifier protected volatile org.apache.hadoop.fs.FileSystem dfsFileSystem = null; + private final ReentrantLock fsLock = new ReentrantLock(); + protected static final AtomicBoolean closed = new AtomicBoolean(false); public RemoteFileSystem(String name, StorageBackend.StorageType type) { super(name, type); @@ -65,4 +70,18 @@ private RemoteFiles getFileLocations(RemoteIterator locatedFi } return new RemoteFiles(locations); } + + @Override + public void close() throws IOException { + fsLock.lock(); + try { + if (!closed.getAndSet(true)) { + if (dfsFileSystem != null) { + dfsFileSystem.close(); + } + } + } finally { + fsLock.unlock(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystemPhantomReference.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystemPhantomReference.java new file mode 100644 index 00000000000000..89506c7b212242 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystemPhantomReference.java @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs.remote; + +import org.apache.hadoop.fs.FileSystem; + +import java.lang.ref.PhantomReference; +import java.lang.ref.ReferenceQueue; + +public class RemoteFileSystemPhantomReference extends PhantomReference { + + private FileSystem fs; + + /** + * Creates a new phantom reference that refers to the given object and + * is registered with the given queue. + * + *

It is possible to create a phantom reference with a {@code null} + * queue. Such a reference will never be enqueued. + * + * @param referent the object the new phantom reference will refer to + * @param q the queue with which the reference is to be registered, + * or {@code null} if registration is not required + */ + public RemoteFileSystemPhantomReference(RemoteFileSystem referent, ReferenceQueue q) { + super(referent, q); + this.fs = referent.dfsFileSystem; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java index f91c50d7099dfc..7d4b9d797ce859 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java @@ -57,14 +57,26 @@ private void initFsProperties() { @Override protected FileSystem nativeFileSystem(String remotePath) throws UserException { + //todo Extracting a common method to achieve logic reuse + if (closed.get()) { + throw new UserException("FileSystem is closed."); + } if (dfsFileSystem == null) { - Configuration conf = new Configuration(); - System.setProperty("com.amazonaws.services.s3.enableV4", "true"); - PropertyConverter.convertToHadoopFSProperties(properties).forEach(conf::set); - try { - dfsFileSystem = FileSystem.get(new Path(remotePath).toUri(), conf); - } catch (Exception e) { - throw new UserException("Failed to get S3 FileSystem for " + e.getMessage(), e); + synchronized (this) { + if (closed.get()) { + throw new UserException("FileSystem is closed."); + } + if (dfsFileSystem == null) { + Configuration conf = new Configuration(); + System.setProperty("com.amazonaws.services.s3.enableV4", "true"); + PropertyConverter.convertToHadoopFSProperties(properties).forEach(conf::set); + try { + dfsFileSystem = FileSystem.get(new Path(remotePath).toUri(), conf); + } catch (Exception e) { + throw new UserException("Failed to get S3 FileSystem for " + e.getMessage(), e); + } + RemoteFSPhantomManager.registerPhantomReference(this); + } } } return dfsFileSystem; diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java index 25ecafda46875e..ee7fddf7ac65f3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java @@ -26,6 +26,7 @@ import org.apache.doris.fs.operations.HDFSFileOperations; import org.apache.doris.fs.operations.HDFSOpParams; import org.apache.doris.fs.operations.OpParams; +import org.apache.doris.fs.remote.RemoteFSPhantomManager; import org.apache.doris.fs.remote.RemoteFile; import org.apache.doris.fs.remote.RemoteFileSystem; @@ -73,30 +74,41 @@ public DFSFileSystem(StorageBackend.StorageType type, Map proper @Override protected FileSystem nativeFileSystem(String remotePath) throws UserException { - if (dfsFileSystem != null) { - return dfsFileSystem; + if (closed.get()) { + throw new UserException("FileSystem is closed."); } - Configuration conf = new HdfsConfiguration(); - for (Map.Entry propEntry : properties.entrySet()) { - conf.set(propEntry.getKey(), propEntry.getValue()); - } + if (dfsFileSystem == null) { + synchronized (this) { + if (closed.get()) { + throw new UserException("FileSystem is closed."); + } + if (dfsFileSystem == null) { - UserGroupInformation ugi = login(conf); - try { - dfsFileSystem = ugi.doAs((PrivilegedAction) () -> { - try { - return FileSystem.get(new Path(remotePath).toUri(), conf); - } catch (IOException e) { - throw new RuntimeException(e); + Configuration conf = new HdfsConfiguration(); + for (Map.Entry propEntry : properties.entrySet()) { + conf.set(propEntry.getKey(), propEntry.getValue()); + } + + UserGroupInformation ugi = login(conf); + try { + dfsFileSystem = ugi.doAs((PrivilegedAction) () -> { + try { + return FileSystem.get(new Path(remotePath).toUri(), conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } catch (SecurityException e) { + throw new UserException(e); + } + + Preconditions.checkNotNull(dfsFileSystem); + operations = new HDFSFileOperations(dfsFileSystem); + RemoteFSPhantomManager.registerPhantomReference(this); } - }); - } catch (SecurityException e) { - throw new UserException(e); + } } - - Preconditions.checkNotNull(dfsFileSystem); - operations = new HDFSFileOperations(dfsFileSystem); return dfsFileSystem; }