Skip to content

Commit

Permalink
Merge pull request #1556 from samson0v/pr/prod-4685
Browse files Browse the repository at this point in the history
Implemented new socket connector config
  • Loading branch information
imbeacon authored Oct 14, 2024
2 parents 8ce296b + 0edf7a4 commit 1f34762
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 7 deletions.
Empty file.
62 changes: 62 additions & 0 deletions tests/unit/connectors/socket/data/new_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
{
"socket": {
"type": "TCP",
"address": "127.0.0.1",
"port": 50000,
"bufferSize": 1024
},
"devices": [
{
"address": "*:*",
"deviceName": "Device Example",
"deviceType": "default",
"encoding": "utf-8",
"telemetry": [
{
"key": "temp",
"byteFrom": 0,
"byteTo": -1
},
{
"key": "hum",
"byteFrom": 0,
"byteTo": 2
}
],
"attributes": [
{
"key": "name",
"byteFrom": 0,
"byteTo": -1
},
{
"key": "num",
"byteFrom": 2,
"byteTo": 4
}
],
"attributeRequests": [
{
"type": "shared",
"requestExpressionSource": "expression",
"attributeNameExpressionSource": "expression",
"requestExpression": "${[0:3]==atr}",
"attributeNameExpression": "[3:]"
}
],
"attributeUpdates": [
{
"encoding": "utf-16",
"attributeOnThingsBoard": "sharedName"
}
],
"serverSideRpc": [
{
"methodRPC": "rpcMethod1",
"withResponse": true,
"encoding": "utf-8"
}
]
}
]
}
58 changes: 58 additions & 0 deletions tests/unit/connectors/socket/data/old_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
{
"type": "TCP",
"address": "127.0.0.1",
"port": 50000,
"bufferSize": 1024,
"devices": [
{
"addressFilter": "*:*",
"deviceName": "Device Example",
"deviceType": "default",
"encoding": "utf-8",
"telemetry": [
{
"key": "temp",
"byteFrom": 0,
"byteTo": -1
},
{
"key": "hum",
"byteFrom": 0,
"byteTo": 2
}
],
"attributes": [
{
"key": "name",
"byteFrom": 0,
"byteTo": -1
},
{
"key": "num",
"byteFrom": 2,
"byteTo": 4
}
],
"attributeRequests": [
{
"type": "shared",
"requestExpression": "${[0:3]==atr}",
"attributeNameExpression": "[3:]"
}
],
"attributeUpdates": [
{
"encoding": "utf-16",
"attributeOnThingsBoard": "sharedName"
}
],
"serverSideRpc": [
{
"methodRPC": "rpcMethod1",
"withResponse": true,
"encoding": "utf-8"
}
]
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import unittest
from os import path
from tests.unit.BaseUnitTest import BaseUnitTest
from simplejson import load
from thingsboard_gateway.connectors.socket.backward_compatibility_adapter import BackwardCompatibilityAdapter


class SocketBackwardCompatibilityAdapterTests(BaseUnitTest):
CONFIG_PATH = path.join(path.dirname(path.dirname(path.dirname(path.abspath(__file__)))),
"connectors" + path.sep + "socket" + path.sep + "data" + path.sep)

@staticmethod
def convert_json(config_path):
with open(config_path, 'r') as file:
config = load(file)

return config

def setUp(self):
self.maxDiff = 8000
self.adapter = BackwardCompatibilityAdapter(config={})

def test_is_old_config(self):
is_old_config = self.convert_json(self.CONFIG_PATH + 'old_config.json')
self.assertTrue(is_old_config, True)

def test_convert_with_empty_config(self):
result = self.adapter.convert()
expected_result = {'socket':
{'address': '127.0.0.1',
'bufferSize': 1024,
'port': 50000,
'type': 'TCP'}
}
self.assertEqual(expected_result, result)

def test_convert_from_old_to_new_config(self):
self.adapter._config = self.convert_json(self.CONFIG_PATH + 'old_config.json')
result = self.adapter.convert()
expected_result = self.convert_json(self.CONFIG_PATH + 'new_config.json')
self.assertEqual(expected_result, result)


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from copy import deepcopy


class BackwardCompatibilityAdapter:
def __init__(self, config):
self._config = deepcopy(config)

@staticmethod
def is_old_config(config):
return config.get('socket') is None

def convert(self):
socket_type = self._config.pop('type', 'TCP')
address = self._config.pop('address', '127.0.0.1')
port = self._config.pop('port', 50000)
buffer_size = self._config.pop('bufferSize', 1024)

self._config['socket'] = {
'type': socket_type,
'address': address,
'port': port,
'bufferSize': buffer_size
}

for device in self._config.get('devices', []):
if device.get('addressFilter'):
address_filter = device.pop('addressFilter')
device['address'] = address_filter

for attribute_requests in device.get('attributeRequests', []):
attribute_requests['requestExpressionSource'] = 'expression'
attribute_requests['attributeNameExpressionSource'] = 'expression'

return self._config
35 changes: 28 additions & 7 deletions thingsboard_gateway/connectors/socket/socket_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from simplejson import dumps

from thingsboard_gateway.connectors.connector import Connector
from thingsboard_gateway.connectors.socket.backward_compatibility_adapter import BackwardCompatibilityAdapter
from thingsboard_gateway.gateway.statistics.decorators import CollectStatistics, CollectAllReceivedBytesStatistics
from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader
from thingsboard_gateway.gateway.statistics.statistics_service import StatisticsService
Expand All @@ -40,27 +41,38 @@ class SocketConnector(Connector, Thread):
def __init__(self, gateway, config, connector_type):
super().__init__()
self.__config = config

is_using_old_config = BackwardCompatibilityAdapter.is_old_config(config)
if is_using_old_config:
self.__config = BackwardCompatibilityAdapter(config).convert()

self.__id = self.__config.get('id')
self._connector_type = connector_type
self.statistics = {'MessagesReceived': 0,
'MessagesSent': 0}
self.statistics = {'MessagesReceived': 0, 'MessagesSent': 0}
self.__gateway = gateway
self.name = config.get("name", 'TCP Connector ' + ''.join(choice(ascii_lowercase) for _ in range(5)))
self.__log = init_logger(self.__gateway, self.name, self.__config.get('logLevel', 'INFO'),
enable_remote_logging=self.__config.get('enableRemoteLogging', False))

if is_using_old_config:
self.__log.warning("Old Socket connector configuration format detected. Automatic conversion is applied.")

self.daemon = True
self.__stopped = False
self._connected = False
self.__bind = False
self.__socket_type = config['type'].upper()
self.__socket_address = config['address']
self.__socket_port = config['port']
self.__socket_buff_size = config['bufferSize']

self.__socket_type = config.get('socket', {}).get('type', 'TCP').upper()
self.__socket_address = config.get('socket', {}).get('address', '127.0.0.1')
self.__socket_port = config.get('socket', {}).get('port', 50000)
self.__socket_buff_size = config.get('socket', {}).get('buffer_size', 1024)

self.__socket = socket.socket(socket.AF_INET, SOCKET_TYPE[self.__socket_type])
self.__socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.__converting_requests = Queue(-1)

self.__devices, self.__device_converters = self.__convert_devices_list()
self.__devices = {}
self.__device_converters = {}
self.__connections = {}

def __convert_devices_list(self):
Expand Down Expand Up @@ -91,6 +103,13 @@ def __convert_devices_list(self):
attr_requests = device.get('attributeRequests', [])
device['attributeRequests'] = self.__validate_attr_requests(attr_requests)
if len(device['attributeRequests']):
is_tb_client = False

while not is_tb_client and not self.__stopped:
self.__log.info('Waiting for ThingsBoard client to be connected...')
is_tb_client = self.__gateway.tb_client is not None and hasattr(self.__gateway.tb_client, 'client')
sleep(1)

self.__attribute_type = {
'client': self.__gateway.tb_client.client.gw_request_client_attributes,
'shared': self.__gateway.tb_client.client.gw_request_shared_attributes
Expand Down Expand Up @@ -151,6 +170,8 @@ def open(self):
self.start()

def run(self):
self.__devices, self.__device_converters = self.__convert_devices_list()

self._connected = True

converting_thread = Thread(target=self.__process_data, daemon=True, name='Converter Thread')
Expand Down

0 comments on commit 1f34762

Please sign in to comment.