diff --git a/goodwe/dt.py b/goodwe/dt.py index d6ca6bf..1b248b0 100644 --- a/goodwe/dt.py +++ b/goodwe/dt.py @@ -149,6 +149,7 @@ def __init__(self, host: str, port: int, comm_addr: int = 0, timeout: int = 1, r self._sensors = self.__all_sensors self._sensors_meter = self.__all_sensors_meter self._settings: dict[str, Sensor] = {s.id_: s for s in self.__all_settings} + self._sensors_map: dict[str, Sensor] | None = None self._has_meter: bool = True @staticmethod @@ -205,25 +206,34 @@ async def read_runtime_data(self) -> dict[str, Any]: return data + async def read_sensor(self, sensor_id: str) -> Any: + sensor: Sensor = self._get_sensor(sensor_id) + if sensor: + return await self._read_sensor(sensor) + if sensor_id.startswith("modbus"): + response = await self._read_from_socket(self._read_command(int(sensor_id[7:]), 1)) + return int.from_bytes(response.read(2), byteorder="big", signed=True) + raise ValueError(f'Unknown sensor "{sensor_id}"') + async def read_setting(self, setting_id: str) -> Any: setting = self._settings.get(setting_id) if setting: - return await self._read_setting(setting) + return await self._read_sensor(setting) if setting_id.startswith("modbus"): response = await self._read_from_socket(self._read_command(int(setting_id[7:]), 1)) return int.from_bytes(response.read(2), byteorder="big", signed=True) raise ValueError(f'Unknown setting "{setting_id}"') - async def _read_setting(self, setting: Sensor) -> Any: + async def _read_sensor(self, setting: Sensor) -> Any: try: count = (setting.size_ + (setting.size_ % 2)) // 2 response = await self._read_from_socket(self._read_command(setting.offset, count)) return setting.read_value(response) except RequestRejectedException as ex: if ex.message == ILLEGAL_DATA_ADDRESS: - logger.debug("Unsupported setting %s", setting.id_) + logger.debug("Unsupported sensor/setting %s", setting.id_) self._settings.pop(setting.id_, None) - raise ValueError(f'Unknown setting "{setting.id_}"') + raise ValueError(f'Unknown sensor/setting "{setting.id_}"') return None async def write_setting(self, setting_id: str, value: Any): @@ -279,6 +289,11 @@ async def get_ongrid_battery_dod(self) -> int: async def set_ongrid_battery_dod(self, dod: int) -> None: raise InverterError("Operation not supported, inverter has no batteries.") + def _get_sensor(self, sensor_id: str) -> Sensor | None: + if self._sensors_map is None: + self._sensors_map = {s.id_: s for s in self.sensors()} + return self._sensors_map.get(sensor_id) + def sensors(self) -> tuple[Sensor, ...]: result = self._sensors if self._has_meter: diff --git a/goodwe/es.py b/goodwe/es.py index 1043f73..d9ef7fc 100644 --- a/goodwe/es.py +++ b/goodwe/es.py @@ -206,6 +206,10 @@ async def read_runtime_data(self) -> dict[str, Any]: data = self._map_response(response, self.__sensors) return data + async def read_sensor(self, sensor_id: str) -> Any: + data = await self.read_runtime_data() + return data[sensor_id] + async def read_setting(self, setting_id: str) -> Any: if setting_id == 'time': # Fake setting, just to enable write_setting to work (if checked as pair in read as in HA) diff --git a/goodwe/et.py b/goodwe/et.py index 84d066e..275bca0 100644 --- a/goodwe/et.py +++ b/goodwe/et.py @@ -492,6 +492,7 @@ def __init__(self, host: str, port: int, comm_addr: int = 0, timeout: int = 1, r self._sensors_meter = self.__all_sensors_meter self._sensors_mppt = self.__all_sensors_mppt self._settings: dict[str, Sensor] = {s.id_: s for s in self.__all_settings} + self._sensors_map: dict[str, Sensor] | None = None @staticmethod def _single_phase_only(s: Sensor) -> bool: @@ -641,25 +642,34 @@ async def read_runtime_data(self) -> dict[str, Any]: return data + async def read_sensor(self, sensor_id: str) -> Any: + sensor: Sensor = self._get_sensor(sensor_id) + if sensor: + return await self._read_sensor(sensor) + if sensor_id.startswith("modbus"): + response = await self._read_from_socket(self._read_command(int(sensor_id[7:]), 1)) + return int.from_bytes(response.read(2), byteorder="big", signed=True) + raise ValueError(f'Unknown sensor "{sensor_id}"') + async def read_setting(self, setting_id: str) -> Any: - setting = self._settings.get(setting_id) + setting: Sensor = self._settings.get(setting_id) if setting: - return await self._read_setting(setting) + return await self._read_sensor(setting) if setting_id.startswith("modbus"): response = await self._read_from_socket(self._read_command(int(setting_id[7:]), 1)) return int.from_bytes(response.read(2), byteorder="big", signed=True) raise ValueError(f'Unknown setting "{setting_id}"') - async def _read_setting(self, setting: Sensor) -> Any: + async def _read_sensor(self, sensor: Sensor) -> Any: try: - count = (setting.size_ + (setting.size_ % 2)) // 2 - response = await self._read_from_socket(self._read_command(setting.offset, count)) - return setting.read_value(response) + count = (sensor.size_ + (sensor.size_ % 2)) // 2 + response = await self._read_from_socket(self._read_command(sensor.offset, count)) + return sensor.read_value(response) except RequestRejectedException as ex: if ex.message == ILLEGAL_DATA_ADDRESS: - logger.debug("Unsupported setting %s", setting.id_) - self._settings.pop(setting.id_, None) - raise ValueError(f'Unknown setting "{setting.id_}"') + logger.debug("Unsupported sensor/setting %s", sensor.id_) + self._settings.pop(sensor.id_, None) + raise ValueError(f'Unknown sensor/setting "{sensor.id_}"') return None async def write_setting(self, setting_id: str, value: Any): @@ -766,7 +776,7 @@ async def set_operation_mode(self, operation_mode: OperationMode, eco_mode_power eco_mode: EcoMode | Sensor = self._settings.get('eco_mode_1') # Load the current values to try to detect schedule type try: - await self._read_setting(eco_mode) + await self._read_sensor(eco_mode) except ValueError: pass eco_mode.set_schedule_type(ScheduleType.ECO_MODE, is_745_platform(self)) @@ -787,6 +797,11 @@ async def set_ongrid_battery_dod(self, dod: int) -> None: if 0 <= dod <= 100: await self.write_setting('battery_discharge_depth', 100 - dod) + def _get_sensor(self, sensor_id: str) -> Sensor | None: + if self._sensors_map is None: + self._sensors_map = {s.id_: s for s in self.sensors()} + return self._sensors_map.get(sensor_id) + def sensors(self) -> tuple[Sensor, ...]: result = self._sensors + self._sensors_meter if self._has_battery: diff --git a/goodwe/inverter.py b/goodwe/inverter.py index eba6a8c..1bfdfc3 100644 --- a/goodwe/inverter.py +++ b/goodwe/inverter.py @@ -151,6 +151,14 @@ async def read_runtime_data(self) -> dict[str, Any]: """ raise NotImplementedError() + @abstractmethod + async def read_sensor(self, sensor_id: str) -> Any: + """ + Read the value of specific inverter sensor. + Sensor must be in list provided by sensors() method, otherwise ValueError is raised. + """ + raise NotImplementedError() + @abstractmethod async def read_setting(self, setting_id: str) -> Any: """ diff --git a/pylintrc b/pylintrc new file mode 100644 index 0000000..d84c303 Binary files /dev/null and b/pylintrc differ diff --git a/tests/inverter_check.py b/tests/inverter_check.py index a1191f3..39dbce1 100644 --- a/tests/inverter_check.py +++ b/tests/inverter_check.py @@ -64,6 +64,11 @@ if sensor.id_ in response: print(f"{sensor.id_}: \t\t {sensor.name} = {response[sensor.id_]} {sensor.unit}") +# ------------- +# Read sensorr +# ------------- +# print(asyncio.run(inverter.read_sensor('vpv1'))) + # ------------- # Read settings # -------------