Skip to content

Commit

Permalink
fix linters
Browse files Browse the repository at this point in the history
  • Loading branch information
alex2211-put committed Sep 25, 2024
1 parent f4d3bcc commit a779513
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 15 deletions.
16 changes: 7 additions & 9 deletions ydb/_topic_reader/topic_reader_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,19 +343,17 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess
self._update_token_event.set()

self._background_tasks.add(
topic_common.wrap_create_asyncio_task(self._read_messages_loop, task_name="read_messages_loop"),
)
topic_common.wrap_create_asyncio_task(self._read_messages_loop, "read_messages_loop"),
)
self._background_tasks.add(
topic_common.wrap_create_asyncio_task(self._decode_batches_loop, task_name="decode_batches"),
)
topic_common.wrap_create_asyncio_task(self._decode_batches_loop, "decode_batches"),
)
if self._get_token_function:
self._background_tasks.add(
topic_common.wrap_create_asyncio_task(self._update_token_loop, task_name="update_token_loop"),
)
topic_common.wrap_create_asyncio_task(self._update_token_loop, "update_token_loop"),
)
self._background_tasks.add(
topic_common.wrap_create_asyncio_task(
self._handle_background_errors, task_name="handle_background_errors",
),
topic_common.wrap_create_asyncio_task(self._handle_background_errors, "handle_background_errors"),
)

async def wait_error(self):
Expand Down
12 changes: 6 additions & 6 deletions ydb/_topic_writer/topic_writer_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,8 @@ def __init__(self, driver: SupportedDriverType, settings: WriterSettings):
self._new_messages = asyncio.Queue()
self._stop_reason = self._loop.create_future()
self._background_tasks = [
topic_common.wrap_create_asyncio_task(self._connection_loop, task_name="connection_loop"),
topic_common.wrap_create_asyncio_task(self._encode_loop, task_name="encode_loop"),
topic_common.wrap_create_asyncio_task(self._connection_loop, "connection_loop"),
topic_common.wrap_create_asyncio_task(self._encode_loop, "encode_loop"),
]

self._state_changed = asyncio.Event()
Expand Down Expand Up @@ -368,10 +368,10 @@ async def _connection_loop(self):
self._stream_connected.set()

send_loop = topic_common.wrap_create_asyncio_task(
self._send_loop, task_name="writer send loop", stream_writer,
self._send_loop, "writer send loop", stream_writer,
)
receive_loop = topic_common.wrap_create_asyncio_task(
self._read_loop, task_name="writer receive loop", stream_writer,
self._read_loop, "writer receive loop", stream_writer,
)

tasks = [send_loop, receive_loop]
Expand Down Expand Up @@ -659,8 +659,8 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamWriteMes
if self._update_token_interval is not None:
self._update_token_event.set()
self._update_token_task = topic_common.wrap_create_asyncio_task(
self._update_token_loop, task_name="update_token_loop",
)
self._update_token_loop, "update_token_loop",
)

@staticmethod
def _ensure_ok(message: WriterMessagesFromServerToClient):
Expand Down

0 comments on commit a779513

Please sign in to comment.