Skip to content

Commit

Permalink
Address some pylint violations
Browse files Browse the repository at this point in the history
  • Loading branch information
mletenay committed Jul 5, 2024
1 parent 5e856fd commit f3d85b4
Show file tree
Hide file tree
Showing 11 changed files with 62 additions and 68 deletions.
4 changes: 2 additions & 2 deletions goodwe/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Goodwe solar inverter communication library."""
from __future__ import annotations

import asyncio
Expand Down Expand Up @@ -127,7 +128,6 @@ async def search_inverters() -> bytes:
result = await command.execute(UdpInverterProtocol("255.255.255.255", 48899, 1, 0))
if result is not None:
return result.response_data()
else:
raise InverterError("No response received to broadcast request.")
raise InverterError("No response received to broadcast request.")
except asyncio.CancelledError:
raise InverterError("No valid response received to broadcast request.") from None
1 change: 1 addition & 0 deletions goodwe/const.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Constants."""
GOODWE_TCP_PORT = 502
GOODWE_UDP_PORT = 8899

Expand Down
12 changes: 5 additions & 7 deletions goodwe/dt.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Grid-only inverter support - models DT/MS/D-NS/XS or GE's GEP(PSB/PSC)"""
from __future__ import annotations

import logging
Expand Down Expand Up @@ -189,7 +190,6 @@ async def read_device_info(self):
else:
# this is only 2 PV strings inverter
self._sensors = tuple(filter(self._pv1_pv2_only, self._sensors))
pass

async def read_runtime_data(self) -> dict[str, Any]:
response = await self._read_from_socket(self._READ_RUNNING_DATA)
Expand All @@ -209,12 +209,10 @@ async def read_setting(self, setting_id: str) -> Any:
setting = self._settings.get(setting_id)
if setting:
return await self._read_setting(setting)
else:
if setting_id.startswith("modbus"):
response = await self._read_from_socket(self._read_command(int(setting_id[7:]), 1))
return int.from_bytes(response.read(2), byteorder="big", signed=True)
else:
raise ValueError(f'Unknown setting "{setting_id}"')
if setting_id.startswith("modbus"):
response = await self._read_from_socket(self._read_command(int(setting_id[7:]), 1))
return int.from_bytes(response.read(2), byteorder="big", signed=True)
raise ValueError(f'Unknown setting "{setting_id}"')

async def _read_setting(self, setting: Sensor) -> Any:
try:
Expand Down
22 changes: 10 additions & 12 deletions goodwe/es.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Single phase hybrid inverter support aka platform 105."""
from __future__ import annotations

import logging
Expand Down Expand Up @@ -210,29 +211,27 @@ async def read_setting(self, setting_id: str) -> Any:
# Fake setting, just to enable write_setting to work (if checked as pair in read as in HA)
# There does not seem to be time setting/sensor available (or is not known)
return datetime.now()
elif setting_id in ('eco_mode_1', 'eco_mode_2', 'eco_mode_3', 'eco_mode_4'):
if setting_id in ('eco_mode_1', 'eco_mode_2', 'eco_mode_3', 'eco_mode_4'):
setting: Sensor | None = self._settings.get(setting_id)
if not setting:
raise ValueError(f'Unknown setting "{setting_id}"')
return await self._read_setting(setting)
elif setting_id.startswith("modbus"):
if setting_id.startswith("modbus"):
response = await self._read_from_socket(self._read_command(int(setting_id[7:]), 1))
return int.from_bytes(response.read(2), byteorder="big", signed=True)
elif setting_id in self._settings:
if setting_id in self._settings:
logger.debug("Reading setting %s", setting_id)
all_settings = await self.read_settings_data()
return all_settings.get(setting_id)
else:
raise ValueError(f'Unknown setting "{setting_id}"')
raise ValueError(f'Unknown setting "{setting_id}"')

async def _read_setting(self, setting: Sensor) -> Any:
count = (setting.size_ + (setting.size_ % 2)) // 2
if self._is_modbus_setting(setting):
response = await self._read_from_socket(self._read_command(setting.offset, count))
return setting.read_value(response)
else:
response = await self._read_from_socket(Aa55ReadCommand(setting.offset, count))
return setting.read_value(response)
response = await self._read_from_socket(Aa55ReadCommand(setting.offset, count))
return setting.read_value(response)

async def write_setting(self, setting_id: str, value: Any):
if setting_id == 'time':
Expand Down Expand Up @@ -284,7 +283,7 @@ async def set_grid_export_limit(self, export_limit: int) -> None:
)

async def get_operation_modes(self, include_emulated: bool) -> tuple[OperationMode, ...]:
result = [e for e in OperationMode]
result = list(OperationMode)
result.remove(OperationMode.PEAK_SHAVING)
result.remove(OperationMode.SELF_USE)
if not include_emulated:
Expand All @@ -304,10 +303,9 @@ async def get_operation_mode(self) -> OperationMode | None:
eco_mode = await self.read_setting('eco_mode_1')
if eco_mode.is_eco_charge_mode():
return OperationMode.ECO_CHARGE
elif eco_mode.is_eco_discharge_mode():
if eco_mode.is_eco_discharge_mode():
return OperationMode.ECO_DISCHARGE
else:
return OperationMode.ECO
return OperationMode.ECO

async def set_operation_mode(self, operation_mode: OperationMode, eco_mode_power: int = 100,
eco_mode_soc: int = 100) -> None:
Expand Down
18 changes: 8 additions & 10 deletions goodwe/et.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Hybrid inverter support aka platform 205, 745, 753"""
from __future__ import annotations

