Skip to content

Commit

Permalink
Improvement for connection
Browse files Browse the repository at this point in the history
  • Loading branch information
imbeacon committed Feb 29, 2024
1 parent f2af60a commit 7d15213
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 21 deletions.
2 changes: 2 additions & 0 deletions thingsboard_gateway/gateway/tb_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
TBUtility.install_package('tb-mqtt-client')
from tb_gateway_mqtt import TBGatewayMqttClient, TBDeviceMqttClient

import tb_device_mqtt
tb_device_mqtt.DEFAULT_TIMEOUT = 1

class TBClient(threading.Thread):
def __init__(self, config, config_folder_path, logger):
Expand Down
45 changes: 24 additions & 21 deletions thingsboard_gateway/gateway/tb_gateway_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,10 @@ class TBGatewayService:
]

def __init__(self, config_file=None):
self.__subscribed_to_rpc_topics = False
self.__init_variables()
if current_thread() is main_thread():
signal(SIGINT, lambda _, __: self.__stop_gateway())

self.stopped = False
self.__lock = RLock()
self.async_device_actions = {
DeviceActions.CONNECT: self.add_device,
Expand Down Expand Up @@ -242,7 +241,6 @@ def __init__(self, config_file=None):
self.tb_client.client.send_telemetry({"ts": time() * 1000, "values": {
"LOGS": "Logging loading exception, logs.json is wrong: %s" % (str(logging_error),)}})
TBLoggerHandler.set_default_handler()
self.__rpc_reply_sent = False
self.remote_handler = TBLoggerHandler(self)
log.addHandler(self.remote_handler)
# self.main_handler.setTarget(self.remote_handler)
Expand All @@ -267,10 +265,8 @@ def __init__(self, config_file=None):
"device_deleted": self.__process_deleted_gateway_devices,
}

self.__remote_shell = None
self.init_remote_shell(self.__config["thingsboard"].get("remoteShell"))

self.__rpc_remote_shell_command_in_progress = None
self.__scheduled_rpc_calls = []
self.__rpc_processing_queue = SimpleQueue()
self.__rpc_scheduled_methods_functions = {
Expand All @@ -282,27 +278,16 @@ def __init__(self, config_file=None):
self.__rpc_processing_thread.start()
self._event_storage = self._event_storage_types[self.__config["storage"]["type"]](self.__config["storage"])
self.connectors_configs = {}
self.__remote_configurator = None
self.__request_config_after_connect = False

self.__grpc_config = None
self.__grpc_connectors = None
self.__grpc_manager = None
self.init_grpc_service(self.__config.get('grpc'))

self._load_connectors()
self.__connect_with_connectors()
self.__load_persistent_devices()

self.__devices_idle_checker = self.__config['thingsboard'].get('checkingDeviceActivity', {})
self.__check_devices_idle = self.__devices_idle_checker.get('checkDeviceInactivity', False)
if self.__check_devices_idle:
thread = Thread(name='Checking devices idle time', target=self.__check_devices_idle_time, daemon=True)
thread.start()
log.info('Start checking devices idle time')

self.__statistics = None
self.__statistics_service = None
self.init_statistics_service(self.__config['thingsboard'].get('statistics', DEFAULT_STATISTIC))

self._published_events = SimpleQueue()
Expand All @@ -315,20 +300,21 @@ def __init__(self, config_file=None):
name="Send data to Thingsboard Thread")
self._send_thread.start()

self.__device_filter_config = None
self.__device_filter = None
self.__grpc_manager = None
self.init_device_filtering(self.__config['thingsboard'].get('deviceFiltering', DEFAULT_DEVICE_FILTER))

self.__duplicate_detector = DuplicateDetector(self.available_connectors_by_name)

self.__init_remote_configuration()

log.info("Gateway started.")

self._watchers_thread = Thread(target=self._watchers, name='Watchers', daemon=True)
self._watchers_thread.start()

self._load_connectors()
self.__connect_with_connectors()
self.__load_persistent_devices()

self.__init_remote_configuration()

if path.exists('/tmp/gateway'):
try:
# deleting old manager if it was closed incorrectly
Expand All @@ -350,6 +336,23 @@ def __init__(self, config_file=None):
self.server = self.manager.get_server()
self.server.serve_forever()

def __init_variables(self):
self.stopped = False
self.__device_filter_config = None
self.__device_filter = None
self.__grpc_manager = None
self.__remote_shell = None
self.__statistics = None
self.__statistics_service = None
self.__grpc_config = None
self.__grpc_connectors = None
self.__grpc_manager = None
self.__remote_configurator = None
self.__request_config_after_connect = False
self.__rpc_reply_sent = False
self.__subscribed_to_rpc_topics = False
self.__rpc_remote_shell_command_in_progress = None

@staticmethod
def __load_general_config(config_file):
file_extension = config_file.split('.')[-1]
Expand Down

0 comments on commit 7d15213

Please sign in to comment.