Skip to content

Commit

Permalink
Merge pull request #880 from lsst/tickets/DM-40395
Browse files Browse the repository at this point in the history
DM-40395: Allow datastore record inserts to use ensure or replace
  • Loading branch information
timj authored Aug 18, 2023
2 parents 3fdd9d0 + 93ab5d6 commit 60b7293
Show file tree
Hide file tree
Showing 13 changed files with 239 additions and 18 deletions.
14 changes: 11 additions & 3 deletions python/lsst/daf/butler/_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1914,7 +1914,7 @@ def ingest(
*datasets: FileDataset,
transfer: str | None = "auto",
run: str | None = None,
idGenerationMode: DatasetIdGenEnum = DatasetIdGenEnum.UNIQUE,
idGenerationMode: DatasetIdGenEnum | None = None,
record_validation_info: bool = True,
) -> None:
"""Store and register one or more datasets that already exist on disk.
Expand All @@ -1941,8 +1941,8 @@ def ingest(
overriding ``self.run``. This parameter is now deprecated since
the run is encoded in the ``FileDataset``.
idGenerationMode : `DatasetIdGenEnum`, optional
Specifies option for generating dataset IDs. By default unique IDs
are generated for each inserted dataset.
Specifies option for generating dataset IDs. Parameter is
deprecated.
record_validation_info : `bool`, optional
If `True`, the default, the datastore can record validation
information associated with the file. If `False` the datastore
Expand Down Expand Up @@ -1984,6 +1984,14 @@ def ingest(
if not datasets:
return

if idGenerationMode is not None:
warnings.warn(
"The idGenerationMode parameter is no longer used and is ignored. "
" Will be removed after v26.0",
FutureWarning,
stacklevel=2,
)

progress = Progress("lsst.daf.butler.Butler.ingest", level=logging.DEBUG)

# We need to reorganize all the inputs so that they are grouped
Expand Down
49 changes: 43 additions & 6 deletions python/lsst/daf/butler/datastores/fileDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
from lsst.utils.timer import time_this
from sqlalchemy import BigInteger, String

from ..registry.interfaces import FakeDatasetRef
from ..registry.interfaces import DatabaseInsertMode, FakeDatasetRef
from .genericDatastore import GenericBaseDatastore

if TYPE_CHECKING:
Expand Down Expand Up @@ -365,10 +365,23 @@ def _delete_artifact(self, location: Location) -> None:
raise
log.debug("Successfully deleted file: %s", location.uri)

def addStoredItemInfo(self, refs: Iterable[DatasetRef], infos: Iterable[StoredFileInfo]) -> None:
def addStoredItemInfo(
self,
refs: Iterable[DatasetRef],
infos: Iterable[StoredFileInfo],
insert_mode: DatabaseInsertMode = DatabaseInsertMode.INSERT,
) -> None:
# Docstring inherited from GenericBaseDatastore
records = [info.rebase(ref).to_record() for ref, info in zip(refs, infos, strict=True)]
self._table.insert(*records, transaction=self._transaction)
match insert_mode:
case DatabaseInsertMode.INSERT:
self._table.insert(*records, transaction=self._transaction)
case DatabaseInsertMode.ENSURE:
self._table.ensure(*records, transaction=self._transaction)
case DatabaseInsertMode.REPLACE:
self._table.replace(*records, transaction=self._transaction)
case _:
raise ValueError(f"Unknown insert mode of '{insert_mode}'")

def getStoredItemsInfo(self, ref: DatasetIdRef) -> list[StoredFileInfo]:
# Docstring inherited from GenericBaseDatastore
Expand Down Expand Up @@ -1036,7 +1049,26 @@ def _finishIngest(
record_validation_info=record_validation_info,
)
refsAndInfos.extend([(ref, info) for ref in dataset.refs])
self._register_datasets(refsAndInfos)

# In direct mode we can allow repeated ingests of the same thing
# if we are sure that the external dataset is immutable. We use
# UUIDv5 to indicate this. If there is a mix of v4 and v5 they are
# separated.
refs_and_infos_replace = []
refs_and_infos_insert = []
if transfer == "direct":
for entry in refsAndInfos:
if entry[0].id.version == 5:
refs_and_infos_replace.append(entry)
else:
refs_and_infos_insert.append(entry)
else:
refs_and_infos_insert = refsAndInfos

if refs_and_infos_insert:
self._register_datasets(refs_and_infos_insert, insert_mode=DatabaseInsertMode.INSERT)
if refs_and_infos_replace:
self._register_datasets(refs_and_infos_replace, insert_mode=DatabaseInsertMode.REPLACE)

def _calculate_ingested_datastore_name(
self,
Expand Down Expand Up @@ -2305,7 +2337,7 @@ def put(self, inMemoryDataset: Any, ref: DatasetRef) -> None:
storedInfo = self._write_in_memory_to_artifact(inMemoryDataset, ref)
artifacts.append((ref, storedInfo))

self._register_datasets(artifacts)
self._register_datasets(artifacts, insert_mode=DatabaseInsertMode.INSERT)

@transactional
def trash(self, ref: DatasetRef | Iterable[DatasetRef], ignore_errors: bool = True) -> None:
Expand Down Expand Up @@ -2729,7 +2761,12 @@ def transfer_from(
"" if len(direct_transfers) == 1 else "s",
)

self._register_datasets(artifacts)
# We are overwriting previous datasets that may have already
# existed. We therefore should ensure that we force the
# datastore records to agree. Note that this can potentially lead
# to difficulties if the dataset has previously been ingested
# disassembled and is somehow now assembled, or vice versa.
self._register_datasets(artifacts, insert_mode=DatabaseInsertMode.REPLACE)

if already_present:
n_skipped = len(already_present)
Expand Down
33 changes: 29 additions & 4 deletions python/lsst/daf/butler/datastores/genericDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
from lsst.daf.butler import DatasetTypeNotSupportedError, Datastore
from lsst.daf.butler.registry.interfaces import DatastoreRegistryBridge

from ..registry.interfaces import DatabaseInsertMode

if TYPE_CHECKING:
from lsst.daf.butler import DatasetRef, StorageClass, StoredDatastoreItemInfo

Expand All @@ -54,7 +56,12 @@ def bridge(self) -> DatastoreRegistryBridge:
raise NotImplementedError()

@abstractmethod
def addStoredItemInfo(self, refs: Iterable[DatasetRef], infos: Iterable[Any]) -> None:
def addStoredItemInfo(
self,
refs: Iterable[DatasetRef],
infos: Iterable[Any],
insert_mode: DatabaseInsertMode = DatabaseInsertMode.INSERT,
) -> None:
"""Record internal storage information associated with one or more
datasets.
Expand All @@ -64,6 +71,11 @@ def addStoredItemInfo(self, refs: Iterable[DatasetRef], infos: Iterable[Any]) ->
The datasets that have been stored.
infos : sequence of `StoredDatastoreItemInfo`
Metadata associated with the stored datasets.
insert_mode : `~lsst.daf.butler.registry.interfaces.DatabaseInsertMode`
Mode to use to insert the new records into the table. The
options are ``INSERT`` (error if pre-existing), ``REPLACE``
(replace content with new values), and ``ENSURE`` (skip if the row
already exists).
"""
raise NotImplementedError()

Expand Down Expand Up @@ -98,7 +110,11 @@ def removeStoredItemInfo(self, ref: DatasetRef) -> None:
"""
raise NotImplementedError()

