Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

233 pending tasks are not cleared #386

Merged
merged 7 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/blueapi/cli/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def create_task(self, task: Task) -> TaskResponse:
data=task.dict(),
)

def clear_pending_task(self, task_id: str) -> TaskResponse:
def clear_task(self, task_id: str) -> TaskResponse:
return self._request_and_deserialize(
f"/tasks/{task_id}", TaskResponse, method="DELETE"
)
Expand Down
10 changes: 5 additions & 5 deletions src/blueapi/service/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def get_device(self, name: str) -> DeviceModel:
def submit_task(self, task: Task) -> str:
return self._worker.submit_task(task)

def clear_pending_task(self, task_id: str) -> str:
def clear_task(self, task_id: str) -> str:
return self._worker.clear_task(task_id)

def begin_task(self, task: WorkerTask) -> WorkerTask:
Expand All @@ -131,11 +131,11 @@ def cancel_active_task(self, failure: bool, reason: Optional[str]):
self._worker.cancel_active_task(failure, reason)

@property
def pending_tasks(self) -> List[TrackableTask]:
return self._worker.get_pending_tasks()
def tasks(self) -> List[TrackableTask]:
return self._worker.get_tasks()

def get_pending_task(self, task_id: str) -> Optional[TrackableTask]:
return self._worker.get_pending_task(task_id)
def get_task_by_id(self, task_id: str) -> Optional[TrackableTask]:
return self._worker.get_task_by_id(task_id)

@property
def initialized(self) -> bool:
Expand Down
12 changes: 6 additions & 6 deletions src/blueapi/service/handler_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ def submit_task(self, task: Task) -> str:
"""

@abstractmethod
def clear_pending_task(self, task_id: str) -> str:
"""Remove a pending task from the worker"""
def clear_task(self, task_id: str) -> str:
"""Remove a task from the worker"""

@abstractmethod
def begin_task(self, task: WorkerTask) -> WorkerTask:
"""Trigger a pending task. Will fail if the worker is busy"""
"""Trigger a task. Will fail if the worker is busy"""

@property
@abstractmethod
Expand All @@ -75,12 +75,12 @@ def cancel_active_task(self, failure: bool, reason: Optional[str]) -> None:

@property
@abstractmethod
def pending_tasks(self) -> List[TrackableTask]:
"""Return a list of all tasks pending on the worker,
def tasks(self) -> List[TrackableTask]:
"""Return a list of all tasks on the worker,
any one of which can be triggered with begin_task"""

@abstractmethod
def get_pending_task(self, task_id: str) -> Optional[TrackableTask]:
def get_task_by_id(self, task_id: str) -> Optional[TrackableTask]:
"""Returns a task matching the task ID supplied,
if the worker knows of it"""

Expand Down
8 changes: 4 additions & 4 deletions src/blueapi/service/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def delete_submitted_task(
task_id: str,
handler: BlueskyHandler = Depends(get_handler),
) -> TaskResponse:
return TaskResponse(task_id=handler.clear_pending_task(task_id))
return TaskResponse(task_id=handler.clear_task(task_id))


