Skip to content

Commit

Permalink
Fix remote target cache invalidation.
Browse files Browse the repository at this point in the history
  • Loading branch information
riga committed Dec 16, 2022
1 parent dbcc1b5 commit 239454d
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 56 deletions.
121 changes: 66 additions & 55 deletions law/target/remote/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,64 +348,68 @@ def _cached_copy(self, src, dst, perm=None, cache=None, prefer_cache=False, vali
if dst is None and not cache:
raise Exception("copy destination must not be empty when caching is disabled")

if cache:
kwargs_no_retries = kwargs.copy()
kwargs_no_retries["retries"] = 0

# handle 3 cases: lr, rl, rc
if mode == "lr":
# strategy: copy to remote, copy to cache, sync stats

# copy to remote, no need to validate as we compute the stat anyway
dst_uri = self._atomic_copy(src, dst, perm=perm, validate=False, **kwargs)
rstat = self.stat(dst, **kwargs_no_retries)

# remove the cache entry
if dst in self.cache:
self.cache.remove(dst)

# allocate cache space and copy to cache
lstat = self.local_fs.stat(src)
self.cache.allocate(lstat.st_size)
cdst_uri = add_scheme(self.cache.cache_path(dst), "file")
with self.cache.lock(dst):
self._atomic_copy(src, cdst_uri, validate=False)
self.cache.touch(dst, (int(time.time()), rstat.st_mtime))

return dst_uri

else: # rl, rc
# strategy: copy to cache when not up to date, sync stats, opt. copy to local

# build the uri to the cache path of the src file
csrc_uri = add_scheme(self.cache.cache_path(src), "file")

# if the file is cached and prefer_cache is true,
# return the cache path, no questions asked
# otherwise, check if the file is there and up to date
if not prefer_cache or src not in self.cache:
with self.cache.lock(src):
# in cache and outdated?
rstat = self.stat(src, **kwargs_no_retries)
if src in self.cache and not self.cache.check_mtime(src, rstat.st_mtime):
self.cache.remove(src, lock=False)
# in cache at all?
if src not in self.cache:
self.cache.allocate(rstat.st_size)
self._atomic_copy(src, csrc_uri, validate=validate, **kwargs)
self.cache.touch(src, (int(time.time()), rstat.st_mtime))

if mode == "rl":
# simply use the local_fs for copying
self.local_fs.copy(csrc_uri, dst, perm=perm)
return dst
else: # rc
return csrc_uri

else:
if not cache:
# simply copy and return the dst path
return self._atomic_copy(src, dst, perm=perm, validate=validate, **kwargs)

kwargs_no_retries = kwargs.copy()
kwargs_no_retries["retries"] = 0

# handle 3 cases: lr, rl, rc
if mode == "lr":
# strategy: copy to remote, copy to cache, sync stats

# copy to remote, no need to validate as we compute the stat anyway
dst_uri = self._atomic_copy(src, dst, perm=perm, validate=False, **kwargs)
rstat = self.stat(dst, **kwargs_no_retries)

# remove the cache entry
if dst in self.cache:
logger.debug("removing destination file {} from cache".format(dst))
self.cache.remove(dst)

# allocate cache space and copy to cache
lstat = self.local_fs.stat(src)
self.cache.allocate(lstat.st_size)
cdst_uri = add_scheme(self.cache.cache_path(dst), "file")
with self.cache.lock(dst):
logger.debug("loading source file {} to cache".format(src))
self._atomic_copy(src, cdst_uri, validate=False)
self.cache.touch(dst, (int(time.time()), rstat.st_mtime))

return dst_uri

else: # rl, rc
# strategy: copy to cache when not up to date, sync stats, opt. copy to local

# build the uri to the cache path of the src file
csrc_uri = add_scheme(self.cache.cache_path(src), "file")

# if the file is cached and prefer_cache is true,
# return the cache path, no questions asked
# otherwise, check if the file is there and up to date
if not prefer_cache or src not in self.cache:
with self.cache.lock(src):
# in cache and outdated?
rstat = self.stat(src, **kwargs_no_retries)
if src in self.cache and not self.cache.check_mtime(src, rstat.st_mtime):
logger.debug("source file {} is outdated in cache, removing".format(src))
self.cache.remove(src, lock=False)
# in cache at all?
if src not in self.cache:
self.cache.allocate(rstat.st_size)
self._atomic_copy(src, csrc_uri, validate=validate, **kwargs)
logger.debug("loading source file {} to cache".format(src))
self.cache.touch(src, (int(time.time()), rstat.st_mtime))

if mode == "rl":
# simply use the local_fs for copying
self.local_fs.copy(csrc_uri, dst, perm=perm)
return dst

# mode is rc
return csrc_uri

def _prepare_dst_dir(self, dst, src=None, perm=None, **kwargs):
"""
Prepares the directory of a target located at *dst* for copying and returns its full
Expand Down Expand Up @@ -583,6 +587,13 @@ def move_from_local(self, src=None, **kwargs):

class RemoteFileTarget(FileSystemFileTarget, RemoteTarget):

@property
def cache_path(self):
if not self.fs.cache:
return None

return self.fs.cache.cache_path(self.path)

@contextmanager
def localize(self, mode="r", perm=None, dir_perm=None, tmp_dir=None, **kwargs):
if mode not in ["r", "w", "a"]:
Expand Down
2 changes: 1 addition & 1 deletion law/target/remote/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ def check_mtime(self, rpath, rmtime):
if self.mtime_patience < 0:
return True

return abs(self.mtime(rpath) - rmtime) > self.mtime_patience
return abs(self.mtime(rpath) - rmtime) <= self.mtime_patience

def _remove(self, cpath, lock=True):
def remove():
Expand Down

0 comments on commit 239454d

Please sign in to comment.