def _register_datasets(self, refsAndInfos: Iterable[tuple[DatasetRef, StoredDatastoreItemInfo]]) -> None:
def _register_datasets(
self,
refsAndInfos: Iterable[tuple[DatasetRef, StoredDatastoreItemInfo]],
insert_mode: DatabaseInsertMode = DatabaseInsertMode.INSERT,
) -> None:
"""Update registry to indicate that one or more datasets have been
stored.
Expand All @@ -108,6 +124,10 @@ def _register_datasets(self, refsAndInfos: Iterable[tuple[DatasetRef, StoredData
`StoredDatastoreItemInfo`]
Datasets to register and the internal datastore metadata associated
with them.
insert_mode : `str`, optional
Indicate whether the new records should be new ("insert", default),
or allowed to exists ("ensure") or be replaced if already present
("replace").
"""
expandedRefs: list[DatasetRef] = []
expandedItemInfos = []
Expand All @@ -120,8 +140,13 @@ def _register_datasets(self, refsAndInfos: Iterable[tuple[DatasetRef, StoredData
# disassembled in datastore we have to deduplicate. Since they
# will have different datasetTypes we can't use a set
registryRefs = {r.id: r for r in expandedRefs}
self.bridge.insert(registryRefs.values())
self.addStoredItemInfo(expandedRefs, expandedItemInfos)
if insert_mode == DatabaseInsertMode.INSERT:
self.bridge.insert(registryRefs.values())
else:
# There are only two columns and all that matters is the
# dataset ID.
self.bridge.ensure(registryRefs.values())
self.addStoredItemInfo(expandedRefs, expandedItemInfos, insert_mode=insert_mode)

def _post_process_get(
self,
Expand Down
8 changes: 7 additions & 1 deletion python/lsst/daf/butler/datastores/inMemoryDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from lsst.daf.butler.registry.interfaces import DatastoreRegistryBridge
from lsst.resources import ResourcePath

from ..registry.interfaces import DatabaseInsertMode
from .genericDatastore import GenericBaseDatastore

if TYPE_CHECKING:
Expand Down Expand Up @@ -178,7 +179,12 @@ def bridge(self) -> DatastoreRegistryBridge:
# Docstring inherited from GenericBaseDatastore.
return self._bridge

def addStoredItemInfo(self, refs: Iterable[DatasetRef], infos: Iterable[StoredMemoryItemInfo]) -> None:
def addStoredItemInfo(
self,
refs: Iterable[DatasetRef],
infos: Iterable[StoredMemoryItemInfo],
insert_mode: DatabaseInsertMode = DatabaseInsertMode.INSERT,
) -> None:
# Docstring inherited from GenericBaseDatastore.
for ref, info in zip(refs, infos, strict=True):
self.records[ref.id] = info
Expand Down
4 changes: 4 additions & 0 deletions python/lsst/daf/butler/registry/bridge/ephemeral.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ def insert(self, refs: Iterable[DatasetIdRef]) -> None:
# Docstring inherited from DatastoreRegistryBridge
self._datasetIds.update(ref.id for ref in refs)

def ensure(self, refs: Iterable[DatasetIdRef]) -> None:
# Docstring inherited from DatastoreRegistryBridge
self._datasetIds.update(ref.id for ref in refs)

def forget(self, refs: Iterable[DatasetIdRef]) -> None:
self._datasetIds.difference_update(ref.id for ref in refs)

Expand Down
4 changes: 4 additions & 0 deletions python/lsst/daf/butler/registry/bridge/monolithic.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ def _refsToRows(self, refs: Iterable[DatasetIdRef]) -> list[dict]:
"""
return [{"datastore_name": self.datastoreName, "dataset_id": ref.id} for ref in refs]

def ensure(self, refs: Iterable[DatasetIdRef]) -> None:
# Docstring inherited from DatastoreRegistryBridge
self._db.ensure(self._tables.dataset_location, *self._refsToRows(refs))

def insert(self, refs: Iterable[DatasetIdRef]) -> None:
# Docstring inherited from DatastoreRegistryBridge
self._db.insert(self._tables.dataset_location, *self._refsToRows(refs))
Expand Down
12 changes: 12 additions & 0 deletions python/lsst/daf/butler/registry/interfaces/_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,18 @@ def insert(self, refs: Iterable[DatasetIdRef]) -> None:
"""
raise NotImplementedError()

@abstractmethod
def ensure(self, refs: Iterable[DatasetIdRef]) -> None:
"""Record that a datastore holds the given datasets, skipping if
the ref is already registered.
Parameters
----------
refs : `~collections.abc.Iterable` of `DatasetIdRef`
References to the datasets.
"""
raise NotImplementedError()

@abstractmethod
def forget(self, refs: Iterable[DatasetIdRef]) -> None:
"""Remove dataset location information without any attempt to put it
Expand Down
15 changes: 15 additions & 0 deletions python/lsst/daf/butler/registry/interfaces/_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
"Database",
"ReadOnlyDatabaseError",
"DatabaseConflictError",
"DatabaseInsertMode",
"SchemaAlreadyDefinedError",
"StaticTablesContext",
]

import enum
import uuid
import warnings
from abc import ABC, abstractmethod
Expand All @@ -44,6 +46,19 @@
from .._exceptions import ConflictingDefinitionError


class DatabaseInsertMode(enum.Enum):
"""Mode options available for inserting database records."""

INSERT = enum.auto()
"""Insert records, failing if they already exist."""

REPLACE = enum.auto()
"""Replace records, overwriting existing."""

ENSURE = enum.auto()
"""Insert records, skipping any that already exist."""


# TODO: method is called with list[ReflectedColumn] in SA 2, and
# ReflectedColumn does not exist in 1.4.
def _checkExistingTableDefinition(name: str, spec: ddl.TableSpec, inspection: list) -> None:
Expand Down
35 changes: 35 additions & 0 deletions python/lsst/daf/butler/registry/interfaces/_opaque.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,41 @@ def insert(self, *data: dict, transaction: DatastoreTransaction | None = None) -
"""
raise NotImplementedError()

@abstractmethod
def ensure(self, *data: dict, transaction: DatastoreTransaction | None = None) -> None:
"""Insert records into the table, skipping rows that already exist.
Parameters
----------
*data
Each additional positional argument is a dictionary that represents
a single row to be added.
transaction : `DatastoreTransaction`, optional
Transaction object that can be used to enable an explicit rollback
of the insert to be registered. Can be ignored if rollback is
handled via a different mechanism, such as by a database. Can be
`None` if no external transaction is available.
"""
raise NotImplementedError()

@abstractmethod
def replace(self, *data: dict, transaction: DatastoreTransaction | None = None) -> None:
"""Insert records into the table, replacing if previously existing
but different.
Parameters
----------
*data
Each additional positional argument is a dictionary that represents
a single row to be added.
transaction : `DatastoreTransaction`, optional
Transaction object that can be used to enable an explicit rollback
of the insert to be registered. Can be ignored if rollback is
handled via a different mechanism, such as by a database. Can be
`None` if no external transaction is available.
"""
raise NotImplementedError()

@abstractmethod
def fetch(self, **where: Any) -> Iterator[Mapping[Any, Any]]:
"""Retrieve records from an opaque table.
Expand Down
12 changes: 12 additions & 0 deletions python/lsst/daf/butler/registry/opaque.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,18 @@ def insert(self, *data: dict, transaction: DatastoreTransaction | None = None) -
# the database itself providing any rollback functionality.
self._db.insert(self._table, *data)

def ensure(self, *data: dict, transaction: DatastoreTransaction | None = None) -> None:
# Docstring inherited from OpaqueTableStorage.
# The provided transaction object can be ignored since we rely on
# the database itself providing any rollback functionality.
self._db.ensure(self._table, *data)

def replace(self, *data: dict, transaction: DatastoreTransaction | None = None) -> None:
# Docstring inherited from OpaqueTableStorage.
# The provided transaction object can be ignored since we rely on
# the database itself providing any rollback functionality.
self._db.replace(self._table, *data)

def fetch(self, **where: Any) -> Iterator[sqlalchemy.RowMapping]:
# Docstring inherited from OpaqueTableStorage.

Expand Down
Loading

0 comments on commit 60b7293

Please sign in to comment.