Skip to content

Commit

Permalink
Merge pull request #1105 from lsst/tickets/DM-46776
Browse files Browse the repository at this point in the history
DM-46776: Add zip creation and ingest
  • Loading branch information
timj authored Oct 30, 2024
2 parents ef8b14b + c510c7f commit 2b0d39d
Show file tree
Hide file tree
Showing 33 changed files with 1,912 additions and 170 deletions.
7 changes: 7 additions & 0 deletions doc/changes/DM-46776.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
* Added ``Butler.retrieve_artifacts_zip`` and ``QuantumBackedButler.retrieve_artifacts_zip`` methods to retrieve the dataset artifacts and store them into a zip file.
* Added ``Butler.ingest_zip`` to ingest the contents of a Zip file.
* Added ``SerializedDatasetRefContainerV1`` class to allow a collection of ``DatasetRef`` to be serialized efficiently.
JSON serializations made using this class will be supported.
* Added ``--zip`` parameter to ``butler retrieve-artifacts``.
* Changed ``Butler.retrieveArtifacts`` to always write a JSON index file describing where the artifacts came from.
* Added a ``butler ingest-zip`` command-line tool for ingesting zip files created by ``butler retrieve-artifacts``.
52 changes: 52 additions & 0 deletions python/lsst/daf/butler/_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,39 @@ def find_dataset(
"""
raise NotImplementedError()

@abstractmethod
def retrieve_artifacts_zip(
self,
refs: Iterable[DatasetRef],
destination: ResourcePathExpression,
overwrite: bool = True,
) -> ResourcePath:
"""Retrieve artifacts from a Butler and place in ZIP file.
Parameters
----------
refs : `~collections.abc.Iterable` [ `DatasetRef` ]
The datasets to be included in the zip file.
destination : `lsst.resources.ResourcePathExpression`
Directory to write the new ZIP file. This directory will
also be used as a staging area for the datasets being downloaded
from the datastore.
overwrite : `bool`, optional
If `False` the output Zip will not be written if a file of the
same name is already present in ``destination``.
Returns
-------
zip_file : `lsst.resources.ResourcePath`
The path to the new ZIP file.
Raises
------
ValueError
Raised if there are no refs to retrieve.
"""
raise NotImplementedError()

@abstractmethod
def retrieveArtifacts(
self,
Expand Down Expand Up @@ -1202,6 +1235,25 @@ def ingest(
"""
raise NotImplementedError()

@abstractmethod
def ingest_zip(self, zip_file: ResourcePathExpression, transfer: str = "auto") -> None:
"""Ingest a Zip file into this butler.
The Zip file must have been created by `retrieve_artifacts_zip`.
Parameters
----------
zip_file : `lsst.resources.ResourcePathExpression`
Path to the Zip file.
transfer : `str`, optional
Method to use to transfer the Zip into the datastore.
Notes
-----
Run collections are created as needed.
"""
raise NotImplementedError()

@abstractmethod
def export(
self,
Expand Down
176 changes: 174 additions & 2 deletions python/lsst/daf/butler/_dataset_ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,26 @@
"DatasetIdGenEnum",
"DatasetRef",
"SerializedDatasetRef",
"SerializedDatasetRefContainerV1",
"SerializedDatasetRefContainers",
]

import enum
import logging
import sys
import uuid
from collections.abc import Iterable, Mapping
from typing import TYPE_CHECKING, Any, ClassVar, Literal, Protocol, TypeAlias, runtime_checkable
from typing import (
TYPE_CHECKING,
Annotated,
Any,
ClassVar,
Literal,
Protocol,
Self,
TypeAlias,
runtime_checkable,
)

import pydantic
from lsst.utils.classes import immutable
Expand All @@ -50,7 +63,13 @@
from ._dataset_type import DatasetType, SerializedDatasetType
from ._named import NamedKeyDict
from .datastore.stored_file_info import StoredDatastoreItemInfo
from .dimensions import DataCoordinate, DimensionGroup, DimensionUniverse, SerializedDataCoordinate
from .dimensions import (
DataCoordinate,
DimensionGroup,
DimensionUniverse,
SerializedDataCoordinate,
SerializedDataId,
)
from .json import from_json_pydantic, to_json_pydantic
from .persistence_context import PersistenceContextVars

Expand All @@ -63,6 +82,9 @@
DatasetDatastoreRecords: TypeAlias = Mapping[str, list[StoredDatastoreItemInfo]]


_LOG = logging.getLogger(__name__)


class AmbiguousDatasetError(Exception):
"""Raised when a `DatasetRef` is not resolved but should be.
Expand Down Expand Up @@ -864,3 +886,153 @@ class associated with the dataset type of the other ref can be
Cannot be changed after a `DatasetRef` is constructed.
"""


class MinimalistSerializableDatasetRef(pydantic.BaseModel):
"""Minimal information needed to define a DatasetRef.
The ID is not included and is presumed to be the key to a mapping
to this information.
"""

model_config = pydantic.ConfigDict(frozen=True)

dataset_type_name: str
"""Name of the dataset type."""

run: str
"""Name of the RUN collection."""

data_id: SerializedDataId
"""Data coordinate of this dataset."""


class SerializedDatasetRefContainer(pydantic.BaseModel):
"""Serializable model for a collection of DatasetRef.
Dimension records are not included.
"""

model_config = pydantic.ConfigDict(extra="allow", frozen=True)
container_version: str


class SerializedDatasetRefContainerV1(SerializedDatasetRefContainer):
"""Serializable model for a collection of DatasetRef.
Dimension records are not included.
"""

container_version: Literal["V1"] = "V1"

universe_version: int
"""Dimension universe version."""

universe_namespace: str
"""Dimension universe namespace."""

dataset_types: dict[str, SerializedDatasetType]
"""Dataset types indexed by their name."""

compact_refs: dict[uuid.UUID, MinimalistSerializableDatasetRef]
"""Minimal dataset ref information indexed by UUID."""

def __len__(self) -> int:
"""Return the number of datasets in the container."""
return len(self.compact_refs)

@classmethod
def from_refs(cls, refs: Iterable[DatasetRef]) -> Self:
"""Construct a serializable form from a list of `DatasetRef`.
Parameters
----------
refs : `~collections.abc.Iterable` [ `DatasetRef` ]
The datasets to include in the container.
"""
# The serialized DatasetRef contains a lot of duplicated information.
# We also want to drop dimension records and assume that the records
# are already in the registry.
universe: DimensionUniverse | None = None
dataset_types: dict[str, SerializedDatasetType] = {}
compact_refs: dict[uuid.UUID, MinimalistSerializableDatasetRef] = {}
for ref in refs:
simple_ref = ref.to_simple()
dataset_type = simple_ref.datasetType
assert dataset_type is not None # For mypy
if universe is None:
universe = ref.datasetType.dimensions.universe
if (name := dataset_type.name) not in dataset_types:
dataset_types[name] = dataset_type
data_id = simple_ref.dataId
assert data_id is not None # For mypy
compact_refs[simple_ref.id] = MinimalistSerializableDatasetRef(
dataset_type_name=name, run=simple_ref.run, data_id=data_id.dataId
)
if universe:
universe_version = universe.version
universe_namespace = universe.namespace
else:
# No refs so no universe.
universe_version = 0
universe_namespace = "unknown"
return cls(
universe_version=universe_version,
universe_namespace=universe_namespace,
dataset_types=dataset_types,
compact_refs=compact_refs,
)

def to_refs(self, universe: DimensionUniverse) -> list[DatasetRef]:
"""Construct the original `DatasetRef`.
Parameters
----------
universe : `DimensionUniverse`
The universe to use when constructing the `DatasetRef`.
Returns
-------
refs : `list` [ `DatasetRef` ]
The `DatasetRef` that were serialized.
"""
if not self.compact_refs:
return []

if universe.namespace != self.universe_namespace:
raise RuntimeError(
f"Can not convert to refs in universe {universe.namespace} that were created from "
f"universe {self.universe_namespace}"
)

if universe.version != self.universe_version:
_LOG.warning(
"Universe mismatch when attempting to reconstruct DatasetRef from serialized form. "
"Serialized with version %d but asked to use version %d.",
self.universe_version,
universe.version,
)

# Reconstruct the DatasetType objects.
dataset_types = {
name: DatasetType.from_simple(dtype, universe=universe)
for name, dtype in self.dataset_types.items()
}
refs: list[DatasetRef] = []
for id_, minimal in self.compact_refs.items():
simple_data_id = SerializedDataCoordinate(dataId=minimal.data_id)
data_id = DataCoordinate.from_simple(simple=simple_data_id, universe=universe)
ref = DatasetRef(
id=id_,
run=minimal.run,
datasetType=dataset_types[minimal.dataset_type_name],
dataId=data_id,
)
refs.append(ref)
return refs


SerializedDatasetRefContainers: TypeAlias = Annotated[
SerializedDatasetRefContainerV1,
pydantic.Field(discriminator="container_version"),
]
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/_dataset_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ def to_simple(self, minimal: bool = False) -> SerializedDatasetType:
"name": self.name,
"storageClass": self._storageClassName,
"isCalibration": self._isCalibration,
"dimensions": list(self._dimensions.names),
"dimensions": list(self._dimensions.required),
}

