Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added fallback to type detection to allow automatic ENUM usage #397

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions pyads/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
_dict_slice_generator,
bytes_from_dict,
size_of_structure,
ADSError,
)
from .symbol import AdsSymbol
from .utils import decode_ads
Expand Down Expand Up @@ -174,16 +175,16 @@ def _query_plc_datatype_from_name(self, data_name: str,

If cache_symbol_info is True then the SymbolInfo will be cached and adsGetSymbolInfo
will only used once.

"""
info = None
if cache_symbol_info:
info = self._symbol_info_cache.get(data_name)
if info is None:
info = adsGetSymbolInfo(self._port, self._adr, data_name)
self._symbol_info_cache[data_name] = info
else:
if info is None:
info = adsGetSymbolInfo(self._port, self._adr, data_name)
return AdsSymbol.get_type_from_str(info.symbol_type)
if cache_symbol_info:
self._symbol_info_cache[data_name] = info

return AdsSymbol.get_type_from_info(info)

def open(self) -> None:
"""Connect to the TwinCAT message router."""
Expand Down Expand Up @@ -543,6 +544,10 @@ def read_by_name(
plc_datatype = self._query_plc_datatype_from_name(data_name,
cache_symbol_info)

if plc_datatype is None:
# `adsSyncReadReqEx2()` will fail for a None type
raise ADSError(None, f"Failed to detect datatype for `{data_name}`")

return adsSyncReadByNameEx(
self._port,
self._adr,
Expand Down Expand Up @@ -688,6 +693,10 @@ def write_by_name(
plc_datatype = self._query_plc_datatype_from_name(data_name,
cache_symbol_info)

if plc_datatype is None:
# `adsSyncWriteReqEx()` does not support `None` for a type
raise ADSError(None, f"Failed to detect datatype for `{data_name}`")

return adsSyncWriteByNameEx(
self._port, self._adr, data_name, value, plc_datatype, handle=handle
)
Expand Down
61 changes: 46 additions & 15 deletions pyads/symbol.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
from typing import TYPE_CHECKING, Any, Optional, List, Tuple, Callable, Union, Type

from . import constants # To access all constants, use package notation
from .constants import PLCDataType
from .constants import PLCDataType, ads_type_to_ctype
from .pyads_ex import adsGetSymbolInfo, ADSError
from .structs import NotificationAttrib
from .structs import NotificationAttrib, SAdsSymbolEntry

# ads.Connection relies on structs.AdsSymbol (but in type hints only), so use
# this 'if' to only include it when type hinting (False during execution)
Expand Down Expand Up @@ -125,7 +125,7 @@ def __init__(
self.name = name
self.index_offset = index_offset
self.index_group = index_group
self.symbol_type = symbol_type
self.symbol_type = None # Apply `symbol_type` later
self.comment = comment
self._value: Any = None

Expand All @@ -137,15 +137,14 @@ def __init__(
from .ads import size_of_structure
self._structure_size = size_of_structure(self.structure_def * self.array_size)

self.plc_type: Optional[Type[PLCDataType]] = None

if missing_info:
self._create_symbol_from_info() # Perform remote lookup

# Now `self.symbol_type` should have a value, find the actual PLCTYPE
# from it.
# This is relevant for both lookup and full user definition.

self.plc_type: Optional[Type[PLCDataType]] = None
if self.symbol_type is not None:
# Apply user-provided type (overriding auto detect if any):
if symbol_type is not None:
self.symbol_type = symbol_type
if isinstance(self.symbol_type, str): # Perform lookup if string
self.plc_type = AdsSymbol.get_type_from_str(self.symbol_type)
else: # Otherwise `symbol_type` is probably a pyads.PLCTYPE_* constant
Expand All @@ -166,12 +165,8 @@ def _create_symbol_from_info(self) -> None:
if info.comment:
self.comment = info.comment

# info.dataType is an integer mapping to a type in
# constants.ads_type_to_ctype.
# However, this type ignores whether the variable is really an array!
# So are not going to be using this and instead rely on the textual
# type
self.symbol_type = info.symbol_type # Save the type as string
self.plc_type = self.get_type_from_info(info)
self.symbol_type = info.symbol_type # Also save the type as string

def _check_for_open_connection(self) -> None:
"""Assert the current object is ready to read from/write to.
Expand All @@ -195,6 +190,12 @@ def read(self) -> Any:
structure_size=self._structure_size,
array_size=self.array_size)
else:
if self.plc_type is None:
raise ADSError(
None,
f"Cannot read data with unknown datatype for symbol "
f"{self.name} ({self.symbol_type})"
)
self._value = self._plc.read(self.index_group, self.index_offset, self.plc_type)

