Skip to content

Commit

Permalink
Fixing kernel verification
Browse files Browse the repository at this point in the history
Signed-off-by: julpark <[email protected]>
  • Loading branch information
julpark-rh committed Oct 12, 2024
1 parent 25f0c83 commit f44c48b
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 168 deletions.
245 changes: 79 additions & 166 deletions ceph/ceph.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""This module implements the required foundation data structures for testing."""

import datetime
import json
import pickle
Expand All @@ -12,6 +11,7 @@
import paramiko
import requests
import yaml
from paramiko.ssh_exception import SSHException

from ceph.parallel import parallel
from cli.ceph.ceph import Ceph as CephCli
Expand Down Expand Up @@ -299,11 +299,9 @@ def generate_ansible_inventory(
if "pool" in node.hostname:
logger.info(node.hostname)
devices = node.create_lvm(
(
devices[0:1]
if not device_to_add
else device_to_add.split()
),
devices[0:1]
if not device_to_add
else device_to_add.split(),
num=random.randint(1, 10) if device_to_add else None,
check_lvm=False if device_to_add else True,
)
Expand Down Expand Up @@ -1144,51 +1142,6 @@ class CommandFailed(Exception):
pass


class TimeoutException(Exception):
"""Operation timeout exception."""

pass


def check_timeout(end_time, timeout):
"""Raises an exception when current time is greater"""
if timeout and datetime.datetime.now() >= end_time:
raise TimeoutException("Command exceed the allocated execution time.")


def read_stream(channel, end_time, timeout, stderr=False, log=True):
"""Reads the data from the given channel.
Args:
channel: the paramiko.Channel object to be used for reading.
end_time: maximum allocated time for reading from the channel.
timeout: Flag to check if timeout must be enforced.
stderr: read from the stderr stream. Default is False.
log: log the output. Default is True.
Returns:
a string with the data read from the channel.
Raises:
TimeoutException: if reading from the channel exceeds the allocated time.
"""
_output = ""
_stream = channel.recv_stderr if stderr else channel.recv
_data = _stream(2048)

while _data:
_output += _data.decode("utf-8")
if log:
for _ln in _data.splitlines():
_log = logger.error if stderr else logger.debug
_log(_ln.decode("utf-8"))

check_timeout(end_time, timeout)
_data = _stream(2048)

return _output


class RolesContainer(object):
"""
Container for single or multiple node roles.
Expand Down Expand Up @@ -1314,14 +1267,14 @@ def __connect(self):
self.__outage_start_time = None
return
except Exception as e:
logger.warning(f"Error in connecting to {self.ip_address}: \n{e}")
logger.warning(f"Connection outage to {self.ip_address}: \n{e}")
if not self.__outage_start_time:
self.__outage_start_time = datetime.datetime.now()

logger.debug("Retrying connection in 10 seconds")
sleep(10)

raise AssertionError(f"Unable to establish a connection with {self.ip_address}")
raise AssertionError(f"Unable to establish connection with {self.ip_address}")

