Skip to content

Commit

Permalink
Merge pull request #62 from tcezard/docker_resiience
Browse files Browse the repository at this point in the history
EVA-3666 - Docker resilience
  • Loading branch information
tcezard authored Sep 24, 2024
2 parents 59babe5 + 12f3aa5 commit 5c60afb
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 91 deletions.
16 changes: 12 additions & 4 deletions eva_sub_cli/executables/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from eva_sub_cli import orchestrator
from eva_sub_cli.orchestrator import VALIDATE, SUBMIT, DOCKER, NATIVE
from eva_sub_cli.file_utils import is_submission_dir_writable
from eva_sub_cli.file_utils import is_submission_dir_writable, DirLockError, DirLock


def validate_command_line_arguments(args, argparser):
Expand Down Expand Up @@ -87,11 +87,19 @@ def main():
logging_config.add_stdout_handler(logging.DEBUG)
else:
logging_config.add_stdout_handler(logging.INFO)
logging_config.add_file_handler(os.path.join(args.submission_dir, 'eva_submission.log'), logging.DEBUG)

try:
# Pass on all the arguments
orchestrator.orchestrate_process(**args.__dict__)
# lock the submission directory

with DirLock(os.path.join(args.submission_dir, '.lock')) as lock:
# Create the log file
logging_config.add_file_handler(os.path.join(args.submission_dir, 'eva_submission.log'), logging.DEBUG)
# Pass on all the arguments to the orchestrator
orchestrator.orchestrate_process(**args.__dict__)
except DirLockError:
print(f'Could not acquire the lock file for {args.submission_dir} because another process is using this '
f'directory or a previous process did not terminate correctly. '
f'If the problem persists, remove the lock file manually.')
except FileNotFoundError as fne:
print(fne)
except SubmissionNotFoundException as snfe:
Expand Down
45 changes: 45 additions & 0 deletions eva_sub_cli/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import gzip
import os
import shutil
import time
from itertools import groupby


Expand Down Expand Up @@ -70,3 +71,47 @@ def fasta_iter(input_fasta):
# join all sequence lines to one.
seq = "".join(s.strip() for s in faiter.__next__())
yield (headerStr, seq)


class DirLockError(Exception):
pass


class DirLock(object):

_SPIN_PERIOD_SECONDS = 0.05

def __init__(self, dirname, timeout=3):
"""Prepare a file lock to protect access to dirname. timeout is the
period (in seconds) after an acquisition attempt is aborted.
The directory must exist, otherwise aquire() will timeout.
"""
self._lockfilename = os.path.join(dirname, ".lock")
self._timeout = timeout

def acquire(self):
start_time = time.time()
while True:
try:
# O_EXCL: fail if file exists or create it (atomically)
os.close(os.open(self._lockfilename,
os.O_CREAT | os.O_EXCL | os.O_RDWR))
break
except OSError:
if (time.time() - start_time) > self._timeout:
raise DirLockError(f"could not create {self._lockfilename} after {self._timeout} seconds")
else:
time.sleep(self._SPIN_PERIOD_SECONDS)

def release(self):
try:
os.remove(self._lockfilename)
except OSError:
pass

def __enter__(self):
self.acquire()
return self

def __exit__(self, type_, value, traceback):
self.release()
82 changes: 40 additions & 42 deletions eva_sub_cli/validators/docker_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time

from ebi_eva_common_pyutils.logger import logging_config
from retry import retry

from eva_sub_cli.validators.validator import Validator

Expand All @@ -19,14 +20,13 @@
class DockerValidator(Validator):

def __init__(self, mapping_file, submission_dir, project_title, metadata_json=None,
metadata_xlsx=None, shallow_validation=False, container_name=None, docker_path='docker', submission_config=None):
metadata_xlsx=None, shallow_validation=False, docker_path='docker', submission_config=None):
super().__init__(mapping_file, submission_dir, project_title,
metadata_json=metadata_json, metadata_xlsx=metadata_xlsx,
shallow_validation=shallow_validation, submission_config=submission_config)
self.docker_path = docker_path
self.container_name = container_name
if self.container_name is None:
self.container_name = container_image.split('/')[1] + '.' + container_tag
submission_basename = re.sub('[^a-zA-Z0-9]', '', os.path.basename(submission_dir))
self.container_name = f'{container_image.split("/")[1]}.{container_tag}.{submission_basename}'

