-
Notifications
You must be signed in to change notification settings - Fork 62
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Coverage combine * Refactor discovery and stagger HTTP requests. * Add DailyTotal NamedTyple * Add inverter_serial_number to InverterResponse
- Loading branch information
Showing
24 changed files
with
653 additions
and
292 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,103 +1,161 @@ | ||
import asyncio | ||
import logging | ||
import typing | ||
|
||
from solax.inverter import Inverter, InverterError | ||
from solax.inverters import ( | ||
QVOLTHYBG33P, | ||
X1, | ||
X3, | ||
X3V34, | ||
X1Boost, | ||
X1HybridGen4, | ||
X1Mini, | ||
X1MiniV34, | ||
X1Smart, | ||
X3HybridG4, | ||
X3MicProG2, | ||
XHybrid, | ||
) | ||
import sys | ||
from asyncio import Future, Task | ||
from collections import defaultdict | ||
from typing import Dict, Literal, Optional, Sequence, Set, TypedDict, Union, cast | ||
|
||
# registry of inverters | ||
REGISTRY = [ | ||
XHybrid, | ||
X3, | ||
X3V34, | ||
X3HybridG4, | ||
X1, | ||
X1Mini, | ||
X1MiniV34, | ||
X1Smart, | ||
QVOLTHYBG33P, | ||
X1Boost, | ||
X1HybridGen4, | ||
X3MicProG2, | ||
] | ||
from async_timeout import timeout | ||
|
||
from solax.inverter import Inverter | ||
from solax.inverter_http_client import InverterHttpClient | ||
|
||
__all__ = ("discover", "DiscoveryKeywords", "DiscoveryError") | ||
|
||
if sys.version_info >= (3, 10): | ||
from importlib.metadata import entry_points | ||
else: | ||
from importlib_metadata import entry_points | ||
|
||
if sys.version_info >= (3, 11): | ||
from typing import Unpack | ||
else: | ||
from typing_extensions import Unpack | ||
|
||
# registry of inverters | ||
REGISTRY = {ep.load() for ep in entry_points(group="solax.inverter")} | ||
|
||
logging.basicConfig(level=logging.INFO) | ||
|
||
|
||
class DiscoveryState: | ||
_discovered_inverter: typing.Optional[Inverter] | ||
_tasks: typing.Set[asyncio.Task] | ||
_failures: list | ||
|
||
def __init__(self): | ||
self._discovered_inverter = None | ||
self._tasks = set() | ||
self._failures = [] | ||
|
||
def get_discovered_inverter(self): | ||
return self._discovered_inverter | ||
|
||
def _task_handler(self, task): | ||
try: | ||
self._tasks.remove(task) | ||
result = task.result() | ||
self._discovered_inverter = result | ||
for a_task in self._tasks: | ||
a_task.cancel() | ||
except asyncio.CancelledError: | ||
logging.debug("task %s canceled", task.get_name()) | ||
except InverterError as ex: | ||
self._failures.append(ex) | ||
|
||
@classmethod | ||
async def _discovery_task(cls, i) -> Inverter: | ||
logging.info("Trying inverter %s", i) | ||
await i.get_data() | ||
return i | ||
|
||
async def discover(self, host, port, pwd="") -> Inverter: | ||
for inverter in REGISTRY: | ||
for i in inverter.build_all_variants(host, port, pwd): | ||
task = asyncio.create_task(self._discovery_task(i), name=f"{i}") | ||
task.add_done_callback(self._task_handler) | ||
self._tasks.add(task) | ||
|
||
while len(self._tasks) > 0: | ||
logging.debug("%d discovery tasks are still running...", len(self._tasks)) | ||
await asyncio.sleep(0.5) | ||
|
||
if self._discovered_inverter is not None: | ||
logging.info("Discovered inverter: %s", self._discovered_inverter) | ||
return self._discovered_inverter | ||
|
||
msg = ( | ||
class DiscoveryKeywords(TypedDict, total=False): | ||
timeout: Optional[float] | ||
inverters: Sequence[Inverter] | ||
return_when: Union[Literal["ALL_COMPLETED"], Literal["FIRST_COMPLETED"]] | ||
|
||
|
||
if sys.version_info >= (3, 9): | ||
_InverterTask = Task[Inverter] | ||
else: | ||
_InverterTask = Task | ||
|
||
|
||
class _DiscoveryHttpClient: | ||
def __init__( | ||
self, | ||
inverter: Inverter, | ||
http_client: InverterHttpClient, | ||
request: Future, | ||
): | ||
self._inverter = inverter | ||
self._http_client = http_client | ||
self._request: Future = request | ||
|
||
def __str__(self): | ||
return str(self._http_client) | ||
|
||
async def request(self): | ||
request = await self._request | ||
request.add_done_callback(self._restore_http_client) | ||
return await request | ||
|
||
def _restore_http_client(self, _: _InverterTask): | ||
self._inverter.http_client = self._http_client | ||
|
||
|
||
async def _discovery_task(i) -> Inverter: | ||
logging.info("Trying inverter %s", i) | ||
await i.get_data() | ||
return i | ||
|
||
|
||
async def discover( | ||
host, port, pwd="", **kwargs: Unpack[DiscoveryKeywords] | ||
) -> Union[Inverter, Set[Inverter]]: | ||
async with timeout(kwargs.get("timeout", 15)): | ||
done: Set[_InverterTask] = set() | ||
pending: Set[_InverterTask] = set() | ||
failures = set() | ||
requests: Dict[InverterHttpClient, Future] = defaultdict( | ||
asyncio.get_running_loop().create_future | ||
) | ||
|
||
return_when = kwargs.get("return_when", asyncio.FIRST_COMPLETED) | ||
for cls in kwargs.get("inverters", REGISTRY): | ||
for inverter in cls.build_all_variants(host, port, pwd): | ||
inverter.http_client = cast( | ||
InverterHttpClient, | ||
_DiscoveryHttpClient( | ||
inverter, inverter.http_client, requests[inverter.http_client] | ||
), | ||
) | ||
|
||
pending.add( | ||
asyncio.create_task(_discovery_task(inverter), name=f"{inverter}") | ||
) | ||
|
||
if not pending: | ||
raise DiscoveryError("No inverters to try to discover") | ||
|
||
def cancel(pending: Set[_InverterTask]) -> Set[_InverterTask]: | ||
for task in pending: | ||
task.cancel() | ||
return pending | ||
|
||
def remove_failures_from(done: Set[_InverterTask]) -> None: | ||
for task in set(done): | ||
exc = task.exception() | ||
if exc: | ||
failures.add(exc) | ||
done.remove(task) | ||
|
||
# stagger HTTP request to prevent accidental Denial Of Service | ||
async def stagger() -> None: | ||
for http_client, future in requests.items(): | ||
future.set_result(asyncio.create_task(http_client.request())) | ||
await asyncio.sleep(1) | ||
|
||
staggered = asyncio.create_task(stagger()) | ||
|
||
while pending and (not done or return_when != asyncio.FIRST_COMPLETED): | ||
try: | ||
done, pending = await asyncio.wait(pending, return_when=return_when) | ||
except asyncio.CancelledError: | ||
staggered.cancel() | ||
await asyncio.gather( | ||
staggered, *cancel(pending), return_exceptions=True | ||
) | ||
raise | ||
|
||
remove_failures_from(done) | ||
|
||
if done and return_when == asyncio.FIRST_COMPLETED: | ||
break | ||
|
||
logging.debug("%d discovery tasks are still running...", len(pending)) | ||
|
||
if pending and return_when != asyncio.FIRST_COMPLETED: | ||
pending.update(done) | ||
done.clear() | ||
|
||
remove_failures_from(done) | ||
staggered.cancel() | ||
await asyncio.gather(staggered, *cancel(pending), return_exceptions=True) | ||
|
||
if done: | ||
logging.info("Discovered inverters: %s", {task.result() for task in done}) | ||
if return_when == asyncio.FIRST_COMPLETED: | ||
return await next(iter(done)) | ||
|
||
return {task.result() for task in done} | ||
|
||
raise DiscoveryError( | ||
"Unable to connect to the inverter at " | ||
f"host={host} port={port}, or your inverter is not supported yet.\n" | ||
"Please see https://github.com/squishykid/solax/wiki/DiscoveryError\n" | ||
f"Failures={str(self._failures)}" | ||
f"Failures={str(failures)}" | ||
) | ||
raise DiscoveryError(msg) | ||
|
||
|
||
class DiscoveryError(Exception): | ||
"""Raised when unable to discover inverter""" | ||
|
||
|
||
async def discover(host, port, pwd="") -> Inverter: | ||
discover_state = DiscoveryState() | ||
await discover_state.discover(host, port, pwd) | ||
return discover_state.get_discovered_inverter() |
Oops, something went wrong.