Skip to content

Commit

Permalink
Add ButlerFactory
Browse files Browse the repository at this point in the history
Add a factory class for creating multiple Butler instances for multiple repositories in the repository index.  This is intended for use in long-lived multi-threaded services that will instantiate a new Butler for each end-user HTTP request.
  • Loading branch information
dhirving committed Dec 26, 2023
1 parent 873dba5 commit dfdc339
Show file tree
Hide file tree
Showing 5 changed files with 283 additions and 0 deletions.
1 change: 1 addition & 0 deletions python/lsst/daf/butler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from . import ddl, time_utils
from ._butler import *
from ._butler_config import *
from ._butler_factory import *
from ._butler_repo_index import *
from ._column_categorization import *
from ._column_tags import *
Expand Down
160 changes: 160 additions & 0 deletions python/lsst/daf/butler/_butler_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
# This file is part of daf_butler.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This software is dual licensed under the GNU General Public License and also
# under a 3-clause BSD license. Recipients may choose which of these licenses
# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
# respectively. If you choose the GPL option then the following text applies
# (but note that there is still no warranty even if you opt for BSD instead):
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

__all__ = ("ButlerFactory",)

from typing import Callable, Mapping

from lsst.resources import ResourcePathExpression

from ._butler import Butler
from ._butler_config import ButlerConfig, ButlerType
from ._butler_repo_index import ButlerRepoIndex
from ._utilities.named_locks import NamedLocks
from ._utilities.thread_safe_cache import ThreadSafeCache

_FactoryFunction = Callable[[str | None], Butler]
"""Function that takes an access token string or `None`, and returns a Butler
instance."""


class ButlerFactory:
"""Factory for efficiently instantiating Butler instances from the
repository index file. This is intended for use from long-lived services
that want to instantiate a separate Butler instance for each end user
request.
Parameters
----------
repositories : `Mapping`[`str`, `str`], optional
Keys are arbitrary labels, and values are URIs to Butler configuration
files. If not provided, defaults to the global repository index
configured by the ``DAF_BUTLER_REPOSITORY_INDEX`` environment variable
-- see `ButlerRepoIndex`.
Notes
-----
For each label in the repository index, caches shared state to allow fast
instantiation of new instances.
Instance methods on this class are threadsafe. A single instance of
ButlerFactory can be shared between multiple threads.
"""

def __init__(self, repositories: Mapping[str, str] | None = None) -> None:
if repositories is None:
self._repositories = None
else:
self._repositories = dict(repositories)

self._factories = ThreadSafeCache[str, _FactoryFunction]()
self._initialization_locks = NamedLocks()

def create_butler(self, *, label: str, access_token: str | None) -> Butler:
"""Create a Butler instance.
Parameters
----------
label : `str`
Label of the repository to instantiate, from the ``repositories``
parameter to the `ButlerFactory` constructor or the global
repository index file.
access_token : `str` | `None`
Gafaelfawr access token used to authenticate to a Butler server.
This is required for any repositories configured to use
`RemoteButler`. If you only use `DirectButler`, this may be
`None`.
Notes
-----
For a service making requests on behalf of end users, the access token
should normally be a "delegated" token so that access permissions are
based on the end user instead of the service. See
https://gafaelfawr.lsst.io/user-guide/gafaelfawringress.html#requesting-delegated-tokens
"""
factory = self._get_or_create_butler_factory_function(label)
return factory(access_token)

def _get_or_create_butler_factory_function(self, label: str) -> _FactoryFunction:
# We maintain a separate lock per label. We only want to instantiate
# one factory function per label, because creating the factory sets up
# shared state that should only exist once per repository. However, we
# don't want other repositories' instance creation to block on one
# repository that is slow to initialize.
with self._initialization_locks.lock(label):
if (factory := self._factories.get(label)) is not None:
return factory

factory = self._create_butler_factory_function(label)
return self._factories.set_or_get(label, factory)

def _create_butler_factory_function(self, label: str) -> _FactoryFunction:
config_uri = self._get_config_uri(label)
config = ButlerConfig(config_uri)
butler_type = config.get_butler_type()

if butler_type == ButlerType.DIRECT:
return _create_direct_butler_factory(config)
elif butler_type == ButlerType.REMOTE:
return _create_remote_butler_factory(config)