@property
def transport(self):
Expand Down Expand Up @@ -1552,149 +1505,109 @@ def long_running(self, **kw):
Returns:
ec: exit status
"""
cmd = kw["cmd"]
_end_time = None
_verbose = kw.get("verbose", False)
ssh = self.rssh if kw.get("sudo") else self.ssh
long_running = kw.get("long_running", False)
if "timeout" in kw:
timeout = None if kw["timeout"] == "notimeout" else kw["timeout"]
else:
# Set defaults if long_running then 1h else 5m
timeout = 3600 if kw.get("long_running", False) in kw else 300
cmd = kw["cmd"]
timeout = None if kw.get("timeout") == "notimeout" else kw.get("timeout", 3600)

logger.info(
f"long running command on {self.ip_address} -- {cmd} with {timeout} seconds"
)

try:
channel = ssh().get_transport().open_session(timeout=timeout)
channel = ssh().get_transport().open_session()
channel.settimeout(timeout)

logger.info(f"Execute {cmd} on {self.ip_address}")
_exec_start_time = datetime.datetime.now()
# A mismatch between stdout and stderr streams have been observed hence
# combining the streams and logging is set to debug level only.
channel.set_combine_stderr(True)
channel.exec_command(cmd)

if timeout:
_end_time = datetime.datetime.now() + datetime.timedelta(
seconds=timeout
)

_out = ""
_err = ""
while not channel.exit_status_ready():
# Prevent high resource consumption
sleep(1)
sleep(2)

# Check the streams for data and log in debug mode only if it
# is a long running command else don't log.
# Fixme: logging must happen in debug irrespective of type.
_verbose = True if long_running else _verbose
if channel.recv_ready():
_out += read_stream(channel, _end_time, timeout, log=_verbose)

if channel.recv_stderr_ready():
_err += read_stream(
channel, _end_time, timeout, stderr=True, log=_verbose
)

check_timeout(_end_time, timeout)
data = channel.recv(1024)
while data:
for line in data.splitlines():
logger.debug(line)

_time = (datetime.datetime.now() - _exec_start_time).total_seconds()
logger.info(
f"Execution of {cmd} on {self.ip_address} took {_time} seconds."
)
data = channel.recv(1024)

# Check for data residues in the channel streams. This is required for the following reasons
# - exit_ready and first line is blank causing data to be None
# - race condition between data read and exit ready
try:
_new_timeout = datetime.datetime.now() + datetime.timedelta(seconds=10)
_out += read_stream(channel, _new_timeout, timeout=True)
_err += read_stream(channel, _new_timeout, timeout=True, stderr=True)
except CommandFailed:
logger.debug("Encountered a timeout during read post execution.")
except BaseException as be:
logger.debug(
f"Encountered an unknown exception during last read.\n {be}"
)

_exit = channel.recv_exit_status()
return _out, _err, _exit, _time
logger.info(f"Command completed on {datetime.datetime.now()}")
return channel.recv_exit_status()
except socket.timeout as terr:
logger.error(f"{cmd} failed to execute within {timeout} seconds.")
logger.error(f"Command failed to execute within {timeout} seconds.")
raise SocketTimeoutException(terr)
except TimeoutException as tex:
channel.close()
logger.error(f"{cmd} failed to execute within {timeout}s.")
raise CommandFailed(tex)
except BaseException as be: # noqa
logger.exception(be)
raise CommandFailed(be)

def exec_command(self, **kw):
"""Execute the given command on the remote host.
Args:
cmd: The command that needs to be executed on the remote host.
long_running: Bool flag to indicate if the command is long running.
check_ec: Bool flag to indicate if the command should check for error code.
timeout: Max time to wait for command to complete. Default is 600 seconds.
pretty_print: Bool flag to indicate if the output should be pretty printed.
verbose: Bool flag to indicate if the command output should be printed.
"""execute a command.
Returns:
Exit code when long_running is used
Tuple having stdout, stderr data output when long_running is not used
Tupe having stdout, stderr, exit code, duration when verbose is enabled
Attributes:
kw (Dict): execute command configuration
check_ec: False will run the command and not wait for exit code
Raises:
CommandFailed: when the exit code is non-zero and check_ec is enabled.
TimeoutError: when the command times out.
Example::
Examples:
self.exec_cmd(cmd='uptime')
or
eg: self.exec_cmd(cmd='uptime')
or
self.exec_cmd(cmd='background_cmd', check_ec=False)
"""
if self.run_once:
self.ssh_transport().set_keepalive(15)
self.rssh_transport().set_keepalive(15)
cmd = kw["cmd"]
_out, _err, _exit, _time = self.long_running(**kw)
self.exit_status = _exit

if kw.get("pretty_print"):
msg = f"\nCommand: {cmd}"
msg += f"\nDuration: {_time} seconds"
msg += f"\nExit Code: {_exit}"

