From 7bbc94e0d61da2a4ce4de6a1285c4cc0e4ba67f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stanis=C5=82aw=20Malinowski?= <56644812+stan-dot@users.noreply.github.com> Date: Thu, 7 Mar 2024 15:11:26 +0000 Subject: [PATCH] remove RunPlan class that wraps Task (#382) quick renaming 36 refs and delete the class --- docs/user/reference/openapi.yaml | 34 +++++++++++------------ src/blueapi/cli/cli.py | 4 +-- src/blueapi/cli/rest.py | 10 +++---- src/blueapi/service/handler.py | 8 +++--- src/blueapi/service/handler_base.py | 4 +-- src/blueapi/service/main.py | 6 ++-- src/blueapi/service/subprocess_handler.py | 6 ++-- src/blueapi/worker/__init__.py | 7 ++--- src/blueapi/worker/reworker.py | 2 +- src/blueapi/worker/task.py | 10 ------- tests/service/test_rest_api.py | 4 +-- tests/service/test_subprocess_handler.py | 12 ++++---- tests/worker/test_reworker.py | 19 ++++++------- 13 files changed, 53 insertions(+), 73 deletions(-) diff --git a/docs/user/reference/openapi.yaml b/docs/user/reference/openapi.yaml index fa51219c6..6922e0bc9 100644 --- a/docs/user/reference/openapi.yaml +++ b/docs/user/reference/openapi.yaml @@ -88,22 +88,6 @@ components: - plans title: PlanResponse type: object - RunPlan: - additionalProperties: false - description: Task that will run a plan - properties: - name: - description: Name of plan to run - title: Name - type: string - params: - description: Values for parameters to plan, if any - title: Params - type: object - required: - - name - title: RunPlan - type: object StateChangeRequest: additionalProperties: false description: Request to change the state of the worker. @@ -123,6 +107,22 @@ components: - new_state title: StateChangeRequest type: object + Task: + additionalProperties: false + description: Task that will run a plan + properties: + name: + description: Name of plan to run + title: Name + type: string + params: + description: Values for parameters to plan, if any + title: Params + type: object + required: + - name + title: Task + type: object TaskResponse: additionalProperties: false description: Acknowledgement that a task has started, includes its ID @@ -319,7 +319,7 @@ paths: detectors: - x schema: - $ref: '#/components/schemas/RunPlan' + $ref: '#/components/schemas/Task' required: true responses: '201': diff --git a/src/blueapi/cli/cli.py b/src/blueapi/cli/cli.py index 3e2e458c9..c8410e9c1 100644 --- a/src/blueapi/cli/cli.py +++ b/src/blueapi/cli/cli.py @@ -23,7 +23,7 @@ print_schema_as_yaml, write_schema_as_yaml, ) -from blueapi.worker import ProgressEvent, RunPlan, WorkerEvent, WorkerState +from blueapi.worker import ProgressEvent, Task, WorkerEvent, WorkerState from .rest import BlueapiRestClient @@ -182,7 +182,7 @@ def store_finished_event(event: WorkerEvent) -> None: finished_event.append(event) parameters = parameters or "{}" - task = RunPlan(name=name, params=json.loads(parameters)) + task = Task(name=name, params=json.loads(parameters)) resp = client.create_task(task) task_id = resp.task_id diff --git a/src/blueapi/cli/rest.py b/src/blueapi/cli/rest.py index 7c4eeef81..674688ea4 100644 --- a/src/blueapi/cli/rest.py +++ b/src/blueapi/cli/rest.py @@ -12,7 +12,7 @@ TaskResponse, WorkerTask, ) -from blueapi.worker import RunPlan, TrackableTask, WorkerState +from blueapi.worker import Task, TrackableTask, WorkerState from .amq import BlueskyRemoteError @@ -56,15 +56,13 @@ def set_state( data={"new_state": state, "defer": defer}, ) - def get_task(self, task_id: str) -> TrackableTask[RunPlan]: - return self._request_and_deserialize( - f"/tasks/{task_id}", TrackableTask[RunPlan] - ) + def get_task(self, task_id: str) -> TrackableTask[Task]: + return self._request_and_deserialize(f"/tasks/{task_id}", TrackableTask[Task]) def get_active_task(self) -> WorkerTask: return self._request_and_deserialize("/worker/task", WorkerTask) - def create_task(self, task: RunPlan) -> TaskResponse: + def create_task(self, task: Task) -> TaskResponse: return self._request_and_deserialize( "/tasks", TaskResponse, diff --git a/src/blueapi/service/handler.py b/src/blueapi/service/handler.py index bab1eba34..e080a522b 100644 --- a/src/blueapi/service/handler.py +++ b/src/blueapi/service/handler.py @@ -16,8 +16,8 @@ from blueapi.service.handler_base import BlueskyHandler from blueapi.service.model import DeviceModel, PlanModel, WorkerTask from blueapi.worker.event import WorkerState -from blueapi.worker.reworker import RunEngineWorker -from blueapi.worker.task import RunPlan +from blueapi.worker.reworker import TaskWorker +from blueapi.worker.task import Task from blueapi.worker.worker import TrackableTask, Worker LOGGER = logging.getLogger(__name__) @@ -42,7 +42,7 @@ def __init__( self._context.with_config(self._config.env) - self._worker = worker or RunEngineWorker( + self._worker = worker or TaskWorker( self._context, broadcast_statuses=self._config.env.events.broadcast_status_events, ) @@ -102,7 +102,7 @@ def devices(self) -> List[DeviceModel]: def get_device(self, name: str) -> DeviceModel: return DeviceModel.from_device(self._context.devices[name]) - def submit_task(self, task: RunPlan) -> str: + def submit_task(self, task: Task) -> str: return self._worker.submit_task(task) def clear_pending_task(self, task_id: str) -> str: diff --git a/src/blueapi/service/handler_base.py b/src/blueapi/service/handler_base.py index 8ca15f1a8..00e215c58 100644 --- a/src/blueapi/service/handler_base.py +++ b/src/blueapi/service/handler_base.py @@ -3,7 +3,7 @@ from blueapi.service.model import DeviceModel, PlanModel, WorkerTask from blueapi.worker.event import WorkerState -from blueapi.worker.task import RunPlan +from blueapi.worker.task import Task from blueapi.worker.worker import TrackableTask @@ -37,7 +37,7 @@ def get_device(self, name: str) -> DeviceModel: """ @abstractmethod - def submit_task(self, task: RunPlan) -> str: + def submit_task(self, task: Task) -> str: """ Submit a task to be run on begin_task """ diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index d02dfc2f1..94a83ff5d 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -16,7 +16,7 @@ from super_state_machine.errors import TransitionError from blueapi.config import ApplicationConfig -from blueapi.worker import RunPlan, TrackableTask, WorkerState +from blueapi.worker import Task, TrackableTask, WorkerState from .handler_base import BlueskyHandler from .model import ( @@ -141,9 +141,7 @@ def get_device_by_name(name: str, handler: BlueskyHandler = Depends(get_handler) def submit_task( request: Request, response: Response, - task: RunPlan = Body( - ..., example=RunPlan(name="count", params={"detectors": ["x"]}) - ), + task: Task = Body(..., example=Task(name="count", params={"detectors": ["x"]})), handler: BlueskyHandler = Depends(get_handler), ): """Submit a task to the worker.""" diff --git a/src/blueapi/service/subprocess_handler.py b/src/blueapi/service/subprocess_handler.py index 151d7cd9a..2fc47f027 100644 --- a/src/blueapi/service/subprocess_handler.py +++ b/src/blueapi/service/subprocess_handler.py @@ -9,7 +9,7 @@ from blueapi.service.handler_base import BlueskyHandler, HandlerNotStartedError from blueapi.service.model import DeviceModel, PlanModel, WorkerTask from blueapi.worker.event import WorkerState -from blueapi.worker.task import RunPlan +from blueapi.worker.task import Task from blueapi.worker.worker import TrackableTask set_start_method("spawn", force=True) @@ -78,7 +78,7 @@ def devices(self) -> List[DeviceModel]: def get_device(self, name: str) -> DeviceModel: return self._run_in_subprocess(get_device, [name]) - def submit_task(self, task: RunPlan) -> str: + def submit_task(self, task: Task) -> str: return self._run_in_subprocess(submit_task, [task]) def clear_pending_task(self, task_id: str) -> str: @@ -135,7 +135,7 @@ def get_device(name: str) -> DeviceModel: return get_handler().get_device(name) -def submit_task(task: RunPlan) -> str: +def submit_task(task: Task) -> str: return get_handler().submit_task(task) diff --git a/src/blueapi/worker/__init__.py b/src/blueapi/worker/__init__.py index 78309230e..a94a59ff7 100644 --- a/src/blueapi/worker/__init__.py +++ b/src/blueapi/worker/__init__.py @@ -1,16 +1,15 @@ from .event import ProgressEvent, StatusView, TaskStatus, WorkerEvent, WorkerState from .multithread import run_worker_in_own_thread -from .reworker import RunEngineWorker -from .task import RunPlan, Task +from .reworker import TaskWorker +from .task import Task from .worker import TrackableTask, Worker from .worker_busy_error import WorkerBusyError __all__ = [ "run_worker_in_own_thread", - "RunEngineWorker", + "TaskWorker", "Task", "Worker", - "RunPlan", "WorkerEvent", "WorkerState", "StatusView", diff --git a/src/blueapi/worker/reworker.py b/src/blueapi/worker/reworker.py index 659d6c5bb..5cbec04de 100644 --- a/src/blueapi/worker/reworker.py +++ b/src/blueapi/worker/reworker.py @@ -35,7 +35,7 @@ DEFAULT_START_STOP_TIMEOUT: float = 30.0 -class RunEngineWorker(Worker[Task]): +class TaskWorker(Worker[Task]): """ Worker wrapping BlueskyContext that can work in its own thread/process diff --git a/src/blueapi/worker/task.py b/src/blueapi/worker/task.py index 84f5a21cd..6adbbe1db 100644 --- a/src/blueapi/worker/task.py +++ b/src/blueapi/worker/task.py @@ -32,16 +32,6 @@ def do_task(self, ctx: BlueskyContext) -> None: ctx.run_engine(wrapped_plan_generator) -# Here for backward compatibility pending -# https://github.com/DiamondLightSource/blueapi/issues/253 -class RunPlan(Task): - """ - Task that will run a plan - """ - - ... - - def _lookup_params(ctx: BlueskyContext, task: Task) -> BaseModel: """ Checks plan parameters against context diff --git a/tests/service/test_rest_api.py b/tests/service/test_rest_api.py index ea3aa3deb..56c4b99b8 100644 --- a/tests/service/test_rest_api.py +++ b/tests/service/test_rest_api.py @@ -13,10 +13,10 @@ from blueapi.service.handler import Handler from blueapi.service.main import get_handler, setup_handler, teardown_handler from blueapi.service.model import WorkerTask -from blueapi.worker.task import RunPlan +from blueapi.worker.task import Task from src.blueapi.worker import WorkerState -_TASK = RunPlan(name="count", params={"detectors": ["x"]}) +_TASK = Task(name="count", params={"detectors": ["x"]}) def test_get_plans(handler: Handler, client: TestClient) -> None: diff --git a/tests/service/test_subprocess_handler.py b/tests/service/test_subprocess_handler.py index 15fdcdab4..816b5a8ab 100644 --- a/tests/service/test_subprocess_handler.py +++ b/tests/service/test_subprocess_handler.py @@ -7,7 +7,7 @@ from blueapi.service.model import DeviceModel, PlanModel, WorkerTask from blueapi.service.subprocess_handler import SubprocessHandler from blueapi.worker.event import WorkerState -from blueapi.worker.task import RunPlan +from blueapi.worker.task import Task from blueapi.worker.worker import TrackableTask @@ -63,7 +63,7 @@ def devices(self) -> List[DeviceModel]: def get_device(self, name: str) -> DeviceModel: return DeviceModel(name="device1", protocols=[]) - def submit_task(self, task: RunPlan) -> str: + def submit_task(self, task: Task) -> str: return "0" def clear_pending_task(self, task_id: str) -> str: @@ -89,9 +89,7 @@ def cancel_active_task(self, failure: bool, reason: Optional[str]) -> None: ... @property def pending_tasks(self) -> List[TrackableTask]: return [ - TrackableTask( - task_id="abc", task=RunPlan(name="sleep", params={"time": 0.0}) - ) + TrackableTask(task_id="abc", task=Task(name="sleep", params={"time": 0.0})) ] def get_pending_task(self, task_id: str) -> Optional[TrackableTask]: @@ -137,8 +135,8 @@ def run_in_same_process(func, args=None): assert sp_handler.get_device("name") == dummy_handler.get_device("name") assert sp_handler.submit_task( - RunPlan(name="sleep", params={"time": 0.0}) - ) == dummy_handler.submit_task(RunPlan(name="sleep", params={"time": 0.0})) + Task(name="sleep", params={"time": 0.0}) + ) == dummy_handler.submit_task(Task(name="sleep", params={"time": 0.0})) assert sp_handler.clear_pending_task("task_id") == dummy_handler.clear_pending_task( "task_id" diff --git a/tests/worker/test_reworker.py b/tests/worker/test_reworker.py index 6dad9b132..edfe3a8b2 100644 --- a/tests/worker/test_reworker.py +++ b/tests/worker/test_reworker.py @@ -10,10 +10,9 @@ from blueapi.core.bluesky_types import DataEvent from blueapi.worker import ( ProgressEvent, - RunEngineWorker, - RunPlan, Task, TaskStatus, + TaskWorker, TrackableTask, Worker, WorkerBusyError, @@ -21,13 +20,13 @@ WorkerState, ) -_SIMPLE_TASK = RunPlan(name="sleep", params={"time": 0.0}) -_LONG_TASK = RunPlan(name="sleep", params={"time": 1.0}) -_INDEFINITE_TASK = RunPlan( +_SIMPLE_TASK = Task(name="sleep", params={"time": 0.0}) +_LONG_TASK = Task(name="sleep", params={"time": 1.0}) +_INDEFINITE_TASK = Task( name="set_absolute", params={"movable": "fake_device", "value": 4.0}, ) -_FAILING_TASK = RunPlan(name="failing_plan", params={}) +_FAILING_TASK = Task(name="failing_plan", params={}) class FakeDevice: @@ -69,7 +68,7 @@ def context(fake_device: FakeDevice) -> BlueskyContext: @pytest.fixture def inert_worker(context: BlueskyContext) -> Worker[Task]: - return RunEngineWorker(context, start_stop_timeout=2.0) + return TaskWorker(context, start_stop_timeout=2.0) @pytest.fixture @@ -259,9 +258,7 @@ def test_no_additional_progress_events_after_complete(worker: Worker): progress_events: List[ProgressEvent] = [] worker.progress_events.subscribe(lambda event, id: progress_events.append(event)) - task: Task = RunPlan( - name="move", params={"moves": {"additional_status_device": 5.0}} - ) + task: Task = Task(name="move", params={"moves": {"additional_status_device": 5.0}}) task_id = worker.submit_task(task) begin_task_and_wait_until_complete(worker, task_id) @@ -344,7 +341,7 @@ def test_worker_and_data_events_produce_in_order(worker: Worker) -> None: def assert_running_count_plan_produces_ordered_worker_and_data_events( expected_events: List[Union[WorkerEvent, DataEvent]], worker: Worker, - task: Task = RunPlan(name="count", params={"detectors": ["image_det"], "num": 1}), + task: Task = Task(name="count", params={"detectors": ["image_det"], "num": 1}), timeout: float = 5.0, ) -> None: event_streams: List[EventStream[Any, int]] = [