@app.put(
Expand Down Expand Up @@ -190,10 +190,10 @@ def get_task(
handler: BlueskyHandler = Depends(get_handler),
) -> TrackableTask:
"""Retrieve a task"""
pending = handler.get_pending_task(task_id)
if pending is None:
task = handler.get_task_by_id(task_id)
if task is None:
raise KeyError
return pending
return task


@app.get("/worker/task")
Expand Down
24 changes: 12 additions & 12 deletions src/blueapi/service/subprocess_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ def get_device(self, name: str) -> DeviceModel:
def submit_task(self, task: Task) -> str:
return self._run_in_subprocess(submit_task, [task])

def clear_pending_task(self, task_id: str) -> str:
return self._run_in_subprocess(clear_pending_task, [task_id])
def clear_task(self, task_id: str) -> str:
return self._run_in_subprocess(clear_task_by_id, [task_id])

def begin_task(self, task: WorkerTask) -> WorkerTask:
return self._run_in_subprocess(begin_task, [task])
Expand All @@ -105,11 +105,11 @@ def cancel_active_task(self, failure: bool, reason: Optional[str]) -> None:
return self._run_in_subprocess(cancel_active_task, [failure, reason])

@property
def pending_tasks(self) -> List[TrackableTask]:
return self._run_in_subprocess(pending_tasks)
def tasks(self) -> List[TrackableTask]:
return self._run_in_subprocess(tasks)

def get_pending_task(self, task_id: str) -> Optional[TrackableTask]:
return self._run_in_subprocess(get_pending_task, [task_id])
def get_task_by_id(self, task_id: str) -> Optional[TrackableTask]:
return self._run_in_subprocess(get_task_by_id, [task_id])

@property
def initialized(self) -> bool:
Expand Down Expand Up @@ -139,8 +139,8 @@ def submit_task(task: Task) -> str:
return get_handler().submit_task(task)


def clear_pending_task(task_id: str) -> str:
return get_handler().clear_pending_task(task_id)
def clear_task_by_id(task_id: str) -> str:
return get_handler().clear_task(task_id)


def begin_task(task: WorkerTask) -> WorkerTask:
Expand All @@ -167,9 +167,9 @@ def cancel_active_task(failure: bool, reason: Optional[str]) -> None:
return get_handler().cancel_active_task(failure, reason)


def pending_tasks() -> List[TrackableTask]:
return get_handler().pending_tasks
def tasks() -> List[TrackableTask]:
return get_handler().tasks


def get_pending_task(task_id: str) -> Optional[TrackableTask]:
return get_handler().get_pending_task(task_id)
def get_task_by_id(task_id: str) -> Optional[TrackableTask]:
return get_handler().get_task_by_id(task_id)
19 changes: 12 additions & 7 deletions src/blueapi/utils/ophyd_async_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,7 @@
t.cancel()
with suppress(Exception):
await t
e = t.exception()
msg += f"\n {tasks[t]}: {type(e).__name__}"
lines = str(e).splitlines()
if len(lines) <= 1:
msg += f": {e}"
else:
msg += "".join(f"\n {line}" for line in lines)
msg += format_error_message(tasks, t)

Check warning on line 33 in src/blueapi/utils/ophyd_async_connect.py

View check run for this annotation

Codecov / codecov/patch

src/blueapi/utils/ophyd_async_connect.py#L33

Added line #L33 was not covered by tests
logging.error(msg)
raised = [t for t in done if t.exception()]
if raised:
Expand All @@ -45,3 +39,14 @@
logging.exception(f" {tasks[t]}:", exc_info=t.exception())
if pending or raised:
raise NotConnected("Not all Devices connected")


def format_error_message(tasks: Dict[asyncio.Task, str], t: asyncio.Task) -> str:
stan-dot marked this conversation as resolved.
Show resolved Hide resolved
e = t.exception()
part_one = f"\n {tasks[t]}: {type(e).__name__}"
lines = str(e).splitlines()

part_two = (
f": {e}" if len(lines) <= 1 else "".join(f"\n {line}" for line in lines)
)
return part_one + part_two
18 changes: 9 additions & 9 deletions src/blueapi/worker/reworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class TaskWorker(Worker[Task]):
_ctx: BlueskyContext
_start_stop_timeout: float

_pending_tasks: Dict[str, TrackableTask]
_tasks: Dict[str, TrackableTask]

_state: WorkerState
_errors: List[str]
Expand All @@ -74,7 +74,7 @@ def __init__(
self._ctx = ctx
self._start_stop_timeout = start_stop_timeout

self._pending_tasks = {}
self._tasks = {}

self._state = WorkerState.from_bluesky_state(ctx.run_engine.state)
self._errors = []
Expand All @@ -94,7 +94,7 @@ def __init__(
self._broadcast_statuses = broadcast_statuses

def clear_task(self, task_id: str) -> str:
task = self._pending_tasks.pop(task_id)
task = self._tasks.pop(task_id)
return task.task_id

def cancel_active_task(
Expand All @@ -112,17 +112,17 @@ def cancel_active_task(
self._ctx.run_engine.stop()
return self._current.task_id

def get_pending_tasks(self) -> List[TrackableTask[Task]]:
return list(self._pending_tasks.values())
def get_tasks(self) -> List[TrackableTask[Task]]:
return list(self._tasks.values())

def get_pending_task(self, task_id: str) -> Optional[TrackableTask[Task]]:
return self._pending_tasks.get(task_id)
def get_task_by_id(self, task_id: str) -> Optional[TrackableTask[Task]]:
return self._tasks.get(task_id)

def get_active_task(self) -> Optional[TrackableTask[Task]]:
return self._current

def begin_task(self, task_id: str) -> None:
task = self._pending_tasks.get(task_id)
task = self._tasks.get(task_id)
if task is not None:
self._submit_trackable_task(task)
else:
Expand All @@ -132,7 +132,7 @@ def submit_task(self, task: Task) -> str:
task.prepare_params(self._ctx) # Will raise if parameters are invalid
task_id: str = str(uuid.uuid4())
trackable_task = TrackableTask(task_id=task_id, task=task)
self._pending_tasks[task_id] = trackable_task
self._tasks[task_id] = trackable_task
return task_id

def _submit_trackable_task(self, trackable_task: TrackableTask) -> None:
Expand Down
8 changes: 4 additions & 4 deletions src/blueapi/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,17 @@ class Worker(ABC, Generic[T]):
"""

@abstractmethod
def get_pending_tasks(self) -> List[TrackableTask[T]]:
def get_tasks(self) -> List[TrackableTask[T]]:
"""
Return a list of all tasks pending on the worker,
Return a list of all tasks on the worker,
any one of which can be triggered with begin_task.

Returns:
List[TrackableTask[T]]: List of task objects
"""

@abstractmethod
def get_pending_task(self, task_id: str) -> Optional[TrackableTask[T]]:
def get_task_by_id(self, task_id: str) -> Optional[TrackableTask[T]]:
"""
Returns a task matching the task ID supplied,
if the worker knows of it.
Expand All @@ -65,7 +65,7 @@ def get_active_task(self) -> Optional[TrackableTask[T]]:
@abstractmethod
def clear_task(self, task_id: str) -> str:
"""
Remove a pending task from the worker
Remove a task from the worker

Args:
task_id: The ID of the task to be removed
Expand Down
32 changes: 16 additions & 16 deletions tests/service/test_rest_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,9 @@ def test_create_task(handler: Handler, client: TestClient) -> None:
response = client.post("/tasks", json=_TASK.dict())
task_id = response.json()["task_id"]

pending = handler.get_pending_task(task_id)
assert pending is not None
assert pending.task == _TASK
t = handler.get_task_by_id(task_id)
assert t is not None
assert t.task == _TASK


def test_put_plan_begins_task(handler: Handler, client: TestClient) -> None:
Expand Down Expand Up @@ -256,7 +256,7 @@ def test_put_plan_with_unknown_plan_name_fails(

response = client.post("/tasks", json=task_json)

assert not handler.pending_tasks
assert not handler.tasks
assert response.status_code == status.HTTP_404_NOT_FOUND


Expand Down Expand Up @@ -285,7 +285,7 @@ def test_put_plan_with_bad_params_fails(handler: Handler, client: TestClient) ->

response = client.post("/tasks", json=task_json)

assert not handler.pending_tasks
assert not handler.tasks
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY


Expand Down Expand Up @@ -391,38 +391,38 @@ def test_clear_pending_task_no_longer_pending(handler: Handler, client: TestClie
response = client.post("/tasks", json=_TASK.dict())
task_id = response.json()["task_id"]

pending = handler.get_pending_task(task_id)
assert pending is not None
assert pending.task == _TASK
t = handler.get_task_by_id(task_id)
assert t is not None
assert t.task == _TASK

delete_response = client.delete(f"/tasks/{task_id}")
assert delete_response.status_code is status.HTTP_200_OK
assert not handler.pending_tasks
assert handler.get_pending_task(task_id) is None
assert not handler.tasks
assert handler.get_task_by_id(task_id) is None


def test_clear_not_pending_task_not_found(handler: Handler, client: TestClient):
response = client.post("/tasks", json=_TASK.dict())
task_id = response.json()["task_id"]

pending = handler.get_pending_task(task_id)
pending = handler.get_task_by_id(task_id)
assert pending is not None
assert pending.task == _TASK

delete_response = client.delete("/tasks/wrong-task-id")
assert delete_response.status_code is status.HTTP_404_NOT_FOUND
pending = handler.get_pending_task(task_id)
pending = handler.get_task_by_id(task_id)
assert pending is not None
assert pending.task == _TASK


def test_clear_when_empty(handler: Handler, client: TestClient):
pending = handler.pending_tasks
pending = handler.tasks
assert not pending

delete_response = client.delete("/tasks/wrong-task-id")
assert delete_response.status_code is status.HTTP_404_NOT_FOUND
assert not handler.pending_tasks
assert not handler.tasks


@pytest.mark.parametrize(
Expand All @@ -443,7 +443,7 @@ def test_delete_running_task(

def start_task(_: str):
mockable_state_machine._worker._current = ( # type: ignore
mockable_state_machine._worker.get_pending_task(task_id)
mockable_state_machine._worker.get_task_by_id(task_id)
)
mockable_state_machine._worker._on_state_change( # type: ignore
RunEngineStateMachine.States.RUNNING
Expand Down Expand Up @@ -473,7 +473,7 @@ def test_reason_passed_to_abort(mockable_state_machine: Handler, client: TestCli

def start_task(_: str):
mockable_state_machine._worker._current = ( # type: ignore
mockable_state_machine._worker.get_pending_task(task_id)
mockable_state_machine._worker.get_task_by_id(task_id)
)
mockable_state_machine._worker._on_state_change( # type: ignore
RunEngineStateMachine.States.RUNNING
Expand Down
Loading
Loading