import logging
Expand Down Expand Up @@ -644,12 +645,10 @@ async def read_setting(self, setting_id: str) -> Any:
setting = self._settings.get(setting_id)
if setting:
return await self._read_setting(setting)
else:
if setting_id.startswith("modbus"):
response = await self._read_from_socket(self._read_command(int(setting_id[7:]), 1))
return int.from_bytes(response.read(2), byteorder="big", signed=True)
else:
raise ValueError(f'Unknown setting "{setting_id}"')
if setting_id.startswith("modbus"):
response = await self._read_from_socket(self._read_command(int(setting_id[7:]), 1))
return int.from_bytes(response.read(2), byteorder="big", signed=True)
raise ValueError(f'Unknown setting "{setting_id}"')

async def _read_setting(self, setting: Sensor) -> Any:
try:
Expand Down Expand Up @@ -705,7 +704,7 @@ async def set_grid_export_limit(self, export_limit: int) -> None:
await self.write_setting('grid_export_limit', export_limit)

async def get_operation_modes(self, include_emulated: bool) -> tuple[OperationMode, ...]:
result = [e for e in OperationMode]
result = list(OperationMode)
if not self._has_peak_shaving:
result.remove(OperationMode.PEAK_SHAVING)
if not is_745_platform(self):
Expand All @@ -727,10 +726,9 @@ async def get_operation_mode(self) -> OperationMode | None:
eco_mode = await self.read_setting('eco_mode_1')
if eco_mode.is_eco_charge_mode():
return OperationMode.ECO_CHARGE
elif eco_mode.is_eco_discharge_mode():
if eco_mode.is_eco_discharge_mode():
return OperationMode.ECO_DISCHARGE
else:
return OperationMode.ECO
return OperationMode.ECO

async def set_operation_mode(self, operation_mode: OperationMode, eco_mode_power: int = 100,
eco_mode_soc: int = 100) -> None:
Expand Down
3 changes: 3 additions & 0 deletions goodwe/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
"""Exceptions declarations."""


class InverterError(Exception):
"""Indicates error communicating with inverter"""

Expand Down
4 changes: 2 additions & 2 deletions goodwe/inverter.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Generic inverter API module."""
from __future__ import annotations

import logging
Expand Down Expand Up @@ -274,8 +275,7 @@ def settings(self) -> tuple[Sensor, ...]:
def _create_protocol(host: str, port: int, comm_addr: int, timeout: int, retries: int) -> InverterProtocol:
if port == 502:
return TcpInverterProtocol(host, port, comm_addr, timeout, retries)
else:
return UdpInverterProtocol(host, port, comm_addr, timeout, retries)
return UdpInverterProtocol(host, port, comm_addr, timeout, retries)

@staticmethod
def _map_response(response: ProtocolResponse, sensors: tuple[Sensor, ...]) -> dict[str, Any]:
Expand Down
1 change: 1 addition & 0 deletions goodwe/modbus.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Modbus protocol implementation."""
import logging
from typing import Union

Expand Down
2 changes: 1 addition & 1 deletion goodwe/model.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Serial number tags to identify inverter type
"""Constants identifying inverter type/model."""
from .inverter import Inverter

PLATFORM_105_MODELS = ("ESU", "EMU", "ESA", "BPS", "BPU", "EMJ", "IJL")
Expand Down
1 change: 1 addition & 0 deletions goodwe/protocol.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Low level IP communication protocol implementation."""
from __future__ import annotations

import asyncio
Expand Down
62 changes: 28 additions & 34 deletions goodwe/sensor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Inverter sensor types."""
from __future__ import annotations

