diff --git a/crmsh/bootstrap.py b/crmsh/bootstrap.py index 01f8f3986..6b90e3386 100644 --- a/crmsh/bootstrap.py +++ b/crmsh/bootstrap.py @@ -2011,7 +2011,7 @@ def adjust_priority_fencing_delay(is_2node_wo_qdevice): out = sh.cluster_shell().get_stdout_or_raise_error("crm configure show related:stonith") if not out: return - pcmk_delay_max_v_list = re.findall("pcmk_delay_max=(\w+)", out) + pcmk_delay_max_v_list = re.findall(r"pcmk_delay_max=(\w+)", out) if pcmk_delay_max_v_list: max_value = max([int(utils.crm_msec(v)/1000) for v in pcmk_delay_max_v_list]) if pcmk_delay_max_v_list and is_2node_wo_qdevice: diff --git a/crmsh/log_patterns.py b/crmsh/log_patterns.py index ac62f1436..4e95427db 100644 --- a/crmsh/log_patterns.py +++ b/crmsh/log_patterns.py @@ -34,7 +34,7 @@ "lrmd.*WARN: .* %% .*timed out$", "crmd.*LRM operation %%_(?:start|stop|promote|demote|migrate)_.*confirmed=true", "crmd.*LRM operation %%_.*Timed Out", - "[(]%%[)]\[", + "[(]%%[)]\\[", ), ( # detail 1 "lrmd.*%% (?:probe|notify)", @@ -92,7 +92,7 @@ "crmd.*LRM operation %%_(?:start|stop|promote|demote|migrate)_.*confirmed=true", "crmd.*LRM operation %%_.*Timed Out", - "[(]%%[)]\[", + "[(]%%[)]\\[", ), ( # detail 1 "crmd.*Initiating.*%%_(?:monitor_0|notify)", @@ -151,7 +151,7 @@ "pacemaker-controld.*Result of .* operation for .* on .*: .*confirmed=true", "pacemaker-controld.*Result of .* operation for .* on .*: Timed Out", - "[(]%%[)]\[", + "[(]%%[)]\\[", ), ( # detail 1 "pacemaker-controld.*Initiating.*%%_(?:monitor_0|notify)", diff --git a/crmsh/report/collect.py b/crmsh/report/collect.py index c3c51d655..dfbbca3c6 100644 --- a/crmsh/report/collect.py +++ b/crmsh/report/collect.py @@ -152,7 +152,7 @@ def collect_dlm_info(context: core.Context) -> None: name_list = [] out_string = "##### NOTICE - Lockspace overview:\n" out_string += utils.get_cmd_output("dlm_tool ls") - name_list = re.findall("^name\s*(.*)$", out_string, re.MULTILINE) + name_list = re.findall(r"^name\s*(.*)$", out_string, re.MULTILINE) for name in name_list: out_string += f"\n\n## NOTICE - Lockspace {name}\n" diff --git a/crmsh/report/utils.py b/crmsh/report/utils.py index 6c439126d..cc27118c6 100644 --- a/crmsh/report/utils.py +++ b/crmsh/report/utils.py @@ -204,7 +204,7 @@ def get_distro_info() -> str: elif shutil.which("lsb_release"): logger.debug2("Using lsb_release to get distribution info") out = sh.LocalShell().get_stdout_or_raise_error("lsb_release -d") - res = re.search("Description:\s+(.*)", out) + res = re.search(r"Description:\s+(.*)", out) return res.group(1) if res else "Unknown" @@ -548,7 +548,7 @@ def _parse_sensitive_set(self) -> None: patt_set = set(self.context.sensitive_regex_list) # from /etc/crm/crm.conf if config.report.sanitize_rule: - patt_set |= set(re.split('\s*\|\s*|\s+', config.report.sanitize_rule.strip('|'))) + patt_set |= set(re.split(r'\s*\|\s*|\s+', config.report.sanitize_rule.strip('|'))) if patt_set: self.context.sanitize = True # Not set from -p option and crm.conf, use default diff --git a/crmsh/sbd.py b/crmsh/sbd.py index cc0df1d19..c005126c4 100644 --- a/crmsh/sbd.py +++ b/crmsh/sbd.py @@ -92,7 +92,7 @@ def get_sbd_msgwait(dev): """ out = sh.cluster_shell().get_stdout_or_raise_error("sbd -d {} dump".format(dev)) # Format like "Timeout (msgwait) : 30" - res = re.search("\(msgwait\)\s+:\s+(\d+)", out) + res = re.search(r"\(msgwait\)\s+:\s+(\d+)", out) if not res: raise ValueError("Cannot get sbd msgwait for {}".format(dev)) return int(res.group(1)) @@ -293,7 +293,7 @@ def _get_device_uuid(dev, node=None): Get UUID for specific device and node """ out = sh.cluster_shell().get_stdout_or_raise_error("sbd -d {} dump".format(dev), node) - res = re.search("UUID\s*:\s*(.*)\n", out) + res = re.search(r"UUID\s*:\s*(.*)\n", out) if not res: raise ValueError("Cannot find sbd device UUID for {}".format(dev)) return res.group(1) diff --git a/crmsh/ui_configure.py b/crmsh/ui_configure.py index 3f1a1f1ab..a0b02fd63 100644 --- a/crmsh/ui_configure.py +++ b/crmsh/ui_configure.py @@ -579,7 +579,7 @@ def do_show(self, context, *args): # 1. "pattern1 pattern2 pattern3" # 2. "pattern1|pattern2|pattern3" # regrex here also filter out possible spaces - osargs = re.split('\s*\|\s*|\s+', config.core.obscure_pattern.strip('|')) + osargs = re.split(r'\s*\|\s*|\s+', config.core.obscure_pattern.strip('|')) args = [arg for arg in args if not arg.startswith('obscure:')] cib_factory.ensure_cib_updated() with obscure(osargs): diff --git a/crmsh/ui_utils.py b/crmsh/ui_utils.py index 33f7c865c..9cdcfaffd 100644 --- a/crmsh/ui_utils.py +++ b/crmsh/ui_utils.py @@ -49,7 +49,7 @@ def manage_attr(cmd, attr_ext_commands, rsc, subcmd, attr, value): ''' try: attr_cmd = _get_attr_cmd(attr_ext_commands, subcmd) - if re.search("\w+=\w+", attr): + if re.search(r"\w+=\w+", attr): attr, value = attr.split('=') return _dispatch_attr_cmd(cmd, attr_cmd, rsc, subcmd, attr, value) except ValueError as msg: diff --git a/crmsh/utils.py b/crmsh/utils.py index 1c8ffb4b0..fc9affe9f 100644 --- a/crmsh/utils.py +++ b/crmsh/utils.py @@ -2035,7 +2035,7 @@ def detect_azure(): system_manufacturer = shell.get_stdout_or_raise_error("dmidecode -s system-manufacturer") chassis_asset_tag = shell.get_stdout_or_raise_error("dmidecode -s chassis-asset-tag") if "microsoft corporation" in system_manufacturer.lower() or \ - ''.join([chr(int(n)) for n in re.findall("\d\d", chassis_asset_tag)]) == "MSFT AZURE VM": + ''.join([chr(int(n)) for n in re.findall(r"\d\d", chassis_asset_tag)]) == "MSFT AZURE VM": # To detect azure we also need to make an API request result = _cloud_metadata_request( "http://169.254.169.254/metadata/instance/network/interface/0/ipv4/ipAddress/0/privateIpAddress?api-version=2017-08-01&format=text", @@ -2493,7 +2493,7 @@ def get_quorum_votes_dict(remote=None): Return a dictionary which contain expect votes and total votes """ out = sh.cluster_shell().get_stdout_or_raise_error("corosync-quorumtool -s", remote, success_exit_status={0, 2}) - return dict(re.findall("(Expected|Total) votes:\s+(\d+)", out)) + return dict(re.findall(r"(Expected|Total) votes:\s+(\d+)", out)) def check_all_nodes_reachable(): @@ -2501,7 +2501,7 @@ def check_all_nodes_reachable(): Check if all cluster nodes are reachable """ out = sh.cluster_shell().get_stdout_or_raise_error("crm_node -l") - for node in re.findall("\d+ (.*) \w+", out): + for node in re.findall(r"\d+ (.*) \w+", out): node_reachable_check(node) @@ -2582,7 +2582,7 @@ def get_all_vg_name(): Get all available VGs """ out = sh.cluster_shell().get_stdout_or_raise_error("vgdisplay") - return re.findall("VG Name\s+(.*)", out) + return re.findall(r"VG Name\s+(.*)", out) def get_pe_number(vg_id): @@ -2590,7 +2590,7 @@ def get_pe_number(vg_id): Get pe number """ output = sh.cluster_shell().get_stdout_or_raise_error("vgdisplay {}".format(vg_id)) - res = re.search("Total PE\s+(\d+)", output) + res = re.search(r"Total PE\s+(\d+)", output) if not res: raise ValueError("Cannot find PE on VG({})".format(vg_id)) return int(res.group(1)) @@ -2678,7 +2678,7 @@ def get_qdevice_sync_timeout(): Get qdevice sync_timeout """ out = sh.cluster_shell().get_stdout_or_raise_error("crm corosync status qdevice") - res = re.search("Sync HB interval:\s+(\d+)ms", out) + res = re.search(r"Sync HB interval:\s+(\d+)ms", out) if not res: raise ValueError("Cannot find qdevice sync timeout") return int(int(res.group(1))/1000) @@ -2713,7 +2713,7 @@ def get_dlm_option_dict(peer=None): Get dlm config option dictionary """ out = sh.cluster_shell().get_stdout_or_raise_error("dlm_tool dump_config", peer) - return dict(re.findall("(\w+)=(\w+)", out)) + return dict(re.findall(r"(\w+)=(\w+)", out)) def set_dlm_option(peer=None, **kargs): @@ -2749,7 +2749,7 @@ def is_resource_running(ra_type, peer=None): Check if the RA running """ out = sh.cluster_shell().get_stdout_or_raise_error("crm_mon -1rR", peer) - patt = f"\({ra_type}\):\s*Started" + patt = f"\\({ra_type}\\):\\s*Started" return re.search(patt, out) is not None @@ -2853,9 +2853,9 @@ def get_systemd_timeout_start_in_sec(time_res): Get the TimeoutStartUSec value in second unit The origin format was like: 1min 30s """ - res_seconds = re.search("(\d+)s", time_res) + res_seconds = re.search(r"(\d+)s", time_res) start_timeout = int(res_seconds.group(1)) if res_seconds else 0 - res_min = re.search("(\d+)min", time_res) + res_min = re.search(r"(\d+)min", time_res) start_timeout += 60 * int(res_min.group(1)) if res_min else 0 return start_timeout diff --git a/crmsh/watchdog.py b/crmsh/watchdog.py index 6d0d2cff4..5af77a2f1 100644 --- a/crmsh/watchdog.py +++ b/crmsh/watchdog.py @@ -10,7 +10,7 @@ class Watchdog(object): Class to find valid watchdog device name """ QUERY_CMD = "sudo sbd query-watchdog" - DEVICE_FIND_REGREX = "\[[0-9]+\] (/dev/.*)\n.*\nDriver: (.*)" + DEVICE_FIND_REGREX = "\\[[0-9]+\\] (/dev/.*)\n.*\nDriver: (.*)" def __init__(self, _input=None, remote_user=None, peer_host=None): """ @@ -61,7 +61,7 @@ def _driver_is_loaded(driver): Check if driver was already loaded """ _, out, _ = ShellUtils().get_stdout_stderr("lsmod") - return re.search("\n{}\s+".format(driver), out) + return re.search("\n{}\\s+".format(driver), out) def _set_watchdog_info(self): """ diff --git a/test/unittests/test_utils.py b/test/unittests/test_utils.py index 360c18b07..ac24e08ce 100644 --- a/test/unittests/test_utils.py +++ b/test/unittests/test_utils.py @@ -596,7 +596,7 @@ class TestInterfacesInfo(unittest.TestCase): Unitary tests for class utils.InterfacesInfo """ - network_output_error = """1: lo inet 127.0.0.1/8 scope host lo\ valid_lft forever preferred_lft forever + network_output_error = """1: lo inet 127.0.0.1/8 scope host lo\\ valid_lft forever preferred_lft forever 2: enp1s0 inet 192.168.122.241/24 brd 192.168.122.255 scope global enp1s0 61: tun0 inet 10.163.45.46 peer 10.163.45.45/32 scope global tun0""" @@ -633,7 +633,7 @@ def tearDownClass(cls): @mock.patch('crmsh.sh.ShellUtils.get_stdout_stderr') def test_get_interfaces_info_no_address(self, mock_run): - only_lo = "1: lo inet 127.0.0.1/8 scope host lo\ valid_lft forever preferred_lft forever" + only_lo = "1: lo inet 127.0.0.1/8 scope host lo\\ valid_lft forever preferred_lft forever" mock_run.return_value = (0, only_lo, None) with self.assertRaises(ValueError) as err: self.interfaces_info.get_interfaces_info()