Skip to content

Commit

Permalink
remove RunPlan class that wraps Task (#382)
Browse files Browse the repository at this point in the history
quick renaming 36 refs and delete the class
  • Loading branch information
stan-dot authored Mar 7, 2024
1 parent 6e85e3d commit 7bbc94e
Show file tree
Hide file tree
Showing 13 changed files with 53 additions and 73 deletions.
34 changes: 17 additions & 17 deletions docs/user/reference/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,22 +88,6 @@ components:
- plans
title: PlanResponse
type: object
RunPlan:
additionalProperties: false
description: Task that will run a plan
properties:
name:
description: Name of plan to run
title: Name
type: string
params:
description: Values for parameters to plan, if any
title: Params
type: object
required:
- name
title: RunPlan
type: object
StateChangeRequest:
additionalProperties: false
description: Request to change the state of the worker.
Expand All @@ -123,6 +107,22 @@ components:
- new_state
title: StateChangeRequest
type: object
Task:
additionalProperties: false
description: Task that will run a plan
properties:
name:
description: Name of plan to run
title: Name
type: string
params:
description: Values for parameters to plan, if any
title: Params
type: object
required:
- name
title: Task
type: object
TaskResponse:
additionalProperties: false
description: Acknowledgement that a task has started, includes its ID
Expand Down Expand Up @@ -319,7 +319,7 @@ paths:
detectors:
- x
schema:
$ref: '#/components/schemas/RunPlan'
$ref: '#/components/schemas/Task'
required: true
responses:
'201':
Expand Down
4 changes: 2 additions & 2 deletions src/blueapi/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
print_schema_as_yaml,
write_schema_as_yaml,
)
from blueapi.worker import ProgressEvent, RunPlan, WorkerEvent, WorkerState
from blueapi.worker import ProgressEvent, Task, WorkerEvent, WorkerState

from .rest import BlueapiRestClient

Expand Down Expand Up @@ -182,7 +182,7 @@ def store_finished_event(event: WorkerEvent) -> None:
finished_event.append(event)

parameters = parameters or "{}"
task = RunPlan(name=name, params=json.loads(parameters))
task = Task(name=name, params=json.loads(parameters))

resp = client.create_task(task)
task_id = resp.task_id
Expand Down
10 changes: 4 additions & 6 deletions src/blueapi/cli/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
TaskResponse,
WorkerTask,
)
from blueapi.worker import RunPlan, TrackableTask, WorkerState
from blueapi.worker import Task, TrackableTask, WorkerState

from .amq import BlueskyRemoteError

Expand Down Expand Up @@ -56,15 +56,13 @@ def set_state(
data={"new_state": state, "defer": defer},
)

def get_task(self, task_id: str) -> TrackableTask[RunPlan]:
return self._request_and_deserialize(
f"/tasks/{task_id}", TrackableTask[RunPlan]
)
def get_task(self, task_id: str) -> TrackableTask[Task]:
return self._request_and_deserialize(f"/tasks/{task_id}", TrackableTask[Task])

def get_active_task(self) -> WorkerTask:
return self._request_and_deserialize("/worker/task", WorkerTask)

