From ff86b8d04d1023cb49376af6b213a33531355386 Mon Sep 17 00:00:00 2001 From: Jylpah Date: Wed, 20 Dec 2023 20:04:43 +0200 Subject: [PATCH 01/13] Test cross-OS testing --- .github/workflows/python-package.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index e3838fe..c0f1da9 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -11,13 +11,13 @@ on: jobs: build: - runs-on: [ ubuntu-latest ] + # runs-on: [ ubuntu-latest ] strategy: fail-fast: false matrix: python-version: [ "3.11", "3.12"] - # os: [ ubuntu-latest, windows-latest, macos-latest ] - # runs-on: ${{ matrix.os }} + os: [ ubuntu-latest, windows-latest, macos-latest ] + runs-on: ${{ matrix.os }} steps: - uses: actions/checkout@v3 From 40ab224cafcf5609690ad0cf6db6a85b4f841ef3 Mon Sep 17 00:00:00 2001 From: jylpah Date: Wed, 20 Dec 2023 20:19:09 +0200 Subject: [PATCH 02/13] add ThrottledClientSession(trust_env=True") --- tests/test_throttledclientsession.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_throttledclientsession.py b/tests/test_throttledclientsession.py index c785c25..42ddc4c 100644 --- a/tests/test_throttledclientsession.py +++ b/tests/test_throttledclientsession.py @@ -309,7 +309,7 @@ def avg_rate(timings: list[float]) -> float: async def _get(url: str, rate: float, N: int) -> list[float]: """Test timings of N/sec get""" timings: list[float] = list() - async with ThrottledClientSession(rate_limit=rate) as session: + async with ThrottledClientSession(rate_limit=rate, trust_env=True) as session: for _ in range(N): async with session.get(url, ssl=False) as resp: assert resp.status == 200, f"request failed, HTTP STATUS={resp.status}" @@ -394,7 +394,7 @@ async def test_3_get_json(server_url: str, json_path: str) -> None: N: int = N_SLOW url: str = server_url + json_path res: Any | None - async with ThrottledClientSession(rate_limit=rate_limit) as session: + async with ThrottledClientSession(rate_limit=rate_limit, trust_env=True) as session: for _ in range(N): if (_ := await get_url_JSON(session=session, url=url, retries=2)) is None: assert False, "get_url_JSON() returned None" @@ -408,7 +408,7 @@ async def test_3_get_json(server_url: str, json_path: str) -> None: # url: str = server_url + json_path # res: Any | None # parents: list[JSONParent] = json_data() -# async with ThrottledClientSession(rate_limit=rate_limit) as session: +# async with ThrottledClientSession(rate_limit=rate_limit, trust_env=True) as session: # for parent in parents: # if ( # res := await post_url( From 036d9a0298008405481d837fac2cf86babd4e01c Mon Sep 17 00:00:00 2001 From: jylpah Date: Wed, 20 Dec 2023 20:23:30 +0200 Subject: [PATCH 03/13] add sleep(0.3) to wait the HTTP server to start --- tests/test_throttledclientsession.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_throttledclientsession.py b/tests/test_throttledclientsession.py index 42ddc4c..88ed392 100644 --- a/tests/test_throttledclientsession.py +++ b/tests/test_throttledclientsession.py @@ -350,6 +350,7 @@ async def test_1_fast_get(server_url: str) -> None: """Test timings of N/sec get""" rate_limit: float = RATE_FAST N: int = N_FAST + await sleep(0.3) # wait the server to start timings: list[float] = await _get(server_url, rate=rate_limit, N=N) rate_max: float = max_rate(timings, rate_limit) rate_avg: float = avg_rate(timings) @@ -371,6 +372,7 @@ async def test_2_slow_get(server_url: str) -> None: """Test timings of N/sec get""" rate_limit: float = RATE_SLOW N: int = N_SLOW + await sleep(0.3) # wait the server to start timings: list[float] = await _get(server_url, rate=rate_limit, N=N) rate_max: float = max_rate(timings, rate_limit) rate_avg: float = avg_rate(timings) @@ -393,7 +395,7 @@ async def test_3_get_json(server_url: str, json_path: str) -> None: rate_limit: float = RATE_SLOW N: int = N_SLOW url: str = server_url + json_path - res: Any | None + await sleep(0.3) # wait the server to start async with ThrottledClientSession(rate_limit=rate_limit, trust_env=True) as session: for _ in range(N): if (_ := await get_url_JSON(session=session, url=url, retries=2)) is None: From 7d27e8c8d30290b022cab88cd5ef5f4b30c49916 Mon Sep 17 00:00:00 2001 From: jylpah Date: Wed, 20 Dec 2023 20:31:21 +0200 Subject: [PATCH 04/13] increase HTTP server wait to 1 sec --- tests/test_throttledclientsession.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_throttledclientsession.py b/tests/test_throttledclientsession.py index 88ed392..14f1443 100644 --- a/tests/test_throttledclientsession.py +++ b/tests/test_throttledclientsession.py @@ -350,7 +350,7 @@ async def test_1_fast_get(server_url: str) -> None: """Test timings of N/sec get""" rate_limit: float = RATE_FAST N: int = N_FAST - await sleep(0.3) # wait the server to start + await sleep(1) # wait the server to start timings: list[float] = await _get(server_url, rate=rate_limit, N=N) rate_max: float = max_rate(timings, rate_limit) rate_avg: float = avg_rate(timings) @@ -372,7 +372,7 @@ async def test_2_slow_get(server_url: str) -> None: """Test timings of N/sec get""" rate_limit: float = RATE_SLOW N: int = N_SLOW - await sleep(0.3) # wait the server to start + await sleep(1) # wait the server to start timings: list[float] = await _get(server_url, rate=rate_limit, N=N) rate_max: float = max_rate(timings, rate_limit) rate_avg: float = avg_rate(timings) @@ -395,7 +395,7 @@ async def test_3_get_json(server_url: str, json_path: str) -> None: rate_limit: float = RATE_SLOW N: int = N_SLOW url: str = server_url + json_path - await sleep(0.3) # wait the server to start + await sleep(1) # wait the server to start async with ThrottledClientSession(rate_limit=rate_limit, trust_env=True) as session: for _ in range(N): if (_ := await get_url_JSON(session=session, url=url, retries=2)) is None: From 1ee29d5a748aab2eb792d598393dfa63c1bb6d68 Mon Sep 17 00:00:00 2001 From: jylpah Date: Wed, 20 Dec 2023 20:31:46 +0200 Subject: [PATCH 05/13] remove the sleep from all but the first test requiring the HTTP mock server --- tests/test_throttledclientsession.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_throttledclientsession.py b/tests/test_throttledclientsession.py index 14f1443..61bd7fe 100644 --- a/tests/test_throttledclientsession.py +++ b/tests/test_throttledclientsession.py @@ -372,7 +372,7 @@ async def test_2_slow_get(server_url: str) -> None: """Test timings of N/sec get""" rate_limit: float = RATE_SLOW N: int = N_SLOW - await sleep(1) # wait the server to start + # await sleep(1) # wait the server to start timings: list[float] = await _get(server_url, rate=rate_limit, N=N) rate_max: float = max_rate(timings, rate_limit) rate_avg: float = avg_rate(timings) @@ -395,7 +395,7 @@ async def test_3_get_json(server_url: str, json_path: str) -> None: rate_limit: float = RATE_SLOW N: int = N_SLOW url: str = server_url + json_path - await sleep(1) # wait the server to start + # await sleep(1) # wait the server to start async with ThrottledClientSession(rate_limit=rate_limit, trust_env=True) as session: for _ in range(N): if (_ := await get_url_JSON(session=session, url=url, retries=2)) is None: From fccb385369fd466e513ee5f85d95588c53021c58 Mon Sep 17 00:00:00 2001 From: jylpah Date: Wed, 20 Dec 2023 20:36:01 +0200 Subject: [PATCH 06/13] increase timeout so that windows-latest could pass on GH... --- tests/test_throttledclientsession.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_throttledclientsession.py b/tests/test_throttledclientsession.py index 61bd7fe..0c5a85f 100644 --- a/tests/test_throttledclientsession.py +++ b/tests/test_throttledclientsession.py @@ -344,7 +344,7 @@ def json_path() -> str: return JSON_PATH -@pytest.mark.timeout(20) +@pytest.mark.timeout(60) @pytest.mark.asyncio async def test_1_fast_get(server_url: str) -> None: """Test timings of N/sec get""" @@ -366,7 +366,7 @@ async def test_1_fast_get(server_url: str) -> None: {', '.join([str(t) for t in timings])}""" -@pytest.mark.timeout(40) +@pytest.mark.timeout(60) @pytest.mark.asyncio async def test_2_slow_get(server_url: str) -> None: """Test timings of N/sec get""" @@ -388,7 +388,7 @@ async def test_2_slow_get(server_url: str) -> None: {', '.join([str(t) for t in timings])}""" -@pytest.mark.timeout(20) +@pytest.mark.timeout(60) @pytest.mark.asyncio async def test_3_get_json(server_url: str, json_path: str) -> None: """Test get_url_JSON()""" From bcdfd5e950cdd3bfd3a723d490550f4ff998d1af Mon Sep 17 00:00:00 2001 From: jylpah Date: Wed, 20 Dec 2023 22:06:10 +0200 Subject: [PATCH 07/13] add TODO comment --- tests/test_throttledclientsession.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/test_throttledclientsession.py b/tests/test_throttledclientsession.py index 0c5a85f..71793e0 100644 --- a/tests/test_throttledclientsession.py +++ b/tests/test_throttledclientsession.py @@ -45,6 +45,9 @@ message = logger.warning +# TODO: should I use pytest-httpserver instead? + + def json_data() -> List[Dict[str, str | int | float | None]]: """Generate JSON test data""" From 74efb4a75959fb74c2d0ae16ef9bd5359653d08b Mon Sep 17 00:00:00 2001 From: jylpah Date: Wed, 20 Dec 2023 22:50:09 +0200 Subject: [PATCH 08/13] remove unused deps --- tests/test_filequeue.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/test_filequeue.py b/tests/test_filequeue.py index 8addc55..fb961b6 100644 --- a/tests/test_filequeue.py +++ b/tests/test_filequeue.py @@ -1,8 +1,6 @@ import sys import pytest # type: ignore -from os.path import dirname, realpath, join as pjoin, basename from pathlib import Path -from random import choice from os import makedirs from fnmatch import fnmatch, fnmatchcase import logging @@ -34,7 +32,7 @@ # ######################################################## -FIXTURE_DIR = Path(dirname(realpath(__file__))) +FIXTURE_DIR = Path(__file__).parent TEST_BASE = Path("06_FileQueue") From d7393865c9c44833e498a8ee05ae727e6dc20433 Mon Sep 17 00:00:00 2001 From: jylpah Date: Wed, 20 Dec 2023 22:50:44 +0200 Subject: [PATCH 09/13] put(): FIX: Win path matching --- src/pyutils/filequeue.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/pyutils/filequeue.py b/src/pyutils/filequeue.py index 35f63a6..50e378f 100644 --- a/src/pyutils/filequeue.py +++ b/src/pyutils/filequeue.py @@ -101,14 +101,12 @@ async def mk_queue(self, files: Sequence[str | Path]) -> bool: return await self.finish() async def put(self, path: Path) -> None: - """Recursive function to build process queueu. Sanitize filename""" + """Recursive function to build process queue. Sanitize filename""" assert isinstance(path, Path), "path has to be type Path()" try: if path.is_dir(): - for child in path.rglob(self._filter): - if child.is_file(): - debug("Adding file to queue: %s", str(child)) - await super().put(child) + for child in path.iterdir(): + await self.put(child) elif path.is_file() and self.match(path): debug("Adding file to queue: %s", str(path)) await super().put(path) From b839bf11bcba5dbdeab064adab20bb82224ad060 Mon Sep 17 00:00:00 2001 From: jylpah Date: Thu, 21 Dec 2023 17:28:02 +0200 Subject: [PATCH 10/13] FIX: skip tests on windows since missing server socket support --- tests/test_throttledclientsession.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/test_throttledclientsession.py b/tests/test_throttledclientsession.py index 71793e0..090ed5f 100644 --- a/tests/test_throttledclientsession.py +++ b/tests/test_throttledclientsession.py @@ -347,6 +347,10 @@ def json_path() -> str: return JSON_PATH +@pytest.mark.skipif( + sys.platform == "win32", + reason="not supported on windows: asyncio.loop.create_unix_connection", +) @pytest.mark.timeout(60) @pytest.mark.asyncio async def test_1_fast_get(server_url: str) -> None: @@ -369,6 +373,10 @@ async def test_1_fast_get(server_url: str) -> None: {', '.join([str(t) for t in timings])}""" +@pytest.mark.skipif( + sys.platform == "win32", + reason="not supported on windows: asyncio.loop.create_unix_connection", +) @pytest.mark.timeout(60) @pytest.mark.asyncio async def test_2_slow_get(server_url: str) -> None: @@ -391,6 +399,10 @@ async def test_2_slow_get(server_url: str) -> None: {', '.join([str(t) for t in timings])}""" +@pytest.mark.skipif( + sys.platform == "win32", + reason="not supported on windows: asyncio.loop.create_unix_connection", +) @pytest.mark.timeout(60) @pytest.mark.asyncio async def test_3_get_json(server_url: str, json_path: str) -> None: From a2ea57eb790006fbec3250692412e7f446f35d8e Mon Sep 17 00:00:00 2001 From: jylpah Date: Thu, 21 Dec 2023 17:56:58 +0200 Subject: [PATCH 11/13] clean up imports --- src/pyutils/eventcounter.py | 368 ++++++++++---------- src/pyutils/filequeue.py | 2 - src/pyutils/multilevelformatter.py | 4 + src/pyutils/throttledclientsession.py | 480 +++++++++++++------------- src/pyutils/utils.py | 2 +- tests/test_throttledclientsession.py | 8 +- 6 files changed, 429 insertions(+), 435 deletions(-) diff --git a/src/pyutils/eventcounter.py b/src/pyutils/eventcounter.py index 40104ae..50251fe 100644 --- a/src/pyutils/eventcounter.py +++ b/src/pyutils/eventcounter.py @@ -1,200 +1,192 @@ ## ----------------------------------------------------------- #### Class EventCounter() -# Class to log/count events, pass them to parent function +# Class to log/count events, pass them to parent function # and merge the results ## ----------------------------------------------------------- -from collections import defaultdict -from typing import Callable, Optional, Union -from asyncio import gather, Task -import time -import logging +from collections import defaultdict +from typing import Callable, Optional +from asyncio import gather, Task +import logging logger = logging.getLogger(__name__) -debug = logger.debug +debug = logger.debug message = logger.warning verbose = logger.info -error = logger.error +error = logger.error -FuncTypeFormatter = Callable[[str], str] +FuncTypeFormatter = Callable[[str], str] FuncTypeFormatterParam = Optional[FuncTypeFormatter] -class EventCounter(): - """Count events for categories""" - def __init__(self, name: str = '', - totals: Optional[str] = None, - categories: list[str] = list(), - errors: list[str] = list(), - int_format: FuncTypeFormatterParam = None, - float_format: FuncTypeFormatterParam = None): - assert name is not None, "param 'name' cannot be None" - assert categories is not None, "param 'categories' cannot be None" - assert errors is not None, "param 'errors' cannot be None" - - self.name : str = name - self._log : defaultdict[str, int] = defaultdict(self._def_value_zero) - self._error_cats: list[str] = errors - self._error_status: bool = False - self._totals = totals - - # formatters - self._format_int : FuncTypeFormatter = self._default_int_formatter - self._format_float : FuncTypeFormatter = self._default_float_formatter - if int_format is not None: - self._format_int = int_format - if float_format is not None: - self._format_float = float_format - - # init categories - for cat in categories: - self.log(cat, 0) - - - @classmethod - def _def_value_zero(cls) -> int: - return 0 - - - def add_error_categories(self, errors: list[str] = list()) -> bool: - """Add error categories and return if an error has been logged""" - self._error_cats += errors - if self.sum(self._error_cats) > 0: - self._error_status = True - return self._error_status - - - def _default_int_formatter(self, category: str) -> str: - assert category is not None, "param 'category' cannot be None" - return f"{category:40}: {self.get_value(category)}" - - - def _default_float_formatter(self, category: str) -> str: - assert category is not None, "param 'category' cannot be None" - return f"{category:40}: {self.get_value(category):.2f}" - - - def log(self, category: str, count: int = 1) -> None: - assert category is not None, 'category cannot be None' - assert count is not None, 'count cannot be None' - self._log[category] += count - if category in self._error_cats: - self._error_status = True - return None - - - def get_long_cat(self, category: str) -> str: - assert category is not None, "param 'category' cannot be None" - return f"{self.name}: {category}" - - - def _get_str(self, category: str) -> str: - assert category is not None, 'category cannot be None' - return self._format_int(category) - - - def get_value(self, category: str) -> int: - assert category is not None, "param 'category' cannot be None" - try: - return self._log[category] - except: - logger.error(f"invalid categgory: {category}") - return 0 - - - def get_values(self) -> dict[str, int]: - return self._log - - - def sum(self, categories: list[str]) -> int: - ret = 0 - for cat in categories: - ret += self.get_value(cat) - return ret - - - def get_categories(self) -> list[str]: - return list(self._log.keys()) - - - def get_error_status(self) -> bool: - return self._error_status - - - def merge(self, B: 'EventCounter') -> bool: - """Merge two EventCounter instances together""" - assert isinstance(B, EventCounter), f"input is not type of 'EventCounter' but: {type(B)}" - - try: - if not isinstance(B, EventCounter): - logger.error(f"input is not type of 'EventCounter' but: {type(B)}") - return False - for cat in B.get_categories(): - value: int = B.get_value(cat) - self.log(cat, value) - if self._totals is not None: - self.log(f"{self._totals}: {cat}", value) - self._error_status = self._error_status or B.get_error_status() - return True - except Exception as err: - logger.error(f'{err}') - return False - - - def merge_child(self, B: 'EventCounter') -> bool: - """Merge two EventCounter instances together""" - assert isinstance(B, EventCounter), f"input is not type of 'EventCounter' but: {type(B)}" - - try: - for cat in B.get_categories(): - value: int = B.get_value(cat) - self.log(B.get_long_cat(cat), value) - if self._totals is not None: - self.log(f"{self._totals}: {cat}", value) - self._error_status = self._error_status or B.get_error_status() - return True - except Exception as err: - logger.error(f'{err}') - return False - - - def get_header(self) -> str: - return f"{self.name}" + (': ERROR occured' if self.get_error_status() else '') - - - def print(self, do_print : bool = True, clean: bool = False) -> Optional[str]: - try: - if do_print: - message(self.get_header()) - for cat in sorted(self._log): - if clean and self.get_value(cat) == 0: - continue - message(self._get_str(cat)) - return None - else: - ret = self.get_header() - for cat in sorted(self._log): - if clean and self.get_value(cat) == 0: - continue - ret += f"\n{self._get_str(cat)}" - return ret - except Exception as err: - logger.error(f'{err}') - return None - - - async def gather_stats(self, tasks: list[Task], - merge_child: bool = True, - cancel : bool = True) -> None: - """Wrapper to gather results from tasks and return the stats and the LAST exception """ - if cancel: - for task in tasks: - task.cancel() - for res in await gather(*tasks, return_exceptions=True): - if isinstance(res, EventCounter): - if merge_child: - self.merge_child(res) - else: - self.merge(res) - elif type(res) is BaseException: - error(f'Task raised an exception: {res}') - return None \ No newline at end of file + + +class EventCounter: + """Count events for categories""" + + def __init__( + self, + name: str = "", + totals: Optional[str] = None, + categories: list[str] = list(), + errors: list[str] = list(), + int_format: FuncTypeFormatterParam = None, + float_format: FuncTypeFormatterParam = None, + ): + assert name is not None, "param 'name' cannot be None" + assert categories is not None, "param 'categories' cannot be None" + assert errors is not None, "param 'errors' cannot be None" + + self.name: str = name + self._log: defaultdict[str, int] = defaultdict(self._def_value_zero) + self._error_cats: list[str] = errors + self._error_status: bool = False + self._totals = totals + + # formatters + self._format_int: FuncTypeFormatter = self._default_int_formatter + self._format_float: FuncTypeFormatter = self._default_float_formatter + if int_format is not None: + self._format_int = int_format + if float_format is not None: + self._format_float = float_format + + # init categories + for cat in categories: + self.log(cat, 0) + + @classmethod + def _def_value_zero(cls) -> int: + return 0 + + def add_error_categories(self, errors: list[str] = list()) -> bool: + """Add error categories and return if an error has been logged""" + self._error_cats += errors + if self.sum(self._error_cats) > 0: + self._error_status = True + return self._error_status + + def _default_int_formatter(self, category: str) -> str: + assert category is not None, "param 'category' cannot be None" + return f"{category:40}: {self.get_value(category)}" + + def _default_float_formatter(self, category: str) -> str: + assert category is not None, "param 'category' cannot be None" + return f"{category:40}: {self.get_value(category):.2f}" + + def log(self, category: str, count: int = 1) -> None: + assert category is not None, "category cannot be None" + assert count is not None, "count cannot be None" + self._log[category] += count + if category in self._error_cats: + self._error_status = True + return None + + def get_long_cat(self, category: str) -> str: + assert category is not None, "param 'category' cannot be None" + return f"{self.name}: {category}" + + def _get_str(self, category: str) -> str: + assert category is not None, "category cannot be None" + return self._format_int(category) + + def get_value(self, category: str) -> int: + assert category is not None, "param 'category' cannot be None" + try: + return self._log[category] + except: + logger.error(f"invalid categgory: {category}") + return 0 + + def get_values(self) -> dict[str, int]: + return self._log + + def sum(self, categories: list[str]) -> int: + ret = 0 + for cat in categories: + ret += self.get_value(cat) + return ret + + def get_categories(self) -> list[str]: + return list(self._log.keys()) + + def get_error_status(self) -> bool: + return self._error_status + + def merge(self, B: "EventCounter") -> bool: + """Merge two EventCounter instances together""" + assert isinstance( + B, EventCounter + ), f"input is not type of 'EventCounter' but: {type(B)}" + + try: + if not isinstance(B, EventCounter): + logger.error(f"input is not type of 'EventCounter' but: {type(B)}") + return False + for cat in B.get_categories(): + value: int = B.get_value(cat) + self.log(cat, value) + if self._totals is not None: + self.log(f"{self._totals}: {cat}", value) + self._error_status = self._error_status or B.get_error_status() + return True + except Exception as err: + logger.error(f"{err}") + return False + + def merge_child(self, B: "EventCounter") -> bool: + """Merge two EventCounter instances together""" + assert isinstance( + B, EventCounter + ), f"input is not type of 'EventCounter' but: {type(B)}" + + try: + for cat in B.get_categories(): + value: int = B.get_value(cat) + self.log(B.get_long_cat(cat), value) + if self._totals is not None: + self.log(f"{self._totals}: {cat}", value) + self._error_status = self._error_status or B.get_error_status() + return True + except Exception as err: + logger.error(f"{err}") + return False + + def get_header(self) -> str: + return f"{self.name}" + (": ERROR occured" if self.get_error_status() else "") + + def print(self, do_print: bool = True, clean: bool = False) -> Optional[str]: + try: + if do_print: + message(self.get_header()) + for cat in sorted(self._log): + if clean and self.get_value(cat) == 0: + continue + message(self._get_str(cat)) + return None + else: + ret = self.get_header() + for cat in sorted(self._log): + if clean and self.get_value(cat) == 0: + continue + ret += f"\n{self._get_str(cat)}" + return ret + except Exception as err: + logger.error(f"{err}") + return None + + async def gather_stats( + self, tasks: list[Task], merge_child: bool = True, cancel: bool = True + ) -> None: + """Wrapper to gather results from tasks and return the stats and the LAST exception""" + if cancel: + for task in tasks: + task.cancel() + for res in await gather(*tasks, return_exceptions=True): + if isinstance(res, EventCounter): + if merge_child: + self.merge_child(res) + else: + self.merge(res) + elif type(res) is BaseException: + error(f"Task raised an exception: {res}") + return None diff --git a/src/pyutils/filequeue.py b/src/pyutils/filequeue.py index 50e378f..109d9d8 100644 --- a/src/pyutils/filequeue.py +++ b/src/pyutils/filequeue.py @@ -7,11 +7,9 @@ ## ----------------------------------------------------------- import logging -import asyncio # from asyncio import Queue import aioconsole # type: ignore -from os import scandir, path from fnmatch import fnmatch, fnmatchcase from pathlib import Path from typing import Optional, Sequence diff --git a/src/pyutils/multilevelformatter.py b/src/pyutils/multilevelformatter.py index 67f1b42..34f7ef8 100644 --- a/src/pyutils/multilevelformatter.py +++ b/src/pyutils/multilevelformatter.py @@ -29,6 +29,10 @@ def set_mlevel_logging( class MultilevelFormatter(logging.Formatter): + """ + logging.Formatter that simplifies setting different log formats for different log levels + """ + _levels: list[int] = [ logging.NOTSET, logging.DEBUG, diff --git a/src/pyutils/throttledclientsession.py b/src/pyutils/throttledclientsession.py index 074ea42..2e2d122 100644 --- a/src/pyutils/throttledclientsession.py +++ b/src/pyutils/throttledclientsession.py @@ -3,254 +3,258 @@ # # Rate-limited async http client session # -# Inherits aiohttp.ClientSession +# Inherits aiohttp.ClientSession ## ----------------------------------------------------------- from typing import Optional, Union from aiohttp import ClientSession, ClientResponse -from asyncio import Queue, Task, CancelledError, TimeoutError, sleep, create_task, wait_for +from asyncio import ( + Queue, + Task, + CancelledError, + TimeoutError, + sleep, + create_task, + wait_for, +) import time import logging from warnings import warn import re from math import ceil, log +from deprecated import deprecated -logger = logging.getLogger() -error = logger.error -message = logger.warning -verbose = logger.info -debug = logger.debug +logger = logging.getLogger() +error = logger.error +message = logger.warning +verbose = logger.info +debug = logger.debug -class ThrottledClientSession(ClientSession): - """Rate-throttled client session class inherited from aiohttp.ClientSession)""" - - _LOG_FILLER : int = 20 - def __init__(self, rate_limit: float = 0, filters: list[str] = list() , - limit_filtered: bool = False, re_filter: bool = False, *args,**kwargs) -> None: - assert isinstance(rate_limit, (int, float)), "rate_limit has to be float" - assert isinstance(filters, list), "filters has to be list" - assert isinstance(limit_filtered, bool),"limit_filtered has to be bool" - assert isinstance(re_filter, bool), "re_filter has to be bool" - - super().__init__(*args,**kwargs) - - self._rate_limit : float = rate_limit - self._fillerTask : Optional[Task] = None - self._qlen : int = 1 - if rate_limit > self._LOG_FILLER: - self._qlen = ceil(log(rate_limit)) - self._queue : Queue = Queue(maxsize=self._qlen) - self._start_time : float = time.time() - self._count : int = 0 - self._errors : int = 0 - self._limit_filtered: bool = limit_filtered - self._re_filter : bool = re_filter - self._filters : list[Union[str, re.Pattern]] = list() - - if re_filter: - for filter in filters: - self._filters.append(re.compile(filter)) - else: - for filter in filters: - self._filters.append(filter) - self._set_limit() - - - def get_rate(self) -> float: - """Return rate of requests""" - warn("Depreaciated: Use 'rate' property") - return self._count / (time.time() - self._start_time) - - @classmethod - def _rate_str(cls, rate: float) -> str: - """Get rate as a formatted string""" - if rate >= 1: - return f'{rate:.1f} requests/sec' - elif rate > 0: - return f'{1/rate:.1f} secs/request' - else: - return "-" - - @property - def rate_limit(self) -> float: - return self._rate_limit - - - @property - def rate_limit_str(self) -> str: - """Give rate-limit as formatted string""" - return self._rate_str(self._rate_limit) - - - @property - def rate(self) -> float: - return self._count / (time.time() - self._start_time) - - - @property - def rate_str(self) -> str: - return self._rate_str(self.rate) - - - @property - def count(self) -> int: - return self._count - - - @property - def errors(self) -> int: - return self._errors - - - # def get_stats(self) -> dict[str, float]: - # """Get session statistics""" - # res = {'rate' : self.rate, 'rate_limit': self.rate_limit, 'count' : self.count, 'errors': self.errors } - # return res - - - @property - def stats(self) -> str: - """Get session statistics as string""" - return f"rate limit: {self.rate_limit_str}, rate: {self.rate_str}, requests: {self.count}, errors: {self.errors}" - - - @property - def stats_dict(self) -> dict[str, float | int ]: - """Get session statistics as dict""" - res = { - 'rate' : self.rate, - 'rate_limit': self.rate_limit, - 'count' : self.count, - 'errors' : self.errors - } - return res - - - def get_stats_str(self) -> str: - """Print session statistics""" - error('DEPRECIATED: use self.stats') - return self.print_stats(self.stats_dict) - - - @classmethod - def print_stats(cls, stats: dict[str, float | int]) -> str: - try: - rate_limit : float = stats['rate_limit'] - rate : float = stats['rate'] - count : float = stats['count'] - errors : float = stats['errors'] - - rate_limit_str : str - if rate_limit >= 1 or rate_limit == 0: - rate_limit_str = f'{rate_limit:.1f} requests/sec' - else: - rate_limit_str = f'{1/rate_limit:.1f} secs/request' - - return f"rate limit: {rate_limit_str}, rate: {rate:.1f} request/sec, requests: {count:.0f}, errors: {errors:.0f}" - except KeyError as err: - return f'Incorrect stats format: {err}' - except Exception as err: - return f'Unexpected error: {err}' - - - def reset_counters(self) -> dict[str, float | int]: - """Reset rate counters and return current results""" - res = self.stats_dict - self._start_time = time.time() - self._count = 0 - return res - - - def _set_limit(self) -> float: - if self._fillerTask is not None: - self._fillerTask.cancel() - self._fillerTask = None - - if self._rate_limit > self._LOG_FILLER: - self._fillerTask = create_task(self._filler()) - elif self._rate_limit > 0: - self._fillerTask = create_task(self._filler_simple()) - - return self._rate_limit - - - async def close(self) -> None: - """Close rate-limiter's "bucket filler" task""" - debug(self.stats) - try: - if self._fillerTask is not None: - self._fillerTask.cancel() - await wait_for(self._fillerTask, timeout=0.5) - except TimeoutError as err: - debug(f'Timeout while cancelling bucket filler: {err}') - except CancelledError as err: - debug('Cancelled') - await super().close() - - - async def _filler_simple(self) -> None: - """Filler task to fill the leaky bucket algo""" - assert self.rate_limit > 0, "_filler cannot be started without rate limit" - try: - wait : float = self._qlen / self.rate_limit - # debug(f'SLEEP: {1/self.rate_limit}') - while True: - await self._queue.put(None) - await sleep(wait) - except CancelledError: - debug('Cancelled') - except Exception as err: - error(f'{err}') - # finally: - # self._queue = None - return None - - - async def _filler(self) -> None: - """Filler task to fill the leaky bucket algo. - Uses longer queue for performance (maybe) :-)""" - assert self.rate_limit > 0, "_filler cannot be started without rate limit" - try: - wait : float = self._qlen / self.rate_limit - # debug(f'SLEEP: {wait}') - while True: - for _ in range(self._qlen): - await self._queue.put(None) - await sleep(wait) - except CancelledError: - debug('Cancelled') - except Exception as err: - error(f'{err}') - return None - - - async def _request(self, *args,**kwargs) -> ClientResponse: - """Throttled _request()""" - if self.is_limited(*args): - await self._queue.get() - self._queue.task_done() - resp : ClientResponse = await super()._request(*args,**kwargs) - self._count += 1 - if not resp.ok: - self._errors += 1 - return resp - - - def is_limited(self, *args: str) -> bool: - """Check wether the rate limit should be applied""" - try: - if self._rate_limit == 0: - return False - url: str = args[1] - for filter in self._filters: - if isinstance(filter, re.Pattern) and filter.match(url) is not None: - return self._limit_filtered - elif isinstance(filter, str) and url.startswith(filter): - return self._limit_filtered - - return not self._limit_filtered - except Exception as err: - error(f'{err}') - return True +class ThrottledClientSession(ClientSession): + """ + Rate-throttled client session class inherited from aiohttp.ClientSession) + + Inherits from aiohttp.ClientSession that may cause a warning. + """ + + _LOG_FILLER: int = 20 + + def __init__( + self, + rate_limit: float = 0, + filters: list[str] = list(), + limit_filtered: bool = False, + re_filter: bool = False, + *args, + **kwargs, + ) -> None: + assert isinstance(rate_limit, (int, float)), "rate_limit has to be float" + assert isinstance(filters, list), "filters has to be list" + assert isinstance(limit_filtered, bool), "limit_filtered has to be bool" + assert isinstance(re_filter, bool), "re_filter has to be bool" + + super().__init__(*args, **kwargs) + + self._rate_limit: float = rate_limit + self._fillerTask: Optional[Task] = None + self._qlen: int = 1 + if rate_limit > self._LOG_FILLER: + self._qlen = ceil(log(rate_limit)) + self._queue: Queue = Queue(maxsize=self._qlen) + self._start_time: float = time.time() + self._count: int = 0 + self._errors: int = 0 + self._limit_filtered: bool = limit_filtered + self._re_filter: bool = re_filter + self._filters: list[Union[str, re.Pattern]] = list() + + if re_filter: + for filter in filters: + self._filters.append(re.compile(filter)) + else: + for filter in filters: + self._filters.append(filter) + self._set_limit() + + @deprecated(version="1.1.0", reason="Use 'rate' property instead") + def get_rate(self) -> float: + """Return rate of requests""" + warn("Depreaciated: Use 'rate' property") + return self._count / (time.time() - self._start_time) + + @classmethod + def _rate_str(cls, rate: float) -> str: + """Get rate as a formatted string""" + if rate >= 1: + return f"{rate:.1f} requests/sec" + elif rate > 0: + return f"{1/rate:.1f} secs/request" + else: + return "-" + + @property + def rate_limit(self) -> float: + return self._rate_limit + + @property + def rate_limit_str(self) -> str: + """Give rate-limit as formatted string""" + return self._rate_str(self._rate_limit) + + @property + def rate(self) -> float: + return self._count / (time.time() - self._start_time) + + @property + def rate_str(self) -> str: + return self._rate_str(self.rate) + + @property + def count(self) -> int: + return self._count + + @property + def errors(self) -> int: + return self._errors + + # def get_stats(self) -> dict[str, float]: + # """Get session statistics""" + # res = {'rate' : self.rate, 'rate_limit': self.rate_limit, 'count' : self.count, 'errors': self.errors } + # return res + + @property + def stats(self) -> str: + """Get session statistics as string""" + return f"rate limit: {self.rate_limit_str}, rate: {self.rate_str}, requests: {self.count}, errors: {self.errors}" + + @property + def stats_dict(self) -> dict[str, float | int]: + """Get session statistics as dict""" + res = { + "rate": self.rate, + "rate_limit": self.rate_limit, + "count": self.count, + "errors": self.errors, + } + return res + + # def get_stats_str(self) -> str: + # """Print session statistics""" + # error("DEPRECIATED: use self.stats") + # return self.print_stats(self.stats_dict) + + @classmethod + def print_stats(cls, stats: dict[str, float | int]) -> str: + try: + rate_limit: float = stats["rate_limit"] + rate: float = stats["rate"] + count: float = stats["count"] + errors: float = stats["errors"] + + rate_limit_str: str + if rate_limit >= 1 or rate_limit == 0: + rate_limit_str = f"{rate_limit:.1f} requests/sec" + else: + rate_limit_str = f"{1/rate_limit:.1f} secs/request" + + return f"rate limit: {rate_limit_str}, rate: {rate:.1f} request/sec, requests: {count:.0f}, errors: {errors:.0f}" + except KeyError as err: + return f"Incorrect stats format: {err}" + except Exception as err: + return f"Unexpected error: {err}" + + def reset_counters(self) -> dict[str, float | int]: + """Reset rate counters and return current results""" + res = self.stats_dict + self._start_time = time.time() + self._count = 0 + return res + + def _set_limit(self) -> float: + if self._fillerTask is not None: + self._fillerTask.cancel() + self._fillerTask = None + + if self._rate_limit > self._LOG_FILLER: + self._fillerTask = create_task(self._filler()) + elif self._rate_limit > 0: + self._fillerTask = create_task(self._filler_simple()) + + return self._rate_limit + + async def close(self) -> None: + """Close rate-limiter's "bucket filler" task""" + debug(self.stats) + try: + if self._fillerTask is not None: + self._fillerTask.cancel() + await wait_for(self._fillerTask, timeout=0.5) + except TimeoutError as err: + debug(f"Timeout while cancelling bucket filler: {err}") + except CancelledError as err: + debug("Cancelled") + await super().close() + + async def _filler_simple(self) -> None: + """Filler task to fill the leaky bucket algo""" + assert self.rate_limit > 0, "_filler cannot be started without rate limit" + try: + wait: float = self._qlen / self.rate_limit + # debug(f'SLEEP: {1/self.rate_limit}') + while True: + await self._queue.put(None) + await sleep(wait) + except CancelledError: + debug("Cancelled") + except Exception as err: + error(f"{err}") + # finally: + # self._queue = None + return None + + async def _filler(self) -> None: + """Filler task to fill the leaky bucket algo. + Uses longer queue for performance (maybe) :-)""" + assert self.rate_limit > 0, "_filler cannot be started without rate limit" + try: + wait: float = self._qlen / self.rate_limit + # debug(f'SLEEP: {wait}') + while True: + for _ in range(self._qlen): + await self._queue.put(None) + await sleep(wait) + except CancelledError: + debug("Cancelled") + except Exception as err: + error(f"{err}") + return None + + async def _request(self, *args, **kwargs) -> ClientResponse: + """Throttled _request()""" + if self.is_limited(*args): + await self._queue.get() + self._queue.task_done() + resp: ClientResponse = await super()._request(*args, **kwargs) + self._count += 1 + if not resp.ok: + self._errors += 1 + return resp + + def is_limited(self, *args: str) -> bool: + """Check wether the rate limit should be applied""" + try: + if self._rate_limit == 0: + return False + url: str = args[1] + for filter in self._filters: + if isinstance(filter, re.Pattern) and filter.match(url) is not None: + return self._limit_filtered + elif isinstance(filter, str) and url.startswith(filter): + return self._limit_filtered + + return not self._limit_filtered + except Exception as err: + error(f"{err}") + return True diff --git a/src/pyutils/utils.py b/src/pyutils/utils.py index d8d3459..2d8f0dc 100644 --- a/src/pyutils/utils.py +++ b/src/pyutils/utils.py @@ -17,7 +17,7 @@ from random import choices from configparser import ConfigParser from functools import wraps -from deprecated import deprecated # type: ignore +from deprecated import deprecated from typer import Typer from typer.testing import CliRunner as TyperRunner diff --git a/tests/test_throttledclientsession.py b/tests/test_throttledclientsession.py index 090ed5f..deb9c74 100644 --- a/tests/test_throttledclientsession.py +++ b/tests/test_throttledclientsession.py @@ -5,16 +5,12 @@ from itertools import pairwise, accumulate from functools import cached_property from math import ceil -from typing import Generator, Any, List, Dict +from typing import Generator, List, Dict from multiprocessing import Process from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse from socketserver import ThreadingMixIn - -# from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer -from asyncio.queues import QueueEmpty, QueueFull -from asyncio import Task, create_task, sleep, gather, timeout, TimeoutError -from random import random +from asyncio import sleep import logging import json From 8e041441bb610487d367a895d986f5faa3f69e95 Mon Sep 17 00:00:00 2001 From: jylpah Date: Thu, 21 Dec 2023 17:57:09 +0200 Subject: [PATCH 12/13] add dep: types-Deprecated --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index dbe4039..15dfb08 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ dev = [ "pytest-cov~=4.1", "pytest-timeout", "types-aiofiles>=23.1.0.1", + "types-Deprecated>=1.2.9.3", ] From 74acf8c7add2e3a94be3a6182c288493354aa449 Mon Sep 17 00:00:00 2001 From: jylpah Date: Thu, 21 Dec 2023 17:57:33 +0200 Subject: [PATCH 13/13] increase sleep to 2s to allow test to pass on slow instance --- tests/test_throttledclientsession.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_throttledclientsession.py b/tests/test_throttledclientsession.py index deb9c74..9cdbd35 100644 --- a/tests/test_throttledclientsession.py +++ b/tests/test_throttledclientsession.py @@ -353,7 +353,7 @@ async def test_1_fast_get(server_url: str) -> None: """Test timings of N/sec get""" rate_limit: float = RATE_FAST N: int = N_FAST - await sleep(1) # wait the server to start + await sleep(2) # wait the server to start timings: list[float] = await _get(server_url, rate=rate_limit, N=N) rate_max: float = max_rate(timings, rate_limit) rate_avg: float = avg_rate(timings)