Skip to content

Commit

Permalink
Updated report strategy processing and version
Browse files Browse the repository at this point in the history
  • Loading branch information
imbeacon committed Nov 1, 2024
1 parent 38986ef commit 393c8f4
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 25 deletions.
2 changes: 0 additions & 2 deletions thingsboard_gateway/connectors/modbus/modbus_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,6 @@ def __init__(self, gateway, config, connector_type):
self.__slaves = []
self.__slave_thread = None

self.__main_report_strategy = self.__config.get(REPORT_STRATEGY_PARAMETER, {})

if self.__config.get('slave') and self.__config.get('slave', {}).get('sendDataToThingsBoard', False):
self.__slave_thread = Thread(target=self.__configure_and_run_slave, args=(self.__config['slave'],),
daemon=True, name='Gateway modbus slave')
Expand Down
6 changes: 4 additions & 2 deletions thingsboard_gateway/connectors/modbus/slave.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,12 @@ def __init__(self, **kwargs):
'attributes': kwargs.get('attributes', []),
'timeseries': kwargs.get('timeseries', []),
'attributeUpdates': kwargs.get('attributeUpdates', []),
'rpc': kwargs.get('rpc', []),
REPORT_STRATEGY_PARAMETER: kwargs.get(REPORT_STRATEGY_PARAMETER),
'rpc': kwargs.get('rpc', [])
}

if REPORT_STRATEGY_PARAMETER in kwargs:
self.config[REPORT_STRATEGY_PARAMETER] = kwargs[REPORT_STRATEGY_PARAMETER]

self.__load_converters(kwargs['connector'])

self.callback = kwargs['callback']
Expand Down
2 changes: 1 addition & 1 deletion thingsboard_gateway/gateway/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def from_string(cls, value: str):
raise ValueError("Invalid report strategy value: %r" % value)

DEFAULT_REPORT_STRATEGY_CONFIG = {
TYPE_PARAMETER: ReportStrategy.ON_REPORT_PERIOD.value,
TYPE_PARAMETER: ReportStrategy.ON_RECEIVED.value,
REPORT_PERIOD_PARAMETER: 10000
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ def get_ts(self):
def is_telemetry(self):
return self._is_telemetry

def update_last_report_time(self):
self._last_report_time = int(monotonic() * 1000)
def update_last_report_time(self, update_time):
self._last_report_time = update_time

def update_value(self, value):
self._value = value
Expand All @@ -60,12 +60,9 @@ def update_ts(self, ts):
self._ts = ts

def should_be_reported_by_period(self, current_time):
if self._report_strategy.report_strategy == ReportStrategy.ON_REPORT_PERIOD:
if self._report_strategy.report_strategy in (ReportStrategy.ON_REPORT_PERIOD, ReportStrategy.ON_CHANGE_OR_REPORT_PERIOD):
return (self._last_report_time is None
or current_time - self._last_report_time >= self._report_strategy.report_period)
elif self._report_strategy.report_strategy == ReportStrategy.ON_CHANGE_OR_REPORT_PERIOD:
return (self._last_report_time is not None
and current_time - self._last_report_time >= self._report_strategy.report_period)
or current_time - self._last_report_time + 50 >= self._report_strategy.report_period)
else:
return False

