From 7db78a9f2ab3f0d0826a4772502ef8939513c7b2 Mon Sep 17 00:00:00 2001 From: Tim Jenness Date: Thu, 15 Jun 2023 16:23:35 -0700 Subject: [PATCH] Use a dataclass to specify cores and memory This gives more flexibility for the future and makes it easier for this information to be added to ctrl_mpexec APIs. --- python/lsst/pipe/base/_quantumContext.py | 52 ++++++++++++++++-------- tests/test_pipelineTask.py | 20 +++++---- 2 files changed, 46 insertions(+), 26 deletions(-) diff --git a/python/lsst/pipe/base/_quantumContext.py b/python/lsst/pipe/base/_quantumContext.py index d925a522..5f0740f5 100644 --- a/python/lsst/pipe/base/_quantumContext.py +++ b/python/lsst/pipe/base/_quantumContext.py @@ -24,9 +24,11 @@ """Module defining a butler like object specialized to a specific quantum. """ -__all__ = ("ButlerQuantumContext", "QuantumContext") +__all__ = ("ButlerQuantumContext", "ExecutionResources", "QuantumContext") +import numbers from collections.abc import Sequence +from dataclasses import dataclass from typing import Any import astropy.units as u @@ -41,6 +43,30 @@ _LOG = getLogger(__name__) +@dataclass(frozen=True, kw_only=True) +class ExecutionResources: + """A description of the resources available to a running quantum.""" + + num_cores: int = 1 + """The maximum number of cores that the task can use.""" + + max_mem: u.Quantity | numbers.Real | None = None + """If defined, the amount of memory allocated to the task. If a plain + integer is given it is assumed to be the number of bytes and converted + to a `~astropy.units.Quantity`. + """ + + def __post_init__(self) -> None: + # Normalize max_mem to bytes. + max_mem = self.max_mem + if max_mem is not None: + if isinstance(max_mem, numbers.Real): + max_mem *= u.B + else: + max_mem = max_mem.to(u.B) + object.__setattr__(self, "max_mem", max_mem) + + class QuantumContext: """A Butler-like class specialized for a single quantum along with context information that can influence how the task is executed. @@ -52,11 +78,8 @@ class QuantumContext: quantum : `lsst.daf.butler.core.Quantum` Quantum object that describes the datasets which will be get/put by a single execution of this node in the pipeline graph. - num_cores : `int`, optional - The maximum number of cores that the task can use. - max_mem : `astropy.units.Quantity`, `int`, or `None`, optional - If defined, the amount of memory allocated to the task. If a plain - integer is given it is assumed to be the number of bytes. + resources : `ExecutionResources` + The resources allocated for executing quanta. Notes ----- @@ -71,20 +94,15 @@ class QuantumContext: execution. """ + resources: ExecutionResources + def __init__( - self, butler: LimitedButler, quantum: Quantum, num_cores: int = 1, max_mem: u.Quantity | None = None + self, butler: LimitedButler, quantum: Quantum, *, resources: ExecutionResources | None = None ): self.quantum = quantum - self.num_cores = num_cores - - # Internally store as bytes. This will also ensure that the quantity - # given is convertible to bytes. - if max_mem is not None: - if isinstance(max_mem, int): - max_mem *= u.B - else: - max_mem = max_mem.to(u.B) - self.max_mem = max_mem + if resources is None: + resources = ExecutionResources() + self.resources = resources self.allInputs = set() self.allOutputs = set() diff --git a/tests/test_pipelineTask.py b/tests/test_pipelineTask.py index 14298f63..dee2d7b1 100644 --- a/tests/test_pipelineTask.py +++ b/tests/test_pipelineTask.py @@ -279,8 +279,8 @@ def testButlerQC(self): butler.put(100, ref) butlerQC = pipeBase.QuantumContext(butler, quantum) - self.assertEqual(butlerQC.num_cores, 1) - self.assertIsNone(butlerQC.max_mem) + self.assertEqual(butlerQC.resources.num_cores, 1) + self.assertIsNone(butlerQC.resources.max_mem) # Pass ref as single argument or a list. obj = butlerQC.get(ref) @@ -310,16 +310,18 @@ def testButlerQC(self): self.assertEqual(obj, {"input": [None, 100], "input2": None}) # Set additional context. - butlerQC = pipeBase.QuantumContext(butler, quantum, num_cores=4, max_mem=5 * u.MB) - self.assertEqual(butlerQC.num_cores, 4) - self.assertEqual(butlerQC.max_mem, 5_000_000 * u.B) + resources = pipeBase.ExecutionResources(num_cores=4, max_mem=5 * u.MB) + butlerQC = pipeBase.QuantumContext(butler, quantum, resources=resources) + self.assertEqual(butlerQC.resources.num_cores, 4) + self.assertEqual(butlerQC.resources.max_mem, 5_000_000 * u.B) - butlerQC = pipeBase.QuantumContext(butler, quantum, max_mem=5) - self.assertEqual(butlerQC.num_cores, 1) - self.assertEqual(butlerQC.max_mem, 5 * u.B) + resources = pipeBase.ExecutionResources(max_mem=5) + butlerQC = pipeBase.QuantumContext(butler, quantum, resources=resources) + self.assertEqual(butlerQC.resources.num_cores, 1) + self.assertEqual(butlerQC.resources.max_mem, 5 * u.B) with self.assertRaises(u.UnitConversionError): - pipeBase.QuantumContext(butler, quantum, max_mem=1 * u.m) + pipeBase.ExecutionResources(max_mem=1 * u.m) class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase):