diff --git a/sonic-xcvrd/tests/test_xcvrd.py b/sonic-xcvrd/tests/test_xcvrd.py index 6530f4226..af1d2568f 100644 --- a/sonic-xcvrd/tests/test_xcvrd.py +++ b/sonic-xcvrd/tests/test_xcvrd.py @@ -4,6 +4,8 @@ from xcvrd.xcvrd_utilities.media_settings_parser import * from xcvrd.xcvrd_utilities.optics_si_parser import * from xcvrd.xcvrd import * +from xcvrd.sff_mgr import * +from xcvrd.xcvrd_utilities.xcvr_table_helper import * import pytest import copy import os @@ -136,6 +138,25 @@ class TestXcvrdThreadException(object): + @patch('xcvrd.sff_mgr.PortChangeObserver', MagicMock(side_effect=NotImplementedError)) + def test_SffManagerTask_task_run_with_exception(self): + stop_event = threading.Event() + sff_mgr = SffManagerTask(DEFAULT_NAMESPACE, stop_event, MagicMock(), helper_logger) + exception_received = None + trace = None + try: + sff_mgr.start() + sff_mgr.join() + except Exception as e1: + exception_received = e1 + trace = traceback.format_exc() + + assert not sff_mgr.is_alive() + assert(type(exception_received) == NotImplementedError) + assert("NotImplementedError" in str(trace) and "effect" in str(trace)) + assert("sonic-xcvrd/xcvrd/sff_mgr.py" in str(trace)) + assert("PortChangeObserver" in str(trace)) + @patch('xcvrd.xcvrd.platform_chassis', MagicMock()) def test_CmisManagerTask_task_run_with_exception(self): port_mapping = PortMapping() @@ -202,8 +223,10 @@ def test_SfpStateUpdateTask_task_run_with_exception(self): @patch('xcvrd.xcvrd.SfpStateUpdateTask.is_alive', MagicMock(return_value = False)) @patch('xcvrd.xcvrd.DomInfoUpdateTask.is_alive', MagicMock(return_value = False)) @patch('xcvrd.xcvrd.CmisManagerTask.is_alive', MagicMock(return_value = False)) - @patch('xcvrd.xcvrd.CmisManagerTask.join', MagicMock(side_effect = NotImplementedError)) + @patch('xcvrd.xcvrd.SffManagerTask.is_alive', MagicMock(return_value=False)) + @patch('xcvrd.xcvrd.CmisManagerTask.join', MagicMock(side_effect=NotImplementedError)) @patch('xcvrd.xcvrd.CmisManagerTask.start', MagicMock()) + @patch('xcvrd.xcvrd.SffManagerTask.start', MagicMock()) @patch('xcvrd.xcvrd.DomInfoUpdateTask.start', MagicMock()) @patch('xcvrd.xcvrd.SfpStateUpdateTask.start', MagicMock()) @patch('xcvrd.xcvrd.DaemonXcvrd.deinit', MagicMock()) @@ -211,16 +234,21 @@ def test_SfpStateUpdateTask_task_run_with_exception(self): @patch('xcvrd.xcvrd.DaemonXcvrd.init') @patch('xcvrd.xcvrd.DomInfoUpdateTask.join') @patch('xcvrd.xcvrd.SfpStateUpdateTask.join') - def test_DaemonXcvrd_run_with_exception(self, mock_task_join1, mock_task_join2, mock_init, mock_os_kill): + @patch('xcvrd.xcvrd.SffManagerTask.join') + def test_DaemonXcvrd_run_with_exception(self, mock_task_join_sff, mock_task_join_sfp, + mock_task_join_dom, mock_init, mock_os_kill): mock_init.return_value = (PortMapping(), set()) xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER) + xcvrd.enable_sff_mgr = True + xcvrd.load_feature_flags = MagicMock() xcvrd.stop_event.wait = MagicMock() xcvrd.run() - assert len(xcvrd.threads) == 3 + assert len(xcvrd.threads) == 4 assert mock_init.call_count == 1 - assert mock_task_join1.call_count == 1 - assert mock_task_join2.call_count == 1 + assert mock_task_join_sff.call_count == 1 + assert mock_task_join_sfp.call_count == 1 + assert mock_task_join_dom.call_count == 1 assert mock_os_kill.call_count == 1 class TestXcvrdScript(object): @@ -1026,6 +1054,7 @@ def test_DaemonXcvrd_wait_for_port_config_done(self, mock_select, mock_sub_table def test_DaemonXcvrd_run(self, mock_task_stop1, mock_task_stop2, mock_task_run1, mock_task_run2, mock_deinit, mock_init): mock_init.return_value = (PortMapping(), set()) xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER) + xcvrd.load_feature_flags = MagicMock() xcvrd.stop_event.wait = MagicMock() xcvrd.run() assert mock_task_stop1.call_count == 1 @@ -1035,6 +1064,249 @@ def test_DaemonXcvrd_run(self, mock_task_stop1, mock_task_stop2, mock_task_run1, assert mock_deinit.call_count == 1 assert mock_init.call_count == 1 + def test_SffManagerTask_handle_port_change_event(self): + stop_event = threading.Event() + task = SffManagerTask(DEFAULT_NAMESPACE, stop_event, MagicMock(), helper_logger) + + port_change_event = PortChangeEvent('PortConfigDone', -1, 0, PortChangeEvent.PORT_SET) + task.on_port_update_event(port_change_event) + assert len(task.port_dict) == 0 + + port_change_event = PortChangeEvent('PortInitDone', -1, 0, PortChangeEvent.PORT_SET) + task.on_port_update_event(port_change_event) + assert len(task.port_dict) == 0 + + port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD) + task.on_port_update_event(port_change_event) + assert len(task.port_dict) == 0 + + port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_REMOVE) + task.on_port_update_event(port_change_event) + assert len(task.port_dict) == 0 + + port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_DEL) + task.on_port_update_event(port_change_event) + assert len(task.port_dict) == 0 + + port_dict = {'type': 'QSFP28', 'subport': '0', 'host_tx_ready': 'false'} + port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_SET, port_dict) + task.on_port_update_event(port_change_event) + assert len(task.port_dict) == 1 + + port_change_event = PortChangeEvent('Ethernet0', -1, 0, PortChangeEvent.PORT_DEL, {}, + 'STATE_DB', 'TRANSCEIVER_INFO') + task.on_port_update_event(port_change_event) + assert len(task.port_dict) == 1 + + port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_DEL, {}, + 'CONFIG_DB', 'PORT_TABLE') + task.on_port_update_event(port_change_event) + assert len(task.port_dict) == 0 + + def test_SffManagerTask_get_active_lanes_for_lport(self): + sff_manager_task = SffManagerTask(DEFAULT_NAMESPACE, + threading.Event(), + MagicMock(), + helper_logger) + + lport = 'Ethernet0' + + subport_idx = 3 + num_lanes_per_lport = 1 + num_lanes_per_pport = 4 + expected_result = [False, False, True, False] + result = sff_manager_task.get_active_lanes_for_lport(lport, subport_idx, num_lanes_per_lport, num_lanes_per_pport) + assert result == expected_result + + subport_idx = 1 + num_lanes_per_lport = 2 + num_lanes_per_pport = 4 + expected_result = [True, True, False, False] + result = sff_manager_task.get_active_lanes_for_lport(lport, subport_idx, num_lanes_per_lport, num_lanes_per_pport) + assert result == expected_result + + subport_idx = 1 + num_lanes_per_lport = 2 + num_lanes_per_pport = 4 + expected_result = [True, True, False, False] + result = sff_manager_task.get_active_lanes_for_lport(lport, subport_idx, num_lanes_per_lport, num_lanes_per_pport) + assert result == expected_result + + subport_idx = 2 + num_lanes_per_lport = 2 + num_lanes_per_pport = 4 + expected_result = [False, False, True, True] + result = sff_manager_task.get_active_lanes_for_lport(lport, subport_idx, num_lanes_per_lport, num_lanes_per_pport) + assert result == expected_result + + subport_idx = 0 + num_lanes_per_lport = 4 + num_lanes_per_pport = 4 + expected_result = [True, True, True, True] + result = sff_manager_task.get_active_lanes_for_lport(lport, subport_idx, num_lanes_per_lport, num_lanes_per_pport) + assert result == expected_result + + # Test with larger number of lanes per port (not real use case) + subport_idx = 1 + num_lanes_per_lport = 4 + num_lanes_per_pport = 32 + expected_result = [True, True, True, True, False, False, False, False, + False, False, False, False, False, False, False, False, + False, False, False, False, False, False, False, False, + False, False, False, False, False, False, False, False] + result = sff_manager_task.get_active_lanes_for_lport(lport, subport_idx, num_lanes_per_lport, num_lanes_per_pport) + assert result == expected_result + + def test_SffManagerTask_get_active_lanes_for_lport_with_invalid_input(self): + sff_manager_task = SffManagerTask(DEFAULT_NAMESPACE, + threading.Event(), + MagicMock(), + helper_logger) + + lport = 'Ethernet0' + + subport_idx = -1 + num_lanes_per_lport = 4 + num_lanes_per_pport = 32 + result = sff_manager_task.get_active_lanes_for_lport(lport, subport_idx, num_lanes_per_lport, num_lanes_per_pport) + assert result is None + + subport_idx = 5 + num_lanes_per_lport = 1 + num_lanes_per_pport = 4 + result = sff_manager_task.get_active_lanes_for_lport(lport, subport_idx, num_lanes_per_lport, num_lanes_per_pport) + assert result is None + + @patch.object(XcvrTableHelper, 'get_state_port_tbl', return_value=MagicMock()) + def test_SffManagerTask_get_host_tx_status(self, mock_get_state_port_tbl): + mock_get_state_port_tbl.return_value.hget.return_value = (True, 'true') + + sff_manager_task = SffManagerTask(DEFAULT_NAMESPACE, + threading.Event(), + MagicMock(), + helper_logger) + + lport = 'Ethernet0' + assert sff_manager_task.get_host_tx_status(lport, 0) == 'true' + mock_get_state_port_tbl.assert_called_once_with(0) + mock_get_state_port_tbl.return_value.hget.assert_called_once_with(lport, 'host_tx_ready') + + @patch.object(XcvrTableHelper, 'get_cfg_port_tbl', return_value=MagicMock()) + def test_SffManagerTask_get_admin_status(self, mock_get_cfg_port_tbl): + mock_get_cfg_port_tbl.return_value.hget.return_value = (True, 'up') + + sff_manager_task = SffManagerTask(DEFAULT_NAMESPACE, + threading.Event(), + MagicMock(), + helper_logger) + + lport = 'Ethernet0' + assert sff_manager_task.get_admin_status(lport, 0) == 'up' + mock_get_cfg_port_tbl.assert_called_once_with(0) + mock_get_cfg_port_tbl.return_value.hget.assert_called_once_with(lport, 'admin_status') + + @patch('xcvrd.xcvrd.platform_chassis') + @patch('xcvrd.sff_mgr.PortChangeObserver', MagicMock(handle_port_update_event=MagicMock())) + def test_SffManagerTask_task_worker(self, mock_chassis): + mock_xcvr_api = MagicMock() + mock_xcvr_api.tx_disable_channel = MagicMock(return_value=True) + mock_xcvr_api.is_flat_memory = MagicMock(return_value=False) + mock_xcvr_api.is_copper = MagicMock(return_value=False) + mock_xcvr_api.get_tx_disable_support = MagicMock(return_value=True) + + mock_sfp = MagicMock() + mock_sfp.get_presence = MagicMock(return_value=True) + mock_sfp.get_xcvr_api = MagicMock(return_value=mock_xcvr_api) + + mock_chassis.get_all_sfps = MagicMock(return_value=[mock_sfp]) + mock_chassis.get_sfp = MagicMock(return_value=mock_sfp) + + task = SffManagerTask(DEFAULT_NAMESPACE, + threading.Event(), + mock_chassis, + helper_logger) + + # TX enable case: + port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_SET, { + 'type': 'QSFP28', + 'subport': '0', + 'lanes': '1,2,3,4', + }) + task.on_port_update_event(port_change_event) + assert len(task.port_dict) == 1 + task.get_host_tx_status = MagicMock(return_value='true') + task.get_admin_status = MagicMock(return_value='up') + mock_xcvr_api.get_tx_disable = MagicMock(return_value=[True, True, True, True]) + task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) + task.task_worker() + assert mock_xcvr_api.tx_disable_channel.call_count == 1 + assert task.get_host_tx_status.call_count == 1 + assert task.get_admin_status.call_count == 1 + + # TX disable case: + port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_SET, + {'host_tx_ready': 'false'}) + task.on_port_update_event(port_change_event) + assert len(task.port_dict) == 1 + mock_xcvr_api.get_tx_disable = MagicMock(return_value=[False, False, False, False]) + task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) + task.task_worker() + assert mock_xcvr_api.tx_disable_channel.call_count == 2 + + # No insertion and no change on host_tx_ready + task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) + task.task_worker() + assert task.port_dict == task.port_dict_prev + assert mock_xcvr_api.tx_disable_channel.call_count == 2 + + # flat memory case + port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_SET, + {'host_tx_ready': 'true'}) + task.on_port_update_event(port_change_event) + mock_xcvr_api.is_flat_memory = MagicMock(return_value=True) + mock_xcvr_api.is_flat_memory.call_count = 0 + task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) + task.task_worker() + assert mock_xcvr_api.is_flat_memory.call_count == 1 + assert mock_xcvr_api.tx_disable_channel.call_count == 2 + mock_xcvr_api.is_flat_memory = MagicMock(return_value=False) + + # copper case + port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_SET, + {'host_tx_ready': 'false'}) + task.on_port_update_event(port_change_event) + mock_xcvr_api.is_copper = MagicMock(return_value=True) + mock_xcvr_api.is_copper.call_count = 0 + task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) + task.task_worker() + assert mock_xcvr_api.is_copper.call_count == 1 + assert mock_xcvr_api.tx_disable_channel.call_count == 2 + mock_xcvr_api.is_copper = MagicMock(return_value=False) + + # tx_disable not supported case + port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_SET, + {'host_tx_ready': 'true'}) + task.on_port_update_event(port_change_event) + mock_xcvr_api.get_tx_disable_support = MagicMock(return_value=False) + mock_xcvr_api.get_tx_disable_support.call_count = 0 + task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) + task.task_worker() + assert mock_xcvr_api.get_tx_disable_support.call_count == 1 + assert mock_xcvr_api.tx_disable_channel.call_count == 2 + mock_xcvr_api.get_tx_disable_support = MagicMock(return_value=True) + + # sfp not present case + port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_SET, + {'host_tx_ready': 'false'}) + task.on_port_update_event(port_change_event) + mock_sfp.get_presence = MagicMock(return_value=False) + mock_sfp.get_presence.call_count = 0 + task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) + task.task_worker() + assert mock_sfp.get_presence.call_count == 1 + assert mock_xcvr_api.tx_disable_channel.call_count == 2 + mock_sfp.get_presence = MagicMock(return_value=True) + @patch('xcvrd.xcvrd._wrapper_get_sfp_type', MagicMock(return_value='QSFP_DD')) def test_CmisManagerTask_handle_port_change_event(self): port_mapping = PortMapping() @@ -2047,7 +2319,6 @@ def test_DaemonXcvrd_init_deinit_fastboot_enabled(self): xcvrd.init() xcvrd.deinit() - def wait_until(total_wait_time, interval, call_back, *args, **kwargs): wait_time = 0 while wait_time <= total_wait_time: diff --git a/sonic-xcvrd/xcvrd/sff_mgr.py b/sonic-xcvrd/xcvrd/sff_mgr.py new file mode 100644 index 000000000..0940b6b54 --- /dev/null +++ b/sonic-xcvrd/xcvrd/sff_mgr.py @@ -0,0 +1,479 @@ +""" + SFF task manager + Deterministic link bring-up task manager for SFF compliant modules, running + as a thread inside xcvrd +""" + +try: + import copy + import sys + import threading + import traceback + + from swsscommon import swsscommon + + from .xcvrd_utilities.port_event_helper import PortChangeObserver + from .xcvrd_utilities.xcvr_table_helper import XcvrTableHelper +except ImportError as e: + raise ImportError(str(e) + " - required module not found") + + +class SffLoggerForPortUpdateEvent: + SFF_LOGGER_PREFIX = "SFF-PORT-UPDATE: " + + def __init__(self, logger): + self.logger = logger + + def log_notice(self, message): + self.logger.log_notice("{}{}".format(self.SFF_LOGGER_PREFIX, message)) + + def log_warning(self, message): + self.logger.log_warning("{}{}".format(self.SFF_LOGGER_PREFIX, message)) + + def log_error(self, message): + self.logger.log_error("{}{}".format(self.SFF_LOGGER_PREFIX, message)) + +# Thread wrapper class for SFF compliant transceiver management +class SffManagerTask(threading.Thread): + + # CONFIG_DB port table fields: + + ADMIN_STATUS = 'admin_status' + # This is the subport index for this logical port, starting from 1, 0 means + # all lanes are taken. + SUBPORT = 'subport' + # This is a comma separated list of lane numbers + LANES_LIST = 'lanes' + + + # STATE_DB TRANSCEIVER_INFO table fields: + + # This filed can used to determine insertion/removal event. Since + # TRANSCEIVER_INFO has the same life cycle as a transceiver, if transceiver + # is inserted/removed, TRANSCEIVER_INFO is also created/deleted. + XCVR_TYPE = 'type' + + + # STATE_DB PORT_TABLE fields: + + HOST_TX_READY = 'host_tx_ready' + + # Default number of lanes per physical port for QSFP28/QSFP+ transceiver + DEFAULT_NUM_LANES_PER_PPORT = 4 + + # Subscribe to below tables in Redis DB + PORT_TBL_MAP = [ + { + 'CONFIG_DB': swsscommon.CFG_PORT_TABLE_NAME + }, + { + 'STATE_DB': 'TRANSCEIVER_INFO', + 'FILTER': [XCVR_TYPE] + }, + { + 'STATE_DB': 'PORT_TABLE', + 'FILTER': [HOST_TX_READY] # This also filters out unwanted 'admin_status' from STATE_DB. + }, + ] + + SFF_LOGGER_PREFIX = "SFF-MAIN: " + + def __init__(self, namespaces, main_thread_stop_event, platform_chassis, helper_logger): + threading.Thread.__init__(self) + self.name = "SffManagerTask" + self.exc = None + self.task_stopping_event = threading.Event() + self.main_thread_stop_event = main_thread_stop_event + self.helper_logger = helper_logger + self.logger_for_port_update_event = SffLoggerForPortUpdateEvent(helper_logger) + self.platform_chassis = platform_chassis + # port_dict holds data obtained from on_port_update_event per port entry + # with logical_port_name as key. + # Its port entry will get deleted upon CONFIG_DB PORT_TABLE DEL. + self.port_dict = {} + # port_dict snapshot captured in the previous event update loop + self.port_dict_prev = {} + self.xcvr_table_helper = XcvrTableHelper(namespaces) + self.namespaces = namespaces + + def log_notice(self, message): + self.helper_logger.log_notice("{}{}".format(self.SFF_LOGGER_PREFIX, message)) + + def log_warning(self, message): + self.helper_logger.log_warning("{}{}".format(self.SFF_LOGGER_PREFIX, message)) + + def log_error(self, message): + self.helper_logger.log_error("{}{}".format(self.SFF_LOGGER_PREFIX, message)) + + def get_active_lanes_for_lport(self, lport, subport_idx, num_lanes_per_lport, num_lanes_per_pport): + """ + Get the active lanes for a logical port based on the subport index. + + Args: + lport (str): Logical port name. + subport_idx (int): Subport index, starting from 1. 0 means all lanes are taken. + num_lanes_per_lport (int): Number of lanes per logical port. + num_lanes_per_pport (int): Number of lanes per physical port. + + Returns: + list: A list of boolean values, where True means the corresponding + lane belongs to this logical port. For example, [True, True, + False, False] means the first two lanes on this physical port + belong to this logical port. + """ + if subport_idx < 0 or subport_idx > num_lanes_per_pport // num_lanes_per_lport: + self.log_error( + "{}: Invalid subport_idx {} " + "for num_lanes_per_lport={}, " + "num_lanes_per_pport={}".format(lport, + subport_idx, + num_lanes_per_lport, + num_lanes_per_pport) + ) + return None + + if subport_idx == 0: + lanes = [True] * num_lanes_per_pport + else: + lanes = [False] * num_lanes_per_pport + start = (subport_idx - 1) * num_lanes_per_lport + end = subport_idx * num_lanes_per_lport + lanes[start:end] = [True] * (end - start) + + return lanes + + def on_port_update_event(self, port_change_event): + if (port_change_event.event_type + not in [port_change_event.PORT_SET, port_change_event.PORT_DEL]): + return + + lport = port_change_event.port_name + pport = port_change_event.port_index + asic_id = port_change_event.asic_id + + # Skip if it's not a physical port + if not lport.startswith('Ethernet'): + return + + # Skip if the physical index is not available + if pport is None: + return + + if port_change_event.port_dict is None: + return + + if port_change_event.event_type == port_change_event.PORT_SET: + if lport not in self.port_dict: + self.port_dict[lport] = {} + if pport >= 0: + self.port_dict[lport]['index'] = pport + + if self.SUBPORT in port_change_event.port_dict: + self.port_dict[lport][self.SUBPORT] = port_change_event.port_dict[self.SUBPORT] + + if self.LANES_LIST in port_change_event.port_dict: + self.port_dict[lport][self.LANES_LIST] = \ + port_change_event.port_dict[self.LANES_LIST].split(',') + + if self.HOST_TX_READY in port_change_event.port_dict: + self.port_dict[lport][self.HOST_TX_READY] = \ + port_change_event.port_dict[self.HOST_TX_READY] + + if self.ADMIN_STATUS in port_change_event.port_dict: + self.port_dict[lport][self.ADMIN_STATUS] = \ + port_change_event.port_dict[self.ADMIN_STATUS] + + if self.XCVR_TYPE in port_change_event.port_dict: + self.port_dict[lport][self.XCVR_TYPE] = port_change_event.port_dict[self.XCVR_TYPE] + self.port_dict[lport]['asic_id'] = asic_id + # CONFIG_DB PORT_TABLE DEL case: + elif port_change_event.db_name and \ + port_change_event.db_name == 'CONFIG_DB': + # Only when port is removed from CONFIG, we consider this entry as deleted. + if lport in self.port_dict: + del self.port_dict[lport] + # STATE_DB TRANSCEIVER_INFO DEL case: + elif port_change_event.table_name and \ + port_change_event.table_name == 'TRANSCEIVER_INFO': + # TRANSCEIVER_INFO DEL corresponds to transceiver removal (not + # port/interface removal), in this case, remove XCVR_TYPE field from + # self.port_dict + if lport in self.port_dict and self.XCVR_TYPE in self.port_dict[lport]: + del self.port_dict[lport][self.XCVR_TYPE] + + def get_host_tx_status(self, lport, asic_index): + host_tx_ready = 'false' + + state_port_tbl = self.xcvr_table_helper.get_state_port_tbl(asic_index) + + found, value = state_port_tbl.hget(lport, self.HOST_TX_READY) + if found: + host_tx_ready = value + return host_tx_ready + + def get_admin_status(self, lport, asic_index): + admin_status = 'down' + + cfg_port_tbl = self.xcvr_table_helper.get_cfg_port_tbl(asic_index) + + found, value = cfg_port_tbl.hget(lport, self.ADMIN_STATUS) + if found: + admin_status = value + return admin_status + + def run(self): + if self.platform_chassis is None: + self.log_error("Platform chassis is not available, stopping...") + return + + try: + self.task_worker() + except Exception as e: + self.helper_logger.log_error("Exception occured at {} thread due to {}".format( + threading.current_thread().getName(), repr(e))) + exc_type, exc_value, exc_traceback = sys.exc_info() + msg = traceback.format_exception(exc_type, exc_value, exc_traceback) + for tb_line in msg: + for tb_line_split in tb_line.splitlines(): + self.helper_logger.log_error(tb_line_split) + self.exc = e + self.main_thread_stop_event.set() + + def join(self): + self.task_stopping_event.set() + threading.Thread.join(self) + if self.exc: + raise self.exc + + def calculate_tx_disable_delta_array(self, cur_tx_disable_array, tx_disable_flag, active_lanes): + """ + Calculate the delta array between current tx_disable array and the target tx_disable flag. + + Args: + cur_tx_disable_array (list): An array of boolean values, where True means the + corresponding lane is disabled. + tx_disable_flag (bool): The target tx_disable flag. + active_lanes (list): An array of boolean values, where True means the + corresponding lane is active for this logical port. + + Returns: + list: A list of boolean values, where True means the corresponding + lane needs to be changed. + """ + delta_array = [] + for active, cur_flag in zip(active_lanes, cur_tx_disable_array): + is_different = (tx_disable_flag != cur_flag) if active else False + delta_array.append(is_different) + return delta_array + + def convert_bool_array_to_bit_mask(self, bool_array): + """ + Convert a boolean array into a bitmask. If a value in the boolean array + is True, the corresponding bit in the bitmask is set to 1, otherwise + it's set to 0. The function starts from the least significant bit for + the first item in the boolean array. + + Args: + bool_array (list): An array of boolean values. + + Returns: + int: A bitmask corresponding to the input boolean array. + """ + mask = 0 + for i, flag in enumerate(bool_array): + mask += (1 << i if flag else 0) + return mask + + def task_worker(self): + ''' + The main goal of sff_mgr is to make sure SFF compliant modules are + brought up in a deterministc way, meaning TX is enabled only after + host_tx_ready becomes True, and TX will be disabled when host_tx_ready + becomes False. This will help eliminate link stability issue and + potential interface flap, also turning off TX reduces the power + consumption and avoid any lab hazard for admin shut interface. + + Platform can decide whether to enable sff_mgr. By default, it's disabled. + + Pre-requisite for platform to enable sff_mgr: + platform needs to keep TX in disabled state after module coming + out-of-reset, in either module insertion or bootup cases. This is to + make sure the module is not transmitting with TX enabled before + host_tx_ready is True. + ''' + + # CONFIG updates, and STATE_DB for insertion/removal, and host_tx_ready change + port_change_observer = PortChangeObserver(self.namespaces, + self.logger_for_port_update_event, + self.task_stopping_event, + self.on_port_update_event, + self.PORT_TBL_MAP) + + # This thread doesn't need to expilictly wait on PortInitDone and + # PortConfigDone events, as xcvrd main thread waits on them before + # spawrning this thread. + while not self.task_stopping_event.is_set(): + # Internally, handle_port_update_event will block for up to + # SELECT_TIMEOUT_MSECS until a message is received(in select + # function). A message is received when there is a Redis SET/DEL + # operation in the DB tables. Upon process restart, messages will be + # replayed for all fields, no need to explictly query the DB tables + # here. + if not port_change_observer.handle_port_update_event(): + # In the case of no real update, go back to the beginning of the loop + continue + + for lport in self.port_dict: + if self.task_stopping_event.is_set(): + break + data = self.port_dict[lport] + pport = int(data.get('index', '-1')) + subport_idx = int(data.get(self.SUBPORT, '0')) + lanes_list = data.get(self.LANES_LIST, None) + # active_lanes is a list of boolean values, where True means the + # corresponding lane belongs to this logical port. + active_lanes = data.get('active_lanes', None) + xcvr_type = data.get(self.XCVR_TYPE, None) + xcvr_inserted = False + host_tx_ready_changed = False + admin_status_changed = False + if pport < 0 or lanes_list is None: + continue + + if xcvr_type is None: + # TRANSCEIVER_INFO table's XCVR_TYPE is not ready, meaning xcvr is not present + continue + + # Procced only for QSFP28/QSFP+ transceiver + if not (xcvr_type.startswith('QSFP28') or xcvr_type.startswith('QSFP+')): + continue + + # Handle the case that host_tx_ready value in the local cache hasn't + # been updated via PortChangeEvent: + if self.HOST_TX_READY not in data: + # Fetch host_tx_ready status from STATE_DB (if not present + # in DB, treat it as false), and update self.port_dict + data[self.HOST_TX_READY] = self.get_host_tx_status(lport, data['asic_id']) + self.log_notice("{}: fetched DB and updated host_tx_ready={} locally".format( + lport, data[self.HOST_TX_READY])) + # Handle the case that admin_status value in the local cache hasn't + # been updated via PortChangeEvent: + if self.ADMIN_STATUS not in data: + # Fetch admin_status from CONFIG_DB (if not present in DB, + # treat it as false), and update self.port_dict + data[self.ADMIN_STATUS] = self.get_admin_status(lport, data['asic_id']) + self.log_notice("{}: fetched DB and updated admin_status={} locally".format( + lport, data[self.ADMIN_STATUS])) + + # Check if there's a diff between current and previous XCVR_TYPE + # It's a xcvr insertion case if TRANSCEIVER_INFO XCVR_TYPE doesn't exist + # in previous port_dict snapshot + if lport not in self.port_dict_prev or self.XCVR_TYPE not in self.port_dict_prev[lport]: + xcvr_inserted = True + # Check if there's a diff between current and previous host_tx_ready + if (lport not in self.port_dict_prev or + self.HOST_TX_READY not in self.port_dict_prev[lport] or + self.port_dict_prev[lport][self.HOST_TX_READY] != data[self.HOST_TX_READY]): + host_tx_ready_changed = True + # Check if there's a diff between current and previous admin_status + if (lport not in self.port_dict_prev or + self.ADMIN_STATUS not in self.port_dict_prev[lport] or + self.port_dict_prev[lport][self.ADMIN_STATUS] != data[self.ADMIN_STATUS]): + admin_status_changed = True + # Skip if neither of below cases happens: + # 1) xcvr insertion + # 2) host_tx_ready getting changed + # 3) admin_status getting changed + # In addition to handle_port_update_event()'s internal filter, + # this check serves as additional filter to ignore irrelevant + # event, such as CONFIG_DB change other than admin_status field. + if ((not xcvr_inserted) and + (not host_tx_ready_changed) and + (not admin_status_changed)): + continue + self.log_notice(("{}: xcvr=present(inserted={}), " + "host_tx_ready={}(changed={}), " + "admin_status={}(changed={})").format( + lport, + xcvr_inserted, + data[self.HOST_TX_READY], host_tx_ready_changed, + data[self.ADMIN_STATUS], admin_status_changed)) + + # double-check the HW presence before moving forward + sfp = self.platform_chassis.get_sfp(pport) + if not sfp.get_presence(): + self.log_error("{}: module not present!".format(lport)) + del self.port_dict[lport][self.XCVR_TYPE] + continue + try: + # Skip if XcvrApi is not supported + api = sfp.get_xcvr_api() + if api is None: + self.log_error( + "{}: skipping sff_mgr since no xcvr api!".format(lport)) + continue + + # Skip if it's not a paged memory device + if api.is_flat_memory(): + self.log_notice( + "{}: skipping sff_mgr for flat memory xcvr".format(lport)) + continue + + # Skip if it's a copper cable + if api.is_copper(): + self.log_notice( + "{}: skipping sff_mgr for copper cable".format(lport)) + continue + + # Skip if tx_disable action is not supported for this xcvr + if not api.get_tx_disable_support(): + self.log_notice( + "{}: skipping sff_mgr due to tx_disable not supported".format( + lport)) + continue + except (AttributeError, NotImplementedError): + # Skip if these essential routines are not available + continue + + if active_lanes is None: + active_lanes = self.get_active_lanes_for_lport(lport, subport_idx, + len(lanes_list), + self.DEFAULT_NUM_LANES_PER_PPORT) + if active_lanes is None: + self.log_error("{}: skipping sff_mgr due to " + "failing to get active lanes".format(lport)) + continue + # Save active_lanes in self.port_dict + self.port_dict[lport]['active_lanes'] = active_lanes + + # Only turn on TX if both host_tx_ready is true and admin_status is up + target_tx_disable_flag = not (data[self.HOST_TX_READY] == 'true' + and data[self.ADMIN_STATUS] == 'up') + # get_tx_disable API returns an array of bool, with tx_disable flag on each lane. + # True means tx disabled; False means tx enabled. + cur_tx_disable_array = api.get_tx_disable() + if cur_tx_disable_array is None: + self.log_error("{}: Failed to get current tx_disable value".format(lport)) + # If reading current tx_disable/enable value failed (could be due to + # read error), then set this variable to the opposite value of + # target_tx_disable_flag, to let detla array to be True on + # all the interested lanes, to try best-effort TX disable/enable. + cur_tx_disable_array = [not target_tx_disable_flag] * self.DEFAULT_NUM_LANES_PER_PPORT + # Get an array of bool, where it's True only on the lanes that need change. + delta_array = self.calculate_tx_disable_delta_array(cur_tx_disable_array, + target_tx_disable_flag, active_lanes) + mask = self.convert_bool_array_to_bit_mask(delta_array) + if mask == 0: + self.log_notice("{}: No change is needed for tx_disable value".format(lport)) + continue + if api.tx_disable_channel(mask, target_tx_disable_flag): + self.log_notice("{}: TX was {} with lanes mask: {}".format( + lport, "disabled" if target_tx_disable_flag else "enabled", bin(mask))) + else: + self.log_error("{}: Failed to {} TX with lanes mask: {}".format( + lport, "disable" if target_tx_disable_flag else "enable", bin(mask))) + + # Take a snapshot for port_dict, this will be used to calculate diff + # later in the while loop to determine if there's really a value + # change on the fields related to the events we care about. + self.port_dict_prev = copy.deepcopy(self.port_dict) diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index 92516359b..20defb7d6 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -26,11 +26,13 @@ from swsscommon import swsscommon from .xcvrd_utilities import sfp_status_helper + from .sff_mgr import SffManagerTask + from .xcvrd_utilities.xcvr_table_helper import XcvrTableHelper from .xcvrd_utilities import port_event_helper from .xcvrd_utilities.port_event_helper import PortChangeObserver from .xcvrd_utilities import media_settings_parser from .xcvrd_utilities import optics_si_parser - + from sonic_platform_base.sonic_xcvr.api.public.c_cmis import CmisApi except ImportError as e: @@ -45,12 +47,6 @@ PLATFORM_SPECIFIC_MODULE_NAME = "sfputil" PLATFORM_SPECIFIC_CLASS_NAME = "SfpUtil" -TRANSCEIVER_INFO_TABLE = 'TRANSCEIVER_INFO' -TRANSCEIVER_DOM_SENSOR_TABLE = 'TRANSCEIVER_DOM_SENSOR' -TRANSCEIVER_DOM_THRESHOLD_TABLE = 'TRANSCEIVER_DOM_THRESHOLD' -TRANSCEIVER_STATUS_TABLE = 'TRANSCEIVER_STATUS' -TRANSCEIVER_PM_TABLE = 'TRANSCEIVER_PM' - TRANSCEIVER_STATUS_TABLE_SW_FIELDS = ["status", "error"] # Mgminit time required as per CMIS spec @@ -898,7 +894,7 @@ def get_cmis_media_lanes_mask(self, api, appl, lport, subport): self.log_error("Invalid input to get media lane mask - appl {} media_lane_count {} " "lport {} subport {}!".format(appl, media_lane_count, lport, subport)) return media_lanes_mask - + media_lane_start_bit = (media_lane_count * (0 if subport == 0 else subport - 1)) if media_lane_assignment_option & (1 << media_lane_start_bit): media_lanes_mask = ((1 << media_lane_count) - 1) << media_lane_start_bit @@ -1322,7 +1318,7 @@ def task_worker(self): continue host_lanes_mask = self.port_dict[lport]['host_lanes_mask'] self.log_notice("{}: Setting host_lanemask=0x{:x}".format(lport, host_lanes_mask)) - + self.port_dict[lport]['media_lane_count'] = int(api.get_media_lane_count(appl)) self.port_dict[lport]['media_lane_assignment_options'] = int(api.get_media_lane_assignment_option(appl)) media_lane_count = self.port_dict[lport]['media_lane_count'] @@ -1392,8 +1388,8 @@ def task_worker(self): self.port_dict[lport]['cmis_expired'] = now + datetime.timedelta(seconds = max(modulePwrUpDuration, dpDeinitDuration)) elif state == self.CMIS_STATE_AP_CONF: - # Explicit control bit to apply custom Host SI settings. - # It will be set to 1 and applied via set_application if + # Explicit control bit to apply custom Host SI settings. + # It will be set to 1 and applied via set_application if # custom SI settings is applicable ec = 0 @@ -1425,13 +1421,13 @@ def task_worker(self): # Apply module SI settings if applicable lane_speed = int(speed/1000)//host_lane_count optics_si_dict = optics_si_parser.fetch_optics_si_setting(pport, lane_speed, sfp) - + self.log_debug("Read SI parameters for port {} from optics_si_settings.json vendor file:".format(lport)) for key, sub_dict in optics_si_dict.items(): self.log_debug("{}".format(key)) for sub_key, value in sub_dict.items(): self.log_debug("{}: {}".format(sub_key, str(value))) - + if optics_si_dict: self.log_notice("{}: Apply Optics SI found for Vendor: {} PN: {} lane speed: {}G". format(lport, api.get_manufacturer(), api.get_model(), lane_speed)) @@ -2185,11 +2181,12 @@ def retry_eeprom_reading(self): class DaemonXcvrd(daemon_base.DaemonBase): - def __init__(self, log_identifier, skip_cmis_mgr=False): + def __init__(self, log_identifier, skip_cmis_mgr=False, enable_sff_mgr=False): super(DaemonXcvrd, self).__init__(log_identifier) self.stop_event = threading.Event() self.sfp_error_event = threading.Event() self.skip_cmis_mgr = skip_cmis_mgr + self.enable_sff_mgr = enable_sff_mgr self.namespaces = [''] self.threads = [] @@ -2316,6 +2313,15 @@ def run(self): # Start daemon initialization sequence port_mapping_data = self.init() + # Start the SFF manager + sff_manager = None + if self.enable_sff_mgr: + sff_manager = SffManagerTask(self.namespaces, self.stop_event, platform_chassis, helper_logger) + sff_manager.start() + self.threads.append(sff_manager) + else: + self.log_notice("Skipping SFF Task Manager") + # Start the CMIS manager cmis_manager = CmisManagerTask(self.namespaces, port_mapping_data, self.stop_event, self.skip_cmis_mgr) if not self.skip_cmis_mgr: @@ -2355,6 +2361,11 @@ def run(self): self.log_error("Exiting main loop as child thread raised exception!") os.kill(os.getpid(), signal.SIGKILL) + # Stop the SFF manager + if sff_manager is not None: + if sff_manager.is_alive(): + sff_manager.join() + # Stop the CMIS manager if cmis_manager is not None: if cmis_manager.is_alive(): @@ -2377,54 +2388,6 @@ def run(self): if self.sfp_error_event.is_set(): sys.exit(SFP_SYSTEM_ERROR) - -class XcvrTableHelper: - def __init__(self, namespaces): - self.int_tbl, self.dom_tbl, self.dom_threshold_tbl, self.status_tbl, self.app_port_tbl, \ - self.cfg_port_tbl, self.state_port_tbl, self.pm_tbl = {}, {}, {}, {}, {}, {}, {}, {} - self.state_db = {} - self.cfg_db = {} - for namespace in namespaces: - asic_id = multi_asic.get_asic_index_from_namespace(namespace) - self.state_db[asic_id] = daemon_base.db_connect("STATE_DB", namespace) - self.int_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_INFO_TABLE) - self.dom_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_DOM_SENSOR_TABLE) - self.dom_threshold_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_DOM_THRESHOLD_TABLE) - self.status_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_STATUS_TABLE) - self.pm_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_PM_TABLE) - self.state_port_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], swsscommon.STATE_PORT_TABLE_NAME) - appl_db = daemon_base.db_connect("APPL_DB", namespace) - self.app_port_tbl[asic_id] = swsscommon.ProducerStateTable(appl_db, swsscommon.APP_PORT_TABLE_NAME) - self.cfg_db[asic_id] = daemon_base.db_connect("CONFIG_DB", namespace) - self.cfg_port_tbl[asic_id] = swsscommon.Table(self.cfg_db[asic_id], swsscommon.CFG_PORT_TABLE_NAME) - - def get_intf_tbl(self, asic_id): - return self.int_tbl[asic_id] - - def get_dom_tbl(self, asic_id): - return self.dom_tbl[asic_id] - - def get_dom_threshold_tbl(self, asic_id): - return self.dom_threshold_tbl[asic_id] - - def get_status_tbl(self, asic_id): - return self.status_tbl[asic_id] - - def get_pm_tbl(self, asic_id): - return self.pm_tbl[asic_id] - - def get_app_port_tbl(self, asic_id): - return self.app_port_tbl[asic_id] - - def get_state_db(self, asic_id): - return self.state_db[asic_id] - - def get_cfg_port_tbl(self, asic_id): - return self.cfg_port_tbl[asic_id] - - def get_state_port_tbl(self, asic_id): - return self.state_port_tbl[asic_id] - # # Main ========================================================================= # @@ -2435,9 +2398,10 @@ def get_state_port_tbl(self, asic_id): def main(): parser = argparse.ArgumentParser() parser.add_argument('--skip_cmis_mgr', action='store_true') + parser.add_argument('--enable_sff_mgr', action='store_true') args = parser.parse_args() - xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER, args.skip_cmis_mgr) + xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER, args.skip_cmis_mgr, args.enable_sff_mgr) xcvrd.run() diff --git a/sonic-xcvrd/xcvrd/xcvrd_utilities/xcvr_table_helper.py b/sonic-xcvrd/xcvrd/xcvrd_utilities/xcvr_table_helper.py new file mode 100644 index 000000000..2f372a76f --- /dev/null +++ b/sonic-xcvrd/xcvrd/xcvrd_utilities/xcvr_table_helper.py @@ -0,0 +1,64 @@ +try: + from sonic_py_common import daemon_base + from sonic_py_common import multi_asic + from swsscommon import swsscommon +except ImportError as e: + raise ImportError(str(e) + " - required module not found") + +TRANSCEIVER_INFO_TABLE = 'TRANSCEIVER_INFO' +TRANSCEIVER_FIRMWARE_INFO_TABLE = 'TRANSCEIVER_FIRMWARE_INFO' +TRANSCEIVER_DOM_SENSOR_TABLE = 'TRANSCEIVER_DOM_SENSOR' +TRANSCEIVER_DOM_THRESHOLD_TABLE = 'TRANSCEIVER_DOM_THRESHOLD' +TRANSCEIVER_STATUS_TABLE = 'TRANSCEIVER_STATUS' +TRANSCEIVER_PM_TABLE = 'TRANSCEIVER_PM' + +class XcvrTableHelper: + def __init__(self, namespaces): + self.int_tbl, self.dom_tbl, self.dom_threshold_tbl, self.status_tbl, self.app_port_tbl, \ + self.cfg_port_tbl, self.state_port_tbl, self.pm_tbl, self.firmware_info_tbl = {}, {}, {}, {}, {}, {}, {}, {}, {} + self.state_db = {} + self.cfg_db = {} + for namespace in namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + self.state_db[asic_id] = daemon_base.db_connect("STATE_DB", namespace) + self.int_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_INFO_TABLE) + self.dom_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_DOM_SENSOR_TABLE) + self.dom_threshold_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_DOM_THRESHOLD_TABLE) + self.status_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_STATUS_TABLE) + self.pm_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_PM_TABLE) + self.firmware_info_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_FIRMWARE_INFO_TABLE) + self.state_port_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], swsscommon.STATE_PORT_TABLE_NAME) + appl_db = daemon_base.db_connect("APPL_DB", namespace) + self.app_port_tbl[asic_id] = swsscommon.ProducerStateTable(appl_db, swsscommon.APP_PORT_TABLE_NAME) + self.cfg_db[asic_id] = daemon_base.db_connect("CONFIG_DB", namespace) + self.cfg_port_tbl[asic_id] = swsscommon.Table(self.cfg_db[asic_id], swsscommon.CFG_PORT_TABLE_NAME) + + def get_intf_tbl(self, asic_id): + return self.int_tbl[asic_id] + + def get_dom_tbl(self, asic_id): + return self.dom_tbl[asic_id] + + def get_dom_threshold_tbl(self, asic_id): + return self.dom_threshold_tbl[asic_id] + + def get_status_tbl(self, asic_id): + return self.status_tbl[asic_id] + + def get_pm_tbl(self, asic_id): + return self.pm_tbl[asic_id] + + def get_firmware_info_tbl(self, asic_id): + return self.firmware_info_tbl[asic_id] + + def get_app_port_tbl(self, asic_id): + return self.app_port_tbl[asic_id] + + def get_state_db(self, asic_id): + return self.state_db[asic_id] + + def get_cfg_port_tbl(self, asic_id): + return self.cfg_port_tbl[asic_id] + + def get_state_port_tbl(self, asic_id): + return self.state_port_tbl[asic_id]