Skip to content

Commit

Permalink
Merge pull request #15 from kpetremann/main
Browse files Browse the repository at this point in the history
feat: scripts for sonic202211
  • Loading branch information
kpetremann authored Oct 28, 2024
2 parents 28a7447 + 785d6b8 commit ce8dfa5
Show file tree
Hide file tree
Showing 2 changed files with 440 additions and 0 deletions.
251 changes: 251 additions & 0 deletions states/utilities/202211/criteo_fdbshow
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
#!/usr/bin/python3

"""
This file is a modified version fdbshow in https://github.com/sonic-net/sonic-utilities
It adds JSON output support.
Script to show MAC/FDB entries learnt in Hardware
usage: criteo_fdbshow [-p PORT] [-v VLAN]
optional arguments:
-p, --port FDB learned on specific port: Ethernet0
-v, --vlan FDB learned on specific Vlan: 1000
Example of the output:
admin@str~$ criteo_fdbshow
No. Vlan MacAddress Port Type
----- ------ ----------------- ---------- -------
1 1000 7C:FE:90:80:9F:05 Ethernet20 Dynamic
2 1000 7C:FE:90:80:9F:10 Ethernet40 Dynamic
3 1000 7C:FE:90:80:9F:01 Ethernet4 Dynamic
4 1000 7C:FE:90:80:9F:02 Ethernet8 Dynamic
Total number of entries 4
admin@str:~$ criteo_fdbshow -p Ethernet4
No. Vlan MacAddress Port Type
----- ------ ----------------- --------- -------
1 1000 7C:FE:90:80:9F:01 Ethernet4 Dynamic
Total number of entries 1
admin@str:~$ criteo_fdbshow -v 1001
1001 is not in list
Examples of the output:
$ ./criteo_fdbshow -j -v 1234
[{"MacAddress": "00:53:00:01:02:03", "No.": 1, "Vlan": 1234, "Type": "Dynamic", "Port": "Ethernet0"}, 000]
$ ./criteo_fdbshow -j -p Ethernet0
[{"MacAddress": "00:53:00:01:02:03", "No.": 1, "Vlan": 1234, "Type": "Dynamic", "Port": "Ethernet0"}, ...]
"""
import argparse
import json
import sys
import os
import re

# mock the redis for unit test purposes #
try: # pragma: no cover
if os.environ["UTILITIES_UNIT_TESTING"] == "1":
modules_path = os.path.join(os.path.dirname(__file__), "..")
test_path = os.path.join(modules_path, "tests")
sys.path.insert(0, modules_path)
sys.path.insert(0, test_path)
import mock_tables.dbconnector
mock_variants = { "1": 'asic_db',
"2": 'asic_db_def_vlan',
"3": 'asic_db_no_fdb',
"4": 'asic_db_no_bridge',
"5": 'asic_db_fetch_except',
"6": 'asic_db_no_static',
"7": 'asic_db_mac_case'}
mock_db_path = os.path.join(test_path, "fdbshow_input")
file_name = mock_variants[os.environ["FDBSHOW_MOCK"]]
jsonfile_asic = os.path.join(mock_db_path, file_name)
mock_tables.dbconnector.dedicated_dbs['ASIC_DB'] = jsonfile_asic
jsonfile_counters = os.path.join(mock_db_path, 'counters_db')
mock_tables.dbconnector.dedicated_dbs['COUNTERS_DB'] = jsonfile_counters
except KeyError: # pragma: no cover
pass

from swsssdk import port_util
from swsscommon.swsscommon import SonicV2Connector
from tabulate import tabulate

class FdbShow(object):

HEADER = ['No.', 'Vlan', 'MacAddress', 'Port', 'Type']

def __init__(self, json):
super(FdbShow,self).__init__()
self.db = SonicV2Connector(host="127.0.0.1")
self.if_name_map, \
self.if_oid_map = port_util.get_interface_oid_map(self.db)
self.if_br_oid_map = port_util.get_bridge_port_map(self.db)
self.json = json
self.fetch_fdb_data()
return

def print_error(self, string):
"""
Helper to display error in JSON or in string.
"""
if self.json:
print(json.dumps({"error": string}))
else:
print("Error: {}".format(string))

def fetch_fdb_data(self):
"""
Fetch FDB entries from ASIC DB.
FDB entries are sorted on "VlanID" and stored as a list of tuples
"""
self.db.connect(self.db.ASIC_DB)
self.bridge_mac_list = []

if not self.if_br_oid_map:
return

fdb_str = self.db.keys(self.db.ASIC_DB, "ASIC_STATE:SAI_OBJECT_TYPE_FDB_ENTRY:*")
if not fdb_str:
return