def create_task(self, task: RunPlan) -> TaskResponse:
def create_task(self, task: Task) -> TaskResponse:
return self._request_and_deserialize(
"/tasks",
TaskResponse,
Expand Down
8 changes: 4 additions & 4 deletions src/blueapi/service/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
from blueapi.service.handler_base import BlueskyHandler
from blueapi.service.model import DeviceModel, PlanModel, WorkerTask
from blueapi.worker.event import WorkerState
from blueapi.worker.reworker import RunEngineWorker
from blueapi.worker.task import RunPlan
from blueapi.worker.reworker import TaskWorker
from blueapi.worker.task import Task
from blueapi.worker.worker import TrackableTask, Worker

LOGGER = logging.getLogger(__name__)
Expand All @@ -42,7 +42,7 @@ def __init__(

self._context.with_config(self._config.env)

self._worker = worker or RunEngineWorker(
self._worker = worker or TaskWorker(
self._context,
broadcast_statuses=self._config.env.events.broadcast_status_events,
)
Expand Down Expand Up @@ -102,7 +102,7 @@ def devices(self) -> List[DeviceModel]:
def get_device(self, name: str) -> DeviceModel:
return DeviceModel.from_device(self._context.devices[name])

def submit_task(self, task: RunPlan) -> str:
def submit_task(self, task: Task) -> str:
return self._worker.submit_task(task)

def clear_pending_task(self, task_id: str) -> str:
Expand Down
4 changes: 2 additions & 2 deletions src/blueapi/service/handler_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from blueapi.service.model import DeviceModel, PlanModel, WorkerTask
from blueapi.worker.event import WorkerState
from blueapi.worker.task import RunPlan
from blueapi.worker.task import Task
from blueapi.worker.worker import TrackableTask


Expand Down Expand Up @@ -37,7 +37,7 @@ def get_device(self, name: str) -> DeviceModel:
"""

@abstractmethod
def submit_task(self, task: RunPlan) -> str:
def submit_task(self, task: Task) -> str:
"""
Submit a task to be run on begin_task
"""
Expand Down
6 changes: 2 additions & 4 deletions src/blueapi/service/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from super_state_machine.errors import TransitionError

from blueapi.config import ApplicationConfig
from blueapi.worker import RunPlan, TrackableTask, WorkerState
from blueapi.worker import Task, TrackableTask, WorkerState

from .handler_base import BlueskyHandler
from .model import (
Expand Down Expand Up @@ -141,9 +141,7 @@ def get_device_by_name(name: str, handler: BlueskyHandler = Depends(get_handler)
def submit_task(
request: Request,
response: Response,
task: RunPlan = Body(
..., example=RunPlan(name="count", params={"detectors": ["x"]})
),
task: Task = Body(..., example=Task(name="count", params={"detectors": ["x"]})),
handler: BlueskyHandler = Depends(get_handler),
):
"""Submit a task to the worker."""
Expand Down
6 changes: 3 additions & 3 deletions src/blueapi/service/subprocess_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from blueapi.service.handler_base import BlueskyHandler, HandlerNotStartedError
from blueapi.service.model import DeviceModel, PlanModel, WorkerTask
from blueapi.worker.event import WorkerState
from blueapi.worker.task import RunPlan
from blueapi.worker.task import Task
from blueapi.worker.worker import TrackableTask

set_start_method("spawn", force=True)
Expand Down Expand Up @@ -78,7 +78,7 @@ def devices(self) -> List[DeviceModel]:
def get_device(self, name: str) -> DeviceModel:
return self._run_in_subprocess(get_device, [name])

def submit_task(self, task: RunPlan) -> str:
def submit_task(self, task: Task) -> str:
return self._run_in_subprocess(submit_task, [task])

def clear_pending_task(self, task_id: str) -> str:
Expand Down Expand Up @@ -135,7 +135,7 @@ def get_device(name: str) -> DeviceModel:
return get_handler().get_device(name)


def submit_task(task: RunPlan) -> str:
def submit_task(task: Task) -> str:
return get_handler().submit_task(task)


Expand Down
7 changes: 3 additions & 4 deletions src/blueapi/worker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
from .event import ProgressEvent, StatusView, TaskStatus, WorkerEvent, WorkerState
from .multithread import run_worker_in_own_thread
from .reworker import RunEngineWorker
from .task import RunPlan, Task
from .reworker import TaskWorker
from .task import Task
from .worker import TrackableTask, Worker
from .worker_busy_error import WorkerBusyError

__all__ = [
"run_worker_in_own_thread",
"RunEngineWorker",
"TaskWorker",
"Task",
"Worker",
"RunPlan",
"WorkerEvent",
"WorkerState",
"StatusView",
Expand Down
2 changes: 1 addition & 1 deletion src/blueapi/worker/reworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
DEFAULT_START_STOP_TIMEOUT: float = 30.0


class RunEngineWorker(Worker[Task]):
class TaskWorker(Worker[Task]):
"""
Worker wrapping BlueskyContext that can work in its own thread/process
Expand Down
10 changes: 0 additions & 10 deletions src/blueapi/worker/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,6 @@ def do_task(self, ctx: BlueskyContext) -> None:
ctx.run_engine(wrapped_plan_generator)


# Here for backward compatibility pending
# https://github.com/DiamondLightSource/blueapi/issues/253
class RunPlan(Task):
"""
Task that will run a plan
"""

...


def _lookup_params(ctx: BlueskyContext, task: Task) -> BaseModel:
"""
Checks plan parameters against context
Expand Down
4 changes: 2 additions & 2 deletions tests/service/test_rest_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
from blueapi.service.handler import Handler
from blueapi.service.main import get_handler, setup_handler, teardown_handler
from blueapi.service.model import WorkerTask
from blueapi.worker.task import RunPlan
from blueapi.worker.task import Task
from src.blueapi.worker import WorkerState

_TASK = RunPlan(name="count", params={"detectors": ["x"]})
_TASK = Task(name="count", params={"detectors": ["x"]})


def test_get_plans(handler: Handler, client: TestClient) -> None:
Expand Down
12 changes: 5 additions & 7 deletions tests/service/test_subprocess_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from blueapi.service.model import DeviceModel, PlanModel, WorkerTask
from blueapi.service.subprocess_handler import SubprocessHandler
from blueapi.worker.event import WorkerState
from blueapi.worker.task import RunPlan
from blueapi.worker.task import Task
from blueapi.worker.worker import TrackableTask


Expand Down Expand Up @@ -63,7 +63,7 @@ def devices(self) -> List[DeviceModel]:
def get_device(self, name: str) -> DeviceModel:
return DeviceModel(name="device1", protocols=[])

def submit_task(self, task: RunPlan) -> str:
def submit_task(self, task: Task) -> str:
return "0"

def clear_pending_task(self, task_id: str) -> str:
Expand All @@ -89,9 +89,7 @@ def cancel_active_task(self, failure: bool, reason: Optional[str]) -> None: ...
@property
def pending_tasks(self) -> List[TrackableTask]:
return [
TrackableTask(
task_id="abc", task=RunPlan(name="sleep", params={"time": 0.0})
)
TrackableTask(task_id="abc", task=Task(name="sleep", params={"time": 0.0}))
]

def get_pending_task(self, task_id: str) -> Optional[TrackableTask]:
Expand Down Expand Up @@ -137,8 +135,8 @@ def run_in_same_process(func, args=None):
assert sp_handler.get_device("name") == dummy_handler.get_device("name")

assert sp_handler.submit_task(
RunPlan(name="sleep", params={"time": 0.0})
) == dummy_handler.submit_task(RunPlan(name="sleep", params={"time": 0.0}))
Task(name="sleep", params={"time": 0.0})
) == dummy_handler.submit_task(Task(name="sleep", params={"time": 0.0}))

assert sp_handler.clear_pending_task("task_id") == dummy_handler.clear_pending_task(
"task_id"
Expand Down
19 changes: 8 additions & 11 deletions tests/worker/test_reworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,23 @@
from blueapi.core.bluesky_types import DataEvent
from blueapi.worker import (
ProgressEvent,
RunEngineWorker,
RunPlan,
Task,
TaskStatus,
TaskWorker,
TrackableTask,
Worker,
WorkerBusyError,
WorkerEvent,
WorkerState,
)

_SIMPLE_TASK = RunPlan(name="sleep", params={"time": 0.0})
_LONG_TASK = RunPlan(name="sleep", params={"time": 1.0})
_INDEFINITE_TASK = RunPlan(
_SIMPLE_TASK = Task(name="sleep", params={"time": 0.0})
_LONG_TASK = Task(name="sleep", params={"time": 1.0})
_INDEFINITE_TASK = Task(
name="set_absolute",
params={"movable": "fake_device", "value": 4.0},
)
_FAILING_TASK = RunPlan(name="failing_plan", params={})
_FAILING_TASK = Task(name="failing_plan", params={})


class FakeDevice:
Expand Down Expand Up @@ -69,7 +68,7 @@ def context(fake_device: FakeDevice) -> BlueskyContext:

@pytest.fixture
def inert_worker(context: BlueskyContext) -> Worker[Task]:
return RunEngineWorker(context, start_stop_timeout=2.0)
return TaskWorker(context, start_stop_timeout=2.0)


@pytest.fixture
Expand Down Expand Up @@ -259,9 +258,7 @@ def test_no_additional_progress_events_after_complete(worker: Worker):
progress_events: List[ProgressEvent] = []
worker.progress_events.subscribe(lambda event, id: progress_events.append(event))

task: Task = RunPlan(
name="move", params={"moves": {"additional_status_device": 5.0}}
)
task: Task = Task(name="move", params={"moves": {"additional_status_device": 5.0}})
task_id = worker.submit_task(task)
begin_task_and_wait_until_complete(worker, task_id)

Expand Down Expand Up @@ -344,7 +341,7 @@ def test_worker_and_data_events_produce_in_order(worker: Worker) -> None:
def assert_running_count_plan_produces_ordered_worker_and_data_events(
expected_events: List[Union[WorkerEvent, DataEvent]],
worker: Worker,
task: Task = RunPlan(name="count", params={"detectors": ["image_det"], "num": 1}),
task: Task = Task(name="count", params={"detectors": ["image_det"], "num": 1}),
timeout: float = 5.0,
) -> None:
event_streams: List[EventStream[Any, int]] = [
Expand Down

0 comments on commit 7bbc94e

Please sign in to comment.