if _out:
msg += f"\nStdout: {_out}"
kw:
check_ec: False will run the command and not wait for exit code
"""
if kw.get("long_running"):
return self.long_running(**kw)

if _err:
msg += f"\nStderr: {_err}"
timeout = kw["timeout"] if kw.get("timeout") else 600
ssh = self.rssh() if kw.get("sudo") else self.ssh()

logger.info(msg)
logger.info(
f"Running command {kw['cmd']} on {self.ip_address} timeout {timeout}"
)

if "verbose" in kw:
return _out, _err, _exit, _time
if self.run_once:
self.ssh_transport().set_keepalive(15)
self.rssh_transport().set_keepalive(15)

# Historically, we are only providing command exit code for long
# running commands.
# Fixme: Ensure the method returns a tuple of
# (stdout, stderr, exit_code, time_taken)
if kw.get("long_running", False):
if kw.get("check_ec", False) and _exit != 0:
stdout = str()
stderr = str()
_stdout = None
_stderr = None
try:
_, _stdout, _stderr = ssh.exec_command(kw["cmd"], timeout=timeout)
for line in _stdout:
if line:
stdout += line
for line in _stderr:
if line:
stderr += line
except socket.timeout as sock_err:
logger.error("socket.timeout doesn't give an error message")
ssh.close()
raise SocketTimeoutException(sock_err)
except SSHException as e:
logger.error("SSHException during cmd: %s", str(e))

exit_status = None
if _stdout is not None:
exit_status = _stdout.channel.recv_exit_status()

self.exit_status = exit_status
if kw.get("check_ec", True):
if exit_status == 0:
logger.info("Command completed successfully")
else:
logger.error(f"Error {exit_status} during cmd, timeout {timeout}")
logger.error(stderr)
raise CommandFailed(
f"{cmd} returned {_err} and code {_exit} on {self.ip_address}"
f"{kw['cmd']} Error: {str(stderr)} {str(self.ip_address)}"
)

return _exit

if kw.get("check_ec", True) and _exit != 0:
raise CommandFailed(
f"{cmd} returned {_err} and code {_exit} on {self.ip_address}"
)

return _out, _err
return stdout, stderr

def remote_file(self, **kw):
"""Return contents of the remote file."""
Expand Down Expand Up @@ -1985,7 +1898,7 @@ def setup_deb_repos(self, deb_repo):
"https://www.redhat.com/security/897da07a.txt",
"https://www.redhat.com/security/f21541eb.txt",
"https://prodsec.redhat.com/keys/00da75f2.txt",
"http://file.corp.redhat.com/~kdreyer/keys/00da75f2.txt",
"http://file.rdu.redhat.com/~kdreyer/keys/00da75f2.txt",
"https://www.redhat.com/security/data/fd431d51.txt",
]

Expand Down Expand Up @@ -2766,4 +2679,4 @@ def create_ceph_object(self, role):
if role in self.DEMON_ROLES:
return CephDemon(role, self.node)
if role != "pool":
return CephObject(role, self.node)
return CephObject(role, self.node)
8 changes: 7 additions & 1 deletion suites/pacific/cephfs/tier-0_fs_kernel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,16 @@ tests:
abort-on-fail: false
- test:
name: Run fsstress on kernel and fuse mounts
module: cephfs_bugs.test_fsstress_on_kernel_and_fuse.py
module: cephfs_bugs.fsstress_kernel_verification.py
polarion-id: CEPH-83575623
desc: Run fsstress on kernel and fuse mounts
abort-on-fail: false
- test:
name: Run xfs test on kernel
module: xfs_test.py
polarion-id: CEPH-83575623
desc: Run xfs test on kernel
abort-on-fail: false
-
test:
desc: "generate sosreport"
Expand Down
6 changes: 6 additions & 0 deletions suites/quincy/cephfs/tier-0_fs_kernel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ tests:
polarion-id: CEPH-83575623
desc: Run fsstress on kernel and fuse mounts
abort-on-fail: false
- test:
name: Run xfs test on kernel
module: xfs_test.py
polarion-id: CEPH-83575623
desc: Run xfs test on kernel
abort-on-fail: false
-
test:
desc: "generate sosreport"
Expand Down
6 changes: 6 additions & 0 deletions suites/reef/cephfs/tier-0_fs_kernel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ tests:
polarion-id: CEPH-83575623
desc: Run fsstress on kernel and fuse mounts
abort-on-fail: false
- test:
name: Run xfs test on kernel
module: xfs_test.py
polarion-id: CEPH-83575623
desc: Run xfs test on kernel
abort-on-fail: false
-
test:
desc: "generate sosreport"
Expand Down
Loading

0 comments on commit f44c48b

Please sign in to comment.