Skip to content

Commit

Permalink
Drop bridge use by InMemoryDatastore.
Browse files Browse the repository at this point in the history
InMemoryDatastore used ephemeral bridge which uses in-memory data to track
references. Copied its trivial logic into InMemoryDatastore, in reality only
trash-tracking data was added, as InMemoryDatastore already tracks datasets
itself.

Dropped few abstract methods from GenericDatastore, they are not used in the
same form by subclasses. The `removeStoredItemInfo` method is defined in two
subclasses, it is used by `test_butler` but not needed otherwise.
  • Loading branch information
andy-slac committed Sep 29, 2023
1 parent e2494f6 commit 87a138c
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 198 deletions.
77 changes: 74 additions & 3 deletions python/lsst/daf/butler/datastores/fileDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,21 @@ def addStoredItemInfo(
infos: Iterable[StoredFileInfo],
insert_mode: DatabaseInsertMode = DatabaseInsertMode.INSERT,
) -> None:
# Docstring inherited from GenericBaseDatastore
"""Record internal storage information associated with one or more
datasets.
Parameters
----------
refs : sequence of `DatasetRef`
The datasets that have been stored.
infos : sequence of `StoredDatastoreItemInfo`
Metadata associated with the stored datasets.
insert_mode : `~lsst.daf.butler.registry.interfaces.DatabaseInsertMode`
Mode to use to insert the new records into the table. The
options are ``INSERT`` (error if pre-existing), ``REPLACE``
(replace content with new values), and ``ENSURE`` (skip if the row
already exists).
"""
records = [
info.rebase(ref).to_record(dataset_id=ref.id) for ref, info in zip(refs, infos, strict=True)
]
Expand All @@ -391,8 +405,22 @@ def addStoredItemInfo(
raise ValueError(f"Unknown insert mode of '{insert_mode}'")

def getStoredItemsInfo(self, ref: DatasetIdRef) -> list[StoredFileInfo]:
# Docstring inherited from GenericBaseDatastore
"""Retrieve information associated with files stored in this
`Datastore` associated with this dataset ref.
Parameters
----------
ref : `DatasetRef`
The dataset that is to be queried.
Returns
-------
items : `~collections.abc.Iterable` [`StoredDatastoreItemInfo`]
Stored information about the files and associated formatters
associated with this dataset. Only one file will be returned
if the dataset has not been disassembled. Can return an empty
list if no matching datasets can be found.
"""
# Try to get them from the ref first.
if ref.datastore_records is not None:
if (ref_records := ref.datastore_records.get(self._table.name)) is not None:
Expand All @@ -407,6 +435,44 @@ def getStoredItemsInfo(self, ref: DatasetIdRef) -> list[StoredFileInfo]:
records = self._table.fetch(dataset_id=ref.id)
return [StoredFileInfo.from_record(record) for record in records]

def _register_datasets(
self,
refsAndInfos: Iterable[tuple[DatasetRef, StoredFileInfo]],
insert_mode: DatabaseInsertMode = DatabaseInsertMode.INSERT,
) -> None:
"""Update registry to indicate that one or more datasets have been
stored.
Parameters
----------
refsAndInfos : sequence `tuple` [`DatasetRef`,
`StoredDatastoreItemInfo`]
Datasets to register and the internal datastore metadata associated
with them.
insert_mode : `str`, optional
Indicate whether the new records should be new ("insert", default),
or allowed to exists ("ensure") or be replaced if already present
("replace").
"""
expandedRefs: list[DatasetRef] = []
expandedItemInfos: list[StoredFileInfo] = []

for ref, itemInfo in refsAndInfos:
expandedRefs.append(ref)
expandedItemInfos.append(itemInfo)

# Dataset location only cares about registry ID so if we have
# disassembled in datastore we have to deduplicate. Since they
# will have different datasetTypes we can't use a set
registryRefs = {r.id: r for r in expandedRefs}
if insert_mode == DatabaseInsertMode.INSERT:
self.bridge.insert(registryRefs.values())
else:
# There are only two columns and all that matters is the
# dataset ID.
self.bridge.ensure(registryRefs.values())
self.addStoredItemInfo(expandedRefs, expandedItemInfos, insert_mode=insert_mode)

def _get_stored_records_associated_with_refs(
self, refs: Iterable[DatasetIdRef]
) -> dict[DatasetId, list[StoredFileInfo]]:
Expand Down Expand Up @@ -485,8 +551,13 @@ def _registered_refs_per_artifact(self, pathInStore: ResourcePath) -> set[Datase
return ids

def removeStoredItemInfo(self, ref: DatasetIdRef) -> None:
# Docstring inherited from GenericBaseDatastore
"""Remove information about the file associated with this dataset.
Parameters
----------
ref : `DatasetRef`
The dataset that has been removed.
"""
# Note that this method is actually not used by this implementation,
# we depend on bridge to delete opaque records. But there are some
# tests that check that this method works, so we keep it for now.
Expand Down
108 changes: 1 addition & 107 deletions python/lsst/daf/butler/datastores/genericDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,10 @@
__all__ = ("GenericBaseDatastore",)

import logging
from abc import abstractmethod
from collections.abc import Iterable, Mapping
from collections.abc import Mapping
from typing import TYPE_CHECKING, Any, Generic, TypeVar

from lsst.daf.butler import DatasetTypeNotSupportedError, Datastore, StoredDatastoreItemInfo
from lsst.daf.butler.registry.interfaces import DatastoreRegistryBridge

from ..registry.interfaces import DatabaseInsertMode

if TYPE_CHECKING:
from lsst.daf.butler import DatasetRef, StorageClass
Expand All @@ -55,107 +51,6 @@ class GenericBaseDatastore(Datastore, Generic[_InfoType]):
Should always be sub-classed since key abstract methods are missing.
"""

@property
@abstractmethod
def bridge(self) -> DatastoreRegistryBridge:
"""Object that manages the interface between this `Datastore` and the
`Registry` (`DatastoreRegistryBridge`).
"""
raise NotImplementedError()

@abstractmethod
def addStoredItemInfo(
self,
refs: Iterable[DatasetRef],
infos: Iterable[Any],
insert_mode: DatabaseInsertMode = DatabaseInsertMode.INSERT,
) -> None:
"""Record internal storage information associated with one or more
datasets.
Parameters
----------
refs : sequence of `DatasetRef`
The datasets that have been stored.
infos : sequence of `StoredDatastoreItemInfo`
Metadata associated with the stored datasets.
insert_mode : `~lsst.daf.butler.registry.interfaces.DatabaseInsertMode`
Mode to use to insert the new records into the table. The
options are ``INSERT`` (error if pre-existing), ``REPLACE``
(replace content with new values), and ``ENSURE`` (skip if the row
already exists).
"""
raise NotImplementedError()

@abstractmethod
def getStoredItemsInfo(self, ref: DatasetRef) -> Iterable[_InfoType]:
"""Retrieve information associated with files stored in this
`Datastore` associated with this dataset ref.
Parameters
----------
ref : `DatasetRef`
The dataset that is to be queried.
Returns
-------
items : `~collections.abc.Iterable` [`StoredDatastoreItemInfo`]
Stored information about the files and associated formatters
associated with this dataset. Only one file will be returned
if the dataset has not been disassembled. Can return an empty
list if no matching datasets can be found.
"""
raise NotImplementedError()

@abstractmethod
def removeStoredItemInfo(self, ref: DatasetRef) -> None:
"""Remove information about the file associated with this dataset.
Parameters
----------
ref : `DatasetRef`
The dataset that has been removed.
"""
raise NotImplementedError()

def _register_datasets(
self,
refsAndInfos: Iterable[tuple[DatasetRef, StoredDatastoreItemInfo]],
insert_mode: DatabaseInsertMode = DatabaseInsertMode.INSERT,
) -> None:
"""Update registry to indicate that one or more datasets have been
stored.
Parameters
----------
refsAndInfos : sequence `tuple` [`DatasetRef`,
`StoredDatastoreItemInfo`]
Datasets to register and the internal datastore metadata associated
with them.
insert_mode : `str`, optional
Indicate whether the new records should be new ("insert", default),
or allowed to exists ("ensure") or be replaced if already present
("replace").
"""
expandedRefs: list[DatasetRef] = []
expandedItemInfos = []

for ref, itemInfo in refsAndInfos:
expandedRefs.append(ref)
expandedItemInfos.append(itemInfo)

# Dataset location only cares about registry ID so if we have
# disassembled in datastore we have to deduplicate. Since they
# will have different datasetTypes we can't use a set
registryRefs = {r.id: r for r in expandedRefs}
if insert_mode == DatabaseInsertMode.INSERT:
self.bridge.insert(registryRefs.values())
else:
# There are only two columns and all that matters is the
# dataset ID.
self.bridge.ensure(registryRefs.values())
self.addStoredItemInfo(expandedRefs, expandedItemInfos, insert_mode=insert_mode)

def _post_process_get(
self,
inMemoryDataset: object,
Expand Down Expand Up @@ -271,7 +166,6 @@ def transfer(self, inputDatastore: Datastore, ref: DatasetRef) -> None:
The external `Datastore` from which to retreive the Dataset.
ref : `DatasetRef`
Reference to the required dataset in the input data store.
"""
assert inputDatastore is not self # unless we want it for renames?
inMemoryDataset = inputDatastore.get(ref)
Expand Down
Loading

0 comments on commit 87a138c

Please sign in to comment.