Skip to content

Commit

Permalink
Add keep alive options to Modbus/TCP
Browse files Browse the repository at this point in the history
  • Loading branch information
mletenay committed May 14, 2024
1 parent e81319f commit 366d356
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 18 deletions.
4 changes: 4 additions & 0 deletions goodwe/inverter.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class Inverter(ABC):
def __init__(self, host: str, port: int, comm_addr: int = 0, timeout: int = 1, retries: int = 3):
self._protocol: InverterProtocol = self._create_protocol(host, port, comm_addr, timeout, retries)
self._consecutive_failures_count: int = 0
self.keep_alive: bool = True

self.model_name: str | None = None
self.serial_number: str | None = None
Expand Down Expand Up @@ -129,6 +130,9 @@ async def _read_from_socket(self, command: ProtocolCommand) -> ProtocolResponse:
except RequestFailedException as ex:
self._consecutive_failures_count += 1
raise RequestFailedException(ex.message, self._consecutive_failures_count) from None
finally:
if not self.keep_alive:
self._protocol.close_transport()

@abstractmethod
async def read_device_info(self):
Expand Down
44 changes: 28 additions & 16 deletions goodwe/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import asyncio
import io
import logging
import platform
import socket
from asyncio.futures import Future
from typing import Tuple, Optional, Callable

Expand Down Expand Up @@ -54,13 +56,15 @@ def _ensure_lock(self) -> asyncio.Lock:
logger.debug("Creating lock instance for current event loop.")
self._lock = asyncio.Lock()
self._running_loop = asyncio.get_event_loop()
self._close_transport()
self.close_transport()
return self._lock

def _close_transport(self) -> None:
def close_transport(self) -> None:
"""Close the underlying transport/connection."""
raise NotImplementedError()

async def send_request(self, command: ProtocolCommand) -> Future:
"""Convert command to request and send it to inverter."""
raise NotImplementedError()

def read_command(self, offset: int, count: int) -> ProtocolCommand:
Expand Down Expand Up @@ -111,7 +115,7 @@ def connection_lost(self, exc: Optional[Exception]) -> None:
logger.debug("Socket closed with error: %s.", exc)
else:
logger.debug("Socket closed.")
self._close_transport()
self.close_transport()

def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None:
"""On datagram received"""
Expand All @@ -130,13 +134,13 @@ def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None:
except RequestRejectedException as ex:
logger.debug("Received exception response: %s", data.hex())
self.response_future.set_exception(ex)
self._close_transport()
self.close_transport()

def error_received(self, exc: Exception) -> None:
"""On error received"""
logger.debug("Received error: %s", exc)
self.response_future.set_exception(exc)
self._close_transport()
self.close_transport()

async def send_request(self, command: ProtocolCommand) -> Future:
"""Send message via transport"""
Expand Down Expand Up @@ -172,9 +176,9 @@ def _retry_mechanism(self) -> None:
else:
logger.debug("Max number of retries (%d) reached, request %s failed.", self.retries, self.command)
self.response_future.set_exception(MaxRetriesException)
self._close_transport()
self.close_transport()

def _close_transport(self) -> None:
def close_transport(self) -> None:
if self._transport:
try:
self._transport.close()
Expand Down Expand Up @@ -211,6 +215,14 @@ async def _connect(self) -> None:
lambda: self,
host=self._host, port=self._port,
)
sock = self._transport.get_extra_info('socket')
if sock is not None:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 10)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 10)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 3)
if platform.system() == 'Windows':
sock.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 10000, 10000))

def connection_made(self, transport: asyncio.DatagramTransport) -> None:
"""On connection made"""
Expand All @@ -219,15 +231,15 @@ def connection_made(self, transport: asyncio.DatagramTransport) -> None:

def eof_received(self) -> None:
logger.debug("EOF received.")
self._close_transport()
self.close_transport()

def connection_lost(self, exc: Optional[Exception]) -> None:
"""On connection lost"""
if exc:
logger.debug("Connection closed with error: %s.", exc)
else:
logger.debug("Connection closed.")
self._close_transport()
self.close_transport()

def data_received(self, data: bytes) -> None:
"""On data received"""
Expand All @@ -241,19 +253,19 @@ def data_received(self, data: bytes) -> None:
else:
logger.debug("Received invalid response: %s", data.hex())
self.response_future.set_exception(RequestRejectedException())
self._close_transport()
self.close_transport()
except asyncio.InvalidStateError:
logger.debug("Response already handled: %s", data.hex())
except RequestRejectedException as ex:
logger.debug("Received exception response: %s", data.hex())
self.response_future.set_exception(ex)
# self._close_transport()
# self.close_transport()

def error_received(self, exc: Exception) -> None:
"""On error received"""
logger.debug("Received error: %s", exc)
self.response_future.set_exception(exc)
self._close_transport()
self.close_transport()

async def send_request(self, command: ProtocolCommand) -> Future:
"""Send message via transport"""
Expand All @@ -271,7 +283,7 @@ async def send_request(self, command: ProtocolCommand) -> Future:
self._retry += 1
if self._lock and self._lock.locked():
self._lock.release()
self._close_transport()
self.close_transport()
return await self.send_request(command)
else:
return self._max_retries_reached()
Expand Down Expand Up @@ -308,16 +320,16 @@ def _timeout_mechanism(self) -> None:
if self._timer:
logger.debug("Failed to receive response to %s in time (%ds).", self.command, self.timeout)
self._timer = None
self._close_transport()
self.close_transport()

def _max_retries_reached(self) -> Future:
logger.debug("Max number of retries (%d) reached, request %s failed.", self.retries, self.command)
self._close_transport()
self.close_transport()
self.response_future = asyncio.get_running_loop().create_future()
self.response_future.set_exception(MaxRetriesException)
return self.response_future

def _close_transport(self) -> None:
def close_transport(self) -> None:
if self._transport:
try:
self._transport.close()
Expand Down
5 changes: 3 additions & 2 deletions tests/stability_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@


async def get_runtime_data():
i = 1
inverter = await goodwe.connect('127.0.0.1', 502)
inverter = await goodwe.connect(host='127.0.0.1', port=502, timeout=1, retries=3)
# inverter.keep_alive = False

i = 1
while True:
logger.info("################################")
logger.info(" Request %d", i)
Expand Down

0 comments on commit 366d356

Please sign in to comment.