Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added list read/write for symbols #268

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 166 additions & 44 deletions pyads/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,17 @@
adsGetNetIdForPLC,
adsGetSymbolInfo,
adsSumRead,
adsSumReadBytes,
adsSumWrite,
adsSumWriteBytes,
adsReleaseHandle,
adsSyncReadByNameEx,
adsSyncWriteByNameEx,
adsSyncAddDeviceNotificationReqEx,
adsSyncDelDeviceNotificationReqEx,
adsSyncSetTimeoutEx,
get_value_from_ctype_data,
type_is_string,
)
from .structs import (
AmsAddr,
Expand All @@ -92,6 +96,7 @@
bytes_from_dict,
size_of_structure,
)
from .errorcodes import ERROR_CODES
from .symbol import AdsSymbol
from .utils import decode_ads

Expand Down Expand Up @@ -543,6 +548,40 @@ def read_by_name(
check_length=check_length,
)

def _read_list(
self,
data_symbols: Dict[str, Union[SAdsSymbolEntry, Tuple]],
ads_sub_commands: int,
structure_defs: Dict[str, StructureDef],
) -> Dict[str, Any]:
"""Perform read for a list of variables.

See :meth:`Connection.read_list_by_name`.
"""

data_names = list(data_symbols.keys())

# Limit request side, split into multiple if needed
if len(data_names) <= ads_sub_commands:
data_names_list = [data_names] # Turn into list of a single element
else:
data_names_list = _list_slice_generator(data_names, ads_sub_commands)

return_data: Dict[str, Any] = {}

for data_names_slice in data_names_list:

result = adsSumRead(self._port, self._adr, data_names_slice, data_symbols,
list(structure_defs.keys())) # type: ignore

for data_name, structure_def in structure_defs.items(): # type: ignore
result[data_name] = dict_from_bytes(result[data_name],
structure_def) # type: ignore

return_data.update(result)

return return_data