return self._value
Expand All @@ -218,6 +219,12 @@ def write(self, new_value: Optional[Any] = None) -> None:
self._plc.write_structure_by_name(self.name, new_value, self.structure_def,
structure_size=self._structure_size, array_size=self.array_size)
else:
if self.plc_type is None:
raise ADSError(
None,
f"Cannot write data with unknown datatype for symbol "
f"{self.name} ({self.symbol_type})"
)
self._plc.write(self.index_group, self.index_offset, new_value, self.plc_type)

def __repr__(self) -> str:
Expand Down Expand Up @@ -286,6 +293,30 @@ def _value_callback(self, notification: Any, data_name: Any) -> None:
)
self._value = value

@classmethod
def get_type_from_info(cls, info: SAdsSymbolEntry) -> Optional[Type[PLCDataType]]:
"""Get PLCTYPE_* from symbol info struct

Also see :meth:`~get_type_from_str`.
"""
plc_type = cls.get_type_from_str(info.symbol_type)
if plc_type is not None:
return plc_type

# Failed to detect by name (e.g. type is enum)

# Use `ADST_*` integer to detect type instead
plc_type = ads_type_to_ctype.get(info.dataType, None)
if plc_type is not None:
array_size = int(info.size / sizeof(plc_type))
# However, `dataType` is always a scalar, even if the object is an array:
if array_size > 1:
plc_type = plc_type * array_size

return plc_type

return None

@staticmethod
def get_type_from_str(type_str: str) -> Optional[Type[PLCDataType]]:
"""Get PLCTYPE_* from PLC name string
Expand Down
35 changes: 29 additions & 6 deletions tests/test_symbol.py
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chrisbeardy you mentioned more tests might be a good idea. What cases would you like to be covered? I'm not sure what is needed exactly.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import pyads
from pyads.testserver import AdsTestServer, AdvancedHandler, PLCVariable
from pyads import constants, AdsSymbol, bytes_from_dict
from pyads import constants, AdsSymbol, ADSError, bytes_from_dict

from tests.test_connection_class import create_notification_struct

Expand Down Expand Up @@ -143,8 +143,8 @@ def test_init_by_name_matrix_style(self):
struct.pack("<21b", *range(21)),
ads_type=constants.ADST_VOID,
symbol_type="matrix_21_int8_T", # Simulink array looks like this
index_group = 123,
index_offset = 100,
index_group=123,
index_offset=100,
)
self.handler.add_variable(var)
self.plc.open()
Expand All @@ -158,7 +158,7 @@ def test_init_by_name_matrix_style(self):

# Verify looked up info
self.assertEqual(constants.PLCTYPE_ARR_SINT(21), symbol.plc_type)
self.assertEqual(var.symbol_type, symbol.symbol_type)
self.assertEqual("matrix_21_int8_T", symbol.symbol_type)

self.assertAdsRequestsCount(0) # No requests

Expand Down Expand Up @@ -265,10 +265,10 @@ def test_init_invalid_type(self):
# Create symbol while providing everything:
symbol = AdsSymbol(self.plc, name=var.name)
self.assertEqual(var.symbol_type, symbol.symbol_type)
with self.assertRaises(TypeError) as cm:
with self.assertRaises(ADSError) as cm:
# Error is thrown inside pyads_ex
symbol.read()
self.assertIn("NoneType", str(cm.exception))
self.assertIn("unknown datatype", str(cm.exception))
self.assertAdsRequestsCount(1) # Only a WRITE followed by a READ

def test_read_write_errors(self):
Expand Down Expand Up @@ -370,6 +370,29 @@ def test_read_structure_array(self):

self.assertEqual(values, read_values)

def test_read_enum(self):
"""Test reading from a symbol when it's an ENUM type"""
self.handler.add_variable(
PLCVariable("TestEnum", 7, constants.ADST_UINT16, symbol_type="E_MyEnum"))

with self.plc:
symbol = self.plc.get_symbol("TestEnum")
value = symbol.read()

self.assertEqual(7, value)

def test_read_enum_array(self):
"""Test reading from a symbol when it's an array of an ENUM type"""
value_bytes = struct.pack("<3h", 1, 2, 3)
self.handler.add_variable(
PLCVariable("TestEnumList", value_bytes, constants.ADST_UINT16, symbol_type="ARRAY [1..3] OF E_MyEnum"))

with self.plc:
symbol = self.plc.get_symbol("TestEnumList")
value = symbol.read()

self.assertEqual([1, 2, 3], value)

def test_write(self):
"""Test symbol value writing"""
with self.plc:
Expand Down
Loading