Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-39582: Add caching for some butler primitives during deserialization #858

Merged
merged 11 commits into from
Jul 4, 2023
2 changes: 2 additions & 0 deletions doc/changes/DM-39582.misc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Added ability for some butler primitives to be cached and re-used on deserialization through a special
interface.
1 change: 1 addition & 0 deletions doc/changes/DM-39582.removal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Deprecate reconstituteDimensions argument from `Quantum.from_simple`.
4 changes: 2 additions & 2 deletions python/lsst/daf/butler/_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1651,9 +1651,9 @@ def exists(
if full_check:
if self.datastore.exists(ref):
existence |= DatasetExistence._ARTIFACT
elif existence != DatasetExistence.UNRECOGNIZED:
elif existence.value != DatasetExistence.UNRECOGNIZED.value:
# Do not add this flag if we have no other idea about a dataset.
existence |= DatasetExistence._ASSUMED
existence |= DatasetExistence(DatasetExistence._ASSUMED)

return existence

Expand Down
1 change: 1 addition & 0 deletions python/lsst/daf/butler/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from .logging import ButlerLogRecords
from .mappingFactory import *
from .named import *
from .persistenceContext import *
from .progress import Progress
from .quantum import *
from .storageClass import *
Expand Down
15 changes: 7 additions & 8 deletions python/lsst/daf/butler/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

import yaml
from lsst.resources import ResourcePath, ResourcePathExpression
from lsst.utils import doImport
from lsst.utils import doImportType
from yaml.representer import Representer

yaml.add_representer(defaultdict, Representer.represent_dict)
Expand Down Expand Up @@ -1203,18 +1203,17 @@ def __init__(

if pytype is not None:
try:
cls = doImport(pytype)
cls = doImportType(pytype)
except ImportError as e:
raise RuntimeError(f"Failed to import cls '{pytype}' for config {type(self)}") from e
defaultsFile = cls.defaultConfigFile
# The class referenced from the config file is not required
# to specify a default config file.
defaultsFile = getattr(cls, "defaultConfigFile", None)
if defaultsFile is not None:
self._updateWithConfigsFromPath(fullSearchPath, defaultsFile)

# Get the container key in case we need it
try:
containerKey = cls.containerKey
except AttributeError:
pass
# Get the container key in case we need it and it is specified.
containerKey = getattr(cls, "containerKey", None)

# Now update this object with the external values so that the external
# values always override the defaults
Expand Down
52 changes: 38 additions & 14 deletions python/lsst/daf/butler/core/datasets/ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
]

import enum
import sys
import uuid
from collections.abc import Iterable
from typing import TYPE_CHECKING, Any, ClassVar
Expand All @@ -41,6 +42,7 @@
from ..dimensions import DataCoordinate, DimensionGraph, DimensionUniverse, SerializedDataCoordinate
from ..json import from_json_pydantic, to_json_pydantic
from ..named import NamedKeyDict
from ..persistenceContext import PersistenceContextVars
from .type import DatasetType, SerializedDatasetType

if TYPE_CHECKING:
Expand Down Expand Up @@ -142,6 +144,10 @@ def makeDatasetId(
return uuid.uuid5(self.NS_UUID, data)


# This is constant, so don't recreate a set for each instance
_serializedDatasetRefFieldsSet = {"id", "datasetType", "dataId", "run", "component"}


class SerializedDatasetRef(BaseModel):
"""Simplified model of a `DatasetRef` suitable for serialization."""

Expand Down Expand Up @@ -202,9 +208,9 @@ def direct(
datasetType if datasetType is None else SerializedDatasetType.direct(**datasetType),
)
setter(node, "dataId", dataId if dataId is None else SerializedDataCoordinate.direct(**dataId))
setter(node, "run", run)
setter(node, "run", sys.intern(run))
setter(node, "component", component)
setter(node, "__fields_set__", {"id", "datasetType", "dataId", "run", "component"})
setter(node, "__fields_set__", _serializedDatasetRefFieldsSet)
return node


Expand Down Expand Up @@ -254,7 +260,7 @@ class DatasetRef:

_serializedType = SerializedDatasetRef
__slots__ = (
"id",
"_id",
"datasetType",
"dataId",
"run",
Expand All @@ -277,12 +283,22 @@ def __init__(
self.dataId = dataId
self.run = run
if id is not None:
self.id = id
self._id = id.int
else:
self.id = DatasetIdFactory().makeDatasetId(
self.run, self.datasetType, self.dataId, id_generation_mode
self._id = (
DatasetIdFactory()
.makeDatasetId(self.run, self.datasetType, self.dataId, id_generation_mode)
.int
)

@property
def id(self) -> DatasetId:
"""Primary key of the dataset (`DatasetId`).

Cannot be changed after a `DatasetRef` is constructed.
"""
return uuid.UUID(int=self._id)

def __eq__(self, other: Any) -> bool:
try:
return (self.datasetType, self.dataId, self.id) == (other.datasetType, other.dataId, other.id)
Expand Down Expand Up @@ -396,9 +412,18 @@ def from_simple(
ref : `DatasetRef`
Newly-constructed object.
"""
cache = PersistenceContextVars.datasetRefs.get()
localName = sys.intern(
datasetType.name
if datasetType is not None
else (x.name if (x := simple.datasetType) is not None else "")
)
key = (simple.id.int, localName)
natelust marked this conversation as resolved.
Show resolved Hide resolved
if cache is not None and (cachedRef := cache.get(key, None)) is not None:
return cachedRef
# Minimalist component will just specify component and id and
# require registry to reconstruct
if set(simple.dict(exclude_unset=True, exclude_defaults=True)).issubset({"id", "component"}):
if not (simple.datasetType is not None or simple.dataId is not None or simple.run is not None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you rewrite this as simple.datasetType is None and simple.dataId is None and simple.run is None, I think it makes it easier to read?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is not logically the same thing, We only want to run this when they are all False. But and is greedy, so False will always gobble up anything.

if registry is None:
raise ValueError("Registry is required to construct component DatasetRef from integer id")
if simple.id is None:
Expand All @@ -408,6 +433,8 @@ def from_simple(
raise RuntimeError(f"No matching dataset found in registry for id {simple.id}")
if simple.component:
ref = ref.makeComponentRef(simple.component)
if cache is not None:
cache[key] = ref
return ref

if universe is None and registry is None:
Expand Down Expand Up @@ -443,7 +470,10 @@ def from_simple(
f"Encountered with {simple!r}{dstr}."
)

return cls(datasetType, dataId, id=simple.id, run=simple.run)
newRef = cls(datasetType, dataId, id=simple.id, run=simple.run)
if cache is not None:
cache[key] = newRef
return newRef

to_json = to_json_pydantic
from_json: ClassVar = classmethod(from_json_pydantic)
Expand Down Expand Up @@ -682,9 +712,3 @@ class associated with the dataset type of the other ref can be

Cannot be changed after a `DatasetRef` is constructed.
"""

id: DatasetId
"""Primary key of the dataset (`DatasetId`).

Cannot be changed after a `DatasetRef` is constructed.
"""
19 changes: 18 additions & 1 deletion python/lsst/daf/butler/core/datasets/type.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from ..configSupport import LookupKey
from ..dimensions import DimensionGraph, SerializedDimensionGraph
from ..json import from_json_pydantic, to_json_pydantic
from ..persistenceContext import PersistenceContextVars
from ..storageClass import StorageClass, StorageClassFactory

if TYPE_CHECKING:
Expand Down Expand Up @@ -74,6 +75,10 @@ def direct(

This method should only be called when the inputs are trusted.
"""
cache = PersistenceContextVars.serializedDatasetTypeMapping.get()
key = (name, storageClass or "")
if cache is not None and (type_ := cache.get(key, None)) is not None:
return type_
node = SerializedDatasetType.__new__(cls)
setter = object.__setattr__
setter(node, "name", name)
Expand All @@ -90,6 +95,8 @@ def direct(
"__fields_set__",
{"name", "storageClass", "dimensions", "parentStorageClass", "isCalibration"},
)
if cache is not None:
cache[key] = node
return node


Expand Down Expand Up @@ -685,6 +692,13 @@ def from_simple(
datasetType : `DatasetType`
Newly-constructed object.
"""
# check to see if there is a cache, and if there is, if there is a
# cached dataset type
cache = PersistenceContextVars.loadedTypes.get()
key = (simple.name, simple.storageClass or "")
if cache is not None and (type_ := cache.get(key, None)) is not None:
return type_

if simple.storageClass is None:
# Treat this as minimalist representation
if registry is None:
Expand All @@ -708,14 +722,17 @@ def from_simple(
# mypy hint
raise ValueError(f"Dimensions must be specified in {simple}")

return cls(
newType = cls(
name=simple.name,
dimensions=DimensionGraph.from_simple(simple.dimensions, universe=universe),
storageClass=simple.storageClass,
isCalibration=simple.isCalibration,
parentStorageClass=simple.parentStorageClass,
universe=universe,
)
if cache is not None:
cache[key] = newType
return newType

to_json = to_json_pydantic
from_json: ClassVar = classmethod(from_json_pydantic)
Expand Down
15 changes: 14 additions & 1 deletion python/lsst/daf/butler/core/datastoreRecordData.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

from .datasets import DatasetId
from .dimensions import DimensionUniverse
from .persistenceContext import PersistenceContextVars
from .storedFileInfo import StoredDatastoreItemInfo

if TYPE_CHECKING:
Expand Down Expand Up @@ -204,16 +205,28 @@ def from_simple(
item_info : `StoredDatastoreItemInfo`
De-serialized instance of `StoredDatastoreItemInfo`.
"""
cache = PersistenceContextVars.dataStoreRecords.get()
key = frozenset(simple.dataset_ids)
if cache is not None and (cachedRecord := cache.get(key)) is not None:
return cachedRecord
records: dict[DatasetId, dict[str, list[StoredDatastoreItemInfo]]] = {}
# make sure that all dataset IDs appear in the dict even if they don't
# have records.
for dataset_id in simple.dataset_ids:
records[dataset_id] = {}
for class_name, table_data in simple.records.items():
klass = doImportType(class_name)
if not issubclass(klass, StoredDatastoreItemInfo):
raise RuntimeError(
"The class specified in the SerializedDatastoreRecordData "
f"({get_full_type_name(klass)}) is not a StoredDatastoreItemInfo."
)
for table_name, table_records in table_data.items():
for record in table_records:
info = klass.from_record(record)
dataset_type_records = records.setdefault(info.dataset_id, {})
dataset_type_records.setdefault(table_name, []).append(info)
return cls(records=records)
newRecord = cls(records=records)
if cache is not None:
cache[key] = newRecord
return newRecord
13 changes: 13 additions & 0 deletions python/lsst/daf/butler/core/dimensions/_coordinate.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

from ..json import from_json_pydantic, to_json_pydantic
from ..named import NamedKeyDict, NamedKeyMapping, NamedValueAbstractSet, NameLookupMapping
from ..persistenceContext import PersistenceContextVars
from ..timespan import Timespan
from ._elements import Dimension, DimensionElement
from ._graph import DimensionGraph
Expand Down Expand Up @@ -76,6 +77,10 @@ def direct(cls, *, dataId: dict[str, DataIdValue], records: dict[str, dict]) ->

This method should only be called when the inputs are trusted.
"""
key = (frozenset(dataId.items()), records is not None)
cache = PersistenceContextVars.serializedDataCoordinateMapping.get()
if cache is not None and (result := cache.get(key)) is not None:
return result
node = SerializedDataCoordinate.__new__(cls)
setter = object.__setattr__
setter(node, "dataId", dataId)
Expand All @@ -87,6 +92,8 @@ def direct(cls, *, dataId: dict[str, DataIdValue], records: dict[str, dict]) ->
else {k: SerializedDimensionRecord.direct(**v) for k, v in records.items()},
)
setter(node, "__fields_set__", {"dataId", "records"})
if cache is not None:
cache[key] = node
return node


Expand Down Expand Up @@ -730,6 +737,10 @@ def from_simple(
dataId : `DataCoordinate`
Newly-constructed object.
"""
key = (frozenset(simple.dataId.items()), simple.records is not None)
cache = PersistenceContextVars.dataCoordinates.get()
if cache is not None and (result := cache.get(key)) is not None:
return result
if universe is None and registry is None:
raise ValueError("One of universe or registry is required to convert a dict to a DataCoordinate")
if universe is None and registry is not None:
Expand All @@ -743,6 +754,8 @@ def from_simple(
dataId = dataId.expanded(
{k: DimensionRecord.from_simple(v, universe=universe) for k, v in simple.records.items()}
)
if cache is not None:
cache[key] = dataId
return dataId

to_json = to_json_pydantic
Expand Down
29 changes: 27 additions & 2 deletions python/lsst/daf/butler/core/dimensions/_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from pydantic import BaseModel, Field, StrictBool, StrictFloat, StrictInt, StrictStr, create_model

from ..json import from_json_pydantic, to_json_pydantic
from ..persistenceContext import PersistenceContextVars
from ..timespan import Timespan, TimespanDatabaseRepresentation
from ._elements import Dimension, DimensionElement

Expand Down Expand Up @@ -166,7 +167,16 @@ def direct(

This method should only be called when the inputs are trusted.
"""
node = cls.construct(definition=definition, record=record)
_recItems = record.items()
# Type ignore because the ternary statement seems to confuse mypy
# based on conflicting inferred types of v.
key = (
definition,
frozenset((k, v if not isinstance(v, list) else tuple(v)) for k, v in _recItems), # type: ignore
)
cache = PersistenceContextVars.serializedDimensionRecordMapping.get()
if cache is not None and (result := cache.get(key)) is not None:
return result
node = SerializedDimensionRecord.__new__(cls)
setter = object.__setattr__
setter(node, "definition", definition)
Expand All @@ -177,6 +187,8 @@ def direct(
node, "record", {k: v if type(v) != list else tuple(v) for k, v in record.items()} # type: ignore
)
setter(node, "__fields_set__", {"definition", "record"})
if cache is not None:
cache[key] = node
return node


Expand Down Expand Up @@ -367,6 +379,16 @@ def from_simple(
if universe is None:
# this is for mypy
raise ValueError("Unable to determine a usable universe")
_recItems = simple.record.items()
# Type ignore because the ternary statement seems to confuse mypy
# based on conflicting inferred types of v.
key = (
simple.definition,
frozenset((k, v if not isinstance(v, list) else tuple(v)) for k, v in _recItems), # type: ignore
)
cache = PersistenceContextVars.dimensionRecords.get()
if cache is not None and (result := cache.get(key)) is not None:
return result

definition = DimensionElement.from_simple(simple.definition, universe=universe)

Expand All @@ -389,7 +411,10 @@ def from_simple(
if (hsh := "hash") in rec:
rec[hsh] = bytes.fromhex(rec[hsh].decode())

return _reconstructDimensionRecord(definition, rec)
dimRec = _reconstructDimensionRecord(definition, rec)
if cache is not None:
cache[key] = dimRec
return dimRec

to_json = to_json_pydantic
from_json: ClassVar = classmethod(from_json_pydantic)
Expand Down
Loading