def read_list_by_name(
self,
data_names: List[str],
Expand Down Expand Up @@ -571,36 +610,95 @@ def read_list_by_name(
structure_defs = {}

if cache_symbol_info:
new_items = [i for i in data_names if i not in self._symbol_info_cache]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i always makes me think of a numeric key, so I wanted to replace with something more verbose.

new_items = [name for name in data_names if name not in self._symbol_info_cache]
new_cache = {
i: adsGetSymbolInfo(self._port, self._adr, i) for i in new_items
name: adsGetSymbolInfo(self._port, self._adr, name) for name in new_items
}
self._symbol_info_cache.update(new_cache)
data_symbols = {i: self._symbol_info_cache[i] for i in data_names}
data_symbols = {
name: self._symbol_info_cache[name] for name in data_names
}
else:
data_symbols = {
i: adsGetSymbolInfo(self._port, self._adr, i) for i in data_names
name: adsGetSymbolInfo(self._port, self._adr, name) for name in data_names
}

def sum_read(port: int, adr: AmsAddr, data_names: List[str],
data_symbols: Dict) -> Dict[str, str]:
result = adsSumRead(port, adr, data_names, data_symbols,
list(structure_defs.keys())) # type: ignore
return self._read_list(data_symbols, ads_sub_commands, structure_defs)

for data_name, structure_def in structure_defs.items(): # type: ignore
result[data_name] = dict_from_bytes(result[data_name],
structure_def) # type: ignore
def read_list_of_symbols(
self,
symbols: List[AdsSymbol],
ads_sub_commands: int = MAX_ADS_SUB_COMMANDS,
):
"""Read new values for a list of AdsSymbols using a single ADS call.

return result
The outputs will be returned as a dictionary, but the cache of each symbol will
be updated too.

if len(data_names) <= ads_sub_commands:
return sum_read(self._port, self._adr, data_names, data_symbols)
Comparable to :meth:`Connection.read_list_by_name`.
See also :class:`pyads.AdsSymbol`.

:param symbols: List if symbol instances
:param ads_sub_commands: Max. number of symbols per call (see
`read_list_by_name`)
"""

for symbol in symbols:
if symbol.is_structure:
raise ValueError("Method not available for structured variables")

# Relying on `adsSumRead()` is tricky, because we do not have the `dataType`
# (integer) for each symbol, we only have the ctypes-type.

data_symbols = {
symbol.name: (symbol.index_group, symbol.index_offset,
symbol.plc_type) for symbol in symbols
}

result = self._read_list(data_symbols, ads_sub_commands, {})

# Add result to symbols cache
for symbol in symbols:
symbol._value = result[symbol.name]

return result

def _write_list(
self,
data_symbols: Dict[Union[SAdsSymbolEntry, Tuple]],
values: Dict[str, Any],
ads_sub_commands: int,
structure_defs: Dict[str, StructureDef],
) -> Dict[str, str]:
"""Write a list of variables.

See :meth:`write_list_by_name`.
"""

if structure_defs is None:
structure_defs = {}

for name, structure_def in structure_defs.items():
values[name] = bytes_from_dict(values[name], structure_def)

structured_data_names = list(structure_defs.keys())

if len(values) <= ads_sub_commands:
data_names_list = [values] # Turn into array of single element
# return adsSumWrite(
# self._port, self._adr, data_names_and_values, data_symbols,
# structured_data_names
# )
else:
data_names_list = _dict_slice_generator(values, ads_sub_commands)

return_data: Dict[str, str] = {}
for data_names_slice in data_names_list:

result = adsSumWrite(self._port, self._adr, data_names_slice, data_symbols,
structured_data_names)
return_data.update(result)

return_data: Dict[str, Any] = {}
for data_names_slice in _list_slice_generator(data_names, ads_sub_commands):
return_data.update(
sum_read(self._port, self._adr, data_names_slice, data_symbols)
)
return return_data

def write_list_by_name(
Expand Down Expand Up @@ -631,49 +729,73 @@ def write_list_by_name(

"""
if cache_symbol_info:
new_items = [
i
for i in data_names_and_values.keys()
if i not in self._symbol_info_cache
]
new_items = [name for name in data_names_and_values.keys()
if name not in self._symbol_info_cache]
new_cache = {
i: adsGetSymbolInfo(self._port, self._adr, i) for i in new_items
name: adsGetSymbolInfo(self._port, self._adr, name) for name in new_items
}
self._symbol_info_cache.update(new_cache)
data_symbols = {
i: self._symbol_info_cache[i] for i in data_names_and_values
}
else:
data_symbols = {
i: adsGetSymbolInfo(self._port, self._adr, i)
for i in data_names_and_values.keys()
name: adsGetSymbolInfo(self._port, self._adr, name)
for name in data_names_and_values.keys()
}

if structure_defs is None:
structure_defs = {}
else:
data_names_and_values = data_names_and_values.copy() # copy so the original does not get modified

for name, structure_def in structure_defs.items():
data_names_and_values[name] = bytes_from_dict(data_names_and_values[name],
structure_def)
return self._write_list(data_symbols, data_names_and_values, ads_sub_commands, structure_defs)

structured_data_names = list(structure_defs.keys())
def write_list_of_symbols(
self,
symbols_and_values: Dict[AdsSymbol, Any],
ads_sub_commands: int = MAX_ADS_SUB_COMMANDS,
):
"""Write new values to a list of symbols.

if len(data_names_and_values) <= ads_sub_commands:
return adsSumWrite(
self._port, self._adr, data_names_and_values, data_symbols,
structured_data_names
)
Either specify a dict of symbols, or first set the `_value` property of
each symbol and then pass them as a list.
Comment on lines +799 to +800
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

List of symbols


return_data: Dict[str, str] = {}
for data_names_slice in _dict_slice_generator(data_names_and_values,
ads_sub_commands):
return_data.update(
adsSumWrite(self._port, self._adr, data_names_slice, data_symbols,
structured_data_names)
)
return return_data
For example:

.. code:: python

# Using dict
new_data = {symbol1: 3.14, symbol2: False}
plc.write_list_of_symbols(new_data)

# Using cache
symbol1._value = 3.14
symbol2._value = False
plc.write_list_of_symbols([symbol1, symbol2])

Comparable to :meth:`Connection.write_list_by_name`.
See also :class:`pyads.AdsSymbol`.

:param symbols_and_values: Symbols to write to
:param ads_sub_commands: Max. number of symbols per call (see
`write_list_by_name`)
"""

for symbol in symbols_and_values.keys():
if symbol.is_structure:
raise ValueError("Method not available for structured variables")

data_symbols = {
symbol.name: (symbol.index_group, symbol.index_offset,
symbol.plc_type) for symbol in symbols_and_values.keys()
}

data_names_and_values = {symbol.name: value for symbol, value in
symbols_and_values.items()}

return self._write_list(data_symbols, data_names_and_values,
ads_sub_commands, {})

def read_structure_by_name(
self,
Expand Down
Loading