diff --git a/pyads/__init__.py b/pyads/__init__.py index 870f087e..382fe209 100644 --- a/pyads/__init__.py +++ b/pyads/__init__.py @@ -23,7 +23,7 @@ StructureDef, ) -from .pyads_ex import ADSError +from .pyads_common import ADSError from .constants import ( PLCTYPE_BOOL, diff --git a/pyads/ads.py b/pyads/ads.py index 1b92fb14..bbe4910b 100644 --- a/pyads/ads.py +++ b/pyads/ads.py @@ -1134,7 +1134,11 @@ def write_structure_by_name( data_name, byte_values, c_ubyte * structure_size, handle=handle ) - def read_list_of_symbols(self, symbols: List[AdsSymbol]): + def read_list_of_symbols( + self, + symbols: List[AdsSymbol], + ads_sub_commands: int = MAX_ADS_SUB_COMMANDS, + ): """Read new values for a list of AdsSymbols using a single ADS call. The outputs will be returned as a dictionary, but the cache of each symbol will @@ -1144,6 +1148,8 @@ def read_list_of_symbols(self, symbols: List[AdsSymbol]): See also :class:`pyads.AdsSymbol`. :param symbols: List if symbol instances + :param ads_sub_commands: Max. number of symbols per call (see + `read_list_by_name`) """ # Relying on `adsSumRead()` is tricky, because we do not have the `dataType` diff --git a/pyads/pyads_common.py b/pyads/pyads_common.py new file mode 100644 index 00000000..f2f8ef82 --- /dev/null +++ b/pyads/pyads_common.py @@ -0,0 +1,117 @@ +"""Contains support functions for the module pyads_ex.py + +:author: David Browne +:license: MIT, see license file or https://opensource.org/licenses/MIT +:created on: 2021-10-50 + +""" +from typing import Any, Tuple, List, Type, Optional, Union +import socket +from contextlib import closing +import struct + +from .constants import ( + DATATYPE_MAP, + ads_type_to_ctype, + PLCTYPE_STRING, + PORT_REMOTE_UDP, + ADST_STRING, + ADST_WSTRING, +) +from .structs import SAdsSymbolEntry +from .errorcodes import ERROR_CODES + + +class ADSError(Exception): + """Error class for errors related to ADS communication.""" + + def __init__( + self, err_code: Optional[int] = None, text: Optional[str] = None + ) -> None: + if err_code is not None: + self.err_code = err_code + try: + self.msg = "{} ({}). ".format(ERROR_CODES[self.err_code], self.err_code) + except KeyError: + self.msg = "Unknown Error ({0}). ".format(self.err_code) + else: + self.msg = "" + + if text is not None: + self.msg += text + + def __str__(self): + # type: () -> str + """Return text representation of the object.""" + return "ADSError: " + self.msg + + +def send_raw_udp_message( + ip_address: str, message: bytes, expected_return_length: int +) -> Tuple[bytes, Tuple[str, int]]: + """Send a raw UDP message to the PLC and return the response. + + :param str ip_address: ip address of the PLC + :param bytes message: the message to send to the PLC + :param int expected_return_length: number of bytes to expect in response + :rtype: Tuple[bytes, Tuple[str, int]] + :return: A tuple containing the response and a tuple containing the IP address and port of the + sending socket + """ + with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as sock: # UDP + # Listen on any available port for the response from the PLC + sock.bind(("", 0)) + + # Send our data to the PLC + sock.sendto(message, (ip_address, PORT_REMOTE_UDP)) + + # Response should come in in less than .5 seconds, but wait longer to account for slow + # communications + sock.settimeout(5) + + # Allow TimeoutError to be raised so user can handle it how they please + return sock.recvfrom(expected_return_length) + + +def type_is_string(plc_type: Type) -> bool: + """Return true if the given class is a string type.""" + + # If single char + if plc_type == PLCTYPE_STRING: + return True + + # If char array + if type(plc_type).__name__ == "PyCArrayType": + if plc_type._type_ == PLCTYPE_STRING: + return True + + return False + + +def get_value_from_ctype_data(read_data: Optional[Any], plc_type: Type) -> Any: + """Convert ctypes data object to a regular value based on the PLCTYPE_* property. + + Typical usage is: + + .. code:: python + + obj = my_plc_type.from_buffer(my_buffer) + value = get_value_from_ctype_data(obj, my_plc_type) + + :param read_data: ctypes._CData object + :param plc_type: pyads.PLCTYPE_* constant (i.e. a ctypes-like type) + """ + + if read_data is None: + return None + + if type_is_string(plc_type): + return read_data.value.decode("utf-8") + + if type(plc_type).__name__ == "PyCArrayType": + return [i for i in read_data] + + if hasattr(read_data, "value"): + return read_data.value + + return read_data # Just return the object itself, don't throw an error diff --git a/pyads/pyads_ex.py b/pyads/pyads_ex.py index 23947f61..a7aeb81e 100644 --- a/pyads/pyads_ex.py +++ b/pyads/pyads_ex.py @@ -1,5 +1,7 @@ """Contains cross platform ADS extension functions. +Contains all the ads...() functions. + :author: David Browne :license: MIT, see license file or https://opensource.org/licenses/MIT :created on: 2018-06-11 18:15:53 @@ -9,10 +11,8 @@ import ctypes import os import platform -import socket import struct import sys -from contextlib import closing from functools import wraps from .utils import platform_is_linux, platform_is_windows @@ -39,13 +39,18 @@ ADSIGRP_SYM_VALBYHND, ADSIGRP_SYM_RELEASEHND, PORT_SYSTEMSERVICE, - PORT_REMOTE_UDP, ADSIGRP_SUMUP_READ, ADSIGRP_SUMUP_WRITE, DATATYPE_MAP, ads_type_to_ctype, ) from .errorcodes import ERROR_CODES +from .pyads_common import ( + ADSError, + send_raw_udp_message, + type_is_string, + get_value_from_ctype_data, +) NOTEFUNC: Optional[Callable] = None @@ -99,30 +104,6 @@ callback_store: Dict[Tuple[AmsAddr, int], Callable[[SAmsAddr, SAdsNotificationHeader, int], None]] = dict() -class ADSError(Exception): - """Error class for errors related to ADS communication.""" - - def __init__( - self, err_code: Optional[int] = None, text: Optional[str] = None - ) -> None: - if err_code is not None: - self.err_code = err_code - try: - self.msg = "{} ({}). ".format(ERROR_CODES[self.err_code], self.err_code) - except KeyError: - self.msg = "Unknown Error ({0}). ".format(self.err_code) - else: - self.msg = "" - - if text is not None: - self.msg += text - - def __str__(self): - # type: () -> str - """Return text representation of the object.""" - return "ADSError: " + self.msg - - def router_function(fn: Callable) -> Callable: """Raise a runtime error if on Win32 systems. @@ -170,77 +151,6 @@ def adsAddRoute(net_id: SAmsNetId, ip_address: str) -> None: raise ADSError(error_code) -def send_raw_udp_message( - ip_address: str, message: bytes, expected_return_length: int -) -> Tuple[bytes, Tuple[str, int]]: - """Send a raw UDP message to the PLC and return the response. - - :param str ip_address: ip address of the PLC - :param bytes message: the message to send to the PLC - :param int expected_return_length: number of bytes to expect in response - :rtype: Tuple[bytes, Tuple[str, int]] - :return: A tuple containing the response and a tuple containing the IP address and port of the - sending socket - """ - with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as sock: # UDP - # Listen on any available port for the response from the PLC - sock.bind(("", 0)) - - # Send our data to the PLC - sock.sendto(message, (ip_address, PORT_REMOTE_UDP)) - - # Response should come in in less than .5 seconds, but wait longer to account for slow - # communications - sock.settimeout(5) - - # Allow TimeoutError to be raised so user can handle it how they please - return sock.recvfrom(expected_return_length) - - -def type_is_string(plc_type: Type) -> bool: - """Return true if the given class is a string type.""" - - # If single char - if plc_type == PLCTYPE_STRING: - return True - - # If char array - if type(plc_type).__name__ == "PyCArrayType": - if plc_type._type_ == PLCTYPE_STRING: - return True - - return False - - -def get_value_from_ctype_data(read_data: Optional[Any], plc_type: Type) -> Any: - """Convert ctypes data object to a regular value based on the PLCTYPE_* property. - - Typical usage is: - - .. code:: python - - obj = my_plc_type.from_buffer(my_buffer) - value = get_value_from_ctype_data(obj, my_plc_type) - - :param read_data: ctypes._CData object - :param plc_type: pyads.PLCTYPE_* constant (i.e. a ctypes-like type) - """ - - if read_data is None: - return None - - if type_is_string(plc_type): - return read_data.value.decode("utf-8") - - if type(plc_type).__name__ == "PyCArrayType": - return [i for i in read_data] - - if hasattr(read_data, "value"): - return read_data.value - - return read_data # Just return the object itself, don't throw an error - - @router_function def adsAddRouteToPLC( sending_net_id: str, @@ -934,35 +844,29 @@ def adsSumRead( sum_response = adsSumReadBytes(port, address, symbol_infos) - data_start = 4 * num_requests - offset = data_start + offset = 4 * num_requests for i, data_name in enumerate(data_names): + info = data_symbols[data_name] error = struct.unpack_from("