From 239454dd575bb2fbd179456f0a548f464b43411a Mon Sep 17 00:00:00 2001 From: Marcel R Date: Fri, 16 Dec 2022 15:55:36 +0100 Subject: [PATCH] Fix remote target cache invalidation. --- law/target/remote/base.py | 121 ++++++++++++++++++++----------------- law/target/remote/cache.py | 2 +- 2 files changed, 67 insertions(+), 56 deletions(-) diff --git a/law/target/remote/base.py b/law/target/remote/base.py index 5ab802e9..61927968 100644 --- a/law/target/remote/base.py +++ b/law/target/remote/base.py @@ -348,64 +348,68 @@ def _cached_copy(self, src, dst, perm=None, cache=None, prefer_cache=False, vali if dst is None and not cache: raise Exception("copy destination must not be empty when caching is disabled") - if cache: - kwargs_no_retries = kwargs.copy() - kwargs_no_retries["retries"] = 0 - - # handle 3 cases: lr, rl, rc - if mode == "lr": - # strategy: copy to remote, copy to cache, sync stats - - # copy to remote, no need to validate as we compute the stat anyway - dst_uri = self._atomic_copy(src, dst, perm=perm, validate=False, **kwargs) - rstat = self.stat(dst, **kwargs_no_retries) - - # remove the cache entry - if dst in self.cache: - self.cache.remove(dst) - - # allocate cache space and copy to cache - lstat = self.local_fs.stat(src) - self.cache.allocate(lstat.st_size) - cdst_uri = add_scheme(self.cache.cache_path(dst), "file") - with self.cache.lock(dst): - self._atomic_copy(src, cdst_uri, validate=False) - self.cache.touch(dst, (int(time.time()), rstat.st_mtime)) - - return dst_uri - - else: # rl, rc - # strategy: copy to cache when not up to date, sync stats, opt. copy to local - - # build the uri to the cache path of the src file - csrc_uri = add_scheme(self.cache.cache_path(src), "file") - - # if the file is cached and prefer_cache is true, - # return the cache path, no questions asked - # otherwise, check if the file is there and up to date - if not prefer_cache or src not in self.cache: - with self.cache.lock(src): - # in cache and outdated? - rstat = self.stat(src, **kwargs_no_retries) - if src in self.cache and not self.cache.check_mtime(src, rstat.st_mtime): - self.cache.remove(src, lock=False) - # in cache at all? - if src not in self.cache: - self.cache.allocate(rstat.st_size) - self._atomic_copy(src, csrc_uri, validate=validate, **kwargs) - self.cache.touch(src, (int(time.time()), rstat.st_mtime)) - - if mode == "rl": - # simply use the local_fs for copying - self.local_fs.copy(csrc_uri, dst, perm=perm) - return dst - else: # rc - return csrc_uri - - else: + if not cache: # simply copy and return the dst path return self._atomic_copy(src, dst, perm=perm, validate=validate, **kwargs) + kwargs_no_retries = kwargs.copy() + kwargs_no_retries["retries"] = 0 + + # handle 3 cases: lr, rl, rc + if mode == "lr": + # strategy: copy to remote, copy to cache, sync stats + + # copy to remote, no need to validate as we compute the stat anyway + dst_uri = self._atomic_copy(src, dst, perm=perm, validate=False, **kwargs) + rstat = self.stat(dst, **kwargs_no_retries) + + # remove the cache entry + if dst in self.cache: + logger.debug("removing destination file {} from cache".format(dst)) + self.cache.remove(dst) + + # allocate cache space and copy to cache + lstat = self.local_fs.stat(src) + self.cache.allocate(lstat.st_size) + cdst_uri = add_scheme(self.cache.cache_path(dst), "file") + with self.cache.lock(dst): + logger.debug("loading source file {} to cache".format(src)) + self._atomic_copy(src, cdst_uri, validate=False) + self.cache.touch(dst, (int(time.time()), rstat.st_mtime)) + + return dst_uri + + else: # rl, rc + # strategy: copy to cache when not up to date, sync stats, opt. copy to local + + # build the uri to the cache path of the src file + csrc_uri = add_scheme(self.cache.cache_path(src), "file") + + # if the file is cached and prefer_cache is true, + # return the cache path, no questions asked + # otherwise, check if the file is there and up to date + if not prefer_cache or src not in self.cache: + with self.cache.lock(src): + # in cache and outdated? + rstat = self.stat(src, **kwargs_no_retries) + if src in self.cache and not self.cache.check_mtime(src, rstat.st_mtime): + logger.debug("source file {} is outdated in cache, removing".format(src)) + self.cache.remove(src, lock=False) + # in cache at all? + if src not in self.cache: + self.cache.allocate(rstat.st_size) + self._atomic_copy(src, csrc_uri, validate=validate, **kwargs) + logger.debug("loading source file {} to cache".format(src)) + self.cache.touch(src, (int(time.time()), rstat.st_mtime)) + + if mode == "rl": + # simply use the local_fs for copying + self.local_fs.copy(csrc_uri, dst, perm=perm) + return dst + + # mode is rc + return csrc_uri + def _prepare_dst_dir(self, dst, src=None, perm=None, **kwargs): """ Prepares the directory of a target located at *dst* for copying and returns its full @@ -583,6 +587,13 @@ def move_from_local(self, src=None, **kwargs): class RemoteFileTarget(FileSystemFileTarget, RemoteTarget): + @property + def cache_path(self): + if not self.fs.cache: + return None + + return self.fs.cache.cache_path(self.path) + @contextmanager def localize(self, mode="r", perm=None, dir_perm=None, tmp_dir=None, **kwargs): if mode not in ["r", "w", "a"]: diff --git a/law/target/remote/cache.py b/law/target/remote/cache.py index 767b4004..e7f35212 100644 --- a/law/target/remote/cache.py +++ b/law/target/remote/cache.py @@ -344,7 +344,7 @@ def check_mtime(self, rpath, rmtime): if self.mtime_patience < 0: return True - return abs(self.mtime(rpath) - rmtime) > self.mtime_patience + return abs(self.mtime(rpath) - rmtime) <= self.mtime_patience def _remove(self, cpath, lock=True): def remove():