Skip to content

Commit

Permalink
Add National Grid Upstate NY Implementation (#91)
Browse files Browse the repository at this point in the history
* break out OIDC login behavior into helper

* Add National Grid implementation

* update debug text

* changes from PR
  • Loading branch information
X-sam authored Aug 18, 2024
1 parent 16ae106 commit 8b2fa45
Show file tree
Hide file tree
Showing 5 changed files with 400 additions and 254 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Supported utilities (in alphabetical order):
- PECO Energy Company (PECO)
- Potomac Electric Power Company (Pepco)
- Mercury NZ Limited
- National Grid NY Upstate
- Pacific Gas & Electric (PG&E)
- Portland General Electric (PGE)
- Puget Sound Energy (PSE)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "opower"
version = "0.6.0"
version = "0.7.0"
license = {text = "Apache-2.0"}
authors = [
{ name="tronikos", email="[email protected]" },
Expand Down
267 changes: 14 additions & 253 deletions src/opower/utilities/mercury.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,17 @@
It uses OAuth 2.0 with PKCE for secure authentication.
"""

import base64
import hashlib
import json
import logging
import secrets
import ssl
from typing import Any, Optional, TypedDict
from urllib.parse import parse_qs, urlparse
from typing import Optional

import aiohttp

from ..exceptions import CannotConnect, InvalidAuth
from .base import UtilityBase
from .oidchelper import async_auth_oidc

_LOGGER = logging.getLogger(__name__)


class ConfigDict(TypedDict):
"""Dictionary to store configuration details for OAuth."""

authorization_endpoint: str
issuer: str
token_endpoint: str


class TokenDict(TypedDict):
"""Dictionary to store OAuth tokens."""

access_token: str


class Mercury(UtilityBase):
"""Mercury NZ Limited utility implementation.
Expand Down Expand Up @@ -78,19 +58,6 @@ def is_dss() -> bool:
"""Check if the utility uses DSS version of the portal."""
return False

@staticmethod
def generate_code_verifier() -> str:
"""Generate a code verifier for PKCE."""
return secrets.token_urlsafe(32)

@staticmethod
def generate_code_challenge(code_verifier: str) -> str:
"""Generate a code challenge for PKCE."""
code_challenge_digest = hashlib.sha256(code_verifier.encode("utf-8")).digest()
return (
base64.urlsafe_b64encode(code_challenge_digest).decode("utf-8").rstrip("=")
)

@staticmethod
async def async_login(
session: aiohttp.ClientSession,
Expand All @@ -100,222 +67,16 @@ async def async_login(
) -> Optional[str]:
"""Perform the login process and return an access token."""
_LOGGER.debug("Starting login process for Mercury NZ Limited")
ssl_context = ssl.create_default_context()
connector = aiohttp.TCPConnector(ssl=ssl_context)
secure_session = aiohttp.ClientSession(connector=connector)

try:
code_verifier = Mercury.generate_code_verifier()
code_challenge = Mercury.generate_code_challenge(code_verifier)
_LOGGER.debug("Generated PKCE code verifier and challenge")

config = await Mercury._get_config(secure_session)
_LOGGER.debug("Retrieved OAuth configuration")

auth_code = await Mercury._get_auth(
secure_session, config, code_challenge, username, password
)
if auth_code is None:
_LOGGER.error("Failed to obtain authorization code")
raise CannotConnect("Failed to obtain authorization code")
_LOGGER.debug("Obtained authorization code")

tokens = await Mercury._get_access(
secure_session, config, auth_code, code_verifier
)

if tokens and "access_token" in tokens:
_LOGGER.debug("Successfully obtained access token")
return tokens["access_token"]
else:
_LOGGER.error("Failed to obtain access token")
raise CannotConnect("Failed to obtain access token")

except aiohttp.ClientError as err:
_LOGGER.error("Connection error during login: %s", str(err))
raise CannotConnect(f"Connection error: {err}")
finally:
await secure_session.close()

@staticmethod
async def _get_config(session: aiohttp.ClientSession) -> ConfigDict:
"""Get the configuration from the server."""
config_url = f"{Mercury.BASE_URL}/{Mercury.TENANT_ID}/{Mercury.POLICY}/v2.0/.well-known/openid-configuration"
_LOGGER.debug("Fetching OAuth configuration from: %s", config_url)
config_text, _, status = await Mercury._fetch(session, config_url)
if status != 200 or not config_text:
_LOGGER.error("Failed to get configuration. Status: %s", status)
raise CannotConnect("Failed to get configuration")
config: ConfigDict = json.loads(config_text)
return config

@staticmethod
async def _get_auth(
session: aiohttp.ClientSession,
config: ConfigDict,
code_challenge: str,
username: str,
password: str,
) -> Optional[str]:
"""Get the authorization code."""
auth_params = {
"client_id": Mercury.CLIENT_ID,
"response_type": "code",
"redirect_uri": Mercury.REDIRECT_URI,
"scope": Mercury.SCOPE_AUTH,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
}
_LOGGER.debug("Requesting authorization code")
auth_content, final_url, status = await Mercury._fetch(
session, config["authorization_endpoint"], params=auth_params
)
if status != 200 or not auth_content:
_LOGGER.error("Failed to get authorization. Status: %s", status)
raise CannotConnect("Failed to get authorization")

settings = Mercury._extract_settings(auth_content)
if not settings:
_LOGGER.debug(
"No settings extracted, checking for direct authorization code"
)
if final_url and final_url.startswith(Mercury.REDIRECT_URI):
query = urlparse(final_url).query
parsed_query = parse_qs(query)
return parsed_query.get("code", [None])[0]
return None

_LOGGER.debug("Posting credentials")
await Mercury._post_credentials(
session, config["issuer"], settings, username, password
)
_LOGGER.debug("Confirming sign-in")
return await Mercury._confirm_signin(session, config["issuer"], settings)

@staticmethod
async def _get_access(
session: aiohttp.ClientSession,
config: ConfigDict,
auth_code: str,
code_verifier: str,
) -> Optional[TokenDict]:
"""Get the access token."""
token_data = {
"client_id": Mercury.CLIENT_ID,
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": Mercury.REDIRECT_URI,
"code_verifier": code_verifier,
"scope": Mercury.SCOPE_ACCESS,
}
_LOGGER.debug("Requesting access token")
token_content, _, status = await Mercury._fetch(
session, config["token_endpoint"], method="POST", data=token_data
)
if status != 200 or not token_content:
_LOGGER.error("Failed to get access token. Status: %s", status)
raise CannotConnect("Failed to get access token")
tokens: TokenDict = json.loads(token_content)
return tokens

@staticmethod
async def _fetch(
session: aiohttp.ClientSession, url: str, **kwargs: Any
) -> tuple[Optional[str], Optional[str], int]:
"""Fetch data from a URL."""
method = kwargs.pop("method", "GET")
timeout = aiohttp.ClientTimeout(total=30)
try:
_LOGGER.debug("Fetching URL: %s, Method: %s", url, method)
async with session.request(
method, url, timeout=timeout, **kwargs
) as response:
content = await response.text()
_LOGGER.debug("Fetch completed. Status: %s", response.status)
return content, str(response.url), response.status
except aiohttp.ClientError as e:
_LOGGER.error("Network error occurred: %s", str(e))
return None, None, 0

@staticmethod
def _extract_settings(auth_content: str) -> Optional[dict[str, Any]]:
"""Extract settings from the authorization content."""
_LOGGER.debug("Extracting settings from authorization content")
settings_start = auth_content.find("var SETTINGS = ")
if settings_start == -1:
_LOGGER.debug("Settings not found in authorization content")
return None
settings_end = auth_content.find(";", settings_start)
if settings_end == -1:
_LOGGER.debug("End of settings not found in authorization content")
return None
settings_json = auth_content[settings_start + 15 : settings_end].strip()
try:
settings: dict[str, Any] = json.loads(settings_json)
_LOGGER.debug("Settings successfully extracted")
return settings
except json.JSONDecodeError:
_LOGGER.error("Failed to parse settings JSON")
return None

@staticmethod
async def _post_credentials(
session: aiohttp.ClientSession,
issuer: str,
settings: dict[str, Any],
username: str,
password: str,
) -> None:
"""Post credentials to the server."""
base_url = issuer.rsplit("/", 2)[0]
_LOGGER.debug("Posting credentials to %s", base_url)
_, _, status = await Mercury._fetch(
session,
f"{base_url}/{Mercury.POLICY}/{Mercury.SELF_ASSERTED_ENDPOINT}",
method="POST",
data={
"tx": settings["transId"],
"p": Mercury.POLICY,
"request_type": "RESPONSE",
"signInName": username,
"password": password,
},
headers={"X-CSRF-TOKEN": settings["csrf"]},
)
if status != 200:
_LOGGER.error("Failed to post credentials. Status: %s", status)
raise InvalidAuth("Invalid username or password")
_LOGGER.debug("Credentials posted successfully")

@staticmethod
async def _confirm_signin(
session: aiohttp.ClientSession, issuer: str, settings: dict[str, Any]
) -> Optional[str]:
"""Confirm the sign-in process."""
base_url = issuer.rsplit("/", 2)[0]
_LOGGER.debug("Confirming sign-in at %s", base_url)
_, final_url, status = await Mercury._fetch(
session,
f"{base_url}/{Mercury.POLICY}/{Mercury.POLICY_CONFIRM_ENDPOINT}",
params={
"rememberMe": "false",
"csrf_token": settings["csrf"],
"tx": settings["transId"],
"p": Mercury.POLICY,
},
allow_redirects=True,
return await async_auth_oidc(
username,
password,
Mercury.BASE_URL,
Mercury.TENANT_ID,
Mercury.POLICY,
Mercury.CLIENT_ID,
Mercury.REDIRECT_URI,
Mercury.SCOPE_AUTH,
Mercury.SCOPE_ACCESS,
Mercury.SELF_ASSERTED_ENDPOINT,
Mercury.POLICY_CONFIRM_ENDPOINT,
)
if status != 200:
_LOGGER.error("Failed to confirm signin. Status: %s", status)
raise CannotConnect("Failed to confirm signin")
if final_url:
query = urlparse(final_url).query
parsed_query = parse_qs(query)
auth_code = parsed_query.get("code", [None])[0]
if auth_code:
_LOGGER.debug("Sign-in confirmed, authorization code obtained")
else:
_LOGGER.warning("Sign-in confirmed, but no authorization code found")
return auth_code
_LOGGER.warning("Sign-in confirmation did not result in a final URL")
return None
72 changes: 72 additions & 0 deletions src/opower/utilities/nationalgridnyupstate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""National Grid Upstate NY utility implementation.
This module handles the authentication and API interactions for National Grid.
It uses OAuth 2.0 with PKCE for secure authentication.
"""

import logging
from typing import Optional

import aiohttp

from .base import UtilityBase
from .oidchelper import async_auth_oidc

_LOGGER = logging.getLogger(__name__)


class NationalGridNYUpstate(UtilityBase):
"""National Grid Upstate NY utility implementation.
This class handles the authentication and API interactions for National Grid UNY.
It uses OAuth 2.0 with PKCE for secure authentication.
"""

@staticmethod
def name() -> str:
"""Return the name of the utility."""
return "National Grid (NY Upstate)"

@staticmethod
def subdomain() -> str:
"""Return the opower.com subdomain for this utility."""
return "ngny"

@staticmethod
def timezone() -> str:
"""Return the timezone."""
return "America/New_York"

BASE_URL = "https://login.nationalgrid.com"
TENANT_ID = "0e1366c5-731c-42b3-90d3-508039d9e70f"
POLICY = "B2C_1A_UWP_NationalGrid_convert_merge_signin"
CLIENT_ID = "36488660-e86a-4a0d-8316-3df49af8d06d"
REDIRECT_URI = "https://myaccount.nationalgrid.com/auth-landing"
APPLICATION_URI = f"{BASE_URL}/"
SCOPE_AUTH = "openid profile offline_access"
SCOPE_ACCESS = f"{CLIENT_ID} openid profile offline_access"
SELF_ASSERTED_ENDPOINT = "SelfAsserted"
POLICY_CONFIRM_ENDPOINT = "api/CombinedSigninAndSignup/confirmed"

@staticmethod
async def async_login(
session: aiohttp.ClientSession,
username: str,
password: str,
optional_mfa_secret: Optional[str],
) -> Optional[str]:
"""Perform the login process and return an access token."""
_LOGGER.debug("Starting login process for National Grid")
return await async_auth_oidc(
username,
password,
NationalGridNYUpstate.BASE_URL,
NationalGridNYUpstate.TENANT_ID,
NationalGridNYUpstate.POLICY,
NationalGridNYUpstate.CLIENT_ID,
NationalGridNYUpstate.REDIRECT_URI,
NationalGridNYUpstate.SCOPE_AUTH,
NationalGridNYUpstate.SCOPE_ACCESS,
NationalGridNYUpstate.SELF_ASSERTED_ENDPOINT,
NationalGridNYUpstate.POLICY_CONFIRM_ENDPOINT,
)
Loading

0 comments on commit 8b2fa45

Please sign in to comment.