Skip to content

Commit

Permalink
Refactor Raspi launcher (youtube#119)
Browse files Browse the repository at this point in the history
This makes the following changes

* Add test coverage to Raspi launcher
* Adds a standalone Python retry module

Adds a `retry` module to use from AbstractLauncher code. This
is intended to clean up retry handling in Raspberry Pi launcher
specifically.

* Refactor and fix up Raspi launcher retry code

Refactors launcher code to use shared `retry` module code, and
re-applies fixes from reverted youtube#106 after clean-up and refactor
of retry code.

b/279249837
b/278447902
b/240984469
  • Loading branch information
kaidokert committed Apr 25, 2023
1 parent a1c73c9 commit 9cf903e
Show file tree
Hide file tree
Showing 4 changed files with 705 additions and 77 deletions.
196 changes: 119 additions & 77 deletions starboard/raspi/shared/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@
import sys
import threading
import time
import contextlib

import _env # pylint: disable=unused-import
import pexpect
from starboard.tools import abstract_launcher
from starboard.raspi.shared import retry


# pylint: disable=unused-argument
def _SigIntOrSigTermHandler(signum, frame):
def _sigint_or_sigterm_handler(signum, frame):
"""Clean up and exit with status |signum|.
Args:
Expand All @@ -42,7 +44,7 @@ def _SigIntOrSigTermHandler(signum, frame):


# First call returns True, otherwise return false.
def FirstRun():
def first_run():
v = globals()
if not v.has_key('first_run'):
v['first_run'] = False
Expand All @@ -64,16 +66,35 @@ class Launcher(abstract_launcher.AbstractLauncher):
# pexpect times out each second to allow Kill to quickly stop a test run
_PEXPECT_TIMEOUT = 1

# Wait up to 30 seconds for the password prompt from the raspi
_PEXPECT_PASSWORD_TIMEOUT_MAX_RETRIES = 30
# SSH shell command retries
_PEXPECT_SPAWN_RETRIES = 20

# pexpect.sendline retries
_PEXPECT_SENDLINE_RETRIES = 3

# Old process kill retries
_KILL_RETRIES = 3

_PEXPECT_SHUTDOWN_SLEEP_TIME = 3
# Time to wait after processes were killed
_PROCESS_KILL_SLEEP_TIME = 10

# Retrys for getting a clean prompt
_PROMPT_WAIT_MAX_RETRIES = 5
# Wait up to 10 seconds for the password prompt from the raspi
_PEXPECT_PASSWORD_TIMEOUT_MAX_RETRIES = 10
# Wait up to 900 seconds for new output from the raspi
_PEXPECT_READLINE_TIMEOUT_MAX_RETRIES = 900
# Delay between subsequent SSH commands
_INTER_COMMAND_DELAY_SECONDS = 0.5
_INTER_COMMAND_DELAY_SECONDS = 1.5

# This is used to strip ansi color codes from pexpect output.
_PEXPECT_SANITIZE_LINE_RE = re.compile(r'\x1b[^m]*m')

# Exceptions to retry
_RETRY_EXCEPTIONS = (pexpect.TIMEOUT, pexpect.ExceptionPexpect,
pexpect.exceptions.EOF, OSError)

def __init__(self, platform, target_name, config, device_id, **kwargs):
# pylint: disable=super-with-arguments
super(Launcher, self).__init__(platform, target_name, config, device_id,
Expand Down Expand Up @@ -101,14 +122,17 @@ def __init__(self, platform, target_name, config, device_id, **kwargs):

self.log_targets = kwargs.get('log_targets', True)

signal.signal(signal.SIGINT, functools.partial(_SigIntOrSigTermHandler))
signal.signal(signal.SIGTERM, functools.partial(_SigIntOrSigTermHandler))
signal.signal(signal.SIGINT, functools.partial(_sigint_or_sigterm_handler))
signal.signal(signal.SIGTERM, functools.partial(_sigint_or_sigterm_handler))

self.last_run_pexpect_cmd = ''

def _InitPexpectCommands(self):
"""Initializes all of the pexpect commands needed for running the test."""

# Ensure no trailing slashes
self.out_directory = self.out_directory.rstrip('/')

test_dir = os.path.join(self.out_directory, 'deploy', self.target_name)
test_file = self.target_name

Expand Down Expand Up @@ -154,6 +178,18 @@ def _InitPexpectCommands(self):
test_success_output,
test_failure_output)

# pylint: disable=no-method-argument
def _CommandBackoff():
time.sleep(Launcher._INTER_COMMAND_DELAY_SECONDS)

def _ShutdownBackoff(self):
Launcher._CommandBackoff()
return self.shutdown_initiated.is_set()

@retry.retry(
exceptions=_RETRY_EXCEPTIONS,
retries=_PEXPECT_SPAWN_RETRIES,
backoff=_CommandBackoff)
def _PexpectSpawnAndConnect(self, command):
"""Spawns a process with pexpect and connect to the raspi.
Expand All @@ -166,70 +202,67 @@ def _PexpectSpawnAndConnect(self, command):
command, timeout=Launcher._PEXPECT_TIMEOUT)
# Let pexpect output directly to our output stream
self.pexpect_process.logfile_read = self.output_file
retry_count = 0
expected_prompts = [
r'.*Are\syou\ssure.*', # Fingerprint verification
r'.* password:', # Password prompt
'.*[a-zA-Z]+.*', # Any other text input
]
while True:
try:
i = self.pexpect_process.expect(expected_prompts)
if i == 0:
self._PexpectSendLine('yes')
elif i == 1:
self._PexpectSendLine(Launcher._RASPI_PASSWORD)
break
else:
# If any other input comes in, maybe we've logged in with rsa key or
# raspi does not have password. Check if we've logged in by echoing
# a special sentence and expect it back.
self._PexpectSendLine('echo ' + Launcher._SSH_LOGIN_SIGNAL)
i = self.pexpect_process.expect([Launcher._SSH_LOGIN_SIGNAL])
break
except pexpect.TIMEOUT:
if self.shutdown_initiated.is_set():
return
retry_count += 1
# Check if the max retry count has been exceeded. If it has, then
# re-raise the timeout exception.
if retry_count > Launcher._PEXPECT_PASSWORD_TIMEOUT_MAX_RETRIES:
raise

