Skip to content

Commit

Permalink
Refactor discovery and stagger HTTP requests.
Browse files Browse the repository at this point in the history
  • Loading branch information
Darsstar committed Mar 29, 2024
1 parent f65b73e commit da3526e
Show file tree
Hide file tree
Showing 12 changed files with 372 additions and 198 deletions.
18 changes: 18 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
"aiohttp>=3.5.4, <4",
"async_timeout>=4.0.2",
"voluptuous>=0.11.5",
"importlib_metadata>=3.6; python_version<'3.10'",
"typing_extensions>=4.1.0; python_version<'3.11'",
],
setup_requires=[
"setuptools_scm",
Expand All @@ -28,4 +30,20 @@
"Operating System :: OS Independent",
],
python_requires=">=3.8",
entry_points={
"solax.inverter": [
"qvolt_hyb_g3_3p = solax.inverters.qvolt_hyb_g3_3p:QVOLTHYBG33P",
"x1 = solax.inverters.x1:X1",
"x1_boost = solax.inverters.x1_boost:X1Boost",
"x1_hybrid_gen4 = solax.inverters.x1_hybrid_gen4:X1HybridGen4",
"x1_mini = solax.inverters.x1_mini:X1Mini",
"x1_mini_v34 = solax.inverters.x1_mini_v34:X1MiniV34",
"x1_smart = solax.inverters.x1_smart:X1Smart",
"x3 = solax.inverters.x3:X3",
"x3_hybrid_g4 = solax.inverters.x3_hybrid_g4:X3HybridG4",
"x3_mic_pro_g2 = solax.inverters.x3_mic_pro_g2:X3MicProG2",
"x3_v34 = solax.inverters.x3_v34:X3V34",
"x_hybrid = solax.inverters.x_hybrid:XHybrid",
],
},
)
2 changes: 1 addition & 1 deletion solax/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async def rt_request(inv: Inverter, retry, t_wait=0) -> InverterResponse:


async def real_time_api(ip_address, port=80, pwd=""):
i = await discover(ip_address, port, pwd)
i = await discover(ip_address, port, pwd, return_when=asyncio.FIRST_COMPLETED)
return RealTimeAPI(i)


Expand Down
227 changes: 140 additions & 87 deletions solax/discovery.py
Original file line number Diff line number Diff line change
@@ -1,103 +1,156 @@
import asyncio
import logging
import typing

from solax.inverter import Inverter, InverterError
from solax.inverters import (
QVOLTHYBG33P,
X1,
X3,
X3V34,
X1Boost,
X1HybridGen4,
X1Mini,
X1MiniV34,
X1Smart,
X3HybridG4,
X3MicProG2,
XHybrid,
)
import sys
from asyncio import Future, Task
from collections import defaultdict
from typing import Dict, Literal, Optional, Sequence, Set, TypedDict, Union, cast

# registry of inverters
REGISTRY = [
XHybrid,
X3,
X3V34,
X3HybridG4,
X1,
X1Mini,
X1MiniV34,
X1Smart,
QVOLTHYBG33P,
X1Boost,
X1HybridGen4,
X3MicProG2,
]
from async_timeout import timeout

from solax.inverter import Inverter
from solax.inverter_http_client import InverterHttpClient

if sys.version_info >= (3, 10):
from importlib.metadata import entry_points
else:
from importlib_metadata import entry_points

if sys.version_info >= (3, 11):
from typing import Unpack
else:
from typing_extensions import Unpack

# registry of inverters
REGISTRY = {ep.load() for ep in entry_points(group="solax.inverter")}

logging.basicConfig(level=logging.INFO)


class DiscoveryState:
_discovered_inverter: typing.Optional[Inverter]
_tasks: typing.Set[asyncio.Task]
_failures: list

def __init__(self):
self._discovered_inverter = None
self._tasks = set()
self._failures = []

def get_discovered_inverter(self):
return self._discovered_inverter

def _task_handler(self, task):
try:
self._tasks.remove(task)
result = task.result()
self._discovered_inverter = result
for a_task in self._tasks:
a_task.cancel()
except asyncio.CancelledError:
logging.debug("task %s canceled", task.get_name())
except InverterError as ex:
self._failures.append(ex)

@classmethod
async def _discovery_task(cls, i) -> Inverter:
logging.info("Trying inverter %s", i)
await i.get_data()
return i

async def discover(self, host, port, pwd="") -> Inverter:
for inverter in REGISTRY:
for i in inverter.build_all_variants(host, port, pwd):
task = asyncio.create_task(self._discovery_task(i), name=f"{i}")
task.add_done_callback(self._task_handler)
self._tasks.add(task)

while len(self._tasks) > 0:
logging.debug("%d discovery tasks are still running...", len(self._tasks))
await asyncio.sleep(0.5)

if self._discovered_inverter is not None:
logging.info("Discovered inverter: %s", self._discovered_inverter)
return self._discovered_inverter

