Skip to content

Commit

Permalink
Fixes issues with computing "information" based on path (a.o. fullmat…
Browse files Browse the repository at this point in the history
…ch having a re.error bad escape)
  • Loading branch information
wilterdinkrobert committed Dec 27, 2023
1 parent afcc1da commit 433501d
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 34 deletions.
18 changes: 10 additions & 8 deletions thingsboard_gateway/connectors/opcua/opcua_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
TBUtility.install_package("opcua")
from opcua import Client, Node, ua

try:
try:
from opcua.crypto import uacrypto
except ImportError:
except ImportError:
TBUtility.install_package("cryptography")
from opcua.crypto import uacrypto

Expand Down Expand Up @@ -414,6 +414,8 @@ def __search_nodes_and_subscribe(self, device_info):
information["path"] = '${%s}' % information_path
information_nodes = []
self.__search_node(device_info["deviceNode"], information_path, result=information_nodes)
if len(information_nodes) == 0:
self._log.error("No nodes found for: %s - %s - %s", information_type, information["key"], information_path)

for information_node in information_nodes:
changed_key = False
Expand All @@ -440,8 +442,8 @@ def __search_nodes_and_subscribe(self, device_info):
converter = device_info["uplink_converter"]

self.subscribed[information_node] = {"converter": converter,
"path": information_path,
"config_path": config_path}
"information": information,
"information_type": information_type}

# Use Node name if param "key" not found in config
if not information.get('key'):
Expand All @@ -460,7 +462,7 @@ def __search_nodes_and_subscribe(self, device_info):
if not device_info.get(information_types[information_type]):
device_info[information_types[information_type]] = []

converted_data = converter.convert((config_path, information_path), information_value)
converted_data = converter.convert((information, information_type), information_value)
self.statistics['MessagesReceived'] = self.statistics['MessagesReceived'] + 1
self.data_to_send.append(converted_data)
self.statistics['MessagesSent'] = self.statistics['MessagesSent'] + 1
Expand All @@ -470,7 +472,7 @@ def __search_nodes_and_subscribe(self, device_info):
sub_nodes.append(information_node)
else:
self._log.error("Node for %s \"%s\" with path %s - NOT FOUND!", information_type,
information_key, information_path)
information['key'], information_path)

if changed_key:
information['key'] = None
Expand Down Expand Up @@ -729,8 +731,8 @@ def datachange_notification(self, node, val, data):
try:
self.connector._log.debug("Python: New data change event on node %s, with val: %s and data %s", node, val, str(data))
subscription = self.connector.subscribed[node]
converted_data = subscription["converter"].convert((subscription["config_path"], subscription["path"]), val,
data, key=subscription.get('key'))
converted_data = subscription["converter"].convert(
(subscription["information"], subscription["information_type"]), val, data, key=subscription.get('key'))
self.connector.statistics['MessagesReceived'] = self.connector.statistics['MessagesReceived'] + 1
self.connector.data_to_send.append(converted_data)
self.connector.statistics['MessagesSent'] = self.connector.statistics['MessagesSent'] + 1
Expand Down
43 changes: 17 additions & 26 deletions thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from re import fullmatch
from time import time
from datetime import timezone

Expand All @@ -37,38 +36,30 @@ def config(self, value):
@StatisticsService.CollectStatistics(start_stat_type='receivedBytesFromDevices',
end_stat_type='convertedBytesFromDevice')
def convert(self, config, val, data=None, key=None):
information = config[0]
information_type = config[1]
device_name = self.__config["deviceName"]
result = {"deviceName": device_name,
"deviceType": self.__config.get("deviceType", "OPC-UA Device"),
"attributes": [],
"telemetry": [], }
try:
information_types = {"attributes": "attributes", "timeseries": "telemetry"}
for information_type in information_types:
for information in self.__config[information_type]:
path = TBUtility.get_value(information["path"], get_tag=True)
if isinstance(config, tuple):
config_information = config[0].replace('\\\\', '\\') if path == config[0].replace('\\\\', '\\') or fullmatch(path,
config[0].replace('\\\\',
'\\')) else \
config[1].replace('\\\\', '\\')
else:
config_information = config.replace('\\\\', '\\')
if path == config_information or fullmatch(path, config_information) or path.replace('\\\\', '\\') == config_information:
full_key = key if key else information["key"]
full_value = information["path"].replace("${"+path+"}", str(val))
if information_type == 'timeseries' and data is not None and not self.__config.get(
'subOverrideServerTime', False):
# Note: SourceTimestamp and ServerTimestamp may be naive datetime objects, hence for the timestamp() the tz must first be overwritten to UTC (which it is according to the spec)
if data.monitored_item.Value.SourceTimestamp is not None:
timestamp = int(data.monitored_item.Value.SourceTimestamp.replace(tzinfo=timezone.utc).timestamp()*1000)
elif data.monitored_item.Value.ServerTimestamp is not None:
timestamp = int(data.monitored_item.Value.ServerTimestamp.replace(tzinfo=timezone.utc).timestamp()*1000)
else:
timestamp = int(time()*1000)
result[information_types[information_type]].append({"ts": timestamp, 'values': {full_key: full_value}})
else:
result[information_types[information_type]].append({full_key: full_value})
path = TBUtility.get_value(information["path"], get_tag=True)
full_key = key if key else information["key"]
full_value = information["path"].replace("${"+path+"}", str(val))
if information_type == 'timeseries' and data is not None and not self.__config.get(
'subOverrideServerTime', False):
# Note: SourceTimestamp and ServerTimestamp may be naive datetime objects, hence for the timestamp() the tz must first be overwritten to UTC (which it is according to the spec)
if data.monitored_item.Value.SourceTimestamp is not None:
timestamp = int(data.monitored_item.Value.SourceTimestamp.replace(tzinfo=timezone.utc).timestamp()*1000)
elif data.monitored_item.Value.ServerTimestamp is not None:
timestamp = int(data.monitored_item.Value.ServerTimestamp.replace(tzinfo=timezone.utc).timestamp()*1000)
else:
timestamp = int(time()*1000)
result[information_types[information_type]].append({"ts": timestamp, 'values': {full_key: full_value}})
else:
result[information_types[information_type]].append({full_key: full_value})
return result
except Exception as e:
self._log.exception(e)

0 comments on commit 433501d

Please sign in to comment.