from abc import ABC, abstractmethod
Expand All @@ -14,66 +15,62 @@


class ScheduleType(IntEnum):
ECO_MODE = 0,
DRY_CONTACT_LOAD = 1,
DRY_CONTACT_SMART_LOAD = 2,
PEAK_SHAVING = 3,
BACKUP_MODE = 4,
SMART_CHARGE_MODE = 5,
ECO_MODE_745 = 6,
ECO_MODE = 0
DRY_CONTACT_LOAD = 1
DRY_CONTACT_SMART_LOAD = 2
PEAK_SHAVING = 3
BACKUP_MODE = 4
SMART_CHARGE_MODE = 5
ECO_MODE_745 = 6
NOT_SET = 85

@classmethod
def detect_schedule_type(cls, value: int) -> ScheduleType:
"""Detect schedule type from its on/off value"""
if value in (0, -1):
return ScheduleType.ECO_MODE
elif value in (1, -2):
if value in (1, -2):
return ScheduleType.DRY_CONTACT_LOAD
elif value in (2, -3):
if value in (2, -3):
return ScheduleType.DRY_CONTACT_SMART_LOAD
elif value in (3, -4):
if value in (3, -4):
return ScheduleType.PEAK_SHAVING
elif value in (4, -5):
if value in (4, -5):
return ScheduleType.BACKUP_MODE
elif value in (5, -6):
if value in (5, -6):
return ScheduleType.SMART_CHARGE_MODE
elif value in (6, -7):
if value in (6, -7):
return ScheduleType.ECO_MODE_745
elif value == 85:
if value == 85:
return ScheduleType.NOT_SET
else:
raise ValueError(f"{value}: on_off value {value} out of range.")
raise ValueError(f"{value}: on_off value {value} out of range.")

def power_unit(self):
"""Return unit of power parameter"""
if self == ScheduleType.PEAK_SHAVING:
return "W"
else:
return "%"
return "%"

def decode_power(self, value: int) -> int:
"""Decode human readable value of power parameter"""
if self == ScheduleType.PEAK_SHAVING:
return value * 10
elif self == ScheduleType.ECO_MODE_745:
if self == ScheduleType.ECO_MODE_745:
return int(value / 10)
elif self == ScheduleType.NOT_SET:
if self == ScheduleType.NOT_SET:
# Prevent out of range values when changing mode
return value if -100 <= value <= 100 else int(value / 10)
else:
return value
return value

def encode_power(self, value: int) -> int:
"""Encode human readable value of power parameter"""
if self == ScheduleType.ECO_MODE:
return value
elif self == ScheduleType.PEAK_SHAVING:
if self == ScheduleType.PEAK_SHAVING:
return int(value / 10)
elif self == ScheduleType.ECO_MODE_745:
if self == ScheduleType.ECO_MODE_745:
return value * 10
else:
return value
return value

def is_in_range(self, value: int) -> bool:
"""Check if the value fits in allowed values range"""
Expand Down Expand Up @@ -872,8 +869,7 @@ def read_float4(buffer: ProtocolResponse, offset: int = None) -> float:
data = buffer.read(4)
if len(data) == 4:
return unpack('>f', data)[0]
else:
return float(0)
return float(0)


def read_voltage(buffer: ProtocolResponse, offset: int = None) -> float:
Expand Down Expand Up @@ -930,8 +926,7 @@ def read_temp(buffer: ProtocolResponse, offset: int = None) -> float | None:
value = int.from_bytes(buffer.read(2), byteorder="big", signed=True)
if value == -1 or value == 32767:
return None
else:
return float(value) / 10
return float(value) / 10


def read_datetime(buffer: ProtocolResponse, offset: int = None) -> datetime:
Expand Down Expand Up @@ -969,10 +964,9 @@ def read_grid_mode(buffer: ProtocolResponse, offset: int = None) -> int:
value = read_bytes2_signed(buffer, offset)
if value < -90:
return 2
elif value >= 90:
if value >= 90:
return 1
else:
return 0
return 0


def read_unsigned_int(data: bytes, offset: int) -> int:
Expand All @@ -994,7 +988,7 @@ def decode_bitmap(value: int, bitmap: dict[int, str]) -> str:
def decode_day_of_week(data: int) -> str:
if data == -1:
return "Mon-Sun"
elif data == 0:
if data == 0:
return ""
bits = bin(data)[2:]
daynames = list(DAY_NAMES)
Expand Down

0 comments on commit f3d85b4

Please sign in to comment.