From 58bc300f76645b860dd8f9289f5220b1ea7d9980 Mon Sep 17 00:00:00 2001 From: Dan Allan Date: Fri, 5 Apr 2024 10:04:52 -0400 Subject: [PATCH] Remove "object cache" misfeature; add simpler "resource cache" (#708) * Remove object_cache. * Add TLRU resource cache. * Remove object cache from adapters. * Apply resource cache to single TIFF adapter. * Make resource cache tunable via env vars * Apply resource cache to HDF5 adapter * Handle zero-sized cache. * Update CHANGELOG * Finish typing resource_cache * Add unit tests for resource cache * Fix outdated comment Co-authored-by: Eugene * Include detailed documentation on resource cache. * Add reference docs for resource cache. * Fix documented env var * Use more specific cache keys --------- Co-authored-by: Eugene --- CHANGELOG.md | 11 + docs/source/explanations/caching.md | 69 +++--- docs/source/how-to/tune-caches.md | 83 ------- docs/source/index.md | 1 - docs/source/reference/service.md | 28 +-- tiled/_tests/test_object_cache.py | 196 ---------------- tiled/_tests/test_resource_cache.py | 45 ++++ tiled/adapters/array.py | 13 +- tiled/adapters/csv.py | 7 - tiled/adapters/excel.py | 11 +- tiled/adapters/hdf5.py | 6 +- tiled/adapters/resource_cache.py | 71 ++++++ tiled/adapters/table.py | 10 +- tiled/adapters/tiff.py | 54 ++--- tiled/commandline/_serve.py | 45 ---- tiled/config.py | 15 +- .../config_schemas/service_configuration.yml | 14 +- tiled/server/app.py | 38 --- tiled/server/object_cache.py | 222 ------------------ tiled/server/settings.py | 4 - 20 files changed, 204 insertions(+), 739 deletions(-) delete mode 100644 docs/source/how-to/tune-caches.md delete mode 100644 tiled/_tests/test_object_cache.py create mode 100644 tiled/_tests/test_resource_cache.py create mode 100644 tiled/adapters/resource_cache.py delete mode 100644 tiled/server/object_cache.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 738876df9..a95dae043 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,17 @@ Write the date in place of the "Unreleased" in the case a new version is release ## Next +### Added + +- Added `tiled.adapters.resource_cache` for caching file handles between + requests. + +### Removed + +- Removed object cache from the codebase. If `object_cache` is included in + the server configuration file, a warning is raised that this configuration + has no effected. + ### Fixed - The configuration setting `tiled_admins` did not work in practice. If a user diff --git a/docs/source/explanations/caching.md b/docs/source/explanations/caching.md index 4c0a0ba1a..c962bd586 100644 --- a/docs/source/explanations/caching.md +++ b/docs/source/explanations/caching.md @@ -11,22 +11,14 @@ Tiled has two kinds of caching: 1. **Client-side response cache.** The Tiled Python client implements a standard web cache, similar in both concept and implementation to a web browser's cache. -3. **Service-side object cache.** The _response_ caches operate near the outer - edges of the application, stashing and retrieve HTTP response bytes. The - _object_ cache is more deeply integrated into the application: it is - available for authors of Adapters to use for stashing any objects that may be - useful in expediting future work. These objects may serializable, such as chunks - of array data, or unserializable, such as file handles. Requests that ask for - overlapping but distinct slices of data or requests that ask for the same - data but in varied formats will not benefit from the _response_ cache; they - will "miss". The _object_ cache, however, can slice and encode its cached - resources differently for different requests. The object cache will not provide - quite the same speed boost as a response cache, but it has a broader impact. +2. **Server-side resource cache.** The resource cache is used to cache file + handles and related system resources, to avoid rapidly opening, closing, + and reopening the same files while handling a burst of requests. (client-http-response-cache)= ## Client-side HTTP Response Cache -The client response cache is an LRU response cache backed by a SQLite file. +The client response cache is an LRU (Least Recently Used) response cache backed by a SQLite file. ```py @@ -48,40 +40,43 @@ cache = Cache( ) ``` -## Server-side Object Cache +## Server-side Resource Cache -TO DO +The "resource cache" is a TLRU (Time-aware Least Recently Used) cache. When +items are evicted from the cache, a hard reference is dropped, freeing the +resource to be closed by the garbage collector if there are no other extant +hard references. Items are evicted if: -### Connection to Dask +- They have been in the cache for a _total_ of more than a given time. + (Accessing an item does not reset this time.) +- The cache is at capacity and this item is the least recently used item. -Dask provides an opt-in, experimental -[opportunistic caching](https://docs.dask.org/en/latest/caching.html) mechanism. -It caches at the granularity of "tasks", such as chunks of array or partitions -of dataframes. +It is not expected that users should need to tune this cache, except in +debugging scenarios. These environment variables may be set to tune +the cache parameters: -Tiled's object cache is generic---not exclusive to dask code paths---but it plugs -into dask in a similar way to make it easy for any Adapters that happen to use -dask to leverage Tiled's object cache very simply, like this: +```sh +TILED_RESOURCE_CACHE_MAX_SIZE # default 1024 items +TILED_RESOURCE_CACHE_TTU # default 60. seconds +``` -```py -from tiled.server.object_cache import get_object_cache +The "size" is measured in cached items; that is, each item in the cache has +size 1. +To disable the resource cache, set: -with get_object_cache().dask_context: - # Any tasks that happen to already be cached will be looked up - # instead of computed here. Anything that _is_ computed here may - # be cached, depending on its bytesize and its cost (how long it took to - # compute). - dask_object.compute() +```sh +TILED_RESOURCE_CACHE_MAX_SIZE=0 ``` -Items can be proactively cleared from the cache like so: +It is also possible to register a custom cache: -```py -from tiled.server.object_cache import get_object_cache, NO_CACHE +```python +from cachetools import Cache +from tiled.adapters.resource_cache import set_resource_cache - -cache = get_object_cache() -if cache is not NO_CACHE: - cache.discard_dask(dask_object.__dask_keys__()) +cache = Cache(maxsize=1) +set_resouurce_cache(cache) ``` + +Any object satisfying the `cachetools.Cache` interface is acceptable. diff --git a/docs/source/how-to/tune-caches.md b/docs/source/how-to/tune-caches.md deleted file mode 100644 index f79a036e2..000000000 --- a/docs/source/how-to/tune-caches.md +++ /dev/null @@ -1,83 +0,0 @@ -# Tune Caches to Balance Speed and Memory Usage - -## Object Cache - -The Tiled server stores objects such as file handles for frequently-opened -files and chunks of frequently-used data in worker memory. (The ability to -externalize the data in a shared cache, like Redis, is planned.) It can use this -to expedite future requests. By default, it will use up to 15% of RAM (total -physical memory) for its object cache. This is meant to leave plenty of room for -data analysis and other memory-hungry software that may be running on the same -machine. - -If Tiled is running on a dedicated data server, you may wish to turn this -up as high as 70%. If Tiled is running on a resource-constrained laptop, you may -wish to turn this down or turn it off. - -This can be done via configuration: - -```yaml -# Given in relative terms... -object_cache: - available_bytes: 0.40 # 40% of total RAM -``` - -```yaml -# Given in absolute terms... -object_cache: - available_bytes: 2_000_000_000 # 2 GB of RAM -``` - -```yaml -# Disable object cache. -object_cache: - available_bytes: 0 -``` - -For `tiled serve {pyobject, directory}` it can be configured with a flag: - -``` -# Given in relative terms... -tiled serve {pyobject, directory} --object-cache=0.40 ... # 40% of total RAM -``` - -``` -# Given in absolute terms... -tiled serve {pyobject, directory} --object-cache=2_000_000_000 ... # 2 GB -``` - -``` -tiled serve {pyobject, directory} --object-cache=0 ... # disabled -``` - -The server logs the object cache configuration at startup, as in: - -``` -OBJECT CACHE: Will use up to 12583450214 bytes (30% of total physical RAM) -``` - -To log cache hits, misses, and stores, use this configuration setting - -```yaml -object_cache: - available_bytes: ... - log_level: DEBUG # case-insensitive -``` - -or the environment variable - -``` -TILED_OBJECT_CACHE_LOG_LEVEL=DEBUG # case-insensitive -``` - -The debug logs interleave with the access logs from uvicorn like this. - -``` -OBJECT CACHE: Miss ('dask', 'read-csv-c15bf1fe8e072d8bf571d9809d3f6bcc', 0) -OBJECT CACHE: Store ('dask', 'read-csv-c15bf1fe8e072d8bf571d9809d3f6bcc', 0) (cost=0.003, nbytes=200) -INFO: 127.0.0.1:47744 - "GET /table/full/file0001 HTTP/1.1" 200 OK -OBJECT CACHE: Hit ('dask', 'read-csv-c15bf1fe8e072d8bf571d9809d3f6bcc', 0) -INFO: 127.0.0.1:47750 - "GET /table/full/file0001 HTTP/1.1" 200 OK -OBJECT CACHE: Hit ('dask', 'read-csv-c15bf1fe8e072d8bf571d9809d3f6bcc', 0) -INFO: 127.0.0.1:47758 - "GET /table/full/file0001 HTTP/1.1" 200 OK -``` diff --git a/docs/source/index.md b/docs/source/index.md index 7279e41a0..1fe5b8433 100644 --- a/docs/source/index.md +++ b/docs/source/index.md @@ -28,7 +28,6 @@ how-to/api-keys how-to/custom-clients how-to/metrics how-to/direct-client -how-to/tune-caches how-to/tiled-authn-database how-to/register ``` diff --git a/docs/source/reference/service.md b/docs/source/reference/service.md index 5a1379312..213a81d57 100644 --- a/docs/source/reference/service.md +++ b/docs/source/reference/service.md @@ -129,32 +129,14 @@ See {doc}`../explanations/structures` for more context. tiled.server.app.build_app_from_config ``` -## Object Cache - -The "object" cache is available to all Adapters to cache any objects, including -serializable objects like array chunks and unserializable objects like file -handles. It is a process-global singleton. - -Implementation detail: It is backed by [Cachey](https://github.com/dask/cachey). - -Adapters that use the cache _must_ use a tuple of strings and/or numbers as a -cache key and _should_ use a cache key of the form `(class.__module__, -class.__qualname__, ...)` to avoid collisions with other Adapters. See -`tiled.adapters.tiff` for a generic example and see `tiled.adapters.table` for -an example that uses integration with dask. +## Resource Cache ```{eval-rst} .. autosummary:: :toctree: generated - tiled.server.object_cache.get_object_cache - tiled.server.object_cache.set_object_cache - tiled.server.object_cache.ObjectCache - tiled.server.object_cache.ObjectCache.available_bytes - tiled.server.object_cache.ObjectCache.get - tiled.server.object_cache.ObjectCache.put - tiled.server.object_cache.ObjectCache.discard - tiled.server.object_cache.ObjectCache.clear - tiled.server.object_cache.ObjectCache.dask_context - tiled.server.object_cache.ObjectCache.discard_dask + tiled.adapters.resource_cache.get_resource_cache + tiled.adapters.resource_cache.set_resource_cache + tiled.adapters.resource_cache.default_resource_cache + tiled.adapters.resource_cache.with_resource_cache ``` diff --git a/tiled/_tests/test_object_cache.py b/tiled/_tests/test_object_cache.py deleted file mode 100644 index 8ca335dd2..000000000 --- a/tiled/_tests/test_object_cache.py +++ /dev/null @@ -1,196 +0,0 @@ -from pathlib import Path - -import numpy -import psutil -import pytest - -from ..catalog import in_memory -from ..client import Context, from_context -from ..client.register import register -from ..server.app import build_app, build_app_from_config -from ..server.object_cache import NO_CACHE, ObjectCache, get_object_cache - - -def test_tallying_hits_and_misses(): - cache = ObjectCache(1e6) - assert cache.get("a") is None - assert cache.misses == 1 - assert cache.hits == 0 - assert cache.get("a") is None - assert cache.misses == 2 - assert cache.hits == 0 - arr = numpy.ones((5, 5)) - cache.put("a", arr, cost=1) - assert cache.get("a") is arr - assert cache.misses == 2 - assert cache.hits == 1 - assert cache.get("a") is arr - assert cache.misses == 2 - assert cache.hits == 2 - cache.discard("a") - assert cache.get("a") is None - assert cache.misses == 3 - assert cache.hits == 2 - - -def test_too_large_item(): - AVAILABLE_BYTES = 10 # very small limit - cache = ObjectCache(AVAILABLE_BYTES) - arr = numpy.ones((5, 5)) - assert arr.nbytes > AVAILABLE_BYTES - cache.put("b", arr, cost=1) - assert cache.get("b") is None - # Manually specify the size. - cache.put("b", arr, cost=1, nbytes=arr.nbytes) - assert cache.get("b") is None - - -def test_eviction(): - AVAILABLE_BYTES = 300 - cache = ObjectCache(AVAILABLE_BYTES) - arr1 = numpy.ones((5, 5)) # 200 bytes - arr2 = 2 * numpy.ones((5, 5)) - cache.put("arr1", arr1, cost=1) - assert "arr1" in cache - # Costly one evicts the previous one. - cache.put("arr2", arr2, cost=5) - assert "arr1" not in cache - assert "arr2" in cache - # Cheap one does not evict the previous one. - cache.put("arr1", arr1, cost=1) - assert "arr1" not in cache - assert "arr2" in cache - - -@pytest.mark.asyncio -@pytest.mark.xfail(reason="Object Cache pending removal") -async def test_object_cache_hit_and_miss(tmpdir): - with open(Path(tmpdir, "data.csv"), "w") as file: - file.write( - """ -a,b,c -1,2,3 -""" - ) - adapter = in_memory(readable_storage=[tmpdir]) - with Context.from_app(build_app(adapter)) as context: - client = from_context(context) - await register(client, tmpdir) - cache = get_object_cache() - assert cache.hits == cache.misses == 0 - client["data"].read() - assert cache.misses == 2 # two dask objects in the cache - assert cache.hits == 0 - client["data"].read() - assert cache.misses == 2 - assert cache.hits == 2 - # Simulate eviction. - cache.clear() - client["data"].read() - assert cache.misses == 4 - assert cache.hits == 2 - client["data"].read() - assert cache.misses == 4 - assert cache.hits == 4 - - -@pytest.mark.asyncio -async def test_object_cache_disabled(tmpdir): - with open(Path(tmpdir, "data.csv"), "w") as file: - file.write( - """ -a,b,c -1,2,3 -""" - ) - adapter = in_memory(readable_storage=[tmpdir]) - server_settings = {"object_cache": {"available_bytes": 0}} - with Context.from_app( - build_app(adapter, server_settings=server_settings) - ) as context: - client = from_context(context) - await register(client, tmpdir) - cache = get_object_cache() - assert cache is NO_CACHE - client["data"] - - -@pytest.mark.asyncio -async def test_detect_content_changed_or_removed(tmpdir): - path = Path(tmpdir, "data.csv") - with open(path, "w") as file: - file.write( - """ -a,b,c -1,2,3 -""" - ) - adapter = in_memory(readable_storage=[tmpdir]) - with Context.from_app(build_app(adapter)) as context: - client = from_context(context) - await register(client, tmpdir) - cache = get_object_cache() - assert cache.hits == cache.misses == 0 - assert len(client["data"].read()) == 1 - with open(path, "w") as file: - file.write( - """ - a,b,c - 1,2,3 - 4,5,6 - """ - ) - await register(client, tmpdir) - assert len(client["data"].read()) == 2 - with open(path, "w") as file: - file.write( - """ - a,b,c - 1,2,3 - 4,5,6 - 7,8,9 - """ - ) - await register(client, tmpdir) - assert len(client["data"].read()) == 3 - # Remove file. - path.unlink() - await register(client, tmpdir) - assert "data" not in client - with pytest.raises(KeyError): - client["data"] - - -def test_cache_size_absolute(tmpdir): - config = { - "trees": [ - { - "tree": "catalog", - "path": "/", - "args": {"uri": tmpdir / "catalog.db", "init_if_not_exists": True}, - } - ], - "object_cache": {"available_bytes": 1000}, - } - with Context.from_app(build_app_from_config(config)): - cache = get_object_cache() - assert cache.available_bytes == 1000 - - -def test_cache_size_relative(tmpdir): - # As a fraction of system memory - config = { - "trees": [ - { - "tree": "catalog", - "path": "/", - "args": {"uri": tmpdir / "catalog.db", "init_if_not_exists": True}, - } - ], - "object_cache": {"available_bytes": 0.1}, - } - with Context.from_app(build_app_from_config(config)): - cache = get_object_cache() - actual = cache.available_bytes - expected = psutil.virtual_memory().total * 0.1 - assert abs(actual - expected) / expected < 0.01 # inexact is OK diff --git a/tiled/_tests/test_resource_cache.py b/tiled/_tests/test_resource_cache.py new file mode 100644 index 000000000..2c52b3adc --- /dev/null +++ b/tiled/_tests/test_resource_cache.py @@ -0,0 +1,45 @@ +import cachetools + +from ..adapters.resource_cache import default_resource_cache, with_resource_cache + + +def test_simple_cache(): + counter = 0 + + def f(): + nonlocal counter + counter += 1 + return "some value" + + cache = cachetools.Cache(maxsize=1) + with_resource_cache("test_key", f, _resource_cache=cache) + with_resource_cache("test_key", f, _resource_cache=cache) + assert counter == 1 + + +def test_default_cache(): + counter = 0 + + def f(): + nonlocal counter + counter += 1 + return "some value" + + cache = default_resource_cache() + with_resource_cache("test_key", f, _resource_cache=cache) + with_resource_cache("test_key", f, _resource_cache=cache) + assert counter == 1 + + +def test_cache_zero_size(): + counter = 0 + + def f(): + nonlocal counter + counter += 1 + return "some value" + + cache = cachetools.Cache(maxsize=0) + with_resource_cache("test_key", f, _resource_cache=cache) + with_resource_cache("test_key", f, _resource_cache=cache) + assert counter == 2 diff --git a/tiled/adapters/array.py b/tiled/adapters/array.py index eaec2975c..3996f63b9 100644 --- a/tiled/adapters/array.py +++ b/tiled/adapters/array.py @@ -1,6 +1,5 @@ import dask.array -from ..server.object_cache import get_object_cache from ..structures.array import ArrayStructure from ..structures.core import StructureFamily @@ -76,11 +75,8 @@ def read(self, slice=None): array = self._array if slice is not None: array = array[slice] - # Special case for dask to cache computed result in object cache. if isinstance(self._array, dask.array.Array): - # Note: If the cache is set to NO_CACHE, this is a null context. - with get_object_cache().dask_context: - return array.compute() + return array.compute() return array def read_block(self, block, slice=None): @@ -90,11 +86,8 @@ def read_block(self, block, slice=None): # Slice within the block. if slice is not None: array = array[slice] - # Special case for dask to cache computed result in object cache. - if isinstance(array, dask.array.Array): - # Note: If the cache is set to NO_CACHE, this is a null context. - with get_object_cache().dask_context: - return array.compute() + if isinstance(self._array, dask.array.Array): + return array.compute() return array diff --git a/tiled/adapters/csv.py b/tiled/adapters/csv.py index f58fb3860..03418c1b7 100644 --- a/tiled/adapters/csv.py +++ b/tiled/adapters/csv.py @@ -2,7 +2,6 @@ import dask.dataframe -from ..server.object_cache import NO_CACHE, get_object_cache from ..structures.core import StructureFamily from ..structures.data_source import Asset, DataSource, Management from ..structures.table import TableStructure @@ -34,12 +33,6 @@ def read_csv( """ filepath = path_from_uri(data_uri) ddf = dask.dataframe.read_csv(filepath, **kwargs) - # If an instance has previously been created using the same parameters, - # then we are here because the caller wants a *fresh* view on this data. - # Therefore, we should clear any cached data. - cache = get_object_cache() - if cache is not NO_CACHE: - cache.discard_dask(ddf.__dask_keys__()) # TODO Pass structure through rather than just re-creating it # in from_dask_dataframe. return DataFrameAdapter.from_dask_dataframe( diff --git a/tiled/adapters/excel.py b/tiled/adapters/excel.py index c60fee471..ed9418f52 100644 --- a/tiled/adapters/excel.py +++ b/tiled/adapters/excel.py @@ -2,7 +2,6 @@ import pandas from ..adapters.mapping import MapAdapter -from ..server.object_cache import NO_CACHE, get_object_cache, with_object_cache from .dataframe import DataFrameAdapter @@ -34,20 +33,12 @@ def from_file(cls, file, **kwargs): excel_file = file else: excel_file = pandas.ExcelFile(file) - # If an instance has previously been created using the same parameters, - # then we are here because the caller wants a *fresh* view on this data. - # Therefore, we should clear any cached data. - cache = get_object_cache() mapping = {} for sheet_name in excel_file.sheet_names: - cache_key = (cls.__module__, cls.__qualname__, file, sheet_name) ddf = dask.dataframe.from_pandas( - with_object_cache(cache_key, excel_file.parse, sheet_name), + excel_file.parse(sheet_name), npartitions=1, # TODO Be smarter about this. ) - if cache is not NO_CACHE: - cache.discard(cache_key) # parsed sheet content - cache.discard_dask(ddf.__dask_keys__()) # dask tasks mapping[sheet_name] = DataFrameAdapter.from_dask_dataframe(ddf) return cls(mapping, **kwargs) diff --git a/tiled/adapters/hdf5.py b/tiled/adapters/hdf5.py index 3307086db..cd5489594 100644 --- a/tiled/adapters/hdf5.py +++ b/tiled/adapters/hdf5.py @@ -10,6 +10,7 @@ from ..structures.core import StructureFamily from ..utils import node_repr, path_from_uri from .array import ArrayAdapter +from .resource_cache import with_resource_cache SWMR_DEFAULT = bool(int(os.getenv("TILED_HDF5_SWMR_DEFAULT", "0"))) INLINED_DEPTH = int(os.getenv("TILED_HDF5_INLINED_CONTENTS_MAX_DEPTH", "7")) @@ -114,7 +115,10 @@ def from_uri( access_policy=None, ): filepath = path_from_uri(data_uri) - file = h5py.File(filepath, "r", swmr=swmr, libver=libver) + cache_key = (h5py.File, filepath, "r", swmr, libver) + file = with_resource_cache( + cache_key, h5py.File, filepath, "r", swmr=swmr, libver=libver + ) return cls.from_file(file) def __repr__(self): diff --git a/tiled/adapters/resource_cache.py b/tiled/adapters/resource_cache.py new file mode 100644 index 000000000..918ba9ede --- /dev/null +++ b/tiled/adapters/resource_cache.py @@ -0,0 +1,71 @@ +import os +from typing import Any, Callable, Optional + +import cachetools + +# When items are evicted from the cache, a hard reference is dropped, freeing +# the resource to be closed by the garbage collector if there are no other +# extant hard references. Items are evicted if: +# +# - They have been in the cache for a _total_ of more than a given time. +# (Accessing an item does not reset this time.) +# - The cache is at capacity and this item is the least recently used item. +# +# The "size" is measured in cached items; that is, each item in the cache has +# size 1. +DEFAULT_MAX_SIZE = int(os.getenv("TILED_RESOURCE_CACHE_MAX_SIZE", "1024")) +DEFAULT_TIME_TO_USE_SECONDS = float(os.getenv("TILED_RESOURCE_CACHE_TTU", "60.")) + + +def get_resource_cache() -> cachetools.Cache: + "Return resource cache, a process-global Cache." + return _cache + + +def set_resource_cache(cache: cachetools.Cache) -> None: + "Set the resource cache, a process-global Cache." + global _cache + _cache = cache + + +def default_ttu(_key: str, value: Any, now: float): + """ + Retain cached items for at most `DEFAULT_TIME_TO_USE_SECONDS` seconds (60s, by default). + """ + return DEFAULT_TIME_TO_USE_SECONDS + now + + +def default_resource_cache(): + "Create a new instance of the default resource cache." + return cachetools.TLRUCache(DEFAULT_MAX_SIZE, default_ttu) + + +def with_resource_cache( + cache_key: str, + factory: Callable, + *args, + _resource_cache: Optional[cachetools.Cache] = None, + **kwargs, +): + """ + Use value from cache or, if not present, call `factory(*args, **kwargs)` and cache result. + + This uses a globally configured resource cache by default. For testing and + debugging, a cache may be passed to the parameter _resource_cache. + """ + if _resource_cache is None: + cache = get_resource_cache() + else: + cache = _resource_cache + # Return cached value if found. + value = cache.get(cache_key) + if value is not None: + return value + # Generate value and offer it to the cache. + value = factory(*args, **kwargs) + if cache.maxsize: # handle size 0 cache + cache[cache_key] = value + return value + + +_cache: cachetools.Cache = default_resource_cache() diff --git a/tiled/adapters/table.py b/tiled/adapters/table.py index 41b3ce742..da63b8b39 100644 --- a/tiled/adapters/table.py +++ b/tiled/adapters/table.py @@ -2,7 +2,6 @@ import dask.dataframe import pandas -from ..server.object_cache import get_object_cache from ..structures.core import Spec, StructureFamily from ..structures.table import TableStructure from .array import ArrayAdapter @@ -102,9 +101,7 @@ def read(self, fields=None): ) else: ddf = dask.dataframe.concat(self._partitions, axis=0) - # Note: If the cache is set to NO_CACHE, this is a null context. - with get_object_cache().dask_context: - return ddf.compute() + return ddf.compute() df = pandas.concat(self._partitions, axis=0) if fields is not None: df = df[fields] @@ -116,11 +113,8 @@ def read_partition(self, partition, fields=None): raise RuntimeError(f"partition {partition} has not be stored yet") if fields is not None: partition = partition[fields] - # Special case for dask to cache computed result in object cache. if isinstance(partition, dask.dataframe.DataFrame): - # Note: If the cache is set to NO_CACHE, this is a null context. - with get_object_cache().dask_context: - return partition.compute() + return partition.compute() return partition diff --git a/tiled/adapters/tiff.py b/tiled/adapters/tiff.py index 794371701..338548c5b 100644 --- a/tiled/adapters/tiff.py +++ b/tiled/adapters/tiff.py @@ -1,12 +1,11 @@ import builtins -import hashlib import tifffile -from ..server.object_cache import with_object_cache from ..structures.array import ArrayStructure, BuiltinDtype from ..structures.core import StructureFamily from ..utils import path_from_uri +from .resource_cache import with_resource_cache class TiffAdapter: @@ -33,8 +32,8 @@ def __init__( if not isinstance(data_uri, str): raise Exception filepath = path_from_uri(data_uri) - self._file = tifffile.TiffFile(filepath) - self._cache_key = (type(self).__module__, type(self).__qualname__, filepath) + cache_key = (tifffile.TiffFile, filepath) + self._file = with_resource_cache(cache_key, tifffile.TiffFile, filepath) self.specs = specs or [] self._provided_metadata = metadata or {} self.access_policy = access_policy @@ -42,7 +41,7 @@ def __init__( if self._file.is_shaped: shape = tuple(self._file.shaped_metadata[0]["shape"]) else: - arr = with_object_cache(self._cache_key, self._file.asarray) + arr = self._file.asarray() shape = arr.shape structure = ArrayStructure( shape=shape, @@ -63,7 +62,7 @@ def read(self, slice=None): # if we only want a slice? I do not think that is possible with a # single-page TIFF but I'm not sure. Certainly it *is* possible for # multi-page TIFFs. - arr = with_object_cache(self._cache_key, self._file.asarray) + arr = self._file.asarray() if slice is not None: arr = arr[slice] return arr @@ -74,7 +73,7 @@ def read_block(self, block, slice=None): if sum(block) != 0: raise IndexError(block) - arr = with_object_cache(self._cache_key, self._file.asarray) + arr = self._file.asarray() if slice is not None: arr = arr[slice] return arr @@ -115,11 +114,6 @@ def __init__( access_policy=None, ): self._seq = seq - self._cache_key = ( - type(self).__module__, - type(self).__qualname__, - hashlib.md5(str(self._seq.files).encode()).hexdigest(), - ) # TODO Check shape, chunks against reality. self.specs = specs or [] self._provided_metadata = metadata or {} @@ -150,26 +144,20 @@ def read(self, slice=None): """ if slice is None: - return with_object_cache(self._cache_key, self._seq.asarray) + return self._seq.asarray() if isinstance(slice, int): # e.g. read(slice=0) - return with_object_cache( - self._cache_key + (slice,), - tifffile.TiffFile(self._seq.files[slice]).asarray, - ) + return tifffile.TiffFile(self._seq.files[slice]).asarray() # e.g. read(slice=(...)) if isinstance(slice, tuple): if len(slice) == 0: - return with_object_cache(self._cache_key, self._seq.asarray) + return self._seq.asarray() image_axis, *the_rest = slice # Could be int or slice # (0, slice(...)) or (0,....) are converted to a list if isinstance(image_axis, int): # e.g. read(slice=(0, ....)) - arr = with_object_cache( - self._cache_key + (image_axis,), - tifffile.TiffFile(self._seq.files[image_axis]).asarray, - ) + return tifffile.TiffFile(self._seq.files[image_axis]).asarray() if isinstance(image_axis, builtins.slice): if image_axis.start is None: slice_start = 0 @@ -180,14 +168,11 @@ def read(self, slice=None): else: slice_step = image_axis.step - arr = with_object_cache( - self._cache_key + (slice_start, image_axis.stop, slice_step), - tifffile.TiffSequence( - self._seq.files[ - slice_start : image_axis.stop : slice_step # noqa: E203 - ] - ).asarray, - ) + arr = tifffile.TiffSequence( + self._seq.files[ + slice_start : image_axis.stop : slice_step # noqa: E203 + ] + ).asarray() arr = arr[tuple(the_rest)] return arr if isinstance(slice, builtins.slice): @@ -201,12 +186,9 @@ def read(self, slice=None): else: slice_step = slice.step - arr = with_object_cache( - self._cache_key + (slice_start, slice.stop, slice_step), - tifffile.TiffSequence( - self._seq.files[slice_start : slice.stop : slice_step] # noqa: E203 - ).asarray, - ) + arr = tifffile.TiffSequence( + self._seq.files[slice_start : slice.stop : slice_step] # noqa: E203 + ).asarray() return arr def read_block(self, block, slice=None): diff --git a/tiled/commandline/_serve.py b/tiled/commandline/_serve.py index 1d52454e2..9aeff874d 100644 --- a/tiled/commandline/_serve.py +++ b/tiled/commandline/_serve.py @@ -102,16 +102,6 @@ def serve_directory( log_config: Optional[str] = typer.Option( None, help="Custom uvicorn logging configuration file" ), - object_cache_available_bytes: Optional[float] = typer.Option( - None, - "--data-cache", - help=( - "Maximum size for the object cache, given as a number of bytes as in " - "1_000_000 or as a fraction of system RAM (total physical memory) as in " - "0.3. Set to 0 to disable this cache. By default, it will use up to " - "0.15 (15%) of RAM." - ), - ), ): "Serve a Tree instance from a directory of files." import tempfile @@ -151,11 +141,6 @@ def serve_directory( key_from_filename = identity else: key_from_filename = None - if object_cache_available_bytes is not None: - server_settings["object_cache"] = {} - server_settings["object_cache"][ - "available_bytes" - ] = object_cache_available_bytes from logging import StreamHandler @@ -342,16 +327,6 @@ def serve_catalog( ), ), port: int = typer.Option(8000, help="Bind to a socket with this port."), - object_cache_available_bytes: Optional[float] = typer.Option( - None, - "--data-cache", - help=( - "Maximum size for the object cache, given as a number of bytes as in " - "1_000_000 or as a fraction of system RAM (total physical memory) as in " - "0.3. Set to 0 to disable this cache. By default, it will use up to " - "0.15 (15%) of RAM." - ), - ), scalable: bool = typer.Option( False, "--scalable", @@ -439,11 +414,6 @@ def serve_catalog( err=True, ) server_settings = {} - if object_cache_available_bytes is not None: - server_settings["object_cache"] = {} - server_settings["object_cache"][ - "available_bytes" - ] = object_cache_available_bytes tree = from_uri( database, writable_storage=write, @@ -500,16 +470,6 @@ def serve_pyobject( ), ), port: int = typer.Option(8000, help="Bind to a socket with this port."), - object_cache_available_bytes: Optional[float] = typer.Option( - None, - "--data-cache", - help=( - "Maximum size for the object cache, given as a number of bytes as in " - "1_000_000 or as a fraction of system RAM (total physical memory) as in " - "0.3. Set to 0 to disable this cache. By default, it will use up to " - "0.15 (15%) of RAM." - ), - ), scalable: bool = typer.Option( False, "--scalable", @@ -524,11 +484,6 @@ def serve_pyobject( tree = import_object(object_path) server_settings = {} - if object_cache_available_bytes is not None: - server_settings["object_cache"] = {} - server_settings["object_cache"][ - "available_bytes" - ] = object_cache_available_bytes web_app = build_app( tree, { diff --git a/tiled/config.py b/tiled/config.py index 32a95de3e..98248f975 100644 --- a/tiled/config.py +++ b/tiled/config.py @@ -5,6 +5,7 @@ """ import copy import os +import warnings from collections import defaultdict from datetime import timedelta from functools import lru_cache @@ -186,7 +187,6 @@ def construct_build_app_kwargs( if root_path := config.get("root_path", ""): server_settings["root_path"] = root_path server_settings["allow_origins"] = config.get("allow_origins") - server_settings["object_cache"] = config.get("object_cache", {}) server_settings["response_bytesize_limit"] = config.get( "response_bytesize_limit" ) @@ -238,7 +238,6 @@ def merge(configs): authentication_config_source = None access_control_config_source = None uvicorn_config_source = None - object_cache_config_source = None metrics_config_source = None database_config_source = None response_bytesize_limit_config_source = None @@ -283,14 +282,10 @@ def merge(configs): uvicorn_config_source = filepath merged["uvicorn"] = config["uvicorn"] if "object_cache" in config: - if "object_cache" in merged: - raise ConfigError( - "object_cache can only be specified in one file. " - f"It was found in both {object_cache_config_source} and " - f"{filepath}" - ) - object_cache_config_source = filepath - merged["object_cache"] = config["object_cache"] + warnings.warn( + "The object cache has been removed. " + "The config of the object cache no longer has any effect." + ) if "response_bytesize_limit" in config: if "response_bytesize_limit" in merged: raise ConfigError( diff --git a/tiled/config_schemas/service_configuration.yml b/tiled/config_schemas/service_configuration.yml index 98162430c..6a7f17a24 100644 --- a/tiled/config_schemas/service_configuration.yml +++ b/tiled/config_schemas/service_configuration.yml @@ -354,19 +354,17 @@ properties: available_bytes: type: number description: | - Maximum size for the object cache, given as a number of bytes as in - 1_000_000 or as a fraction of system RAM (total physical memory) as in - 0.3. Set to 0 to disable this cache. By default, it will use up to - 0.15 (15%) of RAM. + DEPRECATED. The object cache has been removed. This configuration no longer has + any effect. log_level: type: "string" enum: ["critical", "error", "warning", "info", "debug"] description: | - Set to "debug" level to log cache hits, misses, and stores. - Default is "info" level, which logs cache configuration at application startup. + DEPRECATED. The object cache has been removed. This configuration no longer has + any effect. description: | - The 'object' cache is available to all Adapters to cache arbitrary objects, including - file handles or chunks of data, in process memory. + DEPRECATED. The object cache has been removed. This configuration no longer has + any effect. response_bytesize_limit: type: integer description: | diff --git a/tiled/server/app.py b/tiled/server/app.py index d3c8c6329..62606a4db 100644 --- a/tiled/server/app.py +++ b/tiled/server/app.py @@ -49,9 +49,6 @@ get_serialization_registry, get_validation_registry, ) -from .object_cache import NO_CACHE, ObjectCache -from .object_cache import logger as object_cache_logger -from .object_cache import set_object_cache from .router import distinct, patch_route_signature, router, search from .settings import get_settings from .utils import ( @@ -468,13 +465,6 @@ def override_get_settings(): settings.database_max_overflow = database["max_overflow"] if database.get("init_if_not_exists"): settings.database_init_if_not_exists = database["init_if_not_exists"] - object_cache_available_bytes = server_settings.get("object_cache", {}).get( - "available_bytes" - ) - if object_cache_available_bytes is not None: - setattr( - settings, "object_cache_available_bytes", object_cache_available_bytes - ) if authentication.get("providers"): # If we support authentication providers, we need a database, so if one is # not set, use a SQLite database in memory. Horizontally scaled deployments @@ -534,34 +524,6 @@ async def startup_event(): app.state.tasks.append(asyncio_task) app.state.allow_origins.extend(settings.allow_origins) - object_cache_logger.setLevel(settings.object_cache_log_level.upper()) - object_cache_available_bytes = settings.object_cache_available_bytes - import psutil - - TOTAL_PHYSICAL_MEMORY = psutil.virtual_memory().total - if object_cache_available_bytes < 0: - raise ValueError("Negative object cache size is not interpretable.") - if object_cache_available_bytes == 0: - cache = NO_CACHE - object_cache_logger.info("disabled") - else: - if 0 < object_cache_available_bytes < 1: - # Interpret this as a fraction of system memory. - - object_cache_available_bytes = int( - TOTAL_PHYSICAL_MEMORY * object_cache_available_bytes - ) - else: - object_cache_available_bytes = int(object_cache_available_bytes) - cache = ObjectCache(object_cache_available_bytes) - percentage = round( - object_cache_available_bytes / TOTAL_PHYSICAL_MEMORY * 100 - ) - object_cache_logger.info( - f"Will use up to {object_cache_available_bytes:_} bytes ({percentage:d}% of total physical RAM)" - ) - set_object_cache(cache) - # Expose the root_tree here to make it easier to access it from tests, # in usages like: # client.context.app.state.root_tree diff --git a/tiled/server/object_cache.py b/tiled/server/object_cache.py deleted file mode 100644 index 8ad55a972..000000000 --- a/tiled/server/object_cache.py +++ /dev/null @@ -1,222 +0,0 @@ -""" -The 'data' cache is available to all Adapters to cache chunks of data. - -This is integrated with dask's callback mechanism for simple caching of dask -chunks. It is also available for usage outside of dask. - -The cache is a process-global singleton. -""" -import contextlib -import time -from timeit import default_timer - -import cachey -from dask.callbacks import Callback - -from ..utils import catch_warning_msg - -if __debug__: - import logging - - logger = logging.getLogger(__name__) - handler = logging.StreamHandler() - handler.setLevel("DEBUG") - handler.setFormatter(logging.Formatter("OBJECT CACHE: %(message)s")) - logger.addHandler(handler) - - -# See https://github.com/bluesky/tiled/issues/675#issuecomment-1983581882 -WARNING_PANDAS_BLOCKS = ( - "DataFrame._data is deprecated and will be removed in a future version. " - "Use public APIs instead." -) - - -def catch_pandas_blocks_warning(): - return catch_warning_msg( - action="ignore", message=WARNING_PANDAS_BLOCKS, category=DeprecationWarning - ) - - -class _NO_CACHE_SENTINEL: - def __init__(self): - self.dask_context = contextlib.nullcontext() - - def __repr__(self): - return "NO_CACHE" - - -NO_CACHE = _NO_CACHE_SENTINEL() -_object_cache = NO_CACHE - - -def set_object_cache(cache): - """ - Set the process-global icache. - """ - global _object_cache - _object_cache = cache - - -def get_object_cache(): - """ - Set the process-global icache. - """ - return _object_cache - - -# TODO Use positional-only args for for with_object_cache -# once we drop Python 3.7 support. - - -def with_object_cache(cache_key, factory, *args, **kwargs): - """ - Use value from cache or, if not present, call factory(*args, **kwargs) and cache result. - """ - cache = get_object_cache() - # If no cache configured, generate and return value. - if cache is NO_CACHE: - return factory(*args, **kwargs) - # Return cached value if found. - value = cache.get(cache_key) - if value is not None: - return value - # Generate value and offer it to the cache, with an associated cost. - start_time = time.perf_counter() - value = factory(*args, **kwargs) - cost = time.perf_counter() - start_time - cache.put(cache_key, value, cost) - return value - - -class ObjectCache: - def __init__(self, available_bytes_in_process): - self.misses = 0 - self.hits = 0 - - def miss(key): - self.misses += 1 - if __debug__: - logger.debug("Miss %r", key) - - def hit(key, value): - self.hits += 1 - if __debug__: - logger.debug("Hit %r", key) - - self._cache = cachey.Cache(available_bytes_in_process, 0, miss=miss, hit=hit) - self._dask_context = DaskCache(self) - - @property - def dask_context(self): - """ - Within this context, get and store dask tasks with the object cache. - """ - return self._dask_context - - @property - def available_bytes(self): - """ - Maximum size in bytes - """ - return self._cache.available_bytes - - def get(self, key): - """ - Get cache item. - """ - value = self._cache.get(key) - return value - - def put(self, key, value, cost, nbytes=None): - """ - Put cache item. - - Parameters - ---------- - key : uniquely identifies content - value : object - May be any Python object. For future-proofing, the object should be - pickle-able, as an _external_ object cache will be added in the future. - cost : float - Time in seconds that this value cost to obtain. - nbytes : bytesize, optional - Computed (with best effort) if not provided. - """ - if nbytes is None: - with catch_pandas_blocks_warning(): - nbytes = self._cache.get_nbytes(value) - logger.debug("Store %r (cost=%.3f, nbytes=%d)", key, cost, nbytes) - self._cache.put(key, value, cost, nbytes=nbytes) - - def discard(self, *keys): - """ - Discard one or more items from the cache if present. - """ - for key in keys: - # Cachey has no API specifically for this, but we can do it ourselves - # but modifying only public state. - value = self._cache.data.pop(key, None) - if value is not None: - self._cache.total_bytes -= self._cache.nbytes.pop(key) - - def discard_dask(self, *keys): - """ - Discard one or more dask tasks from the cache, if present. - - Internally, cached dask keys are prefixed to avoid collisions. - That is why it has a method separate from discard(). - """ - self.discard(("dask", key) for key in keys) - - def clear(self): - """ - Empty the cache. - """ - return self._cache.clear() - - def __contains__(self, key): - return key in self._cache - - -class DaskCache(Callback): - """ - Adapted from dask.cache - - Changes: - - Scope keys under prefix 'dask-' to avoid collisions with non-dask cache usage - - Use a simpler cost computation: duration (in seconds) - """ - - def __init__(self, cache): - self._nbytes = cachey.nbytes - self.cache = cache - self.starttimes = dict() - - def _start(self, dsk): - "Patched as noted in comment below" - self.durations = dict() - for key in dsk: - # Use 'get', not cache.data[key] as upstream does, - # in order to trip 'hit' and 'miss' callbacks. - cached_result = self.cache.get(("dask", *key)) - if cached_result is not None: - dsk[key] = cached_result - - def _pretask(self, key, dsk, state): - self.starttimes[key] = default_timer() - - def _posttask(self, key, value, dsk, state, id): - duration = default_timer() - self.starttimes[key] - deps = state["dependencies"][key] - if deps: - duration += max(self.durations.get(k, 0) for k in deps) - self.durations[key] = duration - with catch_pandas_blocks_warning(): - nb = self._nbytes(value) - self.cache.put(("dask", *key), value, cost=duration, nbytes=nb) - - def _finish(self, dsk, state, errored): - for key in dsk: - self.starttimes.pop(key, None) - self.durations.pop(key, None) diff --git a/tiled/server/settings.py b/tiled/server/settings.py index 071aba070..57015f166 100644 --- a/tiled/server/settings.py +++ b/tiled/server/settings.py @@ -20,10 +20,6 @@ class Settings(BaseSettings): allow_origins: List[str] = [ item for item in os.getenv("TILED_ALLOW_ORIGINS", "").split() if item ] - object_cache_available_bytes: float = float( - os.getenv("TILED_OBJECT_CACHE_AVAILABLE_BYTES", "0.15") - ) - object_cache_log_level: str = os.getenv("TILED_OBJECT_CACHE_LOG_LEVEL", "INFO") authenticator: Any = None # These 'single user' settings are only applicable if authenticator is None. single_user_api_key: str = os.getenv(