Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Python 3.12: SyntaxWarning: invalid escape sequence #1601

Merged
merged 1 commit into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crmsh/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -2011,7 +2011,7 @@ def adjust_priority_fencing_delay(is_2node_wo_qdevice):
out = sh.cluster_shell().get_stdout_or_raise_error("crm configure show related:stonith")
if not out:
return
pcmk_delay_max_v_list = re.findall("pcmk_delay_max=(\w+)", out)
pcmk_delay_max_v_list = re.findall(r"pcmk_delay_max=(\w+)", out)
if pcmk_delay_max_v_list:
max_value = max([int(utils.crm_msec(v)/1000) for v in pcmk_delay_max_v_list])
if pcmk_delay_max_v_list and is_2node_wo_qdevice:
Expand Down
6 changes: 3 additions & 3 deletions crmsh/log_patterns.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"lrmd.*WARN: .* %% .*timed out$",
"crmd.*LRM operation %%_(?:start|stop|promote|demote|migrate)_.*confirmed=true",
"crmd.*LRM operation %%_.*Timed Out",
"[(]%%[)]\[",
"[(]%%[)]\\[",
),
( # detail 1
"lrmd.*%% (?:probe|notify)",
Expand Down Expand Up @@ -92,7 +92,7 @@

"crmd.*LRM operation %%_(?:start|stop|promote|demote|migrate)_.*confirmed=true",
"crmd.*LRM operation %%_.*Timed Out",
"[(]%%[)]\[",
"[(]%%[)]\\[",
),
( # detail 1
"crmd.*Initiating.*%%_(?:monitor_0|notify)",
Expand Down Expand Up @@ -151,7 +151,7 @@

"pacemaker-controld.*Result of .* operation for .* on .*: .*confirmed=true",
"pacemaker-controld.*Result of .* operation for .* on .*: Timed Out",
"[(]%%[)]\[",
"[(]%%[)]\\[",
),
( # detail 1
"pacemaker-controld.*Initiating.*%%_(?:monitor_0|notify)",
Expand Down
2 changes: 1 addition & 1 deletion crmsh/report/collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def collect_dlm_info(context: core.Context) -> None:
name_list = []
out_string = "##### NOTICE - Lockspace overview:\n"
out_string += utils.get_cmd_output("dlm_tool ls")
name_list = re.findall("^name\s*(.*)$", out_string, re.MULTILINE)
name_list = re.findall(r"^name\s*(.*)$", out_string, re.MULTILINE)

for name in name_list:
out_string += f"\n\n## NOTICE - Lockspace {name}\n"
Expand Down
4 changes: 2 additions & 2 deletions crmsh/report/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def get_distro_info() -> str:
elif shutil.which("lsb_release"):
logger.debug2("Using lsb_release to get distribution info")
out = sh.LocalShell().get_stdout_or_raise_error("lsb_release -d")
res = re.search("Description:\s+(.*)", out)
res = re.search(r"Description:\s+(.*)", out)
return res.group(1) if res else "Unknown"


Expand Down Expand Up @@ -548,7 +548,7 @@ def _parse_sensitive_set(self) -> None:
patt_set = set(self.context.sensitive_regex_list)
# from /etc/crm/crm.conf
if config.report.sanitize_rule:
patt_set |= set(re.split('\s*\|\s*|\s+', config.report.sanitize_rule.strip('|')))
patt_set |= set(re.split(r'\s*\|\s*|\s+', config.report.sanitize_rule.strip('|')))
if patt_set:
self.context.sanitize = True
# Not set from -p option and crm.conf, use default
Expand Down
4 changes: 2 additions & 2 deletions crmsh/sbd.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def get_sbd_msgwait(dev):
"""
out = sh.cluster_shell().get_stdout_or_raise_error("sbd -d {} dump".format(dev))
# Format like "Timeout (msgwait) : 30"
res = re.search("\(msgwait\)\s+:\s+(\d+)", out)
res = re.search(r"\(msgwait\)\s+:\s+(\d+)", out)
if not res:
raise ValueError("Cannot get sbd msgwait for {}".format(dev))
return int(res.group(1))
Expand Down Expand Up @@ -293,7 +293,7 @@ def _get_device_uuid(dev, node=None):
Get UUID for specific device and node
"""
out = sh.cluster_shell().get_stdout_or_raise_error("sbd -d {} dump".format(dev), node)
res = re.search("UUID\s*:\s*(.*)\n", out)
res = re.search(r"UUID\s*:\s*(.*)\n", out)
if not res:
raise ValueError("Cannot find sbd device UUID for {}".format(dev))
return res.group(1)
Expand Down
2 changes: 1 addition & 1 deletion crmsh/ui_configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ def do_show(self, context, *args):
# 1. "pattern1 pattern2 pattern3"
# 2. "pattern1|pattern2|pattern3"
# regrex here also filter out possible spaces
osargs = re.split('\s*\|\s*|\s+', config.core.obscure_pattern.strip('|'))
osargs = re.split(r'\s*\|\s*|\s+', config.core.obscure_pattern.strip('|'))
args = [arg for arg in args if not arg.startswith('obscure:')]
cib_factory.ensure_cib_updated()
with obscure(osargs):
Expand Down
2 changes: 1 addition & 1 deletion crmsh/ui_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def manage_attr(cmd, attr_ext_commands, rsc, subcmd, attr, value):
'''
try:
attr_cmd = _get_attr_cmd(attr_ext_commands, subcmd)
if re.search("\w+=\w+", attr):
if re.search(r"\w+=\w+", attr):
attr, value = attr.split('=')
return _dispatch_attr_cmd(cmd, attr_cmd, rsc, subcmd, attr, value)
except ValueError as msg:
Expand Down
20 changes: 10 additions & 10 deletions crmsh/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2035,7 +2035,7 @@
system_manufacturer = shell.get_stdout_or_raise_error("dmidecode -s system-manufacturer")
chassis_asset_tag = shell.get_stdout_or_raise_error("dmidecode -s chassis-asset-tag")
if "microsoft corporation" in system_manufacturer.lower() or \
''.join([chr(int(n)) for n in re.findall("\d\d", chassis_asset_tag)]) == "MSFT AZURE VM":
''.join([chr(int(n)) for n in re.findall(r"\d\d", chassis_asset_tag)]) == "MSFT AZURE VM":
# To detect azure we also need to make an API request
result = _cloud_metadata_request(
"http://169.254.169.254/metadata/instance/network/interface/0/ipv4/ipAddress/0/privateIpAddress?api-version=2017-08-01&format=text",
Expand Down Expand Up @@ -2493,15 +2493,15 @@
Return a dictionary which contain expect votes and total votes
"""
out = sh.cluster_shell().get_stdout_or_raise_error("corosync-quorumtool -s", remote, success_exit_status={0, 2})
return dict(re.findall("(Expected|Total) votes:\s+(\d+)", out))
return dict(re.findall(r"(Expected|Total) votes:\s+(\d+)", out))


def check_all_nodes_reachable():
"""
Check if all cluster nodes are reachable
"""
out = sh.cluster_shell().get_stdout_or_raise_error("crm_node -l")
for node in re.findall("\d+ (.*) \w+", out):
for node in re.findall(r"\d+ (.*) \w+", out):
node_reachable_check(node)


Expand Down Expand Up @@ -2582,15 +2582,15 @@
Get all available VGs
"""
out = sh.cluster_shell().get_stdout_or_raise_error("vgdisplay")
return re.findall("VG Name\s+(.*)", out)
return re.findall(r"VG Name\s+(.*)", out)


def get_pe_number(vg_id):
"""
Get pe number
"""
output = sh.cluster_shell().get_stdout_or_raise_error("vgdisplay {}".format(vg_id))
res = re.search("Total PE\s+(\d+)", output)
res = re.search(r"Total PE\s+(\d+)", output)
if not res:
raise ValueError("Cannot find PE on VG({})".format(vg_id))
return int(res.group(1))
Expand Down Expand Up @@ -2678,7 +2678,7 @@
Get qdevice sync_timeout
"""
out = sh.cluster_shell().get_stdout_or_raise_error("crm corosync status qdevice")
res = re.search("Sync HB interval:\s+(\d+)ms", out)
res = re.search(r"Sync HB interval:\s+(\d+)ms", out)

Check warning on line 2681 in crmsh/utils.py

View check run for this annotation

Codecov / codecov/patch

crmsh/utils.py#L2681

Added line #L2681 was not covered by tests
if not res:
raise ValueError("Cannot find qdevice sync timeout")
return int(int(res.group(1))/1000)
Expand Down Expand Up @@ -2713,7 +2713,7 @@
Get dlm config option dictionary
"""
out = sh.cluster_shell().get_stdout_or_raise_error("dlm_tool dump_config", peer)
return dict(re.findall("(\w+)=(\w+)", out))
return dict(re.findall(r"(\w+)=(\w+)", out))


def set_dlm_option(peer=None, **kargs):
Expand Down Expand Up @@ -2749,7 +2749,7 @@
Check if the RA running
"""
out = sh.cluster_shell().get_stdout_or_raise_error("crm_mon -1rR", peer)
patt = f"\({ra_type}\):\s*Started"
patt = f"\\({ra_type}\\):\\s*Started"
return re.search(patt, out) is not None


Expand Down Expand Up @@ -2853,9 +2853,9 @@
Get the TimeoutStartUSec value in second unit
The origin format was like: 1min 30s
"""
res_seconds = re.search("(\d+)s", time_res)
res_seconds = re.search(r"(\d+)s", time_res)
start_timeout = int(res_seconds.group(1)) if res_seconds else 0
res_min = re.search("(\d+)min", time_res)
res_min = re.search(r"(\d+)min", time_res)
start_timeout += 60 * int(res_min.group(1)) if res_min else 0
return start_timeout

Expand Down
4 changes: 2 additions & 2 deletions crmsh/watchdog.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class Watchdog(object):
Class to find valid watchdog device name
"""
QUERY_CMD = "sudo sbd query-watchdog"
DEVICE_FIND_REGREX = "\[[0-9]+\] (/dev/.*)\n.*\nDriver: (.*)"
DEVICE_FIND_REGREX = "\\[[0-9]+\\] (/dev/.*)\n.*\nDriver: (.*)"

def __init__(self, _input=None, remote_user=None, peer_host=None):
"""
Expand Down Expand Up @@ -61,7 +61,7 @@ def _driver_is_loaded(driver):
Check if driver was already loaded
"""
_, out, _ = ShellUtils().get_stdout_stderr("lsmod")
return re.search("\n{}\s+".format(driver), out)
return re.search("\n{}\\s+".format(driver), out)

def _set_watchdog_info(self):
"""
Expand Down
4 changes: 2 additions & 2 deletions test/unittests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ class TestInterfacesInfo(unittest.TestCase):
Unitary tests for class utils.InterfacesInfo
"""

network_output_error = """1: lo inet 127.0.0.1/8 scope host lo\ valid_lft forever preferred_lft forever
network_output_error = """1: lo inet 127.0.0.1/8 scope host lo\\ valid_lft forever preferred_lft forever
2: enp1s0 inet 192.168.122.241/24 brd 192.168.122.255 scope global enp1s0
61: tun0 inet 10.163.45.46 peer 10.163.45.45/32 scope global tun0"""

Expand Down Expand Up @@ -633,7 +633,7 @@ def tearDownClass(cls):

@mock.patch('crmsh.sh.ShellUtils.get_stdout_stderr')
def test_get_interfaces_info_no_address(self, mock_run):
only_lo = "1: lo inet 127.0.0.1/8 scope host lo\ valid_lft forever preferred_lft forever"
only_lo = "1: lo inet 127.0.0.1/8 scope host lo\\ valid_lft forever preferred_lft forever"
mock_run.return_value = (0, only_lo, None)
with self.assertRaises(ValueError) as err:
self.interfaces_info.get_interfaces_info()
Expand Down