diff --git a/src/opower/utilities/base.py b/src/opower/utilities/base.py index 2437d7a..b2d85ee 100644 --- a/src/opower/utilities/base.py +++ b/src/opower/utilities/base.py @@ -38,6 +38,6 @@ async def async_login( ) -> None: """Login to the utility website and authorize opower. - Any failure to login should raise ClientResponseError with status 401 or 403. + :raises InvalidAuth: if login information is incorrect """ raise NotImplementedError diff --git a/src/opower/utilities/evergy.py b/src/opower/utilities/evergy.py index cf4d4fd..f36cf26 100644 --- a/src/opower/utilities/evergy.py +++ b/src/opower/utilities/evergy.py @@ -3,17 +3,17 @@ from html.parser import HTMLParser import aiohttp -from ..exceptions import CannotConnect, InvalidAuth +from ..exceptions import InvalidAuth from .base import UtilityBase class EvergyLoginParser(HTMLParser): """HTML parser to extract login verification token from Evergy Login page.""" - def __init__(self, *args, **kwargs) -> None: + def __init__(self) -> None: """Initialize.""" - super().__init__(*args, **kwargs) + super().__init__() self.verification_token = None def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: @@ -56,12 +56,9 @@ async def async_login( async with session.get("https://www.evergy.com/log-in") as resp: login_parser.feed(await resp.text()) - if login_parser.verification_token is None: - raise CannotConnect( - resp.request_info, - resp.history, - message="Failed to parse login verification token", - ) + assert ( + login_parser.verification_token + ), "Failed to parse login verification token" login_payload = { "username": username, @@ -76,11 +73,7 @@ async def async_login( ) as resp: # The response status will be 302 regardless of success, the redirect will tell us if we're logged in if resp.headers["location"] != "/ma/my-account/account-summary": - raise InvalidAuth( - resp.request_info, - resp.history, - message="Login failed", - ) + raise InvalidAuth("Login failed") opower_access_token = None @@ -89,11 +82,6 @@ async def async_login( ) as resp: opower_access_token = resp.headers["jwt"] - if opower_access_token is None: - raise InvalidAuth( - resp.request_info, - resp.history, - message="Failed to parse OPower bearer token", - ) + assert opower_access_token, "Failed to parse OPower bearer token" session.headers.add("authorization", f"{opower_access_token}") diff --git a/src/opower/utilities/exelon.py b/src/opower/utilities/exelon.py index 7e9a03c..edcf35d 100644 --- a/src/opower/utilities/exelon.py +++ b/src/opower/utilities/exelon.py @@ -4,7 +4,8 @@ import re import aiohttp -from aiohttp.client_exceptions import ClientResponseError + +from ..exceptions import InvalidAuth class Exelon: @@ -61,12 +62,7 @@ async def async_login( result = json.loads(await resp.text()) if result["status"] != "200": - raise ClientResponseError( - resp.request_info, - resp.history, - status=403, - message=result["message"], - ) + raise InvalidAuth(result["message"]) async with session.get( "https://" diff --git a/src/opower/utilities/pge.py b/src/opower/utilities/pge.py index 4b049e7..e8381eb 100644 --- a/src/opower/utilities/pge.py +++ b/src/opower/utilities/pge.py @@ -3,8 +3,8 @@ import re import aiohttp -from aiohttp.client_exceptions import ClientResponseError +from ..exceptions import InvalidAuth from .base import UtilityBase @@ -56,12 +56,7 @@ async def async_login( ) as resp: result = await resp.json() if "errorMsg" in result: - raise ClientResponseError( - resp.request_info, - resp.history, - status=403, - message=result["errorMsg"], - ) + raise InvalidAuth(result["errorMsg"]) # 2nd way of login # async with session.get( diff --git a/src/opower/utilities/pse.py b/src/opower/utilities/pse.py index 77ec279..74fb034 100644 --- a/src/opower/utilities/pse.py +++ b/src/opower/utilities/pse.py @@ -4,17 +4,17 @@ import re import aiohttp -from aiohttp.client_exceptions import ClientResponseError +from ..exceptions import InvalidAuth from .base import UtilityBase class PSELoginParser(HTMLParser): """HTML parser to extract login verification token from PSE Login page.""" - def __init__(self, *, convert_charrefs: bool = True) -> None: + def __init__(self) -> None: """Initialize.""" - super().__init__(convert_charrefs=convert_charrefs) + super().__init__() self.verification_token = None def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: @@ -29,9 +29,9 @@ class PSEUsageParser(HTMLParser): _regexp = re.compile(r'var accessToken\s+=\s+["\'](?P.+)["\']') - def __init__(self, *, convert_charrefs: bool = True) -> None: + def __init__(self) -> None: """Initialize.""" - super().__init__(convert_charrefs=convert_charrefs) + super().__init__() self.opower_access_token = None self._in_inline_script = False @@ -84,13 +84,9 @@ async def async_login( async with session.get("https://www.pse.com/en/login") as resp: login_parser.feed(await resp.text()) - if login_parser.verification_token is None: - raise ClientResponseError( - resp.request_info, - resp.history, - status=403, - message="Failed to parse __RequestVerificationToken", - ) + assert ( + login_parser.verification_token + ), "Failed to parse __RequestVerificationToken" await session.post( "https://www.pse.com/api/pseauthentication/AsyncSignIn", @@ -110,12 +106,7 @@ async def async_login( "https://www.pse.com/api/AccountSelector/GetContractAccountJson" ) as resp: if len(await resp.text()) == 0: - raise ClientResponseError( - resp.request_info, - resp.history, - status=403, - message="Login failed", - ) + raise InvalidAuth("Login failed") usage_parser = PSEUsageParser() @@ -124,13 +115,9 @@ async def async_login( ) as resp: usage_parser.feed(await resp.text()) - if usage_parser.opower_access_token is None: - raise ClientResponseError( - resp.request_info, - resp.history, - status=403, - message="Failed to parse OPower bearer token", - ) + assert ( + usage_parser.opower_access_token + ), "Failed to parse OPower bearer token" session.headers.add( "authorization", f"Bearer {usage_parser.opower_access_token}"