Skip to content

Commit

Permalink
Merge pull request #1040 from lsst/tickets/DM-44843
Browse files Browse the repository at this point in the history
DM-44843: Improve performance of collection resolution in the new query system
  • Loading branch information
andy-slac authored Jul 23, 2024
2 parents 8dd8362 + 5ee4d64 commit fd5547a
Show file tree
Hide file tree
Showing 8 changed files with 359 additions and 118 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ on:
jobs:
docker:
runs-on: ubuntu-latest
timeout-minutes: 20
timeout-minutes: 30
permissions:
contents: read
packages: write
Expand Down
114 changes: 71 additions & 43 deletions python/lsst/daf/butler/direct_query_driver/_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
__all__ = ("DirectQueryDriver",)

import dataclasses
import itertools
import logging
import sys
from collections.abc import Iterable, Iterator, Mapping, Set
Expand Down Expand Up @@ -64,6 +65,7 @@
from ..registry import CollectionSummary, CollectionType, NoDefaultCollectionError
from ..registry.interfaces import ChainedCollectionRecord, CollectionRecord
from ..registry.managers import RegistryManagerInstances
from ..registry.wildcards import CollectionWildcard
from ._postprocessing import Postprocessing
from ._query_builder import QueryBuilder, QueryJoiner
from ._query_plan import (
Expand Down Expand Up @@ -920,23 +922,87 @@ def _analyze_query_tree(self, tree: qt.QueryTree) -> tuple[QueryJoinsPlan, Query
)
# Add data coordinate uploads.
result.data_coordinate_uploads.update(tree.data_coordinate_uploads)
# Retrieve collection information for all collections in a tree.
collection_names = set(
itertools.chain.from_iterable(
dataset_search.collections for dataset_search in tree.datasets.values()
)
)
collection_records = {
record.name: record
for record in self.managers.collections.resolve_wildcard(
CollectionWildcard.from_names(collection_names), flatten_chains=True, include_chains=True
)
}
non_chain_records = [
record for record in collection_records.values() if record.type is not CollectionType.CHAINED
]
# Fetch summaries for a subset of dataset types.
dataset_types = [self.get_dataset_type(dataset_type_name) for dataset_type_name in tree.datasets]
summaries = self.managers.datasets.fetch_summaries(non_chain_records, dataset_types)
# Add dataset_searches and filter out collections that don't have the
# right dataset type or governor dimensions.
for dataset_type_name, dataset_search in tree.datasets.items():
collection_summaries = self._filter_collections(
dataset_search.collections, collection_records, summaries
)
resolved_dataset_search = self._resolve_dataset_search(
dataset_type_name, dataset_search, result.constraint_data_id
dataset_type_name, dataset_search, result.constraint_data_id, collection_summaries
)
result.datasets[dataset_type_name] = resolved_dataset_search
if not resolved_dataset_search.collection_records:
result.messages.append(f"Search for dataset type {dataset_type_name!r} is doomed to fail.")
result.messages.extend(resolved_dataset_search.messages)
return result, builder

def _filter_collections(
self,
collection_names: Iterable[str],
records: Mapping[str, CollectionRecord],
summaries: Mapping[Any, CollectionSummary],
) -> list[tuple[CollectionRecord, CollectionSummary]]:
"""Return a subset of collection records and summaries ordered
according to the input collection list.
Parameters
----------
collection_names : `~collections.abc.Iterable` [`str`]
List of collection names.
records : `~collections.abc.Mapping` [`str`, `CollectionRecord`]
Mapping of collection names to collection records, must contain
records for all collections in ``collection_names`` and all their
children collections.
summaries : `~collections.abc.Mapping` [`Any`, `CollectionSummary`]
Mapping of collection IDs to collection summaries, must contain
summaries for all non-chained collections in the collection tree.
Returns
-------
result `list` [`tuple` [`CollectionRecord`, `CollectionSummary`]]
Sequence of collection records and their corresponding summaries
ordered according to the order of input collections and their
child collections. Does not include chained collections.
"""
done: set[str] = set()

def recurse(names: Iterable[str]) -> Iterator[tuple[CollectionRecord, CollectionSummary]]:
for name in names:
if name not in done:
done.add(name)
record = records[name]
if record.type is CollectionType.CHAINED:
yield from recurse(cast(ChainedCollectionRecord, record).children)
else:
yield record, summaries[record.key]

return list(recurse(collection_names))

def _resolve_dataset_search(
self,
dataset_type_name: str,
dataset_search: qt.DatasetSearch,
constraint_data_id: Mapping[str, DataIdValue],
collections: list[tuple[CollectionRecord, CollectionSummary]],
) -> ResolvedDatasetSearch:
"""Resolve the collections that should actually be searched for
datasets of a particular type.
Expand All @@ -950,6 +1016,10 @@ def _resolve_dataset_search(
constraint_data_id : `~collections.abc.Mapping`
Data ID mapping derived from the query predicate that may be used
to filter out some collections based on their governor dimensions.
collections : `list` [ `tuple` [ \
`.registry.interfaces.CollectionRecord`, \
`.registry.CollectionSummary` ] ]
Tuples of collection record and summary.
Returns
-------
Expand All @@ -958,7 +1028,6 @@ def _resolve_dataset_search(
and resolved collection records.
"""
result = ResolvedDatasetSearch(dataset_type_name, dataset_search.dimensions)
collections = self._resolve_collection_path(dataset_search.collections)
if not collections:
result.messages.append("No datasets can be found because collection list is empty.")
for collection_record, collection_summary in collections:
Expand Down Expand Up @@ -989,47 +1058,6 @@ def _resolve_dataset_search(
)
return result

def _resolve_collection_path(
self, collections: Iterable[str]
) -> list[tuple[CollectionRecord, CollectionSummary]]:
"""Expand an ordered iterable of collection names into a list of
collection records and summaries.
Parameters
----------
collections : `~collections.abc.Iterable` [ `str` ]
Ordered iterable of collections.
Returns
-------
resolved : `list` [ `tuple` [ `.registry.interfaces.CollectionRecord`,\
`.registry.CollectionSummary` ] ]
Tuples of collection record and summary. `~CollectionType.CHAINED`
collections are flattened out and not included.
"""
result: list[tuple[CollectionRecord, CollectionSummary]] = []
done: set[str] = set()

# Eventually we really want this recursive Python code to be replaced
# by a recursive SQL query, especially if we extend this method to
# support collection glob patterns to support public APIs we don't yet
# have in the new query system (but will need to add).

def recurse(collection_names: Iterable[str]) -> None:
for collection_name in collection_names:
if collection_name not in done:
done.add(collection_name)
record = self.managers.collections.find(collection_name)

if record.type is CollectionType.CHAINED:
recurse(cast(ChainedCollectionRecord, record).children)
else:
result.append((record, self.managers.datasets.getCollectionSummary(record)))

recurse(collections)

return result

def _join_materialization(
self,
joiner: QueryJoiner,
Expand Down
Loading

0 comments on commit fd5547a

Please sign in to comment.