# pylint: disable=unnecessary-lambda
@retry.retry(
exceptions=Launcher._RETRY_EXCEPTIONS,
retries=Launcher._PEXPECT_PASSWORD_TIMEOUT_MAX_RETRIES,
backoff=lambda: self._ShutdownBackoff(),
wrap_exceptions=False)
def _inner():
i = self.pexpect_process.expect(expected_prompts)
if i == 0:
self._PexpectSendLine('yes')
elif i == 1:
self._PexpectSendLine(Launcher._RASPI_PASSWORD)
else:
# If any other input comes in, maybe we've logged in with rsa key or
# raspi does not have password. Check if we've logged in by echoing
# a special sentence and expect it back.
self._PexpectSendLine('echo ' + Launcher._SSH_LOGIN_SIGNAL)
i = self.pexpect_process.expect([Launcher._SSH_LOGIN_SIGNAL])

_inner()

@retry.retry(
exceptions=_RETRY_EXCEPTIONS,
retries=_PEXPECT_SENDLINE_RETRIES,
wrap_exceptions=False)
def _PexpectSendLine(self, cmd):
"""Send lines to Pexpect and record the last command for logging purposes"""
logging.info('sending >> : %s ', cmd)
self.last_run_pexpect_cmd = cmd
self.pexpect_process.sendline(cmd)

def _PexpectReadLines(self):
"""Reads all lines from the pexpect process."""

retry_count = 0
while True:
try:
# pylint: disable=unnecessary-lambda
@retry.retry(
exceptions=Launcher._RETRY_EXCEPTIONS,
retries=Launcher._PEXPECT_READLINE_TIMEOUT_MAX_RETRIES,
backoff=lambda: self.shutdown_initiated.is_set(),
wrap_exceptions=False)
def _readloop():
while True:
# Sanitize the line to remove ansi color codes.
line = Launcher._PEXPECT_SANITIZE_LINE_RE.sub(
'', self.pexpect_process.readline())
self.output_file.flush()
if not line:
break
return
# Check for the test complete tag. It will be followed by either a
# success or failure tag.
if line.startswith(self.test_complete_tag):
if line.find(self.test_success_tag) != -1:
self.return_value = 0
break
# A line was successfully read without timing out; reset the retry
# count before attempting to read the next line.
retry_count = 0
except pexpect.TIMEOUT:
if self.shutdown_initiated.is_set():
return
retry_count += 1
# Check if the max retry count has been exceeded. If it has, then
# re-raise the timeout exception.
if retry_count > Launcher._PEXPECT_READLINE_TIMEOUT_MAX_RETRIES:
raise

_readloop()