msg = (
class DiscoveryKeywords(TypedDict, total=False):
timeout: Optional[float]
inverters: Sequence[Inverter]
return_when: Union[Literal["ALL_COMPLETED"], Literal["FIRST_COMPLETED"]]


if sys.version_info >= (3, 9):
_InverterTask = Task[Inverter]
else:
_InverterTask = Task


class _DiscoveryHttpClient:
def __init__(
self,
inverter: Inverter,
http_client: InverterHttpClient,
request: Future,
):
self._inverter = inverter
self._http_client = http_client
self._request: Future = request

def __str__(self):
return str(self._http_client)

async def request(self):
request = await self._request
request.add_done_callback(self._restore_http_client)
return await request

def _restore_http_client(self, _: _InverterTask):
self._inverter.http_client = self._http_client


async def _discovery_task(i) -> Inverter:
logging.info("Trying inverter %s", i)
await i.get_data()
return i


async def discover(
host, port, pwd="", **kwargs: Unpack[DiscoveryKeywords]
) -> Union[Inverter, Set[Inverter]]:
async with timeout(kwargs.get("timeout", 15)):
done: Set[_InverterTask] = set()
pending: Set[_InverterTask] = set()
failures = set()
requests: Dict[InverterHttpClient, Future] = defaultdict(
asyncio.get_running_loop().create_future
)

return_when = kwargs.get("return_when", asyncio.FIRST_COMPLETED)
for cls in kwargs.get("inverters", REGISTRY):
for inverter in cls.build_all_variants(host, port, pwd):
inverter.http_client = cast(
InverterHttpClient,
_DiscoveryHttpClient(
inverter, inverter.http_client, requests[inverter.http_client]
),
)

pending.add(
asyncio.create_task(_discovery_task(inverter), name=f"{inverter}")
)

if not pending:
raise DiscoveryError("No inverters to try to discover")

def cancel(pending: Set[_InverterTask]) -> Set[_InverterTask]:
for task in pending:
task.cancel()
return pending

def remove_failures_from(done: Set[_InverterTask]) -> None:
for task in set(done):
exc = task.exception()
if exc:
failures.add(exc)
done.remove(task)

# stagger HTTP request to prevent accidental Denial Of Service
async def stagger() -> None:
for http_client, future in requests.items():
future.set_result(asyncio.create_task(http_client.request()))
await asyncio.sleep(1)

staggered = asyncio.create_task(stagger())

while pending and (not done or return_when != asyncio.FIRST_COMPLETED):
try:
done, pending = await asyncio.wait(pending, return_when=return_when)
except asyncio.CancelledError:
await asyncio.gather(*cancel(pending), return_exceptions=True)
raise

remove_failures_from(done)

if done and return_when == asyncio.FIRST_COMPLETED:
break

logging.debug("%d discovery tasks are still running...", len(pending))

if pending and return_when != asyncio.FIRST_COMPLETED:
pending.update(done)
done.clear()

remove_failures_from(done)
staggered.cancel()
await asyncio.gather(staggered, *cancel(pending), return_exceptions=True)

if done:
logging.info("Discovered inverters: %s", {task.result() for task in done})
if return_when == asyncio.FIRST_COMPLETED:
return await next(iter(done))

return {task.result() for task in done}

raise DiscoveryError(
"Unable to connect to the inverter at "
f"host={host} port={port}, or your inverter is not supported yet.\n"
"Please see https://github.com/squishykid/solax/wiki/DiscoveryError\n"
f"Failures={str(self._failures)}"
f"Failures={str(failures)}"
)
raise DiscoveryError(msg)


class DiscoveryError(Exception):
"""Raised when unable to discover inverter"""


async def discover(host, port, pwd="") -> Inverter:
discover_state = DiscoveryState()
await discover_state.discover(host, port, pwd)
return discover_state.get_discovered_inverter()
24 changes: 11 additions & 13 deletions solax/inverter.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,33 +27,31 @@ def response_decoder(cls) -> ResponseDecoder:
# pylint: enable=C0301
_schema = vol.Schema({}) # type: vol.Schema

def __init__(
self, http_client: InverterHttpClient, response_parser: ResponseParser
):
def __init__(self, http_client: InverterHttpClient):
self.manufacturer = "Solax"
self.response_parser = response_parser
self.http_client = http_client

schema = type(self).schema()
response_decoder = type(self).response_decoder()
self.response_parser = ResponseParser(schema, response_decoder)

@classmethod
def _build(cls, host, port, pwd="", params_in_query=True):
url = utils.to_url(host, port)
http_client = InverterHttpClient(url, Method.POST, pwd)
http_client = InverterHttpClient(url=url, method=Method.POST, pwd=pwd)
if params_in_query:
http_client.with_default_query()
http_client = http_client.with_default_query()
else:
http_client.with_default_data()
http_client = http_client.with_default_data()

schema = cls.schema()
response_decoder = cls.response_decoder()
response_parser = ResponseParser(schema, response_decoder)
return cls(http_client, response_parser)
return cls(http_client)

@classmethod
def build_all_variants(cls, host, port, pwd=""):
versions = [
versions = {
cls._build(host, port, pwd, True),
cls._build(host, port, pwd, False),
]
}
return versions

async def get_data(self) -> InverterResponse:
Expand Down
Loading

0 comments on commit da3526e

Please sign in to comment.