diff --git a/.github/pages/make_switcher.py b/.github/pages/make_switcher.py index e2c8e6f62..2b81e7696 100755 --- a/.github/pages/make_switcher.py +++ b/.github/pages/make_switcher.py @@ -3,28 +3,28 @@ from argparse import ArgumentParser from pathlib import Path from subprocess import CalledProcessError, check_output -from typing import List, Optional +from typing import Optional -def report_output(stdout: bytes, label: str) -> List[str]: +def report_output(stdout: bytes, label: str) -> list[str]: ret = stdout.decode().strip().split("\n") print(f"{label}: {ret}") return ret -def get_branch_contents(ref: str) -> List[str]: +def get_branch_contents(ref: str) -> list[str]: """Get the list of directories in a branch.""" stdout = check_output(["git", "ls-tree", "-d", "--name-only", ref]) return report_output(stdout, "Branch contents") -def get_sorted_tags_list() -> List[str]: +def get_sorted_tags_list() -> list[str]: """Get a list of sorted tags in descending order from the repository.""" stdout = check_output(["git", "tag", "-l", "--sort=-v:refname"]) return report_output(stdout, "Tags list") -def get_versions(ref: str, add: Optional[str]) -> List[str]: +def get_versions(ref: str, add: Optional[str]) -> list[str]: """Generate the file containing the list of all GitHub Pages builds.""" # Get the directories (i.e. builds) from the GitHub Pages branch try: @@ -41,7 +41,7 @@ def get_versions(ref: str, add: Optional[str]) -> List[str]: tags = get_sorted_tags_list() # Make the sorted versions list from main branches and tags - versions: List[str] = [] + versions: list[str] = [] for version in ["master", "main"] + tags: if version in builds: versions.append(version) diff --git a/pyproject.toml b/pyproject.toml index e4b9e6fd6..c6087a656 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,9 +26,8 @@ dependencies = [ "fastapi[all]<0.99", "uvicorn", "requests", - "dls-bluesky-core", #requires ophyd-async + "dls-bluesky-core", #requires ophyd-async "dls-dodal", - "typing_extensions<4.6", ] dynamic = ["version"] license.file = "LICENSE" @@ -135,9 +134,13 @@ lint.select = [ "F", # pyflakes rules - https://docs.astral.sh/ruff/rules/#pyflakes-f "W", # pycodestyle warnings - https://docs.astral.sh/ruff/rules/#warning-w "I", # isort - https://docs.astral.sh/ruff/rules/#isort-i - #"UP", # pyupgrade - https://docs.astral.sh/ruff/rules/#pyupgrade-up + "UP", # pyupgrade - https://docs.astral.sh/ruff/rules/#pyupgrade-up ] [tool.ruff.lint.flake8-bugbear] -extend-immutable-calls = ["fastapi.Depends", "fastapi.Body", "fastapi.Task", "dls_bluesky_core.core.inject"] - +extend-immutable-calls = [ + "fastapi.Depends", + "fastapi.Body", + "fastapi.Task", + "dls_bluesky_core.core.inject", +] diff --git a/src/blueapi/cli/cli.py b/src/blueapi/cli/cli.py index c8410e9c1..0bfd6ad77 100644 --- a/src/blueapi/cli/cli.py +++ b/src/blueapi/cli/cli.py @@ -4,7 +4,7 @@ from functools import wraps from pathlib import Path from pprint import pprint -from typing import Optional, Tuple, Union +from typing import Optional, Union import click from requests.exceptions import ConnectionError @@ -34,7 +34,7 @@ "-c", "--config", type=Path, help="Path to configuration YAML file", multiple=True ) @click.pass_context -def main(ctx: click.Context, config: Union[Optional[Path], Tuple[Path, ...]]) -> None: +def main(ctx: click.Context, config: Union[Optional[Path], tuple[Path, ...]]) -> None: # if no command is supplied, run with the options passed config_loader = ConfigLoader(ApplicationConfig) diff --git a/src/blueapi/cli/rest.py b/src/blueapi/cli/rest.py index c85b879bb..111986d2e 100644 --- a/src/blueapi/cli/rest.py +++ b/src/blueapi/cli/rest.py @@ -1,4 +1,5 @@ -from typing import Any, Callable, Literal, Mapping, Optional, Type, TypeVar +from collections.abc import Mapping +from typing import Any, Callable, Literal, Optional, TypeVar import requests from pydantic import parse_obj_as @@ -98,7 +99,7 @@ def cancel_current_task( def _request_and_deserialize( self, suffix: str, - target_type: Type[T], + target_type: type[T], data: Optional[Mapping[str, Any]] = None, method="GET", raise_if: Callable[[requests.Response], bool] = _is_exception, diff --git a/src/blueapi/cli/updates.py b/src/blueapi/cli/updates.py index d9279b5b6..530abfa2e 100644 --- a/src/blueapi/cli/updates.py +++ b/src/blueapi/cli/updates.py @@ -1,5 +1,6 @@ import itertools -from typing import Dict, Mapping, Optional, Union +from collections.abc import Mapping +from typing import Optional, Union from tqdm import tqdm @@ -9,7 +10,7 @@ class ProgressBarRenderer: - _bars: Dict[str, tqdm] + _bars: dict[str, tqdm] _count: itertools.count def __init__(self) -> None: diff --git a/src/blueapi/config.py b/src/blueapi/config.py index 70cb97268..d929dde48 100644 --- a/src/blueapi/config.py +++ b/src/blueapi/config.py @@ -1,7 +1,8 @@ import os +from collections.abc import Mapping from enum import Enum from pathlib import Path -from typing import Any, Dict, Generic, Literal, Mapping, Optional, Type, TypeVar, Union +from typing import Any, Generic, Literal, Optional, TypeVar, Union import yaml from pydantic import BaseModel, Field, ValidationError, parse_obj_as, validator @@ -124,10 +125,10 @@ class ConfigLoader(Generic[C]): of default values, dictionaries, YAML/JSON files etc. """ - _schema: Type[C] - _values: Dict[str, Any] + _schema: type[C] + _values: dict[str, Any] - def __init__(self, schema: Type[C]) -> None: + def __init__(self, schema: type[C]) -> None: self._schema = schema self._values = {} @@ -142,7 +143,7 @@ def use_values(self, values: Mapping[str, Any]) -> None: if defaults provided. """ - def recursively_update_map(old: Dict[str, Any], new: Mapping[str, Any]) -> None: + def recursively_update_map(old: dict[str, Any], new: Mapping[str, Any]) -> None: for key in new: if ( key in old diff --git a/src/blueapi/core/bluesky_types.py b/src/blueapi/core/bluesky_types.py index b3dc4ba6a..6514da056 100644 --- a/src/blueapi/core/bluesky_types.py +++ b/src/blueapi/core/bluesky_types.py @@ -1,5 +1,14 @@ import inspect -from typing import Any, Callable, Mapping, Optional, Type, Union, get_type_hints +from collections.abc import Mapping +from typing import ( + Any, + Callable, + Optional, + Protocol, + Union, + get_type_hints, + runtime_checkable, +) from bluesky.protocols import ( Checkable, @@ -24,11 +33,6 @@ from blueapi.utils import BlueapiBaseModel -try: - from typing import Protocol, runtime_checkable -except ImportError: - from typing_extensions import Protocol, runtime_checkable # type: ignore - PlanWrapper = Callable[[MsgGenerator], MsgGenerator] #: An object that encapsulates the device to do useful things to produce @@ -62,14 +66,14 @@ def is_bluesky_compatible_device(obj: Any) -> bool: return is_object and _follows_bluesky_protocols(obj) -def is_bluesky_compatible_device_type(cls: Type[Any]) -> bool: +def is_bluesky_compatible_device_type(cls: type[Any]) -> bool: # We must separately check if Obj refers to an class rather than an # instance, as both follow the protocols but only one is a type. return inspect.isclass(cls) and _follows_bluesky_protocols(cls) def _follows_bluesky_protocols(obj: Any) -> bool: - return any((isinstance(obj, protocol) for protocol in BLUESKY_PROTOCOLS)) + return any(isinstance(obj, protocol) for protocol in BLUESKY_PROTOCOLS) def is_bluesky_plan_generator(func: PlanGenerator) -> bool: @@ -89,7 +93,7 @@ class Plan(BlueapiBaseModel): description: Optional[str] = Field( description="Description/docstring of the plan", default=None ) - model: Type[BaseModel] = Field( + model: type[BaseModel] = Field( description="Validation model of the parameters for the plan" ) diff --git a/src/blueapi/core/context.py b/src/blueapi/core/context.py index aae4c26c3..a4ac20d7e 100644 --- a/src/blueapi/core/context.py +++ b/src/blueapi/core/context.py @@ -1,5 +1,6 @@ import functools import logging +from collections.abc import Sequence from dataclasses import dataclass, field from importlib import import_module from inspect import Parameter, signature @@ -7,13 +8,8 @@ from typing import ( Any, Callable, - Dict, Generic, - List, Optional, - Sequence, - Tuple, - Type, TypeVar, Union, get_args, @@ -60,12 +56,12 @@ class BlueskyContext: default_factory=lambda: RunEngine(context_managers=[]) ) plan_wrappers: Sequence[PlanWrapper] = field(default_factory=list) - plans: Dict[str, Plan] = field(default_factory=dict) - devices: Dict[str, Device] = field(default_factory=dict) - plan_functions: Dict[str, PlanGenerator] = field(default_factory=dict) + plans: dict[str, Plan] = field(default_factory=dict) + devices: dict[str, Device] = field(default_factory=dict) + plan_functions: dict[str, PlanGenerator] = field(default_factory=dict) sim: bool = field(default=False) - _reference_cache: Dict[Type, Type] = field(default_factory=dict) + _reference_cache: dict[type, type] = field(default_factory=dict) def wrap(self, plan: MsgGenerator) -> MsgGenerator: wrapped_plan = functools.reduce( @@ -75,7 +71,7 @@ def wrap(self, plan: MsgGenerator) -> MsgGenerator: ) yield from wrapped_plan - def find_device(self, addr: Union[str, List[str]]) -> Optional[Device]: + def find_device(self, addr: Union[str, list[str]]) -> Optional[Device]: """ Find a device in this context, allows for recursive search. @@ -199,7 +195,7 @@ def device(self, device: Device, name: Optional[str] = None) -> None: self.devices[name] = device - def _reference(self, target: Type) -> Type: + def _reference(self, target: type) -> type: """ Create an intermediate reference type for the required ``target`` type that will return an existing device during pydantic deserialisation/validation @@ -238,7 +234,7 @@ def __modify_schema__( def _type_spec_for_function( self, func: Callable[..., Any] - ) -> dict[str, Tuple[Type, Any]]: + ) -> dict[str, tuple[type, Any]]: """ Parse a function signature and build map of field types and default values that can be used to deserialise arguments from external sources. @@ -271,7 +267,7 @@ def _type_spec_for_function( ) return new_args - def _convert_type(self, typ: Type) -> Type: + def _convert_type(self, typ: type) -> type: """ Recursively convert a type to something that can be deserialised by pydantic. Bluesky protocols (and types that extend them) are replaced diff --git a/src/blueapi/core/device_lookup.py b/src/blueapi/core/device_lookup.py index 957a057f7..28e616ede 100644 --- a/src/blueapi/core/device_lookup.py +++ b/src/blueapi/core/device_lookup.py @@ -1,4 +1,4 @@ -from typing import Any, List, Optional, TypeVar +from typing import Any, Optional, TypeVar from .bluesky_types import Device, is_bluesky_compatible_device @@ -6,7 +6,7 @@ D = TypeVar("D", bound=Device) -def find_component(obj: Any, addr: List[str]) -> Optional[D]: +def find_component(obj: Any, addr: list[str]) -> Optional[D]: """ Best effort function to locate a child device, either in a dictionary of devices or a device with child attributes. diff --git a/src/blueapi/core/event.py b/src/blueapi/core/event.py index fde29b3de..dbda0694d 100644 --- a/src/blueapi/core/event.py +++ b/src/blueapi/core/event.py @@ -1,6 +1,6 @@ import itertools from abc import ABC, abstractmethod -from typing import Callable, Dict, Generic, Optional, TypeVar +from typing import Callable, Generic, Optional, TypeVar #: Event type E = TypeVar("E") @@ -47,7 +47,7 @@ class EventPublisher(EventStream[E, int]): Simple Observable that can be fed values to publish """ - _subscriptions: Dict[int, Callable[[E, Optional[str]], None]] + _subscriptions: dict[int, Callable[[E, Optional[str]], None]] _count: itertools.count def __init__(self) -> None: diff --git a/src/blueapi/messaging/base.py b/src/blueapi/messaging/base.py index e9404395d..bd517a6a2 100644 --- a/src/blueapi/messaging/base.py +++ b/src/blueapi/messaging/base.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod from concurrent.futures import Future -from typing import Any, Callable, Optional, Type +from typing import Any, Callable, Optional from .context import MessageContext @@ -86,7 +86,7 @@ def send_and_receive( self, destination: str, obj: Any, - reply_type: Type = str, + reply_type: type = str, correlation_id: Optional[str] = None, ) -> Future: """ diff --git a/src/blueapi/messaging/stomptemplate.py b/src/blueapi/messaging/stomptemplate.py index 2a32d84c1..e56c72435 100644 --- a/src/blueapi/messaging/stomptemplate.py +++ b/src/blueapi/messaging/stomptemplate.py @@ -5,7 +5,7 @@ import uuid from dataclasses import dataclass from threading import Event -from typing import Any, Callable, Dict, List, Optional, Set +from typing import Any, Callable, Optional import stomp from pydantic import parse_obj_as @@ -74,8 +74,8 @@ class StompMessagingTemplate(MessagingTemplate): _authentication: BasicAuthentication _sub_num: itertools.count _listener: stomp.ConnectionListener - _subscriptions: Dict[str, Subscription] - _pending_subscriptions: Set[str] + _subscriptions: dict[str, Subscription] + _pending_subscriptions: set[str] _disconnected: Event # Stateless implementation means attribute can be static @@ -133,7 +133,7 @@ def _send_str( ) -> None: LOGGER.info(f"SENDING {message} to {destination}") - headers: Dict[str, Any] = {"JMSType": "TextMessage"} + headers: dict[str, Any] = {"JMSType": "TextMessage"} if on_reply is not None: reply_queue_name = self.destinations.temporary_queue(str(uuid.uuid1())) headers = {**headers, "reply-to": reply_queue_name} @@ -148,7 +148,7 @@ def subscribe(self, destination: str, callback: MessageListener) -> None: def wrapper(frame: Frame) -> None: as_dict = json.loads(frame.body) - value = parse_obj_as(obj_type, as_dict) + value: Any = parse_obj_as(obj_type, as_dict) context = MessageContext( frame.headers["destination"], @@ -193,7 +193,7 @@ def finished_connecting(_: Frame): self._ensure_subscribed() - def _ensure_subscribed(self, sub_ids: Optional[List[str]] = None) -> None: + def _ensure_subscribed(self, sub_ids: Optional[list[str]] = None) -> None: # We must defer subscription until after connection, because stomp literally # sends a SUB to the broker. But it still nice to be able to call subscribe # on template before it connects, then just run the subscribes after connection. diff --git a/src/blueapi/messaging/utils.py b/src/blueapi/messaging/utils.py index 7b7df81c9..175005c65 100644 --- a/src/blueapi/messaging/utils.py +++ b/src/blueapi/messaging/utils.py @@ -1,12 +1,11 @@ import inspect -from typing import Type from .base import MessageListener def determine_deserialization_type( - listener: MessageListener, default: Type = str -) -> Type: + listener: MessageListener, default: type = str +) -> type: """ Inspect a message listener function to determine the type to deserialize a message to diff --git a/src/blueapi/service/handler.py b/src/blueapi/service/handler.py index 8e78119a0..be2016262 100644 --- a/src/blueapi/service/handler.py +++ b/src/blueapi/service/handler.py @@ -1,5 +1,6 @@ import logging -from typing import List, Mapping, Optional +from collections.abc import Mapping +from typing import Optional from blueapi.config import ApplicationConfig from blueapi.core import BlueskyContext @@ -87,14 +88,14 @@ def stop(self) -> None: self._messaging_template.disconnect() @property - def plans(self) -> List[PlanModel]: + def plans(self) -> list[PlanModel]: return [PlanModel.from_plan(plan) for plan in self._context.plans.values()] def get_plan(self, name: str) -> PlanModel: return PlanModel.from_plan(self._context.plans[name]) @property - def devices(self) -> List[DeviceModel]: + def devices(self) -> list[DeviceModel]: return [ DeviceModel.from_device(device) for device in self._context.devices.values() ] @@ -131,7 +132,7 @@ def cancel_active_task(self, failure: bool, reason: Optional[str]): self._worker.cancel_active_task(failure, reason) @property - def tasks(self) -> List[TrackableTask]: + def tasks(self) -> list[TrackableTask]: return self._worker.get_tasks() def get_task_by_id(self, task_id: str) -> Optional[TrackableTask]: diff --git a/src/blueapi/service/handler_base.py b/src/blueapi/service/handler_base.py index 75ec8767f..36e8090e9 100644 --- a/src/blueapi/service/handler_base.py +++ b/src/blueapi/service/handler_base.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import List, Optional +from typing import Optional from blueapi.service.model import DeviceModel, PlanModel, WorkerTask from blueapi.worker.event import WorkerState @@ -12,7 +12,7 @@ class BlueskyHandler(ABC): @property @abstractmethod - def plans(self) -> List[PlanModel]: + def plans(self) -> list[PlanModel]: """ All available plans in the BlueskyContext """ @@ -25,7 +25,7 @@ def get_plan(self, name: str) -> PlanModel: @property @abstractmethod - def devices(self) -> List[DeviceModel]: + def devices(self) -> list[DeviceModel]: """ All available devices in the BlueskyContext """ @@ -75,7 +75,7 @@ def cancel_active_task(self, failure: bool, reason: Optional[str]) -> None: @property @abstractmethod - def tasks(self) -> List[TrackableTask]: + def tasks(self) -> list[TrackableTask]: """Return a list of all tasks on the worker, any one of which can be triggered with begin_task""" diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 13c042ce5..e41c992d8 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -1,5 +1,5 @@ from contextlib import asynccontextmanager -from typing import Dict, Optional, Set +from typing import Optional from fastapi import ( BackgroundTasks, @@ -212,7 +212,7 @@ def get_state(handler: BlueskyHandler = Depends(get_handler)) -> WorkerState: # Map of current_state: allowed new_states -_ALLOWED_TRANSITIONS: Dict[WorkerState, Set[WorkerState]] = { +_ALLOWED_TRANSITIONS: dict[WorkerState, set[WorkerState]] = { WorkerState.RUNNING: { WorkerState.PAUSED, WorkerState.ABORTING, diff --git a/src/blueapi/service/model.py b/src/blueapi/service/model.py index 5626498d0..625ecc1e4 100644 --- a/src/blueapi/service/model.py +++ b/src/blueapi/service/model.py @@ -1,4 +1,5 @@ -from typing import Any, Iterable, List, Optional +from collections.abc import Iterable +from typing import Any, Optional from bluesky.protocols import HasName from pydantic import Field @@ -16,7 +17,7 @@ class DeviceModel(BlueapiBaseModel): """ name: str = Field(description="Name of the device") - protocols: List[str] = Field( + protocols: list[str] = Field( description="Protocols that a device conforms to, indicating its capabilities" ) @@ -45,7 +46,7 @@ class DeviceResponse(BlueapiBaseModel): Response to a query for devices """ - devices: List[DeviceModel] = Field(description="Devices available to use in plans") + devices: list[DeviceModel] = Field(description="Devices available to use in plans") class PlanModel(BlueapiBaseModel): @@ -85,7 +86,7 @@ class PlanResponse(BlueapiBaseModel): Response to a query for plans """ - plans: List[PlanModel] = Field(description="Plans available to use by a worker") + plans: list[PlanModel] = Field(description="Plans available to use by a worker") class TaskResponse(BlueapiBaseModel): diff --git a/src/blueapi/service/openapi.py b/src/blueapi/service/openapi.py index 08b954626..859fae8fd 100644 --- a/src/blueapi/service/openapi.py +++ b/src/blueapi/service/openapi.py @@ -1,7 +1,7 @@ """Generate openapi.json.""" +from collections.abc import Mapping from pathlib import Path -from typing import Mapping import yaml from fastapi.openapi.utils import get_openapi diff --git a/src/blueapi/service/subprocess_handler.py b/src/blueapi/service/subprocess_handler.py index 8c3e2fc06..13fc127b0 100644 --- a/src/blueapi/service/subprocess_handler.py +++ b/src/blueapi/service/subprocess_handler.py @@ -1,8 +1,9 @@ import logging import signal +from collections.abc import Iterable from multiprocessing import Pool, set_start_method from multiprocessing.pool import Pool as PoolClass -from typing import Callable, Iterable, List, Optional +from typing import Callable, Optional from blueapi.config import ApplicationConfig from blueapi.service.handler import get_handler, setup_handler, teardown_handler @@ -65,14 +66,14 @@ def _run_in_subprocess( return self._subprocess.apply(function, arguments) @property - def plans(self) -> List[PlanModel]: + def plans(self) -> list[PlanModel]: return self._run_in_subprocess(plans) def get_plan(self, name: str) -> PlanModel: return self._run_in_subprocess(get_plan, [name]) @property - def devices(self) -> List[DeviceModel]: + def devices(self) -> list[DeviceModel]: return self._run_in_subprocess(devices) def get_device(self, name: str) -> DeviceModel: @@ -105,7 +106,7 @@ def cancel_active_task(self, failure: bool, reason: Optional[str]) -> None: return self._run_in_subprocess(cancel_active_task, [failure, reason]) @property - def tasks(self) -> List[TrackableTask]: + def tasks(self) -> list[TrackableTask]: return self._run_in_subprocess(tasks) def get_task_by_id(self, task_id: str) -> Optional[TrackableTask]: @@ -119,7 +120,7 @@ def initialized(self) -> bool: # Free functions (passed to subprocess) for each of the methods required by Handler -def plans() -> List[PlanModel]: +def plans() -> list[PlanModel]: return get_handler().plans @@ -127,7 +128,7 @@ def get_plan(name: str): return get_handler().get_plan(name) -def devices() -> List[DeviceModel]: +def devices() -> list[DeviceModel]: return get_handler().devices @@ -167,7 +168,7 @@ def cancel_active_task(failure: bool, reason: Optional[str]) -> None: return get_handler().cancel_active_task(failure, reason) -def tasks() -> List[TrackableTask]: +def tasks() -> list[TrackableTask]: return get_handler().tasks diff --git a/src/blueapi/startup/example_plans.py b/src/blueapi/startup/example_plans.py index 6ec7aa99a..53460e34a 100644 --- a/src/blueapi/startup/example_plans.py +++ b/src/blueapi/startup/example_plans.py @@ -1,5 +1,3 @@ -from typing import List - from bluesky.protocols import Movable, Readable from dls_bluesky_core.core import inject from dls_bluesky_core.plans import count @@ -9,7 +7,7 @@ def stp_snapshot( - detectors: List[Readable], + detectors: list[Readable], temperature: Movable = inject("sample_temperature"), pressure: Movable = inject("sample_pressure"), ) -> MsgGenerator: diff --git a/src/blueapi/utils/modules.py b/src/blueapi/utils/modules.py index e6d8e947e..e7c28e6f9 100644 --- a/src/blueapi/utils/modules.py +++ b/src/blueapi/utils/modules.py @@ -1,5 +1,6 @@ +from collections.abc import Iterable from types import ModuleType -from typing import Any, Iterable, List +from typing import Any def load_module_all(mod: ModuleType) -> Iterable[Any]: @@ -21,7 +22,7 @@ def load_module_all(mod: ModuleType) -> Iterable[Any]: Iterator[Iterable[Any]]: Each successive variable in globals """ - def get_named_subset(names: List[str]): + def get_named_subset(names: list[str]): for name in names: yield getattr(mod, name) diff --git a/src/blueapi/utils/ophyd_async_connect.py b/src/blueapi/utils/ophyd_async_connect.py index dfc37448b..382b412bf 100644 --- a/src/blueapi/utils/ophyd_async_connect.py +++ b/src/blueapi/utils/ophyd_async_connect.py @@ -1,7 +1,8 @@ import asyncio import logging +from collections.abc import Iterable from contextlib import suppress -from typing import Any, Dict, Iterable +from typing import Any from ophyd_async.core import DEFAULT_TIMEOUT, NotConnected from ophyd_async.core import Device as OphydAsyncDevice @@ -12,7 +13,7 @@ async def connect_ophyd_async_devices( sim: bool = False, timeout: float = DEFAULT_TIMEOUT, ) -> None: - tasks: Dict[asyncio.Task, str] = {} + tasks: dict[asyncio.Task, str] = {} for device in devices: if isinstance(device, OphydAsyncDevice): task = asyncio.create_task(device.connect(sim=sim)) @@ -21,7 +22,7 @@ async def connect_ophyd_async_devices( await _wait_for_tasks(tasks, timeout=timeout) -async def _wait_for_tasks(tasks: Dict[asyncio.Task, str], timeout: float): +async def _wait_for_tasks(tasks: dict[asyncio.Task, str], timeout: float): done, pending = await asyncio.wait(tasks, timeout=timeout) if pending: msg = f"{len(pending)} Devices did not connect:" @@ -41,7 +42,7 @@ async def _wait_for_tasks(tasks: Dict[asyncio.Task, str], timeout: float): def _format_awaited_task_error_message( - tasks: Dict[asyncio.Task, str], t: asyncio.Task + tasks: dict[asyncio.Task, str], t: asyncio.Task ) -> str: e = t.exception() part_one = f"\n {tasks[t]}: {type(e).__name__}" diff --git a/src/blueapi/worker/event.py b/src/blueapi/worker/event.py index 9e9b7e8e3..2c14de30f 100644 --- a/src/blueapi/worker/event.py +++ b/src/blueapi/worker/event.py @@ -1,5 +1,6 @@ +from collections.abc import Mapping from enum import Enum -from typing import List, Mapping, Optional, Union +from typing import Optional, Union from bluesky.run_engine import RunEngineStateMachine from pydantic import Field @@ -110,8 +111,8 @@ class WorkerEvent(BlueapiBaseModel): state: WorkerState task_status: Optional[TaskStatus] = None - errors: List[str] = Field(default_factory=list) - warnings: List[str] = Field(default_factory=list) + errors: list[str] = Field(default_factory=list) + warnings: list[str] = Field(default_factory=list) def is_error(self) -> bool: return (self.task_status is not None and self.task_status.task_failed) or bool( diff --git a/src/blueapi/worker/reworker.py b/src/blueapi/worker/reworker.py index 58690be11..f284943db 100644 --- a/src/blueapi/worker/reworker.py +++ b/src/blueapi/worker/reworker.py @@ -1,10 +1,11 @@ import logging import uuid +from collections.abc import Iterable, Mapping from dataclasses import dataclass from functools import partial from queue import Full, Queue from threading import Event, RLock -from typing import Any, Dict, Iterable, List, Mapping, Optional, Set, Union +from typing import Any, Optional, Union from bluesky.protocols import Status from super_state_machine.errors import TransitionError @@ -48,16 +49,16 @@ class TaskWorker(Worker[Task]): _ctx: BlueskyContext _start_stop_timeout: float - _tasks: Dict[str, TrackableTask] + _tasks: dict[str, TrackableTask] _state: WorkerState - _errors: List[str] - _warnings: List[str] + _errors: list[str] + _warnings: list[str] _task_channel: Queue # type: ignore _current: Optional[TrackableTask] _status_lock: RLock - _status_snapshot: Dict[str, StatusView] - _completed_statuses: Set[str] + _status_snapshot: dict[str, StatusView] + _completed_statuses: set[str] _worker_events: EventPublisher[WorkerEvent] _progress_events: EventPublisher[ProgressEvent] _data_events: EventPublisher[DataEvent] @@ -112,7 +113,7 @@ def cancel_active_task( self._ctx.run_engine.stop() return self._current.task_id - def get_tasks(self) -> List[TrackableTask[Task]]: + def get_tasks(self) -> list[TrackableTask[Task]]: return list(self._tasks.values()) def get_task_by_id(self, task_id: str) -> Optional[TrackableTask[Task]]: diff --git a/src/blueapi/worker/task.py b/src/blueapi/worker/task.py index 6adbbe1db..f080be875 100644 --- a/src/blueapi/worker/task.py +++ b/src/blueapi/worker/task.py @@ -1,5 +1,6 @@ import logging -from typing import Any, Mapping +from collections.abc import Mapping +from typing import Any from pydantic import BaseModel, Field diff --git a/src/blueapi/worker/worker.py b/src/blueapi/worker/worker.py index a8756d23b..84d73055c 100644 --- a/src/blueapi/worker/worker.py +++ b/src/blueapi/worker/worker.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Generic, List, Optional, TypeVar +from typing import Generic, Optional, TypeVar from pydantic import Field @@ -20,7 +20,7 @@ class TrackableTask(BlueapiBaseModel, Generic[T]): task: T is_complete: bool = False is_pending: bool = True - errors: List[str] = Field(default_factory=list) + errors: list[str] = Field(default_factory=list) class Worker(ABC, Generic[T]): @@ -30,7 +30,7 @@ class Worker(ABC, Generic[T]): """ @abstractmethod - def get_tasks(self) -> List[TrackableTask[T]]: + def get_tasks(self) -> list[TrackableTask[T]]: """ Return a list of all tasks on the worker, any one of which can be triggered with begin_task. diff --git a/tests/conftest.py b/tests/conftest.py index fe4086042..77832fd42 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,7 +1,7 @@ import asyncio # Based on https://docs.pytest.org/en/latest/example/simple.html#control-skipping-of-tests-according-to-command-line-option # noqa: E501 -from typing import Iterator +from collections.abc import Iterator from unittest.mock import MagicMock import pytest diff --git a/tests/core/test_context.py b/tests/core/test_context.py index 960d7aa4f..5440fbfd5 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -1,6 +1,5 @@ from __future__ import annotations -from typing import Dict, List, Type, Union from unittest.mock import patch import pytest @@ -101,12 +100,12 @@ def devicey_context(sim_motor: SynAxis, sim_detector: SynGauss) -> BlueskyContex class SomeConfigurable: - def read_configuration(self) -> SyncOrAsync[Dict[str, Reading]]: # type: ignore + def read_configuration(self) -> SyncOrAsync[dict[str, Reading]]: # type: ignore ... def describe_configuration( # type: ignore self, - ) -> SyncOrAsync[Dict[str, Descriptor]]: ... + ) -> SyncOrAsync[dict[str, Descriptor]]: ... @pytest.fixture @@ -195,9 +194,7 @@ def test_extra_kwargs_in_with_dodal_module_passed_to_make_all_devices( @pytest.mark.parametrize( "addr", ["sim", "sim_det", "sim.setpoint", ["sim"], ["sim", "setpoint"]] ) -def test_lookup_device( - devicey_context: BlueskyContext, addr: Union[str, List[str]] -) -> None: +def test_lookup_device(devicey_context: BlueskyContext, addr: str | list[str]) -> None: device = devicey_context.find_device(addr) assert is_bluesky_compatible_device(device) @@ -269,7 +266,7 @@ def test_device_reference_cache(empty_context: BlueskyContext) -> None: def test_reference_type_conversion(empty_context: BlueskyContext) -> None: - movable_ref: Type = empty_context._reference(Movable) + movable_ref: type = empty_context._reference(Movable) assert empty_context._convert_type(Movable) == movable_ref assert ( empty_context._convert_type(dict[Movable, list[tuple[int, Movable]]]) diff --git a/tests/core/test_event.py b/tests/core/test_event.py index 9f832de01..4f39ebc11 100644 --- a/tests/core/test_event.py +++ b/tests/core/test_event.py @@ -1,7 +1,7 @@ +from collections.abc import Iterable from concurrent.futures import Future from dataclasses import dataclass from queue import Queue -from typing import Iterable import pytest diff --git a/tests/messaging/test_stomptemplate.py b/tests/messaging/test_stomptemplate.py index cb8d89a14..0e7f8afdf 100644 --- a/tests/messaging/test_stomptemplate.py +++ b/tests/messaging/test_stomptemplate.py @@ -1,10 +1,11 @@ import itertools +from collections.abc import Iterable from concurrent.futures import Future from queue import Queue -from typing import Any, Iterable, List, Type +from typing import Any +from unittest.mock import ANY, MagicMock, patch import pytest -from mock import ANY, MagicMock, patch from pydantic import BaseModel, BaseSettings, Field from stomp import Connection from stomp.exception import ConnectFailedException @@ -17,7 +18,7 @@ class StompTestingSettings(BaseSettings): - blueapi_test_stomp_ports: List[int] = Field(default=[61613]) + blueapi_test_stomp_ports: list[int] = Field(default=[61613]) def test_stomp_configs(self) -> Iterable[StompConfig]: for port in self.blueapi_test_stomp_ports: @@ -122,7 +123,7 @@ class Foo(BaseModel): [("test", str), (1, int), (Foo(a=1, b="test"), Foo)], ) def test_deserialization( - template: MessagingTemplate, test_queue: str, message: Any, message_type: Type + template: MessagingTemplate, test_queue: str, message: Any, message_type: type ) -> None: def server(ctx: MessageContext, message: message_type) -> None: # type: ignore reply_queue = ctx.reply_destination diff --git a/tests/messaging/test_utils.py b/tests/messaging/test_utils.py index 28c82637e..feafd2afa 100644 --- a/tests/messaging/test_utils.py +++ b/tests/messaging/test_utils.py @@ -1,5 +1,6 @@ +from collections.abc import Mapping from dataclasses import dataclass -from typing import Any, Mapping +from typing import Any import pytest diff --git a/tests/preprocessors/test_attach_metadata.py b/tests/preprocessors/test_attach_metadata.py index 87d1f0a27..878015922 100644 --- a/tests/preprocessors/test_attach_metadata.py +++ b/tests/preprocessors/test_attach_metadata.py @@ -1,5 +1,6 @@ +from collections.abc import Mapping from pathlib import Path -from typing import Any, Callable, Dict, List, Mapping +from typing import Any, Callable import bluesky.plan_stubs as bps import bluesky.plans as bp @@ -92,7 +93,7 @@ def __init__( self._name = name self._provider = provider - async def read(self) -> Dict[str, Reading]: + async def read(self) -> dict[str, Reading]: return { f"{self.name}_data": { "value": "test", @@ -100,7 +101,7 @@ async def read(self) -> Dict[str, Reading]: }, } - async def describe(self) -> Dict[str, DataKey]: + async def describe(self) -> dict[str, DataKey]: directory_info = self._provider() path = f"{directory_info.directory_path}/{directory_info.filename_prefix}" return { @@ -126,7 +127,7 @@ def parent(self) -> None: @pytest.fixture(params=[1, 2]) -def detectors(request, provider: VisitDirectoryProvider) -> List[Readable]: +def detectors(request, provider: VisitDirectoryProvider) -> list[Readable]: number_of_detectors = request.param return [ FakeDetector( @@ -137,21 +138,21 @@ def detectors(request, provider: VisitDirectoryProvider) -> List[Readable]: ] -def simple_run(detectors: List[Readable]) -> MsgGenerator: +def simple_run(detectors: list[Readable]) -> MsgGenerator: yield from bp.count(detectors) -def multi_run(detectors: List[Readable]) -> MsgGenerator: +def multi_run(detectors: list[Readable]) -> MsgGenerator: yield from bp.count(detectors) yield from bp.count(detectors) -def multi_nested_plan(detectors: List[Readable]) -> MsgGenerator: +def multi_nested_plan(detectors: list[Readable]) -> MsgGenerator: yield from simple_run(detectors) yield from simple_run(detectors) -def multi_run_single_stage(detectors: List[Readable]) -> MsgGenerator: +def multi_run_single_stage(detectors: list[Readable]) -> MsgGenerator: def stageless_count() -> MsgGenerator: return (yield from bps.one_shot(detectors)) @@ -163,7 +164,7 @@ def inner_plan() -> MsgGenerator: def multi_run_single_stage_multi_group( - detectors: List[Readable], + detectors: list[Readable], ) -> MsgGenerator: def stageless_count() -> MsgGenerator: return (yield from bps.one_shot(detectors)) @@ -179,7 +180,7 @@ def inner_plan() -> MsgGenerator: @run_decorator(md={DATA_SESSION: 12345}) @set_run_key_decorator("outer") -def nested_run_with_metadata(detectors: List[Readable]) -> MsgGenerator: +def nested_run_with_metadata(detectors: list[Readable]) -> MsgGenerator: yield from set_run_key_wrapper(bp.count(detectors), "inner") yield from set_run_key_wrapper(bp.count(detectors), "inner") @@ -187,7 +188,7 @@ def nested_run_with_metadata(detectors: List[Readable]) -> MsgGenerator: @run_decorator() @set_run_key_decorator("outer") def nested_run_without_metadata( - detectors: List[Readable], + detectors: list[Readable], ) -> MsgGenerator: yield from set_run_key_wrapper(bp.count(detectors), "inner") yield from set_run_key_wrapper(bp.count(detectors), "inner") @@ -195,7 +196,7 @@ def nested_run_without_metadata( def test_simple_run_gets_scan_number( run_engine: RunEngine, - detectors: List[Readable], + detectors: list[Readable], provider: DirectoryProvider, ) -> None: docs = collect_docs( @@ -211,8 +212,8 @@ def test_simple_run_gets_scan_number( @pytest.mark.parametrize("plan", [multi_run, multi_nested_plan]) def test_multi_run_gets_scan_numbers( run_engine: RunEngine, - detectors: List[Readable], - plan: Callable[[List[Readable]], MsgGenerator], + detectors: list[Readable], + plan: Callable[[list[Readable]], MsgGenerator], provider: DirectoryProvider, ) -> None: """Test is here to demonstrate that multi run plans will overwrite files.""" @@ -230,7 +231,7 @@ def test_multi_run_gets_scan_numbers( def test_multi_run_single_stage( run_engine: RunEngine, - detectors: List[Readable], + detectors: list[Readable], provider: DirectoryProvider, ) -> None: docs = collect_docs( @@ -254,7 +255,7 @@ def test_multi_run_single_stage( def test_multi_run_single_stage_multi_group( run_engine: RunEngine, - detectors: List[Readable], + detectors: list[Readable], provider: DirectoryProvider, ) -> None: docs = collect_docs( @@ -282,7 +283,7 @@ def test_multi_run_single_stage_multi_group( def test_nested_run_with_metadata( run_engine: RunEngine, - detectors: List[Readable], + detectors: list[Readable], provider: DirectoryProvider, ) -> None: """Test is here to demonstrate that nested runs will be treated as a single run. @@ -304,7 +305,7 @@ def test_nested_run_with_metadata( def test_nested_run_without_metadata( run_engine: RunEngine, - detectors: List[Readable], + detectors: list[Readable], provider: DirectoryProvider, ) -> None: """Test is here to demonstrate that nested runs will be treated as a single run. @@ -326,7 +327,7 @@ def test_nested_run_without_metadata( def test_visit_directory_provider_fails( run_engine: RunEngine, - detectors: List[Readable], + detectors: list[Readable], provider: DirectoryProvider, client: MockVisitServiceClient, ) -> None: @@ -341,7 +342,7 @@ def test_visit_directory_provider_fails( def test_visit_directory_provider_fails_after_one_sucess( run_engine: RunEngine, - detectors: List[Readable], + detectors: list[Readable], provider: DirectoryProvider, client: MockVisitServiceClient, ) -> None: @@ -363,7 +364,7 @@ def collect_docs( run_engine: RunEngine, plan: MsgGenerator, provider: DirectoryProvider, -) -> List[DataEvent]: +) -> list[DataEvent]: events = [] def on_event(name: str, doc: Mapping[str, Any]) -> None: @@ -375,9 +376,9 @@ def on_event(name: str, doc: Mapping[str, Any]) -> None: def assert_all_detectors_used_collection_numbers( - docs: List[DataEvent], - detectors: List[Readable], - source_history: List[Path], + docs: list[DataEvent], + detectors: list[Readable], + source_history: list[Path], ) -> None: descriptors = find_descriptor_docs(docs) assert len(descriptors) == len(source_history) @@ -390,9 +391,9 @@ def assert_all_detectors_used_collection_numbers( assert Path(source) == expected_source -def find_start_docs(docs: List[DataEvent]) -> List[DataEvent]: +def find_start_docs(docs: list[DataEvent]) -> list[DataEvent]: return list(filter(lambda event: event.name == "start", docs)) -def find_descriptor_docs(docs: List[DataEvent]) -> List[DataEvent]: +def find_descriptor_docs(docs: list[DataEvent]) -> list[DataEvent]: return list(filter(lambda event: event.name == "descriptor", docs)) diff --git a/tests/service/test_handler.py b/tests/service/test_handler.py index 28def3d1c..a15851bb0 100644 --- a/tests/service/test_handler.py +++ b/tests/service/test_handler.py @@ -1,5 +1,6 @@ +from unittest.mock import Mock, patch + import pytest -from mock import Mock, patch from blueapi.service.handler import ( Handler, diff --git a/tests/service/test_openapi.py b/tests/service/test_openapi.py index ca9ea22fd..edc5c726e 100644 --- a/tests/service/test_openapi.py +++ b/tests/service/test_openapi.py @@ -1,7 +1,8 @@ -import mock +from unittest import mock +from unittest.mock import Mock, PropertyMock + import pytest import yaml -from mock import Mock, PropertyMock from blueapi.service.openapi import DOCS_SCHEMA_LOCATION, generate_schema diff --git a/tests/service/test_subprocess_handler.py b/tests/service/test_subprocess_handler.py index d0ccca84e..147e845b4 100644 --- a/tests/service/test_subprocess_handler.py +++ b/tests/service/test_subprocess_handler.py @@ -1,7 +1,7 @@ -from typing import List, Optional +from typing import Optional +from unittest.mock import MagicMock, patch import pytest -from mock import MagicMock, patch from blueapi.service.handler_base import BlueskyHandler, HandlerNotStartedError from blueapi.service.model import DeviceModel, PlanModel, WorkerTask @@ -47,14 +47,14 @@ def test_raises_if_not_started(): class DummyHandler(BlueskyHandler): @property - def plans(self) -> List[PlanModel]: + def plans(self) -> list[PlanModel]: return [PlanModel(name="plan1"), PlanModel(name="plan2")] def get_plan(self, name: str) -> PlanModel: return PlanModel(name="plan1") @property - def devices(self) -> List[DeviceModel]: + def devices(self) -> list[DeviceModel]: return [ DeviceModel(name="device1", protocols=[]), DeviceModel(name="device2", protocols=[]), @@ -87,7 +87,7 @@ def resume_worker(self) -> None: ... def cancel_active_task(self, failure: bool, reason: Optional[str]) -> None: ... @property - def tasks(self) -> List[TrackableTask]: + def tasks(self) -> list[TrackableTask]: return [ TrackableTask(task_id="abc", task=Task(name="sleep", params={"time": 0.0})) ] diff --git a/tests/test_cli.py b/tests/test_cli.py index 730571d1e..eb42d0673 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,9 +1,9 @@ from dataclasses import dataclass +from unittest.mock import Mock, patch import pytest from click.testing import CliRunner from fastapi.testclient import TestClient -from mock import Mock, patch from pydantic import BaseModel from requests.exceptions import ConnectionError diff --git a/tests/test_config.py b/tests/test_config.py index 5809336b1..fd692746c 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,8 +1,8 @@ import os from pathlib import Path -from typing import Any, Type +from typing import Any +from unittest import mock -import mock import pytest from pydantic import BaseModel, Field @@ -56,7 +56,7 @@ def default_yaml(package_root: Path) -> Path: @pytest.mark.parametrize("schema", [ConfigWithDefaults, NestedConfigWithDefaults]) -def test_load_defaults(schema: Type[Any]) -> None: +def test_load_defaults(schema: type[Any]) -> None: loader = ConfigLoader(schema) assert loader.load() == schema() diff --git a/tests/worker/test_reworker.py b/tests/worker/test_reworker.py index 5a64111b0..2c65ef656 100644 --- a/tests/worker/test_reworker.py +++ b/tests/worker/test_reworker.py @@ -1,11 +1,12 @@ import itertools import threading +from collections.abc import Iterable from concurrent.futures import Future from queue import Full -from typing import Any, Callable, Iterable, List, Optional, TypeVar, Union +from typing import Any, Callable, Optional, TypeVar, Union +from unittest.mock import MagicMock, patch import pytest -from mock import MagicMock, patch from blueapi.config import EnvironmentConfig, Source, SourceKind from blueapi.core import BlueskyContext, EventStream, MsgGenerator @@ -190,7 +191,7 @@ def test_begin_task_blocks_until_current_task_set(worker: Worker) -> None: def test_plan_failure_recorded_in_active_task(worker: Worker) -> None: task_id = worker.submit_task(_FAILING_TASK) - events_future: Future[List[WorkerEvent]] = take_events( + events_future: Future[list[WorkerEvent]] = take_events( worker.worker_events, lambda event: event.task_status is not None and event.task_status.task_failed, ) @@ -214,7 +215,7 @@ def test_produces_worker_events(worker: Worker, num_runs: int) -> None: assert_run_produces_worker_events(events, worker, task_id) -def _sleep_events(task_id: str) -> List[WorkerEvent]: +def _sleep_events(task_id: str) -> list[WorkerEvent]: return [ WorkerEvent( state=WorkerState.RUNNING, @@ -248,7 +249,7 @@ def test_no_additional_progress_events_after_complete(worker: Worker): See https://github.com/bluesky/ophyd/issues/1115 """ - progress_events: List[ProgressEvent] = [] + progress_events: list[ProgressEvent] = [] worker.progress_events.subscribe(lambda event, id: progress_events.append(event)) task: Task = Task(name="move", params={"moves": {"additional_status_device": 5.0}}) @@ -280,7 +281,7 @@ def raise_full(item): def assert_run_produces_worker_events( - expected_events: List[WorkerEvent], + expected_events: list[WorkerEvent], worker: Worker, task_id: str, ) -> None: @@ -291,8 +292,8 @@ def begin_task_and_wait_until_complete( worker: Worker, task_id: str, timeout: float = 5.0, -) -> List[WorkerEvent]: - events: "Future[List[WorkerEvent]]" = take_events( +) -> list[WorkerEvent]: + events: "Future[list[WorkerEvent]]" = take_events( worker.worker_events, lambda event: event.is_complete(), ) @@ -343,18 +344,18 @@ def test_worker_and_data_events_produce_in_order(worker: Worker) -> None: def assert_running_count_plan_produces_ordered_worker_and_data_events( - expected_events: List[Union[WorkerEvent, DataEvent]], + expected_events: list[Union[WorkerEvent, DataEvent]], worker: Worker, task: Task = Task(name="count", params={"detectors": ["image_det"], "num": 1}), # noqa: B008 timeout: float = 5.0, ) -> None: - event_streams: List[EventStream[Any, int]] = [ + event_streams: list[EventStream[Any, int]] = [ worker.data_events, worker.worker_events, ] count = itertools.count() - events: "Future[List[Any]]" = take_events_from_streams( + events: "Future[list[Any]]" = take_events_from_streams( event_streams, lambda _: next(count) >= len(expected_events) - 1, ) @@ -379,7 +380,7 @@ def assert_running_count_plan_produces_ordered_worker_and_data_events( def take_n_events( stream: EventStream[E, Any], num: int, -) -> "Future[List[E]]": +) -> "Future[list[E]]": count = itertools.count() return take_events(stream, lambda _: next(count) >= num) @@ -387,9 +388,9 @@ def take_n_events( def take_events( stream: EventStream[E, Any], cutoff_predicate: Callable[[E], bool], -) -> "Future[List[E]]": - events: List[E] = [] - future: "Future[List[E]]" = Future() +) -> "Future[list[E]]": + events: list[E] = [] + future: "Future[list[E]]" = Future() def on_event(event: E, event_id: Optional[str]) -> None: events.append(event) @@ -402,9 +403,9 @@ def on_event(event: E, event_id: Optional[str]) -> None: def take_events_from_streams( - streams: List[EventStream[Any, int]], + streams: list[EventStream[Any, int]], cutoff_predicate: Callable[[Any], bool], -) -> "Future[List[Any]]": +) -> "Future[list[Any]]": """Returns a collated list of futures for events in numerous event streams. The support for generic and algebraic types doesn't appear to extend to @@ -424,8 +425,8 @@ def take_events_from_streams( ] """ - events: List[Any] = [] - future: "Future[List[Any]]" = Future() + events: list[Any] = [] + future: "Future[list[Any]]" = Future() def on_event(event: Any, event_id: Optional[str]) -> None: print(event) @@ -436,7 +437,7 @@ def on_event(event: Any, event_id: Optional[str]) -> None: for stream in streams: sub = stream.subscribe(on_event) - def callback(unused: Future[List[Any]], stream=stream, sub=sub): + def callback(unused: Future[list[Any]], stream=stream, sub=sub): stream.unsubscribe(sub) future.add_done_callback(callback)