def _validate(self):
self.run_docker_validator()
Expand All @@ -49,8 +49,9 @@ def get_docker_validation_cmd(self):

def run_docker_validator(self):
# check if docker container is ready for running validation
self.verify_docker_env()

self.verify_docker_is_installed()
self.download_container_image_if_needed()
self.run_container_if_required()
try:
# remove all existing files from container
self._run_quiet_command(
Expand All @@ -77,6 +78,7 @@ def run_docker_validator(self):
"Copy validation output from container to host",
f"{self.docker_path} cp {self.container_name}:{container_validation_dir}/{container_validation_output_dir}/. {self.output_dir}"
)
self.stop_running_container()

def verify_docker_is_installed(self):
try:
Expand Down Expand Up @@ -135,50 +137,46 @@ def verify_image_available_locally(self):
logger.debug(f"Container ({container_image}) image is not available locally")
return False

def run_container(self):
logger.debug(f"Trying to run container {self.container_name}")
try:
self._run_quiet_command(
"Try running container",
f"{self.docker_path} run -it --rm -d --name {self.container_name} {container_image}:{container_tag}"
)
# stopping execution to give some time to container to get up and running
time.sleep(5)
if not self.verify_container_is_running():
def run_container_if_required(self):
if self.verify_container_is_running():
raise RuntimeError(f"Container ({self.container_name}) is already running. "
f"Did you start multiple validation for the same directory ?")
if self.verify_container_is_stopped():
logger.warn(f"Container {self.container_name} was stopped but not cleaned up before.")
self.try_restarting_container()
else:
logger.debug(f"Trying to start container {self.container_name}")
try:
self._run_quiet_command(
"Running container",
f"{self.docker_path} run -it --rm -d --name {self.container_name} {container_image}:{container_tag}"
)
# Wait to give some time to container to get up and running
time.sleep(5)
if not self.verify_container_is_running():
raise RuntimeError(f"Container ({self.container_name}) could not be started")
except subprocess.CalledProcessError as ex:
logger.error(ex)
raise RuntimeError(f"Container ({self.container_name}) could not be started")
except subprocess.CalledProcessError as ex:
logger.error(ex)
raise RuntimeError(f"Container ({self.container_name}) could not be started")

def stop_running_container(self):
if not self.verify_container_is_stopped():
if self.verify_container_is_running():
self._run_quiet_command(
"Stop the running container",
f"{self.docker_path} stop {self.container_name}"
)

def download_container_image(self):
logger.debug(f"Pulling container ({container_image}) image")
try:
self._run_quiet_command("pull container image", f"{self.docker_path} pull {container_image}:{container_tag}")
except subprocess.CalledProcessError as ex:
logger.error(ex)
raise RuntimeError(f"Cannot pull container ({container_image}) image")
# Give the pull command some time to complete
time.sleep(5)
self.run_container()

def verify_docker_env(self):
self.verify_docker_is_installed()

if not self.verify_container_is_running():
if self.verify_container_is_stopped():
self.try_restarting_container()
else:
if self.verify_image_available_locally():
self.run_container()
else:
self.download_container_image()
@retry(RuntimeError, tries=3, delay=5, backoff=1, jitter=2, logger=logger)
def download_container_image_if_needed(self):
if not self.verify_image_available_locally():
logger.debug(f"Pulling container ({container_image}) image")
try:
self._run_quiet_command("pull container image", f"{self.docker_path} pull {container_image}:{container_tag}")
except subprocess.CalledProcessError as ex:
logger.error(ex)
raise RuntimeError(f"Cannot pull container ({container_image}) image")
# Give the pull command some time to complete
time.sleep(5)

def copy_files_to_container(self):
def _copy(file_description, file_path):
Expand Down
Loading

0 comments on commit 5c60afb

Please sign in to comment.