bvid_tlb = {}
oid_pfx = len("oid:0x")
for s in fdb_str:
fdb_entry = s
fdb = json.loads(fdb_entry .split(":", 2)[-1])
if not fdb:
continue

ent = self.db.get_all(self.db.ASIC_DB, s)
if not ent:
continue

br_port_id = ent["SAI_FDB_ENTRY_ATTR_BRIDGE_PORT_ID"][oid_pfx:]
ent_type = ent["SAI_FDB_ENTRY_ATTR_TYPE"]
fdb_type = ['Dynamic','Static'][ent_type == "SAI_FDB_ENTRY_TYPE_STATIC"]
if br_port_id not in self.if_br_oid_map:
continue
port_id = self.if_br_oid_map[br_port_id]
if port_id in self.if_oid_map:
if_name = self.if_oid_map[port_id]
else:
if_name = port_id
if 'vlan' in fdb:
vlan_id = fdb["vlan"]
else:
if 'bvid' not in fdb:
# no possibility to find the Vlan id. skip the FDB entry
continue
bvid = fdb["bvid"]
if bvid in bvid_tlb:
vlan_id = bvid_tlb[bvid]
else:
try:
vlan_id = port_util.get_vlan_id_from_bvid(self.db, bvid)
bvid_tlb[bvid] = vlan_id
if vlan_id is None:
# the situation could be faced if the system has an FDB entries,
# which are linked to default Vlan(caused by untagged traffic)
continue
except Exception:
vlan_id = bvid
self.print_error("Failed to get Vlan id for bvid {}\n".format(bvid))

if vlan_id is not None:
self.bridge_mac_list.append((int(vlan_id),) + (fdb["mac"],) + (if_name,) + (fdb_type,))

self.bridge_mac_list.sort(key = lambda x: x[0])
return


def display(self, vlan, port, address, entry_type, count):
"""
Display the FDB entries for specified vlan/port.
@todo: - PortChannel support
"""
output = []

if vlan is not None:
vlan_val = int(vlan)

if address is not None:
address = address.upper()

if entry_type is not None:
entry_type = entry_type.capitalize()

self.bridge_mac_list = [fdb for fdb in self.bridge_mac_list
if (vlan is None or fdb[0] == vlan_val) and
(port is None or fdb[2] == port) and
(address is None or fdb[1] == address) and
(entry_type is None or fdb[3] == entry_type)]

if not count:
fdb_index = 1
for fdb in self.bridge_mac_list:
entry = [fdb_index, fdb[0], fdb[1], fdb[2], fdb[3]]
if self.json:
entry = dict(zip(self.HEADER, entry))
output.append(entry)
fdb_index += 1

if not self.json:
print(tabulate(output, self.HEADER))
else:
print(json.dumps(output))

if not self.json:
print("Total number of entries {0}".format(len(self.bridge_mac_list)))

def validate_params(self, vlan, port, address, entry_type):
if vlan is not None:
if not vlan.isnumeric():
self.print_error("Invalid vlan id {0}".format(vlan))
return False

vlan_val = int(vlan)
if (vlan_val not in range(1,4096)):
self.print_error("Invalid vlan id {0}".format(vlan))
return False

if port is not None and port not in self.if_name_map:
self.print_error("Invalid port {0}".format(port))
return False

if address is not None:
mac_addr_pattern ="^([0-9A-Fa-f]{2}[:]){5}([0-9A-Fa-f]{2})$"
if not re.match(mac_addr_pattern, address):
self.print_error("Invalid mac address {0}".format(address))
return False

if entry_type is not None and entry_type.capitalize() not in ["Static", "Dynamic"]:
self.print_error("Invalid type {0}". format(entry_type))
return False

return True

def main():

parser = argparse.ArgumentParser(description='Display ASIC FDB entries',
formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument('-p', '--port', type=str, help='FDB learned on specific port: Ethernet0', default=None)
parser.add_argument('-v', '--vlan', type=str, help='FDB learned on specific Vlan: 1001', default=None)
parser.add_argument('-a', '--address', type=str, help='FDB display based on specific mac address', default=None)
parser.add_argument('-t', '--type', type=str, help='FDB display of specific type of mac address', default=None)
parser.add_argument('-c', '--count', action='store_true', help='FDB display count of mac address')
parser.add_argument('-j', '--json', action='store_true', help='JSON output')
args = parser.parse_args()

try:
fdb = FdbShow(args.json)
if not fdb.validate_params(args.vlan, args.port, args.address, args.type):
sys.exit(1)

fdb.display(args.vlan, args.port, args.address, args.type, args.count)
except Exception as e:
print(str(e))
sys.exit(1)

if __name__ == "__main__": # pragma: no cover
main()
Loading

0 comments on commit ce8dfa5

Please sign in to comment.