diff --git a/ydb/_topic_common/common.py b/ydb/_topic_common/common.py index 7a97336e..c296116e 100644 --- a/ydb/_topic_common/common.py +++ b/ydb/_topic_common/common.py @@ -1,5 +1,6 @@ import asyncio import concurrent.futures +import sys import threading import typing from typing import Optional @@ -29,6 +30,12 @@ def wrapper(rpc_state, response_pb, driver=None): return wrapper +def wrap_create_asyncio_task(func: typing.Callable, *args, **kwargs, task_name: str): + if sys.hexversion < 0x03080000: + return asyncio.create_task(func(*args, **kwargs)) + return asyncio.create_task(func(*args, **kwargs), task_name=loop_name) + + _shared_event_loop_lock = threading.Lock() _shared_event_loop: Optional[asyncio.AbstractEventLoop] = None diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index e871e549..4226badb 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -3,6 +3,7 @@ import asyncio import concurrent.futures import gzip +import sys import typing from asyncio import Task from collections import deque @@ -10,6 +11,7 @@ import ydb from .. import _apis, issues +from .._topic_common import common as topic_common from .._utilities import AtomicCounter from ..aio import Driver from ..issues import Error as YdbError, _process_response @@ -87,7 +89,10 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): def __del__(self): if not self._closed: - self._loop.create_task(self.close(flush=False)) + if sys.hexversion < 0x03080000: + self._loop.create_task(self.close(flush=False)) + else: + self._loop.create_task(self.close(flush=False), name="close reader") async def wait_message(self): """ @@ -337,11 +342,21 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess self._update_token_event.set() - self._background_tasks.add(asyncio.create_task(self._read_messages_loop())) - self._background_tasks.add(asyncio.create_task(self._decode_batches_loop())) + self._background_tasks.add( + topic_common.wrap_create_asyncio_task(self._read_messages_loop, task_name="read_messages_loop"), + ) + self._background_tasks.add( + topic_common.wrap_create_asyncio_task(self._decode_batches_loop, task_name="decode_batches"), + ) if self._get_token_function: - self._background_tasks.add(asyncio.create_task(self._update_token_loop())) - self._background_tasks.add(asyncio.create_task(self._handle_background_errors())) + self._background_tasks.add( + topic_common.wrap_create_asyncio_task(self._update_token_loop, task_name="update_token_loop"), + ) + self._background_tasks.add( + topic_common.wrap_create_asyncio_task( + self._handle_background_errors, task_name="handle_background_errors", + ), + ) async def wait_error(self): raise await self._first_error diff --git a/ydb/_topic_writer/topic_writer_asyncio.py b/ydb/_topic_writer/topic_writer_asyncio.py index 22b6dfaf..08687f97 100644 --- a/ydb/_topic_writer/topic_writer_asyncio.py +++ b/ydb/_topic_writer/topic_writer_asyncio.py @@ -28,6 +28,7 @@ issues, ) from .._errors import check_retriable_error +from .._topic_common import common as topic_common from ..retries import RetrySettings from .._grpc.grpcwrapper.ydb_topic_public_types import PublicCodec from .._grpc.grpcwrapper.ydb_topic import ( @@ -231,8 +232,8 @@ def __init__(self, driver: SupportedDriverType, settings: WriterSettings): self._new_messages = asyncio.Queue() self._stop_reason = self._loop.create_future() self._background_tasks = [ - asyncio.create_task(self._connection_loop()), - asyncio.create_task(self._encode_loop()), + topic_common.wrap_create_asyncio_task(self._connection_loop, task_name="connection_loop"), + topic_common.wrap_create_asyncio_task(self._encode_loop, task_name="encode_loop"), ] self._state_changed = asyncio.Event() @@ -366,8 +367,12 @@ async def _connection_loop(self): self._stream_connected.set() - send_loop = asyncio.create_task(self._send_loop(stream_writer)) - receive_loop = asyncio.create_task(self._read_loop(stream_writer)) + send_loop = topic_common.wrap_create_asyncio_task( + self._send_loop, stream_writer, task_name="writer send loop", + ) + receive_loop = topic_common.wrap_create_asyncio_task( + self._read_loop, stream_writer, task_name="writer receive loop", + ) tasks = [send_loop, receive_loop] done, _ = await asyncio.wait([send_loop, receive_loop], return_when=asyncio.FIRST_COMPLETED) @@ -653,7 +658,9 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamWriteMes if self._update_token_interval is not None: self._update_token_event.set() - self._update_token_task = asyncio.create_task(self._update_token_loop()) + self._update_token_task = topic_common.wrap_create_asyncio_task( + self._update_token_loop, task_name="update_token_loop", + ) @staticmethod def _ensure_ok(message: WriterMessagesFromServerToClient):