-
Notifications
You must be signed in to change notification settings - Fork 96
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Moved non ads..() methods away from pyads_ex, cleaned up SumRead/Writ…
…e a little
- Loading branch information
1 parent
a83db0f
commit b73f92d
Showing
4 changed files
with
150 additions
and
124 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
"""Contains support functions for the module pyads_ex.py | ||
:author: David Browne <[email protected]> | ||
:license: MIT, see license file or https://opensource.org/licenses/MIT | ||
:created on: 2021-10-50 | ||
""" | ||
from typing import Any, Tuple, List, Type, Optional, Union | ||
import socket | ||
from contextlib import closing | ||
import struct | ||
|
||
from .constants import ( | ||
DATATYPE_MAP, | ||
ads_type_to_ctype, | ||
PLCTYPE_STRING, | ||
PORT_REMOTE_UDP, | ||
ADST_STRING, | ||
ADST_WSTRING, | ||
) | ||
from .structs import SAdsSymbolEntry | ||
from .errorcodes import ERROR_CODES | ||
|
||
|
||
class ADSError(Exception): | ||
"""Error class for errors related to ADS communication.""" | ||
|
||
def __init__( | ||
self, err_code: Optional[int] = None, text: Optional[str] = None | ||
) -> None: | ||
if err_code is not None: | ||
self.err_code = err_code | ||
try: | ||
self.msg = "{} ({}). ".format(ERROR_CODES[self.err_code], self.err_code) | ||
except KeyError: | ||
self.msg = "Unknown Error ({0}). ".format(self.err_code) | ||
else: | ||
self.msg = "" | ||
|
||
if text is not None: | ||
self.msg += text | ||
|
||
def __str__(self): | ||
# type: () -> str | ||
"""Return text representation of the object.""" | ||
return "ADSError: " + self.msg | ||
|
||
|
||
def send_raw_udp_message( | ||
ip_address: str, message: bytes, expected_return_length: int | ||
) -> Tuple[bytes, Tuple[str, int]]: | ||
"""Send a raw UDP message to the PLC and return the response. | ||
:param str ip_address: ip address of the PLC | ||
:param bytes message: the message to send to the PLC | ||
:param int expected_return_length: number of bytes to expect in response | ||
:rtype: Tuple[bytes, Tuple[str, int]] | ||
:return: A tuple containing the response and a tuple containing the IP address and port of the | ||
sending socket | ||
""" | ||
with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as sock: # UDP | ||
# Listen on any available port for the response from the PLC | ||
sock.bind(("", 0)) | ||
|
||
# Send our data to the PLC | ||
sock.sendto(message, (ip_address, PORT_REMOTE_UDP)) | ||
|
||
# Response should come in in less than .5 seconds, but wait longer to account for slow | ||
# communications | ||
sock.settimeout(5) | ||
|
||
# Allow TimeoutError to be raised so user can handle it how they please | ||
return sock.recvfrom(expected_return_length) | ||
|
||
|
||
def type_is_string(plc_type: Type) -> bool: | ||
"""Return true if the given class is a string type.""" | ||
|
||
# If single char | ||
if plc_type == PLCTYPE_STRING: | ||
return True | ||
|
||
# If char array | ||
if type(plc_type).__name__ == "PyCArrayType": | ||
if plc_type._type_ == PLCTYPE_STRING: | ||
return True | ||
|
||
return False | ||
|
||
|
||
def get_value_from_ctype_data(read_data: Optional[Any], plc_type: Type) -> Any: | ||
"""Convert ctypes data object to a regular value based on the PLCTYPE_* property. | ||
Typical usage is: | ||
.. code:: python | ||
obj = my_plc_type.from_buffer(my_buffer) | ||
value = get_value_from_ctype_data(obj, my_plc_type) | ||
:param read_data: ctypes._CData object | ||
:param plc_type: pyads.PLCTYPE_* constant (i.e. a ctypes-like type) | ||
""" | ||
|
||
if read_data is None: | ||
return None | ||
|
||
if type_is_string(plc_type): | ||
return read_data.value.decode("utf-8") | ||
|
||
if type(plc_type).__name__ == "PyCArrayType": | ||
return [i for i in read_data] | ||
|
||
if hasattr(read_data, "value"): | ||
return read_data.value | ||
|
||
return read_data # Just return the object itself, don't throw an error |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,7 @@ | ||
"""Contains cross platform ADS extension functions. | ||
Contains all the ads...() functions. | ||
:author: David Browne <[email protected]> | ||
:license: MIT, see license file or https://opensource.org/licenses/MIT | ||
:created on: 2018-06-11 18:15:53 | ||
|
@@ -9,10 +11,8 @@ | |
import ctypes | ||
import os | ||
import platform | ||
import socket | ||
import struct | ||
import sys | ||
from contextlib import closing | ||
from functools import wraps | ||
|
||
from .utils import platform_is_linux, platform_is_windows | ||
|
@@ -39,13 +39,18 @@ | |
ADSIGRP_SYM_VALBYHND, | ||
ADSIGRP_SYM_RELEASEHND, | ||
PORT_SYSTEMSERVICE, | ||
PORT_REMOTE_UDP, | ||
ADSIGRP_SUMUP_READ, | ||
ADSIGRP_SUMUP_WRITE, | ||
DATATYPE_MAP, | ||
ads_type_to_ctype, | ||
) | ||
from .errorcodes import ERROR_CODES | ||
from .pyads_common import ( | ||
ADSError, | ||
send_raw_udp_message, | ||
type_is_string, | ||
get_value_from_ctype_data, | ||
) | ||
|
||
|
||
NOTEFUNC: Optional[Callable] = None | ||
|
@@ -99,30 +104,6 @@ | |
callback_store: Dict[Tuple[AmsAddr, int], Callable[[SAmsAddr, SAdsNotificationHeader, int], None]] = dict() | ||
|
||
|
||
class ADSError(Exception): | ||
"""Error class for errors related to ADS communication.""" | ||
|
||
def __init__( | ||
self, err_code: Optional[int] = None, text: Optional[str] = None | ||
) -> None: | ||
if err_code is not None: | ||
self.err_code = err_code | ||
try: | ||
self.msg = "{} ({}). ".format(ERROR_CODES[self.err_code], self.err_code) | ||
except KeyError: | ||
self.msg = "Unknown Error ({0}). ".format(self.err_code) | ||
else: | ||
self.msg = "" | ||
|
||
if text is not None: | ||
self.msg += text | ||
|
||
def __str__(self): | ||
# type: () -> str | ||
"""Return text representation of the object.""" | ||
return "ADSError: " + self.msg | ||
|
||
|
||
def router_function(fn: Callable) -> Callable: | ||
"""Raise a runtime error if on Win32 systems. | ||
|
@@ -170,77 +151,6 @@ def adsAddRoute(net_id: SAmsNetId, ip_address: str) -> None: | |
raise ADSError(error_code) | ||
|
||
|
||
def send_raw_udp_message( | ||
ip_address: str, message: bytes, expected_return_length: int | ||
) -> Tuple[bytes, Tuple[str, int]]: | ||
"""Send a raw UDP message to the PLC and return the response. | ||
:param str ip_address: ip address of the PLC | ||
:param bytes message: the message to send to the PLC | ||
:param int expected_return_length: number of bytes to expect in response | ||
:rtype: Tuple[bytes, Tuple[str, int]] | ||
:return: A tuple containing the response and a tuple containing the IP address and port of the | ||
sending socket | ||
""" | ||
with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as sock: # UDP | ||
# Listen on any available port for the response from the PLC | ||
sock.bind(("", 0)) | ||
|
||
# Send our data to the PLC | ||
sock.sendto(message, (ip_address, PORT_REMOTE_UDP)) | ||
|
||
# Response should come in in less than .5 seconds, but wait longer to account for slow | ||
# communications | ||
sock.settimeout(5) | ||
|
||
# Allow TimeoutError to be raised so user can handle it how they please | ||
return sock.recvfrom(expected_return_length) | ||
|
||
|
||
def type_is_string(plc_type: Type) -> bool: | ||
"""Return true if the given class is a string type.""" | ||
|
||
# If single char | ||
if plc_type == PLCTYPE_STRING: | ||
return True | ||
|
||
# If char array | ||
if type(plc_type).__name__ == "PyCArrayType": | ||
if plc_type._type_ == PLCTYPE_STRING: | ||
return True | ||
|
||
return False | ||
|
||
|
||
def get_value_from_ctype_data(read_data: Optional[Any], plc_type: Type) -> Any: | ||
"""Convert ctypes data object to a regular value based on the PLCTYPE_* property. | ||
Typical usage is: | ||
.. code:: python | ||
obj = my_plc_type.from_buffer(my_buffer) | ||
value = get_value_from_ctype_data(obj, my_plc_type) | ||
:param read_data: ctypes._CData object | ||
:param plc_type: pyads.PLCTYPE_* constant (i.e. a ctypes-like type) | ||
""" | ||
|
||
if read_data is None: | ||
return None | ||
|
||
if type_is_string(plc_type): | ||
return read_data.value.decode("utf-8") | ||
|
||
if type(plc_type).__name__ == "PyCArrayType": | ||
return [i for i in read_data] | ||
|
||
if hasattr(read_data, "value"): | ||
return read_data.value | ||
|
||
return read_data # Just return the object itself, don't throw an error | ||
|
||
|
||
@router_function | ||
def adsAddRouteToPLC( | ||
sending_net_id: str, | ||
|
@@ -934,35 +844,29 @@ def adsSumRead( | |
|
||
sum_response = adsSumReadBytes(port, address, symbol_infos) | ||
|
||
data_start = 4 * num_requests | ||
offset = data_start | ||
offset = 4 * num_requests | ||
|
||
for i, data_name in enumerate(data_names): | ||
info = data_symbols[data_name] | ||
error = struct.unpack_from("<I", sum_response, offset=i * 4)[0] | ||
if error: | ||
result[data_name] = ERROR_CODES[error] | ||
else: | ||
if data_name in structured_data_names: | ||
value = sum_response[ | ||
offset: offset + data_symbols[data_name].size] | ||
elif ( | ||
data_symbols[data_name].dataType != ADST_STRING | ||
and data_symbols[data_name].dataType != ADST_WSTRING | ||
): | ||
value = sum_response[offset: offset + info.size] | ||
elif info.dataType != ADST_STRING and info.dataType != ADST_WSTRING: | ||
value = struct.unpack_from( | ||
DATATYPE_MAP[ads_type_to_ctype[data_symbols[data_name].dataType]], | ||
DATATYPE_MAP[ads_type_to_ctype[info.dataType]], | ||
sum_response, | ||
offset=offset, | ||
)[0] | ||
else: | ||
null_idx = sum_response[ | ||
offset: offset + data_symbols[data_name].size | ||
].index(0) | ||
null_idx = sum_response[offset: offset + info.size].index(0) | ||
value = bytearray( | ||
sum_response[offset: offset + null_idx] | ||
).decode("utf-8") | ||
result[data_name] = value | ||
offset += data_symbols[data_name].size | ||
offset += info.size | ||
|
||
return result | ||
|
||
|
@@ -1025,27 +929,26 @@ def adsSumWrite( | |
buf = bytearray(total_request_size) | ||
|
||
for data_name in data_names_and_values.keys(): | ||
struct.pack_into("<I", buf, offset, data_symbols[data_name].iGroup) | ||
struct.pack_into("<I", buf, offset + 4, data_symbols[data_name].iOffs) | ||
struct.pack_into("<I", buf, offset + 8, data_symbols[data_name].size) | ||
info = data_symbols[data_name] | ||
struct.pack_into("<I", buf, offset, info.iGroup) | ||
struct.pack_into("<I", buf, offset + 4, info.iOffs) | ||
struct.pack_into("<I", buf, offset + 8, info.size) | ||
offset += 12 | ||
|
||
for data_name, value in data_names_and_values.items(): | ||
info = data_symbols[data_name] | ||
if data_name in structured_data_names: | ||
buf[offset : offset + data_symbols[data_name].size] = value | ||
elif ( | ||
data_symbols[data_name].dataType != ADST_STRING | ||
and data_symbols[data_name].dataType != ADST_WSTRING | ||
): | ||
buf[offset: offset + info.size] = value | ||
elif info.dataType != ADST_STRING and info.dataType != ADST_WSTRING: | ||
struct.pack_into( | ||
DATATYPE_MAP[ads_type_to_ctype[data_symbols[data_name].dataType]], | ||
DATATYPE_MAP[ads_type_to_ctype[info.dataType]], | ||
buf, | ||
offset, | ||
value, | ||
) | ||
else: | ||
buf[offset : offset + len(value)] = value.encode("utf-8") | ||
offset += data_symbols[data_name].size | ||
buf[offset: offset + len(value)] = value.encode("utf-8") | ||
offset += info.size | ||
|
||
error_descriptions = adsSumWriteBytes( | ||
port, | ||
|