Skip to content

Commit

Permalink
Reset to capture timeouts
Browse files Browse the repository at this point in the history
catch all in case it gets into the situation where we get stuck in a loop.
  • Loading branch information
ottlukas authored May 8, 2024
1 parent e7a0510 commit d870810
Showing 1 changed file with 11 additions and 193 deletions.
204 changes: 11 additions & 193 deletions plc4py/plc4py/drivers/modbus/ModbusConnection.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,204 +16,22 @@
# specific language governing permissions and limitations
# under the License.
#
import asyncio
import logging
from typing import Type, Awaitable
from plc4py.spi.configuration.PlcConfiguration import PlcConfiguration

import plc4py
from plc4py.api.PlcConnection import PlcConnection, PlcConnectionMetaData
from plc4py.api.PlcDriver import PlcDriver
from plc4py.api.authentication.PlcAuthentication import PlcAuthentication
from plc4py.api.messages.PlcResponse import (
PlcResponse,
PlcReadResponse,
PlcWriteResponse,
PlcTagResponse,
)
from plc4py.api.messages.PlcRequest import (
ReadRequestBuilder,
PlcRequest,
PlcReadRequest,
PlcWriteRequest,
)
from plc4py.api.value.PlcValue import PlcResponseCode
from plc4py.drivers.PlcDriverLoader import PlcDriverLoader
from plc4py.drivers.modbus.ModbusConfiguration import ModbusConfiguration
from plc4py.drivers.modbus.ModbusDevice import ModbusDevice
from plc4py.drivers.modbus.ModbusProtocol import ModbusProtocol
from plc4py.drivers.modbus.ModbusTag import ModbusTagBuilder
from plc4py.spi.messages.PlcReader import PlcReader
from plc4py.spi.messages.PlcRequest import DefaultReadRequestBuilder
from plc4py.spi.messages.PlcWriter import PlcWriter
from plc4py.spi.transport.Plc4xBaseTransport import Plc4xBaseTransport
from plc4py.spi.transport.TCPTransport import TCPTransport


class ModbusConnection(PlcConnection, PlcReader, PlcWriter, PlcConnectionMetaData):
class ModbusConfiguration(PlcConfiguration):
"""
Modbus TCP PLC connection implementation
Specific Modbus Configuration
"""

def __init__(self, config: ModbusConfiguration, transport: Plc4xBaseTransport):
super().__init__(config)
self._configuration: ModbusConfiguration
self._device: ModbusDevice = ModbusDevice(self._configuration)
self._transport: Plc4xBaseTransport = transport

@staticmethod
async def create(connection_string: str) -> "ModbusConnection":
config = ModbusConfiguration(connection_string)
transport = await TCPTransport.create(
protocol_factory=lambda: ModbusProtocol(connection_future),
host=config.host,
port=config.port,
)
return ModbusConnection(config, transport)

def is_connected(self) -> bool:
"""
Indicates if the connection is established to a remote PLC.
:return: True if connection, False otherwise
"""
if self._transport is not None:
return not self._transport.is_closing()
else:
return False

def close(self) -> None:
"""
Closes the connection to the remote PLC.
:return:
"""
if self._transport is not None:
self._transport.close()

def read_request_builder(self) -> ReadRequestBuilder:
"""
:return: read request builder.
"""
return DefaultReadRequestBuilder(ModbusTagBuilder)

async def execute(self, request: PlcRequest) -> PlcResponse:
"""
Executes a PlcRequest as long as it's already connected.
:param request: Plc Request to execute
:return: The response from the Plc/Device
"""
# Check if the connection is established
if not self.is_connected():
# Return a default failed response if not connected
return self._default_failed_request(PlcResponseCode.NOT_CONNECTED)

# Check the type of the request and execute the corresponding method
if isinstance(request, PlcReadRequest):
# Execute a read request
return await self._read(request)
elif isinstance(request, PlcWriteRequest):
# Execute a write request
return await self._write(request)

# Return a default failed response if the request type is not supported
return self._default_failed_request(PlcResponseCode.NOT_CONNECTED)

def _connection_established(self) -> bool:
"""Check if the connection to the PLC is established."""
return self._device is not None

async def _read(self, request: PlcReadRequest) -> PlcReadResponse:
"""
Executes a PlcReadRequest
"""
if self._check_connection():
logging.error("No device is set in the modbus connection!")
return self._default_failed_request(PlcResponseCode.NOT_CONNECTED)
try:
logging.debug("Sending read request to Modbus Device")
response = await asyncio.wait_for(
self._device.read(request, self._transport), 5
)
return response
except Exception:
# TODO:- This exception is very general and probably should be replaced
return PlcReadResponse(PlcResponseCode.INTERNAL_ERROR, {})

async def _write(self, request: PlcWriteRequest) -> PlcTagResponse:
"""
Executes a PlcWriteRequest
"""
if self._check_connection():
logging.error("No device is set in the modbus connection!")
return self._default_failed_request(PlcResponseCode.NOT_CONNECTED)
def __init__(self, url):
super().__init__(url)

try:
logging.debug("Sending write request to Modbus Device")
response = await asyncio.wait_for(
self._device.write(request, self._transport), 5
)
return response
except Exception:
# TODO:- This exception is very general and probably should be replaced
return PlcWriteResponse(PlcResponseCode.INTERNAL_ERROR, {})

def is_read_supported(self) -> bool:
"""
Indicates if the connection supports read requests.
:return: True if connection supports reading, False otherwise
"""
return True

def is_write_supported(self) -> bool:
"""
Indicates if the connection supports write requests.
:return: True if connection supports writing, False otherwise
"""
return False

def is_subscribe_supported(self) -> bool:
"""
Indicates if the connection supports subscription requests.
:return: True if connection supports subscriptions, False otherwise
"""
return False

def is_browse_supported(self) -> bool:
"""
Indicates if the connection supports browsing requests.
:return: True if connection supports browsing, False otherwise
"""
return False


class ModbusDriver(PlcDriver):
def __init__(self):
super().__init__()
self.protocol_code = "modbus-tcp"
self.protocol_name = "Modbus TCP"

async def get_connection(
self, url: str, authentication: PlcAuthentication = PlcAuthentication()
) -> PlcConnection:
"""
Connects to a PLC using the given plc connection string.
:param url: plc connection string
:param authentication: authentication credentials.
:return PlcConnection: PLC Connection object
"""
return await ModbusConnection.create(url)


class ModbusDriverLoader(PlcDriverLoader):
"""
Modbus Driver Pluggy Hook Implmentation, lets pluggy find the driver by name
"""
if self.transport is None:
self.transport = "tcp"

@staticmethod
@plc4py.hookimpl
def get_driver() -> Type[ModbusDriver]:
return ModbusDriver
if self.port is None:
self.port = 502

@staticmethod
@plc4py.hookimpl
def key() -> str:
return "modbus"
if "unit_identifier" not in self.parameters:
self.unit_identifier = 1

0 comments on commit d870810

Please sign in to comment.