Expand All @@ -84,8 +81,8 @@ def put(self, datapoint_key: DatapointKey, data: str, device_name, device_type,
def get(self, datapoint_key: DatapointKey, device_name, connector_id) -> ReportStrategyDataRecord:
return self._data_cache.get((datapoint_key, device_name, connector_id))

def update_last_report_time(self, datapoint_key: DatapointKey, device_name, connector_id):
self._data_cache[(datapoint_key, device_name, connector_id)].update_last_report_time()
def update_last_report_time(self, datapoint_key: DatapointKey, device_name, connector_id, update_time):
self._data_cache[(datapoint_key, device_name, connector_id)].update_last_report_time(update_time)

def update_key_value(self, datapoint_key: DatapointKey, device_name, connector_id, value):
self._data_cache[(datapoint_key, device_name, connector_id)].update_value(value)
Expand All @@ -98,3 +95,6 @@ def delete_all_records_for_connector_by_connector_id(self, connector_id):
for key in keys_to_delete:
del self._data_cache[key]

def clear(self):
self._data_cache.clear()

Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def filter_datapoint_and_cache(self, datapoint_key: DatapointKey, data, device_n
report_strategy_config = datapoint_key.report_strategy

ts = None
current_time = int(monotonic() * 1000)

if isinstance(data, tuple):
data, ts = data
Expand Down Expand Up @@ -157,7 +158,7 @@ def filter_datapoint_and_cache(self, datapoint_key: DatapointKey, data, device_n
return False
elif report_strategy_config.report_strategy == ReportStrategy.ON_CHANGE_OR_REPORT_PERIOD:
self._report_strategy_data_cache.update_key_value(datapoint_key, device_name, connector_id, data)
self._report_strategy_data_cache.update_last_report_time(datapoint_key, device_name, connector_id)
# self._report_strategy_data_cache.update_last_report_time(datapoint_key, device_name, connector_id, current_time)
if is_telemetry:
self._report_strategy_data_cache.update_ts(datapoint_key, device_name, connector_id, ts)
return True
Expand All @@ -169,7 +170,7 @@ def filter_datapoint_and_cache(self, datapoint_key: DatapointKey, data, device_n
if isinstance(datapoint_key, tuple):
datapoint_key, _ = datapoint_key
self.__keys_to_report_periodically.add((datapoint_key, device_name, connector_id))
self._report_strategy_data_cache.update_last_report_time(datapoint_key, device_name, connector_id)
self._report_strategy_data_cache.update_last_report_time(datapoint_key, device_name, connector_id, current_time)
if is_telemetry:
self._report_strategy_data_cache.update_ts(datapoint_key, device_name, connector_id, ts)
return True
Expand All @@ -195,6 +196,8 @@ def __periodical_reporting(self):
for key, device_name, connector_id in keys_set:
report_strategy_data_record = report_strategy_data_cache_get(key, device_name, connector_id)
if report_strategy_data_record is None:
if not self.__keys_to_report_periodically:
continue
raise ValueError(f"Data record for key '{key}' is absent in the cache")

if not report_strategy_data_record.should_be_reported_by_period(current_time):
Expand All @@ -208,8 +211,7 @@ def __periodical_reporting(self):

data_entry = data_to_report[data_report_key]
if report_strategy_data_record.is_telemetry():
# data_entry.add_to_telemetry(TelemetryEntry({key: value},
# report_strategy_data_record.get_ts()))
# data_entry.add_to_telemetry(TelemetryEntry({key: value}, report_strategy_data_record.get_ts())) # Can be used to keep first ts, instead of overwriting it with current ts
current_ts = int(time() * 1000)
data_entry.add_to_telemetry(TelemetryEntry({key: value}, current_ts))
report_strategy_data_record.update_ts(current_ts)
Expand All @@ -219,7 +221,7 @@ def __periodical_reporting(self):
data_entry.add_to_attributes(key, value)
reported_data_length += 1

report_strategy_data_record.update_last_report_time()
report_strategy_data_record.update_last_report_time(current_time)

if data_to_report:
for data_report_key, data in data_to_report.items():
Expand Down Expand Up @@ -253,4 +255,9 @@ def delete_all_records_for_connector_by_connector_id_and_connector_name(self, co
if connector_id == connector_id:
self.__keys_to_report_periodically.remove((key, device_name, connector_id))
self._connectors_report_strategies.pop(connector_id, None)
self._connectors_report_strategies.pop(connector_name, None)
self._connectors_report_strategies.pop(connector_name, None)

def clear_cache(self):
self._report_strategy_data_cache.clear()
self.__keys_to_report_periodically.clear()
self._connectors_report_strategies.clear()
3 changes: 2 additions & 1 deletion thingsboard_gateway/gateway/tb_gateway_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,8 @@ def __connect_with_connectors(self):
self.available_connectors_by_id[connector_id] = connector
self.available_connectors_by_name[connector_name] = connector
try:
connector_report_strategy = ReportStrategyConfig(connector_config[CONFIG_SECTION_PARAMETER][config].get(REPORT_STRATEGY_PARAMETER))
report_strategy_config_connector = connector_config[CONFIG_SECTION_PARAMETER][config].pop(REPORT_STRATEGY_PARAMETER, None)
connector_report_strategy = ReportStrategyConfig(report_strategy_config_connector)
self._report_strategy_service.register_connector_report_strategy(connector_name, connector_id, connector_report_strategy)
except ValueError:
log.info("Cannot find separated report strategy for connector %r. The main report strategy will be used as a connector report strategy.",
Expand Down
31 changes: 28 additions & 3 deletions thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@

from tb_gateway_mqtt import TBGatewayMqttClient

from thingsboard_gateway.gateway.constants import CONFIG_VERSION_PARAMETER, REPORT_STRATEGY_PARAMETER
from thingsboard_gateway.gateway.constants import CONFIG_VERSION_PARAMETER, REPORT_STRATEGY_PARAMETER, \
DEFAULT_REPORT_STRATEGY_CONFIG
from thingsboard_gateway.gateway.entities.report_strategy_config import ReportStrategyConfig
from thingsboard_gateway.gateway.tb_client import TBClient
from thingsboard_gateway.tb_utility.tb_utility import TBUtility

Expand Down Expand Up @@ -324,15 +326,25 @@ def _handle_general_configuration_update(self, config):
else:
LOG.debug('--- Remote Shell configuration not changed.')

LOG.debug('--- Checking other configuration parameters changes...')

LOG.debug('--- Checking Report Strategy configuration changes...')
if config.get(REPORT_STRATEGY_PARAMETER) != self.general_configuration.get(REPORT_STRATEGY_PARAMETER):
LOG.debug('---- Report Strategy configuration changed. Processing...')
success = self._apply_report_strategy_config(config)
if not success:
config[REPORT_STRATEGY_PARAMETER].update(self.general_configuration[REPORT_STRATEGY_PARAMETER])
else:
LOG.debug('--- Report Strategy configuration not changed.')

LOG.debug('--- Checking other configuration parameters changes...')
self._apply_other_params_config(config)

LOG.debug('--- Saving new general configuration...')
self.general_configuration = config
self._gateway.send_attributes({'general_configuration': self.general_configuration})
self._cleanup()
with open(self._gateway.get_config_path() + "tb_gateway.json", "w",
encoding="UTF-8") as file:
with open(self._gateway.get_config_path() + "tb_gateway.json", "w", encoding="UTF-8") as file:
file.writelines(dumps(self._get_general_config_in_local_format(), indent=' '))

def _handle_storage_configuration_update(self, config):
Expand Down Expand Up @@ -782,6 +794,19 @@ def _apply_remote_shell_config(self, config):
self._gateway.init_remote_shell(self.general_configuration.get('remoteShell'))
return False

def _apply_report_strategy_config(self, config):
old_main_report_strategy = self._gateway._report_strategy_service.main_report_strategy
try:
new_main_report_strategy = ReportStrategyConfig(config.get(REPORT_STRATEGY_PARAMETER, DEFAULT_REPORT_STRATEGY_CONFIG))
self._gateway._report_strategy_service.main_report_strategy = new_main_report_strategy
self._gateway._report_strategy_service.clear_cache()
return True
except Exception as e:
self._gateway._report_strategy_service.main_report_strategy = old_main_report_strategy
LOG.error('Something went wrong with applying the new Report Strategy configuration. Reverting...')
LOG.exception(e)
return False

def _apply_other_params_config(self, config):
self._gateway.config['thingsboard'].update(config)

Expand Down
3 changes: 2 additions & 1 deletion thingsboard_gateway/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
VERSION = "3.5.3"

VERSION = "3.6"

0 comments on commit 393c8f4

Please sign in to comment.