def _Sleep(self, val):
self._PexpectSendLine('sleep {};echo {}'.format(val,
Expand All @@ -243,39 +276,38 @@ def _CleanupPexpectProcess(self):
# Check if kernel logged OOM kill or any other system failure message
if self.return_value:
logging.info('Sending dmesg')
self._PexpectSendLine('dmesg -P --color=never | tail -n 100')
time.sleep(3)
try:
with contextlib.suppress(Launcher._RETRY_EXCEPTIONS):
self._PexpectSendLine('dmesg -P --color=never | tail -n 100')
time.sleep(self._PEXPECT_SHUTDOWN_SLEEP_TIME)
with contextlib.suppress(Launcher._RETRY_EXCEPTIONS):
self.pexpect_process.readlines()
except pexpect.TIMEOUT:
logging.info('Timeout exception during cleanup command: %s',
self.last_run_pexpect_cmd)
pass
logging.info('Done sending dmesg')

# Send ctrl-c to the raspi and close the process.
self._PexpectSendLine(chr(3))
time.sleep(1) # Allow a second for normal shutdown
self.pexpect_process.close()
with contextlib.suppress(Launcher._RETRY_EXCEPTIONS):
self._PexpectSendLine(chr(3))
time.sleep(self._PEXPECT_TIMEOUT) # Allow time for normal shutdown
with contextlib.suppress(Launcher._RETRY_EXCEPTIONS):
self.pexpect_process.close()

def _WaitForPrompt(self):
"""Sends empty commands, until a bash prompt is returned"""
retry_count = 5
while True:
try:
self.pexpect_process.expect(self._RASPI_PROMPT)
break
except pexpect.TIMEOUT:
logging.info('Timeout exception during WaitForPrompt command: %s',
self.last_run_pexpect_cmd)
if self.shutdown_initiated.is_set():
return
retry_count -= 1
if not retry_count:
raise
self._PexpectSendLine('echo ' + Launcher._SSH_SLEEP_SIGNAL)
time.sleep(self._INTER_COMMAND_DELAY_SECONDS)

def backoff():
self._PexpectSendLine('echo ' + Launcher._SSH_SLEEP_SIGNAL)
return self._ShutdownBackoff()

retry.with_retry(
lambda: self.pexpect_process.expect(self._RASPI_PROMPT),
exceptions=Launcher._RETRY_EXCEPTIONS,
retries=Launcher._PROMPT_WAIT_MAX_RETRIES,
backoff=backoff,
wrap_exceptions=False)

@retry.retry(
exceptions=_RETRY_EXCEPTIONS,
retries=_KILL_RETRIES,
backoff=_CommandBackoff)
def _KillExistingCobaltProcesses(self):
"""If there are leftover Cobalt processes, kill them.
Expand All @@ -294,7 +326,7 @@ def _KillExistingCobaltProcesses(self):
logging.warning('Forced to pkill existing instance(s) of cobalt. '
'Pausing to ensure no further operations are run '
'before processes shut down.')
time.sleep(10)
time.sleep(Launcher._PROCESS_KILL_SLEEP_TIME)
logging.info('Done killing existing processes')

def Run(self):
Expand Down Expand Up @@ -330,12 +362,19 @@ def Run(self):
if self.test_result_xml_path:
first_run_commands.append('touch {}'.format(self.test_result_xml_path))
first_run_commands.extend(['free -mh', 'ps -ux', 'df -h'])
if FirstRun():
if first_run():
for cmd in first_run_commands:
if not self.shutdown_initiated.is_set():
self._PexpectSendLine(cmd)
line = self.pexpect_process.readline()
self.output_file.write(line)

def _readline():
line = self.pexpect_process.readline()
self.output_file.write(line)

retry.with_retry(
_readline,
exceptions=Launcher._RETRY_EXCEPTIONS,
retries=Launcher._PROMPT_WAIT_MAX_RETRIES)
self._WaitForPrompt()
self.output_file.flush()
self._Sleep(self._INTER_COMMAND_DELAY_SECONDS)
Expand All @@ -346,6 +385,9 @@ def Run(self):
self._PexpectSendLine(self.test_command)
self._PexpectReadLines()

except retry.RetriesExceeded:
logging.exception('Command retry exceeded (cmd: %s)',
self.last_run_pexpect_cmd)
except pexpect.EOF:
logging.exception('pexpect encountered EOF while reading line. (cmd: %s)',
self.last_run_pexpect_cmd)
Expand Down Expand Up @@ -377,7 +419,7 @@ def Kill(self):
# Initiate the shutdown. This causes the run to abort within one second.
self.shutdown_initiated.set()
# Wait up to three seconds for the run to be set to inactive.
self.run_inactive.wait(3)
self.run_inactive.wait(Launcher._PEXPECT_SHUTDOWN_SLEEP_TIME)

def GetDeviceIp(self):
"""Gets the device IP."""
Expand Down
Loading

0 comments on commit 9cf903e

Please sign in to comment.