def _get_config_uri(self, label: str) -> ResourcePathExpression:
if self._repositories is None:
return ButlerRepoIndex.get_repo_uri(label)
else:
return self._repositories[label]


def _create_direct_butler_factory(config: ButlerConfig) -> _FactoryFunction:
butler = Butler.from_config(config)

def create_butler(access_token: str | None) -> Butler:
# Access token is ignored because DirectButler does not use Gafaelfawr
# authentication.

# TODO DM-42317: This is not actually safe in its current form, because
# clone returns an object that has non-thread-safe mutable state shared
# between the original and cloned instance.
# However, current services are already sharing a single global
# non-cloned Butler instance, so this isn't making things worse than
# they already are.
return butler._clone()

return create_butler


def _create_remote_butler_factory(config: ButlerConfig) -> _FactoryFunction:
import lsst.daf.butler.remote_butler

factory = lsst.daf.butler.remote_butler.RemoteButlerFactory.create_factory_from_config(config)

def create_butler(access_token: str | None) -> Butler:
if access_token is None:
raise ValueError("Access token is required to connect to a Butler server")
return factory.create_butler_for_access_token(access_token)

return create_butler
61 changes: 61 additions & 0 deletions python/lsst/daf/butler/_utilities/named_locks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# This file is part of daf_butler.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This software is dual licensed under the GNU General Public License and also
# under a 3-clause BSD license. Recipients may choose which of these licenses
# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
# respectively. If you choose the GPL option then the following text applies
# (but note that there is still no warranty even if you opt for BSD instead):
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from contextlib import contextmanager
from threading import Lock
from typing import Iterator


class NamedLocks:
"""Maintains a collection of separate mutex locks, indexed by name."""

def __init__(self) -> None:
self._lookup_lock = Lock()
self._named_locks = dict[str, Lock]()

@contextmanager
def lock(self, name: str) -> Iterator[None]:
"""Return a context manager that acquires a mutex lock when entered and
releases it when exited.
Parameters
----------
name : `str`
The name of the lock. A separate lock instance is created for each
distinct name.
"""
with self._get_lock(name):
yield

def _get_lock(self, name: str) -> Lock:
with self._lookup_lock:
lock = self._named_locks.get(name, None)
if lock is None:
lock = Lock()
self._named_locks[name] = lock

return lock
55 changes: 55 additions & 0 deletions tests/test_named_locks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# This file is part of daf_butler.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This software is dual licensed under the GNU General Public License and also
# under a 3-clause BSD license. Recipients may choose which of these licenses
# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
# respectively. If you choose the GPL option then the following text applies
# (but note that there is still no warranty even if you opt for BSD instead):
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import unittest

from lsst.daf.butler._utilities.named_locks import NamedLocks


class NamedLocksTestCase(unittest.TestCase):
"""Test NamedLocks."""

def test_named_locks(self):
locks = NamedLocks()
lock1 = locks._get_lock("a")
lock2 = locks._get_lock("b")
lock3 = locks._get_lock("a")

self.assertIs(lock1, lock3)
self.assertIsNot(lock1, lock2)

self.assertFalse(lock1.locked())
self.assertFalse(lock2.locked())
with locks.lock("a"):
self.assertTrue(lock1.locked())
self.assertFalse(lock2.locked())
self.assertFalse(lock1.locked())
self.assertFalse(lock2.locked())


if __name__ == "__main__":
unittest.main()
6 changes: 6 additions & 0 deletions tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

from lsst.daf.butler import (
Butler,
ButlerFactory,
DataCoordinate,
DatasetRef,
MissingDatasetTypeError,
Expand Down Expand Up @@ -252,8 +253,13 @@ def override_read(http_resource_path):
collections=["collection1", "collection2"],
run="collection2",
)
butler_factory = ButlerFactory({"server": "https://test.example/api/butler"})
factory_created_butler = butler_factory.create_butler(label="server", access_token="token")
self.assertIsInstance(butler, RemoteButler)
self.assertIsInstance(factory_created_butler, RemoteButler)
self.assertEqual(butler._server_url, "https://test.example/api/butler/")
self.assertEqual(factory_created_butler._server_url, "https://test.example/api/butler/")

self.assertEqual(butler.collections, ("collection1", "collection2"))
self.assertEqual(butler.run, "collection2")

Expand Down

0 comments on commit dfdc339

Please sign in to comment.