if self._parentStorageClassName is not None:
Expand Down
4 changes: 2 additions & 2 deletions python/lsst/daf/butler/_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,8 @@ def read(
# direct read from URI option and the contents of the Zip file must
# be extracted.
uri = self.file_descriptor.location.uri
if uri.fragment and uri.fragment.startswith("zip-path="):
_, path_in_zip = uri.fragment.split("=")
if uri.fragment and uri.unquoted_fragment.startswith("zip-path="):
_, _, path_in_zip = uri.unquoted_fragment.partition("=")

# Open the Zip file using ResourcePath.
with uri.open("rb") as fd:
Expand Down
35 changes: 34 additions & 1 deletion python/lsst/daf/butler/_quantum_backed.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from typing import TYPE_CHECKING, Any

import pydantic
from lsst.resources import ResourcePathExpression
from lsst.resources import ResourcePath, ResourcePathExpression

from ._butler_config import ButlerConfig
from ._config import Config
Expand All @@ -51,6 +51,7 @@
from ._storage_class import StorageClass, StorageClassFactory
from .datastore import Datastore
from .datastore.record_data import DatastoreRecordData, SerializedDatastoreRecordData
from .datastores.file_datastore.retrieve_artifacts import retrieve_and_zip
from .dimensions import DimensionUniverse
from .registry.bridge.monolithic import MonolithicDatastoreRegistryBridgeManager
from .registry.databases.sqlite import SqliteDatabase
Expand Down Expand Up @@ -498,6 +499,38 @@ def pruneDatasets(
# Point of no return for removing artifacts
self._datastore.emptyTrash()

def retrieve_artifacts_zip(
self,
refs: Iterable[DatasetRef],
destination: ResourcePathExpression,
overwrite: bool = True,
) -> ResourcePath:
"""Retrieve artifacts from the graph and place in ZIP file.
Parameters
----------
refs : `~collections.abc.Iterable` [ `DatasetRef` ]
The datasets to be included in the zip file.
destination : `lsst.resources.ResourcePathExpression`
Directory to write the new ZIP file. This directory will
also be used as a staging area for the datasets being downloaded
from the datastore.
overwrite : `bool`, optional
If `False` the output Zip will not be written if a file of the
same name is already present in ``destination``.
Returns
-------
zip_file : `lsst.resources.ResourcePath`
The path to the new ZIP file.
Raises
------
ValueError
Raised if there are no refs to retrieve.
"""
return retrieve_and_zip(refs, destination, self._datastore.retrieveArtifacts, overwrite)

def extract_provenance_data(self) -> QuantumProvenanceData:
"""Extract provenance information and datastore records from this
butler.
Expand Down
2 changes: 2 additions & 0 deletions python/lsst/daf/butler/cli/cmd/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"config_validate",
"export_calibs",
"ingest_files",
"ingest_zip",
"prune_datasets",
"query_collections",
"query_data_ids",
Expand All @@ -61,6 +62,7 @@
create,
export_calibs,
ingest_files,
ingest_zip,
prune_datasets,
query_collections,
query_data_ids,
Expand Down
Loading

0 comments on commit 2b0d39d

Please sign in to comment.