diff --git a/goodwe/const.py b/goodwe/const.py index 1484049..74ae280 100644 --- a/goodwe/const.py +++ b/goodwe/const.py @@ -1,9 +1,7 @@ -from typing import Dict - GOODWE_TCP_PORT = 502 GOODWE_UDP_PORT = 8899 -BATTERY_MODES: Dict[int, str] = { +BATTERY_MODES: dict[int, str] = { 0: "No battery", 1: "Standby", 2: "Discharge", @@ -12,7 +10,7 @@ 5: "To be discharged", } -ENERGY_MODES: Dict[int, str] = { +ENERGY_MODES: dict[int, str] = { 0: "Check Mode", 1: "Wait Mode", 2: "Normal (On-Grid)", @@ -24,37 +22,37 @@ 128: "Battery Discharging", } -GRID_MODES: Dict[int, str] = { +GRID_MODES: dict[int, str] = { 0: "Not connected to grid", 1: "Connected to grid", 2: "Fault", } -GRID_IN_OUT_MODES: Dict[int, str] = { +GRID_IN_OUT_MODES: dict[int, str] = { 0: "Idle", 1: "Exporting", 2: "Importing", } -LOAD_MODES: Dict[int, str] = { +LOAD_MODES: dict[int, str] = { 0: "Inverter and the load is disconnected", 1: "The inverter is connected to a load", } -PV_MODES: Dict[int, str] = { +PV_MODES: dict[int, str] = { 0: "PV panels not connected", 1: "PV panels connected, no power", 2: "PV panels connected, producing power", } -WORK_MODES: Dict[int, str] = { +WORK_MODES: dict[int, str] = { 0: "Wait Mode", 1: "Normal", 2: "Error", 4: "Check Mode", } -WORK_MODES_ET: Dict[int, str] = { +WORK_MODES_ET: dict[int, str] = { 0: "Wait Mode", 1: "Normal (On-Grid)", 2: "Normal (Off-Grid)", @@ -63,14 +61,14 @@ 5: "Check Mode", } -WORK_MODES_ES: Dict[int, str] = { +WORK_MODES_ES: dict[int, str] = { 0: "Inverter Off - Standby", 1: "Inverter On", 2: "Inverter Abnormal, stopping power", 3: "Inverter Severly Abnormal, 20 seconds to restart", } -SAFETY_COUNTRIES: Dict[int, str] = { +SAFETY_COUNTRIES: dict[int, str] = { 0: "IT CEI 0-21", 1: "CZ-A1", 2: "DE LV with PV", @@ -198,7 +196,7 @@ 149: "Brazil 254Vac", } -ERROR_CODES: Dict[int, str] = { +ERROR_CODES: dict[int, str] = { 31: 'Internal Communication Failure', 30: 'EEPROM R/W Failure', 29: 'Fac Failure', @@ -233,7 +231,7 @@ 0: 'GFCI Device Check Failure', } -DIAG_STATUS_CODES: Dict[int, str] = { +DIAG_STATUS_CODES: dict[int, str] = { 0: "Battery voltage low", 1: "Battery SOC low", 2: "Battery SOC in back", @@ -265,7 +263,7 @@ 28: "SOC protect off", } -BMS_ALARM_CODES: Dict[int, str] = { +BMS_ALARM_CODES: dict[int, str] = { 15: 'Charging over-voltage 3', 14: 'Discharging under-voltage 3', 13: 'Cell temperature high 3', @@ -284,7 +282,7 @@ 0: 'Charging over-voltage 2', } -BMS_WARNING_CODES: Dict[int, str] = { +BMS_WARNING_CODES: dict[int, str] = { 11: 'System temperature high', 10: 'System temperature low 2', 9: 'System temperature low 1', @@ -299,7 +297,7 @@ 0: 'Charging over-voltage 1', } -DERATING_MODE_CODES: Dict[int, str] = { +DERATING_MODE_CODES: dict[int, str] = { 31: '', 30: '', 29: '', diff --git a/goodwe/dt.py b/goodwe/dt.py index 4cd4bfb..b833a38 100644 --- a/goodwe/dt.py +++ b/goodwe/dt.py @@ -1,12 +1,10 @@ from __future__ import annotations import logging -from typing import Tuple +from .const import * from .exceptions import InverterError, RequestFailedException, RequestRejectedException -from .inverter import Inverter -from .inverter import OperationMode -from .inverter import SensorKind as Kind +from .inverter import Inverter, OperationMode, SensorKind as Kind from .modbus import ILLEGAL_DATA_ADDRESS from .model import is_3_mppt, is_single_phase from .protocol import ProtocolCommand @@ -18,7 +16,7 @@ class DT(Inverter): """Class representing inverter of DT/MS/D-NS/XS or GE's GEP(PSB/PSC) families""" - __all_sensors: Tuple[Sensor, ...] = ( + __all_sensors: tuple[Sensor, ...] = ( Timestamp("timestamp", 30100, "Timestamp"), Voltage("vpv1", 30103, "PV1 Voltage", Kind.PV), Current("ipv1", 30104, "PV1 Current", Kind.PV), @@ -37,9 +35,9 @@ class DT(Inverter): "PV3 Power", "W", Kind.PV), # ppv1 + ppv2 + ppv3 Calculated("ppv", - lambda data: (round(read_voltage(data, 30103) * read_current(data, 30104))) + (round( - read_voltage(data, 30105) * read_current(data, 30106))) + (round( - read_voltage(data, 30107) * read_current(data, 30108))), + lambda data: (round(read_voltage(data, 30103) * read_current(data, 30104))) + ( + round(read_voltage(data, 30105) * read_current(data, 30106))) + ( + round(read_voltage(data, 30107) * read_current(data, 30108))), "PV Power", "W", Kind.PV), # Voltage("vpv4", 14, "PV4 Voltage", Kind.PV), # Current("ipv4", 16, "PV4 Current", Kind.PV), @@ -115,12 +113,12 @@ class DT(Inverter): # Inverter's meter data # Modbus registers from offset 0x75f4 (30196) - __all_sensors_meter: Tuple[Sensor, ...] = ( + __all_sensors_meter: tuple[Sensor, ...] = ( PowerS("active_power", 30196, "Active Power", Kind.GRID), ) # Modbus registers of inverter settings, offsets are modbus register addresses - __all_settings: Tuple[Sensor, ...] = ( + __all_settings: tuple[Sensor, ...] = ( Timestamp("time", 40313, "Inverter time"), Integer("shadow_scan", 40326, "Shadow Scan", "", Kind.PV), @@ -133,12 +131,12 @@ class DT(Inverter): ) # Settings for single phase inverters - __settings_single_phase: Tuple[Sensor, ...] = ( + __settings_single_phase: tuple[Sensor, ...] = ( Long("grid_export_limit", 40328, "Grid Export Limit", "W", Kind.GRID), ) # Settings for three phase inverters - __settings_three_phase: Tuple[Sensor, ...] = ( + __settings_three_phase: tuple[Sensor, ...] = ( Integer("grid_export_limit", 40336, "Grid Export Limit", "%", Kind.GRID), ) @@ -193,7 +191,7 @@ async def read_device_info(self): self._sensors = tuple(filter(self._pv1_pv2_only, self._sensors)) pass - async def read_runtime_data(self) -> Dict[str, Any]: + async def read_runtime_data(self) -> dict[str, Any]: response = await self._read_from_socket(self._READ_RUNNING_DATA) data = self._map_response(response, self._sensors) @@ -253,7 +251,7 @@ async def _write_setting(self, setting: Sensor, value: Any): else: await self._read_from_socket(self._write_multi_command(setting.offset, raw_value)) - async def read_settings_data(self) -> Dict[str, Any]: + async def read_settings_data(self) -> dict[str, Any]: data = {} for setting in self.settings(): value = await self.read_setting(setting.id_) @@ -267,7 +265,7 @@ async def set_grid_export_limit(self, export_limit: int) -> None: if export_limit >= 0: return await self.write_setting('grid_export_limit', export_limit) - async def get_operation_modes(self, include_emulated: bool) -> Tuple[OperationMode, ...]: + async def get_operation_modes(self, include_emulated: bool) -> tuple[OperationMode, ...]: return () async def get_operation_mode(self) -> OperationMode: @@ -283,11 +281,11 @@ async def get_ongrid_battery_dod(self) -> int: async def set_ongrid_battery_dod(self, dod: int) -> None: raise InverterError("Operation not supported, inverter has no batteries.") - def sensors(self) -> Tuple[Sensor, ...]: + def sensors(self) -> tuple[Sensor, ...]: result = self._sensors if self._has_meter: result = result + self._sensors_meter return result - def settings(self) -> Tuple[Sensor, ...]: + def settings(self) -> tuple[Sensor, ...]: return tuple(self._settings.values()) diff --git a/goodwe/es.py b/goodwe/es.py index 159c176..6c4b0f8 100644 --- a/goodwe/es.py +++ b/goodwe/es.py @@ -1,12 +1,10 @@ from __future__ import annotations import logging -from typing import Tuple +from .const import * from .exceptions import InverterError -from .inverter import Inverter -from .inverter import OperationMode -from .inverter import SensorKind as Kind +from .inverter import Inverter, OperationMode, SensorKind as Kind from .protocol import ProtocolCommand, Aa55ProtocolCommand, Aa55ReadCommand, Aa55WriteCommand, Aa55WriteMultiCommand from .sensor import * @@ -20,7 +18,7 @@ class ES(Inverter): _READ_DEVICE_RUNNING_DATA: ProtocolCommand = Aa55ProtocolCommand("010600", "0186") _READ_DEVICE_SETTINGS_DATA: ProtocolCommand = Aa55ProtocolCommand("010900", "0189") - __sensors: Tuple[Sensor, ...] = ( + __sensors: tuple[Sensor, ...] = ( Voltage("vpv1", 0, "PV1 Voltage", Kind.PV), # modbus 0x500 Current("ipv1", 2, "PV1 Current", Kind.PV), Calculated("ppv1", @@ -124,7 +122,7 @@ class ES(Inverter): "House Consumption", "W", Kind.AC), ) - __all_settings: Tuple[Sensor, ...] = ( + __all_settings: tuple[Sensor, ...] = ( Integer("backup_supply", 12, "Backup Supply"), Integer("off-grid_charge", 14, "Off-grid Charge"), Integer("shadow_scan", 16, "Shadow Scan", "", Kind.PV), @@ -156,7 +154,7 @@ class ES(Inverter): ) # Settings added in ARM firmware 14 - __settings_arm_fw_14: Tuple[Sensor, ...] = ( + __settings_arm_fw_14: tuple[Sensor, ...] = ( EcoModeV2("eco_mode_1", 47547, "Eco Mode Group 1"), ByteH("eco_mode_1_switch", 47549, "Eco Mode Group 1 Switch"), EcoModeV2("eco_mode_2", 47553, "Eco Mode Group 2"), @@ -202,7 +200,7 @@ async def read_device_info(self): if self._supports_eco_mode_v2(): self._settings.update({s.id_: s for s in self.__settings_arm_fw_14}) - async def read_runtime_data(self) -> Dict[str, Any]: + async def read_runtime_data(self) -> dict[str, Any]: response = await self._read_from_socket(self._READ_DEVICE_RUNNING_DATA) data = self._map_response(response, self.__sensors) return data @@ -271,7 +269,7 @@ async def _write_setting(self, setting: Sensor, value: Any): else: await self._read_from_socket(Aa55WriteMultiCommand(setting.offset, raw_value)) - async def read_settings_data(self) -> Dict[str, Any]: + async def read_settings_data(self) -> dict[str, Any]: response = await self._read_from_socket(self._READ_DEVICE_SETTINGS_DATA) data = self._map_response(response, self.settings()) return data @@ -285,7 +283,7 @@ async def set_grid_export_limit(self, export_limit: int) -> None: Aa55ProtocolCommand("033502" + "{:04x}".format(export_limit), "03b5") ) - async def get_operation_modes(self, include_emulated: bool) -> Tuple[OperationMode, ...]: + async def get_operation_modes(self, include_emulated: bool) -> tuple[OperationMode, ...]: result = [e for e in OperationMode] result.remove(OperationMode.PEAK_SHAVING) result.remove(OperationMode.SELF_USE) @@ -349,10 +347,10 @@ async def set_ongrid_battery_dod(self, dod: int) -> None: async def _reset_inverter(self) -> None: await self._read_from_socket(Aa55ProtocolCommand("031d00", "039d")) - def sensors(self) -> Tuple[Sensor, ...]: + def sensors(self) -> tuple[Sensor, ...]: return self.__sensors - def settings(self) -> Tuple[Sensor, ...]: + def settings(self) -> tuple[Sensor, ...]: return tuple(self._settings.values()) async def _set_general_mode(self) -> None: diff --git a/goodwe/et.py b/goodwe/et.py index 25b66d5..ae359cc 100644 --- a/goodwe/et.py +++ b/goodwe/et.py @@ -1,12 +1,10 @@ from __future__ import annotations import logging -from typing import Tuple +from .const import * from .exceptions import RequestFailedException, RequestRejectedException -from .inverter import Inverter -from .inverter import OperationMode -from .inverter import SensorKind as Kind +from .inverter import Inverter, OperationMode, SensorKind as Kind from .modbus import ILLEGAL_DATA_ADDRESS from .model import is_2_battery, is_4_mppt, is_745_platform, is_single_phase from .protocol import ProtocolCommand @@ -19,7 +17,7 @@ class ET(Inverter): """Class representing inverter of ET/EH/BT/BH or GE's GEH families AKA platform 205 or 745""" # Modbus registers from offset 0x891c (35100), count 0x7d (125) - __all_sensors: Tuple[Sensor, ...] = ( + __all_sensors: tuple[Sensor, ...] = ( Timestamp("timestamp", 35100, "Timestamp"), Voltage("vpv1", 35103, "PV1 Voltage", Kind.PV), Current("ipv1", 35104, "PV1 Current", Kind.PV), @@ -160,7 +158,7 @@ class ET(Inverter): ) # Modbus registers from offset 0x9088 (37000) - __all_sensors_battery: Tuple[Sensor, ...] = ( + __all_sensors_battery: tuple[Sensor, ...] = ( Integer("battery_bms", 37000, "Battery BMS", "", Kind.BAT), Integer("battery_index", 37001, "Battery Index", "", Kind.BAT), Integer("battery_status", 37002, "Battery Status", "", Kind.BAT), @@ -193,7 +191,7 @@ class ET(Inverter): ) # Modbus registers from offset 0x9858 (39000) - __all_sensors_battery2: Tuple[Sensor, ...] = ( + __all_sensors_battery2: tuple[Sensor, ...] = ( Integer("battery2_status", 39000, "Battery 2 Status", "", Kind.BAT), Temp("battery2_temperature", 39001, "Battery 2 Temperature", Kind.BAT), Integer("battery2_charge_limit", 39002, "Battery 2 Charge Limit", "A", Kind.BAT), @@ -225,7 +223,7 @@ class ET(Inverter): # Inverter's meter data # Modbus registers from offset 0x8ca0 (36000) - __all_sensors_meter: Tuple[Sensor, ...] = ( + __all_sensors_meter: tuple[Sensor, ...] = ( Integer("commode", 36000, "Commode"), Integer("rssi", 36001, "RSSI"), Integer("manufacture_code", 36002, "Manufacture Code"), @@ -282,7 +280,7 @@ class ET(Inverter): # Inverter's MPPT data # Modbus registers from offset 0x89e5 (35301) - __all_sensors_mppt: Tuple[Sensor, ...] = ( + __all_sensors_mppt: tuple[Sensor, ...] = ( Power4("ppv_total", 35301, "PV Power Total", Kind.PV), Integer("pv_channel", 35303, "PV Channel", "", Kind.PV), Voltage("vpv5", 35304, "PV5 Voltage", Kind.PV), @@ -340,7 +338,7 @@ class ET(Inverter): ) # Modbus registers of inverter settings, offsets are modbus register addresses - __all_settings: Tuple[Sensor, ...] = ( + __all_settings: tuple[Sensor, ...] = ( Integer("comm_address", 45127, "Communication Address", ""), Long("modbus_baud_rate", 45132, "Modbus Baud rate", ""), Timestamp("time", 45200, "Inverter time"), @@ -434,7 +432,7 @@ class ET(Inverter): ) # Settings added in ARM firmware 19 - __settings_arm_fw_19: Tuple[Sensor, ...] = ( + __settings_arm_fw_19: tuple[Sensor, ...] = ( Integer("fast_charging", 47545, "Fast Charging Enabled", "", Kind.BAT), Integer("fast_charging_soc", 47546, "Fast Charging SoC", "%", Kind.BAT), EcoModeV2("eco_mode_1", 47547, "Eco Mode Group 1"), @@ -455,7 +453,7 @@ class ET(Inverter): ) # Settings added in ARM firmware 22 - __settings_arm_fw_22: Tuple[Sensor, ...] = ( + __settings_arm_fw_22: tuple[Sensor, ...] = ( Long("peak_shaving_power_limit", 47542, "Peak Shaving Power Limit"), Integer("peak_shaving_soc", 47544, "Peak Shaving SoC"), # EcoModeV2("eco_modeV2_5", 47571, "Eco Mode Version 2 Power Group 5"), @@ -570,7 +568,7 @@ async def read_device_info(self): logger.debug("Cannot read _has_peak_shaving settings, disabling it.") self._has_peak_shaving = False - async def read_runtime_data(self) -> Dict[str, Any]: + async def read_runtime_data(self) -> dict[str, Any]: response = await self._read_from_socket(self._READ_RUNNING_DATA) data = self._map_response(response, self._sensors) @@ -688,7 +686,7 @@ async def _write_setting(self, setting: Sensor, value: Any): else: await self._read_from_socket(self._write_multi_command(setting.offset, raw_value)) - async def read_settings_data(self) -> Dict[str, Any]: + async def read_settings_data(self) -> dict[str, Any]: data = {} for setting in self.settings(): try: @@ -706,7 +704,7 @@ async def set_grid_export_limit(self, export_limit: int) -> None: if export_limit >= 0: await self.write_setting('grid_export_limit', export_limit) - async def get_operation_modes(self, include_emulated: bool) -> Tuple[OperationMode, ...]: + async def get_operation_modes(self, include_emulated: bool) -> tuple[OperationMode, ...]: result = [e for e in OperationMode] if not self._has_peak_shaving: result.remove(OperationMode.PEAK_SHAVING) @@ -791,7 +789,7 @@ async def set_ongrid_battery_dod(self, dod: int) -> None: if 0 <= dod <= 100: await self.write_setting('battery_discharge_depth', 100 - dod) - def sensors(self) -> Tuple[Sensor, ...]: + def sensors(self) -> tuple[Sensor, ...]: result = self._sensors + self._sensors_meter if self._has_battery: result = result + self._sensors_battery @@ -801,7 +799,7 @@ def sensors(self) -> Tuple[Sensor, ...]: result = result + self._sensors_mppt return result - def settings(self) -> Tuple[Sensor, ...]: + def settings(self) -> tuple[Sensor, ...]: return tuple(self._settings.values()) async def _clear_battery_mode_param(self) -> None: diff --git a/goodwe/exceptions.py b/goodwe/exceptions.py index 47001c6..1dc1931 100644 --- a/goodwe/exceptions.py +++ b/goodwe/exceptions.py @@ -35,11 +35,11 @@ class PartialResponseException(InverterError): Attributes: length -- received data length - expected -- expected data lenght + expected -- expected data length """ - def __init__(self, lenght: int, expected: int): - self.length: int = lenght + def __init__(self, length: int, expected: int): + self.length: int = length self.expected: int = expected diff --git a/goodwe/inverter.py b/goodwe/inverter.py index d9e2bd4..9415dda 100644 --- a/goodwe/inverter.py +++ b/goodwe/inverter.py @@ -4,7 +4,7 @@ from abc import ABC, abstractmethod from dataclasses import dataclass from enum import Enum, IntEnum -from typing import Any, Callable, Dict, Tuple, Optional +from typing import Any, Callable, Optional from .exceptions import MaxRetriesException, RequestFailedException from .protocol import InverterProtocol, ProtocolCommand, ProtocolResponse, TcpInverterProtocol, UdpInverterProtocol @@ -142,7 +142,7 @@ async def read_device_info(self): raise NotImplementedError() @abstractmethod - async def read_runtime_data(self) -> Dict[str, Any]: + async def read_runtime_data(self) -> dict[str, Any]: """ Request the runtime data from the inverter. Answer dictionary of individual sensors and their values. @@ -171,7 +171,7 @@ async def write_setting(self, setting_id: str, value: Any): raise NotImplementedError() @abstractmethod - async def read_settings_data(self) -> Dict[str, Any]: + async def read_settings_data(self) -> dict[str, Any]: """ Request the settings data from the inverter. Answer dictionary of individual settings and their values. @@ -207,7 +207,7 @@ async def set_grid_export_limit(self, export_limit: int) -> None: raise NotImplementedError() @abstractmethod - async def get_operation_modes(self, include_emulated: bool) -> Tuple[OperationMode, ...]: + async def get_operation_modes(self, include_emulated: bool) -> tuple[OperationMode, ...]: """ Answer list of supported inverter operation modes """ @@ -257,14 +257,14 @@ async def set_ongrid_battery_dod(self, dod: int) -> None: raise NotImplementedError() @abstractmethod - def sensors(self) -> Tuple[Sensor, ...]: + def sensors(self) -> tuple[Sensor, ...]: """ Return tuple of sensor definitions """ raise NotImplementedError() @abstractmethod - def settings(self) -> Tuple[Sensor, ...]: + def settings(self) -> tuple[Sensor, ...]: """ Return tuple of settings definitions """ @@ -278,7 +278,7 @@ def _create_protocol(host: str, port: int, comm_addr: int, timeout: int, retries return UdpInverterProtocol(host, port, comm_addr, timeout, retries) @staticmethod - def _map_response(response: ProtocolResponse, sensors: Tuple[Sensor, ...]) -> Dict[str, Any]: + def _map_response(response: ProtocolResponse, sensors: tuple[Sensor, ...]) -> dict[str, Any]: """Process the response data and return dictionary with runtime values""" result = {} for sensor in sensors: diff --git a/goodwe/protocol.py b/goodwe/protocol.py index d89a1cc..5e3f7f8 100644 --- a/goodwe/protocol.py +++ b/goodwe/protocol.py @@ -6,7 +6,7 @@ import platform import socket from asyncio.futures import Future -from typing import Tuple, Optional, Callable +from typing import Optional, Callable from .exceptions import MaxRetriesException, PartialResponseException, RequestFailedException, RequestRejectedException from .modbus import create_modbus_rtu_request, create_modbus_rtu_multi_request, create_modbus_tcp_request, \ @@ -138,7 +138,7 @@ def connection_lost(self, exc: Optional[Exception]) -> None: logger.debug("Socket closed.") self._close_transport() - def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None: + def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None: """On datagram received""" if self._timer: self._timer.cancel() diff --git a/goodwe/sensor.py b/goodwe/sensor.py index 3005834..5a8b24a 100644 --- a/goodwe/sensor.py +++ b/goodwe/sensor.py @@ -6,7 +6,6 @@ from struct import unpack from typing import Any, Callable, Optional -from .const import * from .inverter import Sensor, SensorKind from .protocol import ProtocolResponse @@ -405,9 +404,9 @@ def encode_value(self, value: Any, register_value: bytes = None) -> bytes: class Enum(Sensor): """Sensor representing label from enumeration encoded in 1 bytes""" - def __init__(self, id_: str, offset: int, labels: Dict, name: str, kind: Optional[SensorKind] = None): + def __init__(self, id_: str, offset: int, labels: dict[int, str], name: str, kind: Optional[SensorKind] = None): super().__init__(id_, offset, name, 1, "", kind) - self._labels: Dict = labels + self._labels: dict[int, str] = labels def read_value(self, data: ProtocolResponse): return self._labels.get(read_byte(data)) @@ -416,9 +415,9 @@ def read_value(self, data: ProtocolResponse): class EnumH(Sensor): """Sensor representing label from enumeration encoded in 1 (high 8 bits of 16bit register)""" - def __init__(self, id_: str, offset: int, labels: Dict, name: str, kind: Optional[SensorKind] = None): + def __init__(self, id_: str, offset: int, labels: dict[int, str], name: str, kind: Optional[SensorKind] = None): super().__init__(id_, offset, name, 1, "", kind) - self._labels: Dict = labels + self._labels: dict[int, str] = labels def read_value(self, data: ProtocolResponse): return self._labels.get(read_byte(data)) @@ -427,9 +426,9 @@ def read_value(self, data: ProtocolResponse): class EnumL(Sensor): """Sensor representing label from enumeration encoded in 1 byte (low 8 bits of 16bit register)""" - def __init__(self, id_: str, offset: int, labels: Dict, name: str, kind: Optional[SensorKind] = None): + def __init__(self, id_: str, offset: int, labels: dict[int, str], name: str, kind: Optional[SensorKind] = None): super().__init__(id_, offset, name, 1, "", kind) - self._labels: Dict = labels + self._labels: dict[int, str] = labels def read_value(self, data: ProtocolResponse): read_byte(data) @@ -439,9 +438,9 @@ def read_value(self, data: ProtocolResponse): class Enum2(Sensor): """Sensor representing label from enumeration encoded in 2 bytes""" - def __init__(self, id_: str, offset: int, labels: Dict, name: str, kind: Optional[SensorKind] = None): + def __init__(self, id_: str, offset: int, labels: dict[int, str], name: str, kind: Optional[SensorKind] = None): super().__init__(id_, offset, name, 2, "", kind) - self._labels: Dict = labels + self._labels: dict[int, str] = labels def read_value(self, data: ProtocolResponse): return self._labels.get(read_bytes2(data, None, 0)) @@ -450,9 +449,9 @@ def read_value(self, data: ProtocolResponse): class EnumBitmap4(Sensor): """Sensor representing label from bitmap encoded in 4 bytes""" - def __init__(self, id_: str, offset: int, labels: Dict, name: str, kind: Optional[SensorKind] = None): + def __init__(self, id_: str, offset: int, labels: dict[int, str], name: str, kind: Optional[SensorKind] = None): super().__init__(id_, offset, name, 4, "", kind) - self._labels: Dict = labels + self._labels: dict[int, str] = labels def read_value(self, data: ProtocolResponse) -> Any: raise NotImplementedError() @@ -465,10 +464,10 @@ def read(self, data: ProtocolResponse): class EnumBitmap22(Sensor): """Sensor representing label from bitmap encoded in 2+2 bytes""" - def __init__(self, id_: str, offsetH: int, offsetL: int, labels: Dict, name: str, + def __init__(self, id_: str, offsetH: int, offsetL: int, labels: dict[int, str], name: str, kind: Optional[SensorKind] = None): super().__init__(id_, offsetH, name, 2, "", kind) - self._labels: Dict = labels + self._labels: dict[int, str] = labels self._offsetL: int = offsetL def read_value(self, data: ProtocolResponse) -> Any: @@ -482,11 +481,11 @@ def read(self, data: ProtocolResponse): class EnumCalculated(Sensor): """Sensor representing label from enumeration of calculated value""" - def __init__(self, id_: str, getter: Callable[[ProtocolResponse], Any], labels: Dict, name: str, + def __init__(self, id_: str, getter: Callable[[ProtocolResponse], Any], labels: dict[int, str], name: str, kind: Optional[SensorKind] = None): super().__init__(id_, 0, name, 0, "", kind) self._getter: Callable[[ProtocolResponse], Any] = getter - self._labels: Dict = labels + self._labels: dict[int, str] = labels def read_value(self, data: ProtocolResponse) -> Any: raise NotImplementedError() @@ -500,23 +499,23 @@ class EcoMode(ABC): @abstractmethod def encode_charge(self, eco_mode_power: int, eco_mode_soc: int = 100) -> bytes: - """Answer bytes representing all the time enabled charging eco mode group""" + """Answer bytes representing all the time enabled charging eco-mode group""" @abstractmethod def encode_discharge(self, eco_mode_power: int) -> bytes: - """Answer bytes representing all the time enabled discharging eco mode group""" + """Answer bytes representing all the time enabled discharging eco-mode group""" @abstractmethod def encode_off(self) -> bytes: - """Answer bytes representing empty and disabled eco mode group""" + """Answer bytes representing empty and disabled eco-mode group""" @abstractmethod def is_eco_charge_mode(self) -> bool: - """Answer if it represents the emulated 24/7 fulltime discharge mode""" + """Answer if it represents the emulated 24/7 full-time discharge mode""" @abstractmethod def is_eco_discharge_mode(self) -> bool: - """Answer if it represents the emulated 24/7 fulltime discharge mode""" + """Answer if it represents the emulated 24/7 full-time discharge mode""" @abstractmethod def get_schedule_type(self) -> ScheduleType: @@ -586,19 +585,19 @@ def encode_value(self, value: Any, register_value: bytes = None) -> bytes: raise ValueError def encode_charge(self, eco_mode_power: int, eco_mode_soc: int = 100) -> bytes: - """Answer bytes representing all the time enabled charging eco mode group""" + """Answer bytes representing all the time enabled charging eco-mode group""" return bytes.fromhex("0000173b{:04x}ff7f".format((-1 * abs(eco_mode_power)) & (2 ** 16 - 1))) def encode_discharge(self, eco_mode_power: int) -> bytes: - """Answer bytes representing all the time enabled discharging eco mode group""" + """Answer bytes representing all the time enabled discharging eco-mode group""" return bytes.fromhex("0000173b{:04x}ff7f".format(abs(eco_mode_power))) def encode_off(self) -> bytes: - """Answer bytes representing empty and disabled eco mode group""" + """Answer bytes representing empty and disabled eco-mode group""" return bytes.fromhex("3000300000640000") def is_eco_charge_mode(self) -> bool: - """Answer if it represents the emulated 24/7 fulltime discharge mode""" + """Answer if it represents the emulated 24/7 full-time discharge mode""" return self.start_h == 0 \ and self.start_m == 0 \ and self.end_h == 23 \ @@ -608,7 +607,7 @@ def is_eco_charge_mode(self) -> bool: and self.power < 0 def is_eco_discharge_mode(self) -> bool: - """Answer if it represents the emulated 24/7 fulltime discharge mode""" + """Answer if it represents the emulated 24/7 full-time discharge mode""" return self.start_h == 0 \ and self.start_m == 0 \ and self.end_h == 23 \ @@ -707,7 +706,7 @@ def encode_value(self, value: Any, register_value: bytes = None) -> bytes: raise ValueError def encode_charge(self, eco_mode_power: int, eco_mode_soc: int = 100) -> bytes: - """Answer bytes representing all the time enabled charging eco mode group""" + """Answer bytes representing all the time enabled charging eco-mode group""" return bytes.fromhex( "0000173b{:02x}7f{:04x}{:04x}{:04x}".format( 255 - self.schedule_type, @@ -716,7 +715,7 @@ def encode_charge(self, eco_mode_power: int, eco_mode_soc: int = 100) -> bytes: 0 if self.schedule_type != ScheduleType.ECO_MODE_745 else 0x0fff)) def encode_discharge(self, eco_mode_power: int) -> bytes: - """Answer bytes representing all the time enabled discharging eco mode group""" + """Answer bytes representing all the time enabled discharging eco-mode group""" return bytes.fromhex("0000173b{:02x}7f{:04x}0064{:04x}".format( 255 - self.schedule_type, abs(self.schedule_type.encode_power(eco_mode_power)), @@ -729,7 +728,7 @@ def encode_off(self) -> bytes: self.schedule_type.encode_power(100))) def is_eco_charge_mode(self) -> bool: - """Answer if it represents the emulated 24/7 fulltime discharge mode""" + """Answer if it represents the emulated 24/7 full-time discharge mode""" return self.start_h == 0 \ and self.start_m == 0 \ and self.end_h == 23 \ @@ -740,7 +739,7 @@ def is_eco_charge_mode(self) -> bool: and (self.month_bits == 0 or self.month_bits == 0x0fff) def is_eco_discharge_mode(self) -> bool: - """Answer if it represents the emulated 24/7 fulltime discharge mode""" + """Answer if it represents the emulated 24/7 full-time discharge mode""" return self.start_h == 0 \ and self.start_m == 0 \ and self.end_h == 23 \ @@ -981,7 +980,7 @@ def read_unsigned_int(data: bytes, offset: int) -> int: return int.from_bytes(data[offset:offset + 2], byteorder="big", signed=False) -def decode_bitmap(value: int, bitmap: Dict[int, str]) -> str: +def decode_bitmap(value: int, bitmap: dict[int, str]) -> str: bits = value result = [] for i in range(32): diff --git a/tests/inverter_scan.py b/tests/inverter_scan.py index b7607d5..b0004c9 100644 --- a/tests/inverter_scan.py +++ b/tests/inverter_scan.py @@ -21,7 +21,7 @@ def try_command(command, ip): print(f"Trying command: {command}") try: response = asyncio.run( - ProtocolCommand(bytes.fromhex(command), lambda x: True).execute(UdpInverterProtocol(ip, 8899))) + ProtocolCommand(bytes.fromhex(command), lambda x: True).execute(UdpInverterProtocol(ip, 8899, 0x7f))) print(f"Response to {command} command: {response.raw_data.hex()}") except InverterError: print(f"No response to {command} command") diff --git a/tests/mock_aa55_server.py b/tests/mock_aa55_server.py index 44883c2..b30409d 100644 --- a/tests/mock_aa55_server.py +++ b/tests/mock_aa55_server.py @@ -62,7 +62,7 @@ async def main(): # low-level APIs. loop = asyncio.get_running_loop() - transport, protocol = await loop.create_datagram_endpoint( + transport, _ = await loop.create_datagram_endpoint( lambda: EchoServerProtocol(), local_addr=('127.0.0.1', 8899)) diff --git a/tests/test_dt.py b/tests/test_dt.py index 5831198..5cdcedb 100644 --- a/tests/test_dt.py +++ b/tests/test_dt.py @@ -40,7 +40,7 @@ async def _read_from_socket(self, command: ProtocolCommand) -> ProtocolResponse: def assertSensor(self, sensor_name, expected_value, expected_unit, data): self.assertEqual(expected_value, data.get(sensor_name)) - sensor = self.sensor_map.get(sensor_name); + sensor = self.sensor_map.get(sensor_name) self.assertEqual(expected_unit, sensor.unit) self.sensor_map.pop(sensor_name) diff --git a/tests/test_et.py b/tests/test_et.py index b4b9a82..4cf0edd 100644 --- a/tests/test_et.py +++ b/tests/test_et.py @@ -43,7 +43,7 @@ async def _read_from_socket(self, command: ProtocolCommand) -> ProtocolResponse: def assertSensor(self, sensor_name, expected_value, expected_unit, data): self.assertEqual(expected_value, data.get(sensor_name)) - sensor = self.sensor_map.get(sensor_name); + sensor = self.sensor_map.get(sensor_name) self.assertEqual(expected_unit, sensor.unit) self.sensor_map.pop(sensor_name) diff --git a/tests/test_sensor.py b/tests/test_sensor.py index 3409c64..4dd2fcd 100644 --- a/tests/test_sensor.py +++ b/tests/test_sensor.py @@ -1,5 +1,6 @@ from unittest import TestCase +from goodwe.const import * from goodwe.sensor import *