From 393c8f4ceb164d471ab8b53e26f230a5b9e88b47 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Fri, 1 Nov 2024 10:03:33 +0200 Subject: [PATCH] Updated report strategy processing and version --- .../connectors/modbus/modbus_connector.py | 2 -- .../connectors/modbus/slave.py | 6 ++-- thingsboard_gateway/gateway/constants.py | 2 +- .../report_strategy_data_cache.py | 18 +++++------ .../report_strategy_service.py | 19 ++++++++---- .../gateway/tb_gateway_service.py | 3 +- .../tb_gateway_remote_configurator.py | 31 +++++++++++++++++-- thingsboard_gateway/version.py | 3 +- 8 files changed, 59 insertions(+), 25 deletions(-) diff --git a/thingsboard_gateway/connectors/modbus/modbus_connector.py b/thingsboard_gateway/connectors/modbus/modbus_connector.py index cc63ec85..1dd100fc 100644 --- a/thingsboard_gateway/connectors/modbus/modbus_connector.py +++ b/thingsboard_gateway/connectors/modbus/modbus_connector.py @@ -154,8 +154,6 @@ def __init__(self, gateway, config, connector_type): self.__slaves = [] self.__slave_thread = None - self.__main_report_strategy = self.__config.get(REPORT_STRATEGY_PARAMETER, {}) - if self.__config.get('slave') and self.__config.get('slave', {}).get('sendDataToThingsBoard', False): self.__slave_thread = Thread(target=self.__configure_and_run_slave, args=(self.__config['slave'],), daemon=True, name='Gateway modbus slave') diff --git a/thingsboard_gateway/connectors/modbus/slave.py b/thingsboard_gateway/connectors/modbus/slave.py index cba59063..281c36d3 100644 --- a/thingsboard_gateway/connectors/modbus/slave.py +++ b/thingsboard_gateway/connectors/modbus/slave.py @@ -61,10 +61,12 @@ def __init__(self, **kwargs): 'attributes': kwargs.get('attributes', []), 'timeseries': kwargs.get('timeseries', []), 'attributeUpdates': kwargs.get('attributeUpdates', []), - 'rpc': kwargs.get('rpc', []), - REPORT_STRATEGY_PARAMETER: kwargs.get(REPORT_STRATEGY_PARAMETER), + 'rpc': kwargs.get('rpc', []) } + if REPORT_STRATEGY_PARAMETER in kwargs: + self.config[REPORT_STRATEGY_PARAMETER] = kwargs[REPORT_STRATEGY_PARAMETER] + self.__load_converters(kwargs['connector']) self.callback = kwargs['callback'] diff --git a/thingsboard_gateway/gateway/constants.py b/thingsboard_gateway/gateway/constants.py index d2c460be..2050d559 100644 --- a/thingsboard_gateway/gateway/constants.py +++ b/thingsboard_gateway/gateway/constants.py @@ -99,7 +99,7 @@ def from_string(cls, value: str): raise ValueError("Invalid report strategy value: %r" % value) DEFAULT_REPORT_STRATEGY_CONFIG = { - TYPE_PARAMETER: ReportStrategy.ON_REPORT_PERIOD.value, + TYPE_PARAMETER: ReportStrategy.ON_RECEIVED.value, REPORT_PERIOD_PARAMETER: 10000 } diff --git a/thingsboard_gateway/gateway/report_strategy/report_strategy_data_cache.py b/thingsboard_gateway/gateway/report_strategy/report_strategy_data_cache.py index cdebae0c..6f07a731 100644 --- a/thingsboard_gateway/gateway/report_strategy/report_strategy_data_cache.py +++ b/thingsboard_gateway/gateway/report_strategy/report_strategy_data_cache.py @@ -50,8 +50,8 @@ def get_ts(self): def is_telemetry(self): return self._is_telemetry - def update_last_report_time(self): - self._last_report_time = int(monotonic() * 1000) + def update_last_report_time(self, update_time): + self._last_report_time = update_time def update_value(self, value): self._value = value @@ -60,12 +60,9 @@ def update_ts(self, ts): self._ts = ts def should_be_reported_by_period(self, current_time): - if self._report_strategy.report_strategy == ReportStrategy.ON_REPORT_PERIOD: + if self._report_strategy.report_strategy in (ReportStrategy.ON_REPORT_PERIOD, ReportStrategy.ON_CHANGE_OR_REPORT_PERIOD): return (self._last_report_time is None - or current_time - self._last_report_time >= self._report_strategy.report_period) - elif self._report_strategy.report_strategy == ReportStrategy.ON_CHANGE_OR_REPORT_PERIOD: - return (self._last_report_time is not None - and current_time - self._last_report_time >= self._report_strategy.report_period) + or current_time - self._last_report_time + 50 >= self._report_strategy.report_period) else: return False @@ -84,8 +81,8 @@ def put(self, datapoint_key: DatapointKey, data: str, device_name, device_type, def get(self, datapoint_key: DatapointKey, device_name, connector_id) -> ReportStrategyDataRecord: return self._data_cache.get((datapoint_key, device_name, connector_id)) - def update_last_report_time(self, datapoint_key: DatapointKey, device_name, connector_id): - self._data_cache[(datapoint_key, device_name, connector_id)].update_last_report_time() + def update_last_report_time(self, datapoint_key: DatapointKey, device_name, connector_id, update_time): + self._data_cache[(datapoint_key, device_name, connector_id)].update_last_report_time(update_time) def update_key_value(self, datapoint_key: DatapointKey, device_name, connector_id, value): self._data_cache[(datapoint_key, device_name, connector_id)].update_value(value) @@ -98,3 +95,6 @@ def delete_all_records_for_connector_by_connector_id(self, connector_id): for key in keys_to_delete: del self._data_cache[key] + def clear(self): + self._data_cache.clear() + diff --git a/thingsboard_gateway/gateway/report_strategy/report_strategy_service.py b/thingsboard_gateway/gateway/report_strategy/report_strategy_service.py index 8dcdd9f7..ec9f21ea 100644 --- a/thingsboard_gateway/gateway/report_strategy/report_strategy_service.py +++ b/thingsboard_gateway/gateway/report_strategy/report_strategy_service.py @@ -126,6 +126,7 @@ def filter_datapoint_and_cache(self, datapoint_key: DatapointKey, data, device_n report_strategy_config = datapoint_key.report_strategy ts = None + current_time = int(monotonic() * 1000) if isinstance(data, tuple): data, ts = data @@ -157,7 +158,7 @@ def filter_datapoint_and_cache(self, datapoint_key: DatapointKey, data, device_n return False elif report_strategy_config.report_strategy == ReportStrategy.ON_CHANGE_OR_REPORT_PERIOD: self._report_strategy_data_cache.update_key_value(datapoint_key, device_name, connector_id, data) - self._report_strategy_data_cache.update_last_report_time(datapoint_key, device_name, connector_id) + # self._report_strategy_data_cache.update_last_report_time(datapoint_key, device_name, connector_id, current_time) if is_telemetry: self._report_strategy_data_cache.update_ts(datapoint_key, device_name, connector_id, ts) return True @@ -169,7 +170,7 @@ def filter_datapoint_and_cache(self, datapoint_key: DatapointKey, data, device_n if isinstance(datapoint_key, tuple): datapoint_key, _ = datapoint_key self.__keys_to_report_periodically.add((datapoint_key, device_name, connector_id)) - self._report_strategy_data_cache.update_last_report_time(datapoint_key, device_name, connector_id) + self._report_strategy_data_cache.update_last_report_time(datapoint_key, device_name, connector_id, current_time) if is_telemetry: self._report_strategy_data_cache.update_ts(datapoint_key, device_name, connector_id, ts) return True @@ -195,6 +196,8 @@ def __periodical_reporting(self): for key, device_name, connector_id in keys_set: report_strategy_data_record = report_strategy_data_cache_get(key, device_name, connector_id) if report_strategy_data_record is None: + if not self.__keys_to_report_periodically: + continue raise ValueError(f"Data record for key '{key}' is absent in the cache") if not report_strategy_data_record.should_be_reported_by_period(current_time): @@ -208,8 +211,7 @@ def __periodical_reporting(self): data_entry = data_to_report[data_report_key] if report_strategy_data_record.is_telemetry(): - # data_entry.add_to_telemetry(TelemetryEntry({key: value}, - # report_strategy_data_record.get_ts())) + # data_entry.add_to_telemetry(TelemetryEntry({key: value}, report_strategy_data_record.get_ts())) # Can be used to keep first ts, instead of overwriting it with current ts current_ts = int(time() * 1000) data_entry.add_to_telemetry(TelemetryEntry({key: value}, current_ts)) report_strategy_data_record.update_ts(current_ts) @@ -219,7 +221,7 @@ def __periodical_reporting(self): data_entry.add_to_attributes(key, value) reported_data_length += 1 - report_strategy_data_record.update_last_report_time() + report_strategy_data_record.update_last_report_time(current_time) if data_to_report: for data_report_key, data in data_to_report.items(): @@ -253,4 +255,9 @@ def delete_all_records_for_connector_by_connector_id_and_connector_name(self, co if connector_id == connector_id: self.__keys_to_report_periodically.remove((key, device_name, connector_id)) self._connectors_report_strategies.pop(connector_id, None) - self._connectors_report_strategies.pop(connector_name, None) \ No newline at end of file + self._connectors_report_strategies.pop(connector_name, None) + + def clear_cache(self): + self._report_strategy_data_cache.clear() + self.__keys_to_report_periodically.clear() + self._connectors_report_strategies.clear() diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index 487737ad..99bd4e78 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -994,7 +994,8 @@ def __connect_with_connectors(self): self.available_connectors_by_id[connector_id] = connector self.available_connectors_by_name[connector_name] = connector try: - connector_report_strategy = ReportStrategyConfig(connector_config[CONFIG_SECTION_PARAMETER][config].get(REPORT_STRATEGY_PARAMETER)) + report_strategy_config_connector = connector_config[CONFIG_SECTION_PARAMETER][config].pop(REPORT_STRATEGY_PARAMETER, None) + connector_report_strategy = ReportStrategyConfig(report_strategy_config_connector) self._report_strategy_service.register_connector_report_strategy(connector_name, connector_id, connector_report_strategy) except ValueError: log.info("Cannot find separated report strategy for connector %r. The main report strategy will be used as a connector report strategy.", diff --git a/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py b/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py index 4d94b887..57bab5d3 100644 --- a/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py +++ b/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py @@ -25,7 +25,9 @@ from tb_gateway_mqtt import TBGatewayMqttClient -from thingsboard_gateway.gateway.constants import CONFIG_VERSION_PARAMETER, REPORT_STRATEGY_PARAMETER +from thingsboard_gateway.gateway.constants import CONFIG_VERSION_PARAMETER, REPORT_STRATEGY_PARAMETER, \ + DEFAULT_REPORT_STRATEGY_CONFIG +from thingsboard_gateway.gateway.entities.report_strategy_config import ReportStrategyConfig from thingsboard_gateway.gateway.tb_client import TBClient from thingsboard_gateway.tb_utility.tb_utility import TBUtility @@ -324,6 +326,17 @@ def _handle_general_configuration_update(self, config): else: LOG.debug('--- Remote Shell configuration not changed.') + LOG.debug('--- Checking other configuration parameters changes...') + + LOG.debug('--- Checking Report Strategy configuration changes...') + if config.get(REPORT_STRATEGY_PARAMETER) != self.general_configuration.get(REPORT_STRATEGY_PARAMETER): + LOG.debug('---- Report Strategy configuration changed. Processing...') + success = self._apply_report_strategy_config(config) + if not success: + config[REPORT_STRATEGY_PARAMETER].update(self.general_configuration[REPORT_STRATEGY_PARAMETER]) + else: + LOG.debug('--- Report Strategy configuration not changed.') + LOG.debug('--- Checking other configuration parameters changes...') self._apply_other_params_config(config) @@ -331,8 +344,7 @@ def _handle_general_configuration_update(self, config): self.general_configuration = config self._gateway.send_attributes({'general_configuration': self.general_configuration}) self._cleanup() - with open(self._gateway.get_config_path() + "tb_gateway.json", "w", - encoding="UTF-8") as file: + with open(self._gateway.get_config_path() + "tb_gateway.json", "w", encoding="UTF-8") as file: file.writelines(dumps(self._get_general_config_in_local_format(), indent=' ')) def _handle_storage_configuration_update(self, config): @@ -782,6 +794,19 @@ def _apply_remote_shell_config(self, config): self._gateway.init_remote_shell(self.general_configuration.get('remoteShell')) return False + def _apply_report_strategy_config(self, config): + old_main_report_strategy = self._gateway._report_strategy_service.main_report_strategy + try: + new_main_report_strategy = ReportStrategyConfig(config.get(REPORT_STRATEGY_PARAMETER, DEFAULT_REPORT_STRATEGY_CONFIG)) + self._gateway._report_strategy_service.main_report_strategy = new_main_report_strategy + self._gateway._report_strategy_service.clear_cache() + return True + except Exception as e: + self._gateway._report_strategy_service.main_report_strategy = old_main_report_strategy + LOG.error('Something went wrong with applying the new Report Strategy configuration. Reverting...') + LOG.exception(e) + return False + def _apply_other_params_config(self, config): self._gateway.config['thingsboard'].update(config) diff --git a/thingsboard_gateway/version.py b/thingsboard_gateway/version.py index 00165c70..dcc15c4d 100644 --- a/thingsboard_gateway/version.py +++ b/thingsboard_gateway/version.py @@ -11,4 +11,5 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -VERSION = "3.5.3" \ No newline at end of file + +VERSION = "3.6" \ No newline at end of file