From f44c48b39b2d7d40e2ea341fba4c7fbf8ae322d9 Mon Sep 17 00:00:00 2001 From: julpark Date: Thu, 10 Oct 2024 12:30:21 -0700 Subject: [PATCH] Fixing kernel verification Signed-off-by: julpark --- ceph/ceph.py | 245 ++++++------------ suites/pacific/cephfs/tier-0_fs_kernel.yaml | 8 +- suites/quincy/cephfs/tier-0_fs_kernel.yaml | 6 + suites/reef/cephfs/tier-0_fs_kernel.yaml | 6 + .../fsstress_kernel_verification.py | 160 ++++++++++++ tests/cephfs/xfs_test.py | 3 +- 6 files changed, 260 insertions(+), 168 deletions(-) create mode 100644 tests/cephfs/cephfs_bugs/fsstress_kernel_verification.py diff --git a/ceph/ceph.py b/ceph/ceph.py index 2dd6e2f69d..c84394898a 100644 --- a/ceph/ceph.py +++ b/ceph/ceph.py @@ -1,5 +1,4 @@ """This module implements the required foundation data structures for testing.""" - import datetime import json import pickle @@ -12,6 +11,7 @@ import paramiko import requests import yaml +from paramiko.ssh_exception import SSHException from ceph.parallel import parallel from cli.ceph.ceph import Ceph as CephCli @@ -299,11 +299,9 @@ def generate_ansible_inventory( if "pool" in node.hostname: logger.info(node.hostname) devices = node.create_lvm( - ( - devices[0:1] - if not device_to_add - else device_to_add.split() - ), + devices[0:1] + if not device_to_add + else device_to_add.split(), num=random.randint(1, 10) if device_to_add else None, check_lvm=False if device_to_add else True, ) @@ -1144,51 +1142,6 @@ class CommandFailed(Exception): pass -class TimeoutException(Exception): - """Operation timeout exception.""" - - pass - - -def check_timeout(end_time, timeout): - """Raises an exception when current time is greater""" - if timeout and datetime.datetime.now() >= end_time: - raise TimeoutException("Command exceed the allocated execution time.") - - -def read_stream(channel, end_time, timeout, stderr=False, log=True): - """Reads the data from the given channel. - - Args: - channel: the paramiko.Channel object to be used for reading. - end_time: maximum allocated time for reading from the channel. - timeout: Flag to check if timeout must be enforced. - stderr: read from the stderr stream. Default is False. - log: log the output. Default is True. - - Returns: - a string with the data read from the channel. - - Raises: - TimeoutException: if reading from the channel exceeds the allocated time. - """ - _output = "" - _stream = channel.recv_stderr if stderr else channel.recv - _data = _stream(2048) - - while _data: - _output += _data.decode("utf-8") - if log: - for _ln in _data.splitlines(): - _log = logger.error if stderr else logger.debug - _log(_ln.decode("utf-8")) - - check_timeout(end_time, timeout) - _data = _stream(2048) - - return _output - - class RolesContainer(object): """ Container for single or multiple node roles. @@ -1314,14 +1267,14 @@ def __connect(self): self.__outage_start_time = None return except Exception as e: - logger.warning(f"Error in connecting to {self.ip_address}: \n{e}") + logger.warning(f"Connection outage to {self.ip_address}: \n{e}") if not self.__outage_start_time: self.__outage_start_time = datetime.datetime.now() logger.debug("Retrying connection in 10 seconds") sleep(10) - raise AssertionError(f"Unable to establish a connection with {self.ip_address}") + raise AssertionError(f"Unable to establish connection with {self.ip_address}") @property def transport(self): @@ -1552,149 +1505,109 @@ def long_running(self, **kw): Returns: ec: exit status """ - cmd = kw["cmd"] - _end_time = None - _verbose = kw.get("verbose", False) ssh = self.rssh if kw.get("sudo") else self.ssh - long_running = kw.get("long_running", False) - if "timeout" in kw: - timeout = None if kw["timeout"] == "notimeout" else kw["timeout"] - else: - # Set defaults if long_running then 1h else 5m - timeout = 3600 if kw.get("long_running", False) in kw else 300 + cmd = kw["cmd"] + timeout = None if kw.get("timeout") == "notimeout" else kw.get("timeout", 3600) + + logger.info( + f"long running command on {self.ip_address} -- {cmd} with {timeout} seconds" + ) try: - channel = ssh().get_transport().open_session(timeout=timeout) + channel = ssh().get_transport().open_session() channel.settimeout(timeout) - logger.info(f"Execute {cmd} on {self.ip_address}") - _exec_start_time = datetime.datetime.now() + # A mismatch between stdout and stderr streams have been observed hence + # combining the streams and logging is set to debug level only. + channel.set_combine_stderr(True) channel.exec_command(cmd) - if timeout: - _end_time = datetime.datetime.now() + datetime.timedelta( - seconds=timeout - ) - - _out = "" - _err = "" while not channel.exit_status_ready(): # Prevent high resource consumption - sleep(1) + sleep(2) - # Check the streams for data and log in debug mode only if it - # is a long running command else don't log. - # Fixme: logging must happen in debug irrespective of type. - _verbose = True if long_running else _verbose if channel.recv_ready(): - _out += read_stream(channel, _end_time, timeout, log=_verbose) - - if channel.recv_stderr_ready(): - _err += read_stream( - channel, _end_time, timeout, stderr=True, log=_verbose - ) - - check_timeout(_end_time, timeout) + data = channel.recv(1024) + while data: + for line in data.splitlines(): + logger.debug(line) - _time = (datetime.datetime.now() - _exec_start_time).total_seconds() - logger.info( - f"Execution of {cmd} on {self.ip_address} took {_time} seconds." - ) + data = channel.recv(1024) - # Check for data residues in the channel streams. This is required for the following reasons - # - exit_ready and first line is blank causing data to be None - # - race condition between data read and exit ready - try: - _new_timeout = datetime.datetime.now() + datetime.timedelta(seconds=10) - _out += read_stream(channel, _new_timeout, timeout=True) - _err += read_stream(channel, _new_timeout, timeout=True, stderr=True) - except CommandFailed: - logger.debug("Encountered a timeout during read post execution.") - except BaseException as be: - logger.debug( - f"Encountered an unknown exception during last read.\n {be}" - ) - - _exit = channel.recv_exit_status() - return _out, _err, _exit, _time + logger.info(f"Command completed on {datetime.datetime.now()}") + return channel.recv_exit_status() except socket.timeout as terr: - logger.error(f"{cmd} failed to execute within {timeout} seconds.") + logger.error(f"Command failed to execute within {timeout} seconds.") raise SocketTimeoutException(terr) - except TimeoutException as tex: - channel.close() - logger.error(f"{cmd} failed to execute within {timeout}s.") - raise CommandFailed(tex) except BaseException as be: # noqa logger.exception(be) raise CommandFailed(be) def exec_command(self, **kw): - """Execute the given command on the remote host. - - Args: - cmd: The command that needs to be executed on the remote host. - long_running: Bool flag to indicate if the command is long running. - check_ec: Bool flag to indicate if the command should check for error code. - timeout: Max time to wait for command to complete. Default is 600 seconds. - pretty_print: Bool flag to indicate if the output should be pretty printed. - verbose: Bool flag to indicate if the command output should be printed. + """execute a command. - Returns: - Exit code when long_running is used - Tuple having stdout, stderr data output when long_running is not used - Tupe having stdout, stderr, exit code, duration when verbose is enabled + Attributes: + kw (Dict): execute command configuration + check_ec: False will run the command and not wait for exit code - Raises: - CommandFailed: when the exit code is non-zero and check_ec is enabled. - TimeoutError: when the command times out. + Example:: - Examples: - self.exec_cmd(cmd='uptime') - or + eg: self.exec_cmd(cmd='uptime') + or self.exec_cmd(cmd='background_cmd', check_ec=False) - """ - if self.run_once: - self.ssh_transport().set_keepalive(15) - self.rssh_transport().set_keepalive(15) - cmd = kw["cmd"] - _out, _err, _exit, _time = self.long_running(**kw) - self.exit_status = _exit - - if kw.get("pretty_print"): - msg = f"\nCommand: {cmd}" - msg += f"\nDuration: {_time} seconds" - msg += f"\nExit Code: {_exit}" - - if _out: - msg += f"\nStdout: {_out}" + kw: + check_ec: False will run the command and not wait for exit code + """ + if kw.get("long_running"): + return self.long_running(**kw) - if _err: - msg += f"\nStderr: {_err}" + timeout = kw["timeout"] if kw.get("timeout") else 600 + ssh = self.rssh() if kw.get("sudo") else self.ssh() - logger.info(msg) + logger.info( + f"Running command {kw['cmd']} on {self.ip_address} timeout {timeout}" + ) - if "verbose" in kw: - return _out, _err, _exit, _time + if self.run_once: + self.ssh_transport().set_keepalive(15) + self.rssh_transport().set_keepalive(15) - # Historically, we are only providing command exit code for long - # running commands. - # Fixme: Ensure the method returns a tuple of - # (stdout, stderr, exit_code, time_taken) - if kw.get("long_running", False): - if kw.get("check_ec", False) and _exit != 0: + stdout = str() + stderr = str() + _stdout = None + _stderr = None + try: + _, _stdout, _stderr = ssh.exec_command(kw["cmd"], timeout=timeout) + for line in _stdout: + if line: + stdout += line + for line in _stderr: + if line: + stderr += line + except socket.timeout as sock_err: + logger.error("socket.timeout doesn't give an error message") + ssh.close() + raise SocketTimeoutException(sock_err) + except SSHException as e: + logger.error("SSHException during cmd: %s", str(e)) + + exit_status = None + if _stdout is not None: + exit_status = _stdout.channel.recv_exit_status() + + self.exit_status = exit_status + if kw.get("check_ec", True): + if exit_status == 0: + logger.info("Command completed successfully") + else: + logger.error(f"Error {exit_status} during cmd, timeout {timeout}") + logger.error(stderr) raise CommandFailed( - f"{cmd} returned {_err} and code {_exit} on {self.ip_address}" + f"{kw['cmd']} Error: {str(stderr)} {str(self.ip_address)}" ) - return _exit - - if kw.get("check_ec", True) and _exit != 0: - raise CommandFailed( - f"{cmd} returned {_err} and code {_exit} on {self.ip_address}" - ) - - return _out, _err + return stdout, stderr def remote_file(self, **kw): """Return contents of the remote file.""" @@ -1985,7 +1898,7 @@ def setup_deb_repos(self, deb_repo): "https://www.redhat.com/security/897da07a.txt", "https://www.redhat.com/security/f21541eb.txt", "https://prodsec.redhat.com/keys/00da75f2.txt", - "http://file.corp.redhat.com/~kdreyer/keys/00da75f2.txt", + "http://file.rdu.redhat.com/~kdreyer/keys/00da75f2.txt", "https://www.redhat.com/security/data/fd431d51.txt", ] @@ -2766,4 +2679,4 @@ def create_ceph_object(self, role): if role in self.DEMON_ROLES: return CephDemon(role, self.node) if role != "pool": - return CephObject(role, self.node) + return CephObject(role, self.node) \ No newline at end of file diff --git a/suites/pacific/cephfs/tier-0_fs_kernel.yaml b/suites/pacific/cephfs/tier-0_fs_kernel.yaml index 38e74d77e9..8cea009ef5 100644 --- a/suites/pacific/cephfs/tier-0_fs_kernel.yaml +++ b/suites/pacific/cephfs/tier-0_fs_kernel.yaml @@ -133,10 +133,16 @@ tests: abort-on-fail: false - test: name: Run fsstress on kernel and fuse mounts - module: cephfs_bugs.test_fsstress_on_kernel_and_fuse.py + module: cephfs_bugs.fsstress_kernel_verification.py polarion-id: CEPH-83575623 desc: Run fsstress on kernel and fuse mounts abort-on-fail: false + - test: + name: Run xfs test on kernel + module: xfs_test.py + polarion-id: CEPH-83575623 + desc: Run xfs test on kernel + abort-on-fail: false - test: desc: "generate sosreport" diff --git a/suites/quincy/cephfs/tier-0_fs_kernel.yaml b/suites/quincy/cephfs/tier-0_fs_kernel.yaml index 38e74d77e9..47919aa2a4 100644 --- a/suites/quincy/cephfs/tier-0_fs_kernel.yaml +++ b/suites/quincy/cephfs/tier-0_fs_kernel.yaml @@ -137,6 +137,12 @@ tests: polarion-id: CEPH-83575623 desc: Run fsstress on kernel and fuse mounts abort-on-fail: false + - test: + name: Run xfs test on kernel + module: xfs_test.py + polarion-id: CEPH-83575623 + desc: Run xfs test on kernel + abort-on-fail: false - test: desc: "generate sosreport" diff --git a/suites/reef/cephfs/tier-0_fs_kernel.yaml b/suites/reef/cephfs/tier-0_fs_kernel.yaml index 96d5e7332e..ed45be4245 100644 --- a/suites/reef/cephfs/tier-0_fs_kernel.yaml +++ b/suites/reef/cephfs/tier-0_fs_kernel.yaml @@ -138,6 +138,12 @@ tests: polarion-id: CEPH-83575623 desc: Run fsstress on kernel and fuse mounts abort-on-fail: false + - test: + name: Run xfs test on kernel + module: xfs_test.py + polarion-id: CEPH-83575623 + desc: Run xfs test on kernel + abort-on-fail: false - test: desc: "generate sosreport" diff --git a/tests/cephfs/cephfs_bugs/fsstress_kernel_verification.py b/tests/cephfs/cephfs_bugs/fsstress_kernel_verification.py new file mode 100644 index 0000000000..bbbd53a842 --- /dev/null +++ b/tests/cephfs/cephfs_bugs/fsstress_kernel_verification.py @@ -0,0 +1,160 @@ +import random +import string +import time +import traceback + +from tests.cephfs.cephfs_utilsV1 import FsUtils as FsUtilsV1 +from utility.log import Log + +log = Log(__name__) + + +def run(ceph_cluster, **kw): + """ + CEPH-83575623 - Run fsstress.sh repeatedly on fuse and kernel clients for a while and validate no crash is seen. + + Procedure : + 1. Create 2 Subvolumes - each mounted on kernel and fuse. + 2. Run fsstress for few iterations(10). + 3. Check for Ceph Health Status to validate if there is no Health Warn or Health Error status. + + Clean-up: + 1. Remove files from mountpoint, Unmount subvolumes. + 3. ceph fs subvolume rm [--group_name ] + 4. ceph fs subvolumegroup rm + """ + + try: + tc = "CEPH-83575623" + log.info(f"Running cephfs {tc} test case") + test_data = kw.get("test_data") + fs_util_v1 = FsUtilsV1(ceph_cluster, test_data=test_data) + erasure = ( + FsUtilsV1.get_custom_config_value(test_data, "erasure") + if test_data + else False + ) + config = kw.get("config") + build = config.get("build", config.get("rhbuild")) + clients = ceph_cluster.get_ceph_objects("client") + log.info("checking Pre-requisites") + if len(clients) < 1: + log.error( + f"This test requires minimum 1 client nodes.This has only {len(clients)} clients" + ) + return 1 + log.info("Installing required packages for make") + clients[0].exec_command( + sudo=True, cmd='dnf groupinstall "Development Tools" -y' + ) + fs_util_v1.prepare_clients(clients, build) + fs_util_v1.auth_list(clients) + default_fs = "cephfs" if not erasure else "cephfs-ec" + # if "cephfs" does not exsit, create it + fs_details = fs_util_v1.get_fs_info(clients[0], default_fs) + if not fs_details: + fs_util_v1.create_fs(clients[0], default_fs) + subvolume_group_name = "subvol_group1" + subvolume_name = "subvol" + subvolumegroup = { + "vol_name": default_fs, + "group_name": subvolume_group_name, + } + fs_util_v1.create_subvolumegroup(clients[0], **subvolumegroup) + subvolume_list = [ + { + "vol_name": default_fs, + "subvol_name": f"{subvolume_name}_1", + "group_name": subvolume_group_name, + }, + { + "vol_name": default_fs, + "subvol_name": f"{subvolume_name}_2", + "group_name": subvolume_group_name, + }, + ] + for subvolume in subvolume_list: + fs_util_v1.create_subvolume(clients[0], **subvolume) + + mounting_dir = "".join( + random.choice(string.ascii_lowercase + string.digits) + for _ in list(range(10)) + ) + log.info("Mount 1 subvolume on kernel and 1 subvolume on Fuse → Client1") + + kernel_mounting_dir_1 = f"/mnt/cephfs_kernel{mounting_dir}_1/" + mon_node_ips = fs_util_v1.get_mon_node_ips() + subvol_path_kernel, rc = clients[0].exec_command( + sudo=True, + cmd=f"ceph fs subvolume getpath {default_fs} {subvolume_name}_1 {subvolume_group_name}", + ) + fs_util_v1.kernel_mount( + [clients[0]], + kernel_mounting_dir_1, + ",".join(mon_node_ips), + sub_dir=f"{subvol_path_kernel.strip()}", + extra_params=f",fs={default_fs}", + ) + + fuse_mounting_dir_1 = f"/mnt/cephfs_fuse{mounting_dir}_1/" + subvol_path_fuse, rc = clients[0].exec_command( + sudo=True, + cmd=f"ceph fs subvolume getpath {default_fs} {subvolume_name}_2 {subvolume_group_name}", + ) + fs_util_v1.fuse_mount( + [clients[0]], + fuse_mounting_dir_1, + extra_params=f" -r {subvol_path_fuse.strip()} --client_fs {default_fs}", + ) + + log.info("Run fsstress for few iterations on fuse and kernel mounts.") + fsstress_url = "https://raw.githubusercontent.com/ceph/ceph/main/qa/workunits/suites/fsstress.sh" + + def run_commands(client, commands): + for command in commands: + client.exec_command(sudo=True, cmd=command) + + directories = [kernel_mounting_dir_1, fuse_mounting_dir_1] + for directory in directories: + commands = [ + f"mkdir -p {directory}fsstress/", + f"cd {directory}fsstress/ && wget {fsstress_url}", + f"chmod 777 {directory}fsstress/fsstress.sh", + ] + run_commands(clients[0], commands) + iterations = 50 + log.info( + f"run fsstress on kernel and fuse in parallel for {iterations} iterations" + ) + for _ in range(iterations): + for directory in [kernel_mounting_dir_1, fuse_mounting_dir_1]: + clients[0].exec_command( + sudo=True, cmd=f"sh {directory}fsstress/fsstress.sh" + ) + time.sleep(10) + clients[0].exec_command( + sudo=True, cmd=f"rm -rf {directory}fsstress/fsstress/" + ) + time.sleep(10) + + return 0 + except Exception as e: + log.error(e) + log.error(traceback.format_exc()) + return 1 + finally: + log.error("Clean up the system") + fs_util_v1.client_clean_up( + "umount", kernel_clients=[clients[0]], mounting_dir=kernel_mounting_dir_1 + ) + + fs_util_v1.client_clean_up( + "umount", fuse_clients=[clients[0]], mounting_dir=fuse_mounting_dir_1 + ) + + for subvolume in subvolume_list: + fs_util_v1.remove_subvolume(clients[0], **subvolume) + + fs_util_v1.remove_subvolumegroup( + clients[0], default_fs, subvolume_group_name, force=True + ) diff --git a/tests/cephfs/xfs_test.py b/tests/cephfs/xfs_test.py index 597cc4b2e9..b68de59aee 100644 --- a/tests/cephfs/xfs_test.py +++ b/tests/cephfs/xfs_test.py @@ -114,7 +114,7 @@ def run(ceph_cluster, **kw): cmd=f"echo '{xfs_config_context}' > /root/xfstests-dev/local.config", ) # create exclude file - exclude_file = "generic/003" + exclude_file = "generic/003 generic/538" client1.exec_command( sudo=True, cmd=f"echo '{exclude_file}' > /root/xfstests-dev/ceph.exclude" ) @@ -124,6 +124,7 @@ def run(ceph_cluster, **kw): cmd="cd /root/xfstests-dev && ./check -d -T -g quick -e ceph.exclude", check_ec=False, long_running=True, + timeout=1800 ) log.info("XFS tests completed successfully") return 0