Skip to content

Commit

Permalink
Switch to python 3.9 collections typing hints
Browse files Browse the repository at this point in the history
  • Loading branch information
mletenay committed Jul 5, 2024
1 parent abdfeda commit 5e856fd
Show file tree
Hide file tree
Showing 13 changed files with 101 additions and 109 deletions.
32 changes: 15 additions & 17 deletions goodwe/const.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from typing import Dict

GOODWE_TCP_PORT = 502
GOODWE_UDP_PORT = 8899

BATTERY_MODES: Dict[int, str] = {
BATTERY_MODES: dict[int, str] = {
0: "No battery",
1: "Standby",
2: "Discharge",
Expand All @@ -12,7 +10,7 @@
5: "To be discharged",
}

ENERGY_MODES: Dict[int, str] = {
ENERGY_MODES: dict[int, str] = {
0: "Check Mode",
1: "Wait Mode",
2: "Normal (On-Grid)",
Expand All @@ -24,37 +22,37 @@
128: "Battery Discharging",
}

GRID_MODES: Dict[int, str] = {
GRID_MODES: dict[int, str] = {
0: "Not connected to grid",
1: "Connected to grid",
2: "Fault",
}

GRID_IN_OUT_MODES: Dict[int, str] = {
GRID_IN_OUT_MODES: dict[int, str] = {
0: "Idle",
1: "Exporting",
2: "Importing",
}

LOAD_MODES: Dict[int, str] = {
LOAD_MODES: dict[int, str] = {
0: "Inverter and the load is disconnected",
1: "The inverter is connected to a load",
}

PV_MODES: Dict[int, str] = {
PV_MODES: dict[int, str] = {
0: "PV panels not connected",
1: "PV panels connected, no power",
2: "PV panels connected, producing power",
}

WORK_MODES: Dict[int, str] = {
WORK_MODES: dict[int, str] = {
0: "Wait Mode",
1: "Normal",
2: "Error",
4: "Check Mode",
}

WORK_MODES_ET: Dict[int, str] = {
WORK_MODES_ET: dict[int, str] = {
0: "Wait Mode",
1: "Normal (On-Grid)",
2: "Normal (Off-Grid)",
Expand All @@ -63,14 +61,14 @@
5: "Check Mode",
}

WORK_MODES_ES: Dict[int, str] = {
WORK_MODES_ES: dict[int, str] = {
0: "Inverter Off - Standby",
1: "Inverter On",
2: "Inverter Abnormal, stopping power",
3: "Inverter Severly Abnormal, 20 seconds to restart",
}

SAFETY_COUNTRIES: Dict[int, str] = {
SAFETY_COUNTRIES: dict[int, str] = {
0: "IT CEI 0-21",
1: "CZ-A1",
2: "DE LV with PV",
Expand Down Expand Up @@ -198,7 +196,7 @@
149: "Brazil 254Vac",
}

ERROR_CODES: Dict[int, str] = {
ERROR_CODES: dict[int, str] = {
31: 'Internal Communication Failure',
30: 'EEPROM R/W Failure',
29: 'Fac Failure',
Expand Down Expand Up @@ -233,7 +231,7 @@
0: 'GFCI Device Check Failure',
}

DIAG_STATUS_CODES: Dict[int, str] = {
DIAG_STATUS_CODES: dict[int, str] = {
0: "Battery voltage low",
1: "Battery SOC low",
2: "Battery SOC in back",
Expand Down Expand Up @@ -265,7 +263,7 @@
28: "SOC protect off",
}

BMS_ALARM_CODES: Dict[int, str] = {
BMS_ALARM_CODES: dict[int, str] = {
15: 'Charging over-voltage 3',
14: 'Discharging under-voltage 3',
13: 'Cell temperature high 3',
Expand All @@ -284,7 +282,7 @@
0: 'Charging over-voltage 2',
}

BMS_WARNING_CODES: Dict[int, str] = {
BMS_WARNING_CODES: dict[int, str] = {
11: 'System temperature high',
10: 'System temperature low 2',
9: 'System temperature low 1',
Expand All @@ -299,7 +297,7 @@
0: 'Charging over-voltage 1',
}

DERATING_MODE_CODES: Dict[int, str] = {
DERATING_MODE_CODES: dict[int, str] = {
31: '',
30: '',
29: '',
Expand Down
32 changes: 15 additions & 17 deletions goodwe/dt.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
from __future__ import annotations

import logging
from typing import Tuple

from .const import *
from .exceptions import InverterError, RequestFailedException, RequestRejectedException
from .inverter import Inverter
from .inverter import OperationMode
from .inverter import SensorKind as Kind
from .inverter import Inverter, OperationMode, SensorKind as Kind
from .modbus import ILLEGAL_DATA_ADDRESS
from .model import is_3_mppt, is_single_phase
from .protocol import ProtocolCommand
Expand All @@ -18,7 +16,7 @@
class DT(Inverter):
"""Class representing inverter of DT/MS/D-NS/XS or GE's GEP(PSB/PSC) families"""

__all_sensors: Tuple[Sensor, ...] = (
__all_sensors: tuple[Sensor, ...] = (
Timestamp("timestamp", 30100, "Timestamp"),
Voltage("vpv1", 30103, "PV1 Voltage", Kind.PV),
Current("ipv1", 30104, "PV1 Current", Kind.PV),
Expand All @@ -37,9 +35,9 @@ class DT(Inverter):
"PV3 Power", "W", Kind.PV),
# ppv1 + ppv2 + ppv3
Calculated("ppv",
lambda data: (round(read_voltage(data, 30103) * read_current(data, 30104))) + (round(
read_voltage(data, 30105) * read_current(data, 30106))) + (round(
read_voltage(data, 30107) * read_current(data, 30108))),
lambda data: (round(read_voltage(data, 30103) * read_current(data, 30104))) + (
round(read_voltage(data, 30105) * read_current(data, 30106))) + (
round(read_voltage(data, 30107) * read_current(data, 30108))),
"PV Power", "W", Kind.PV),
# Voltage("vpv4", 14, "PV4 Voltage", Kind.PV),
# Current("ipv4", 16, "PV4 Current", Kind.PV),
Expand Down Expand Up @@ -115,12 +113,12 @@ class DT(Inverter):

# Inverter's meter data
# Modbus registers from offset 0x75f4 (30196)
__all_sensors_meter: Tuple[Sensor, ...] = (
__all_sensors_meter: tuple[Sensor, ...] = (
PowerS("active_power", 30196, "Active Power", Kind.GRID),
)

# Modbus registers of inverter settings, offsets are modbus register addresses
__all_settings: Tuple[Sensor, ...] = (
__all_settings: tuple[Sensor, ...] = (
Timestamp("time", 40313, "Inverter time"),

Integer("shadow_scan", 40326, "Shadow Scan", "", Kind.PV),
Expand All @@ -133,12 +131,12 @@ class DT(Inverter):
)

# Settings for single phase inverters
__settings_single_phase: Tuple[Sensor, ...] = (
__settings_single_phase: tuple[Sensor, ...] = (
Long("grid_export_limit", 40328, "Grid Export Limit", "W", Kind.GRID),
)

# Settings for three phase inverters
__settings_three_phase: Tuple[Sensor, ...] = (
__settings_three_phase: tuple[Sensor, ...] = (
Integer("grid_export_limit", 40336, "Grid Export Limit", "%", Kind.GRID),
)

Expand Down Expand Up @@ -193,7 +191,7 @@ async def read_device_info(self):
self._sensors = tuple(filter(self._pv1_pv2_only, self._sensors))
pass

async def read_runtime_data(self) -> Dict[str, Any]:
async def read_runtime_data(self) -> dict[str, Any]:
response = await self._read_from_socket(self._READ_RUNNING_DATA)
data = self._map_response(response, self._sensors)

Expand Down Expand Up @@ -253,7 +251,7 @@ async def _write_setting(self, setting: Sensor, value: Any):
else:
await self._read_from_socket(self._write_multi_command(setting.offset, raw_value))

async def read_settings_data(self) -> Dict[str, Any]:
async def read_settings_data(self) -> dict[str, Any]:
data = {}
for setting in self.settings():
value = await self.read_setting(setting.id_)
Expand All @@ -267,7 +265,7 @@ async def set_grid_export_limit(self, export_limit: int) -> None:
if export_limit >= 0:
return await self.write_setting('grid_export_limit', export_limit)

async def get_operation_modes(self, include_emulated: bool) -> Tuple[OperationMode, ...]:
async def get_operation_modes(self, include_emulated: bool) -> tuple[OperationMode, ...]:
return ()

async def get_operation_mode(self) -> OperationMode:
Expand All @@ -283,11 +281,11 @@ async def get_ongrid_battery_dod(self) -> int:
async def set_ongrid_battery_dod(self, dod: int) -> None:
raise InverterError("Operation not supported, inverter has no batteries.")

def sensors(self) -> Tuple[Sensor, ...]:
def sensors(self) -> tuple[Sensor, ...]:
result = self._sensors
if self._has_meter:
result = result + self._sensors_meter
return result

def settings(self) -> Tuple[Sensor, ...]:
def settings(self) -> tuple[Sensor, ...]:
return tuple(self._settings.values())
22 changes: 10 additions & 12 deletions goodwe/es.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
from __future__ import annotations

import logging
from typing import Tuple

from .const import *
from .exceptions import InverterError
from .inverter import Inverter
from .inverter import OperationMode
from .inverter import SensorKind as Kind
from .inverter import Inverter, OperationMode, SensorKind as Kind
from .protocol import ProtocolCommand, Aa55ProtocolCommand, Aa55ReadCommand, Aa55WriteCommand, Aa55WriteMultiCommand
from .sensor import *

Expand All @@ -20,7 +18,7 @@ class ES(Inverter):
_READ_DEVICE_RUNNING_DATA: ProtocolCommand = Aa55ProtocolCommand("010600", "0186")
_READ_DEVICE_SETTINGS_DATA: ProtocolCommand = Aa55ProtocolCommand("010900", "0189")

__sensors: Tuple[Sensor, ...] = (
__sensors: tuple[Sensor, ...] = (
Voltage("vpv1", 0, "PV1 Voltage", Kind.PV), # modbus 0x500
Current("ipv1", 2, "PV1 Current", Kind.PV),
Calculated("ppv1",
Expand Down Expand Up @@ -124,7 +122,7 @@ class ES(Inverter):
"House Consumption", "W", Kind.AC),
)

__all_settings: Tuple[Sensor, ...] = (
__all_settings: tuple[Sensor, ...] = (
Integer("backup_supply", 12, "Backup Supply"),
Integer("off-grid_charge", 14, "Off-grid Charge"),
Integer("shadow_scan", 16, "Shadow Scan", "", Kind.PV),
Expand Down Expand Up @@ -156,7 +154,7 @@ class ES(Inverter):
)

# Settings added in ARM firmware 14
__settings_arm_fw_14: Tuple[Sensor, ...] = (
__settings_arm_fw_14: tuple[Sensor, ...] = (
EcoModeV2("eco_mode_1", 47547, "Eco Mode Group 1"),
ByteH("eco_mode_1_switch", 47549, "Eco Mode Group 1 Switch"),
EcoModeV2("eco_mode_2", 47553, "Eco Mode Group 2"),
Expand Down Expand Up @@ -202,7 +200,7 @@ async def read_device_info(self):
if self._supports_eco_mode_v2():
self._settings.update({s.id_: s for s in self.__settings_arm_fw_14})

async def read_runtime_data(self) -> Dict[str, Any]:
async def read_runtime_data(self) -> dict[str, Any]:
response = await self._read_from_socket(self._READ_DEVICE_RUNNING_DATA)
data = self._map_response(response, self.__sensors)
return data
Expand Down Expand Up @@ -271,7 +269,7 @@ async def _write_setting(self, setting: Sensor, value: Any):
else:
await self._read_from_socket(Aa55WriteMultiCommand(setting.offset, raw_value))

async def read_settings_data(self) -> Dict[str, Any]:
async def read_settings_data(self) -> dict[str, Any]:
response = await self._read_from_socket(self._READ_DEVICE_SETTINGS_DATA)
data = self._map_response(response, self.settings())
return data
Expand All @@ -285,7 +283,7 @@ async def set_grid_export_limit(self, export_limit: int) -> None:
Aa55ProtocolCommand("033502" + "{:04x}".format(export_limit), "03b5")
)

async def get_operation_modes(self, include_emulated: bool) -> Tuple[OperationMode, ...]:
async def get_operation_modes(self, include_emulated: bool) -> tuple[OperationMode, ...]:
result = [e for e in OperationMode]
result.remove(OperationMode.PEAK_SHAVING)
result.remove(OperationMode.SELF_USE)
Expand Down Expand Up @@ -349,10 +347,10 @@ async def set_ongrid_battery_dod(self, dod: int) -> None:
async def _reset_inverter(self) -> None:
await self._read_from_socket(Aa55ProtocolCommand("031d00", "039d"))

def sensors(self) -> Tuple[Sensor, ...]:
def sensors(self) -> tuple[Sensor, ...]:
return self.__sensors

def settings(self) -> Tuple[Sensor, ...]:
def settings(self) -> tuple[Sensor, ...]:
return tuple(self._settings.values())

async def _set_general_mode(self) -> None:
Expand Down
Loading

0 comments on commit 5e856fd

Please sign in to comment.