diff --git a/eva_sub_cli/executables/cli.py b/eva_sub_cli/executables/cli.py index 0b1433b..db8c0c4 100755 --- a/eva_sub_cli/executables/cli.py +++ b/eva_sub_cli/executables/cli.py @@ -15,7 +15,7 @@ from eva_sub_cli import orchestrator from eva_sub_cli.orchestrator import VALIDATE, SUBMIT, DOCKER, NATIVE -from eva_sub_cli.file_utils import is_submission_dir_writable +from eva_sub_cli.file_utils import is_submission_dir_writable, DirLockError, DirLock def validate_command_line_arguments(args, argparser): @@ -87,11 +87,19 @@ def main(): logging_config.add_stdout_handler(logging.DEBUG) else: logging_config.add_stdout_handler(logging.INFO) - logging_config.add_file_handler(os.path.join(args.submission_dir, 'eva_submission.log'), logging.DEBUG) try: - # Pass on all the arguments - orchestrator.orchestrate_process(**args.__dict__) + # lock the submission directory + + with DirLock(os.path.join(args.submission_dir, '.lock')) as lock: + # Create the log file + logging_config.add_file_handler(os.path.join(args.submission_dir, 'eva_submission.log'), logging.DEBUG) + # Pass on all the arguments to the orchestrator + orchestrator.orchestrate_process(**args.__dict__) + except DirLockError: + print(f'Could not acquire the lock file for {args.submission_dir} because another process is using this ' + f'directory or a previous process did not terminate correctly. ' + f'If the problem persists, remove the lock file manually.') except FileNotFoundError as fne: print(fne) except SubmissionNotFoundException as snfe: diff --git a/eva_sub_cli/file_utils.py b/eva_sub_cli/file_utils.py index b4ad203..b3ba5e3 100644 --- a/eva_sub_cli/file_utils.py +++ b/eva_sub_cli/file_utils.py @@ -2,6 +2,7 @@ import gzip import os import shutil +import time from itertools import groupby @@ -70,3 +71,47 @@ def fasta_iter(input_fasta): # join all sequence lines to one. seq = "".join(s.strip() for s in faiter.__next__()) yield (headerStr, seq) + + +class DirLockError(Exception): + pass + + +class DirLock(object): + + _SPIN_PERIOD_SECONDS = 0.05 + + def __init__(self, dirname, timeout=3): + """Prepare a file lock to protect access to dirname. timeout is the + period (in seconds) after an acquisition attempt is aborted. + The directory must exist, otherwise aquire() will timeout. + """ + self._lockfilename = os.path.join(dirname, ".lock") + self._timeout = timeout + + def acquire(self): + start_time = time.time() + while True: + try: + # O_EXCL: fail if file exists or create it (atomically) + os.close(os.open(self._lockfilename, + os.O_CREAT | os.O_EXCL | os.O_RDWR)) + break + except OSError: + if (time.time() - start_time) > self._timeout: + raise DirLockError(f"could not create {self._lockfilename} after {self._timeout} seconds") + else: + time.sleep(self._SPIN_PERIOD_SECONDS) + + def release(self): + try: + os.remove(self._lockfilename) + except OSError: + pass + + def __enter__(self): + self.acquire() + return self + + def __exit__(self, type_, value, traceback): + self.release() \ No newline at end of file diff --git a/eva_sub_cli/validators/docker_validator.py b/eva_sub_cli/validators/docker_validator.py index 2294979..dffcb56 100644 --- a/eva_sub_cli/validators/docker_validator.py +++ b/eva_sub_cli/validators/docker_validator.py @@ -5,6 +5,7 @@ import time from ebi_eva_common_pyutils.logger import logging_config +from retry import retry from eva_sub_cli.validators.validator import Validator @@ -19,14 +20,13 @@ class DockerValidator(Validator): def __init__(self, mapping_file, submission_dir, project_title, metadata_json=None, - metadata_xlsx=None, shallow_validation=False, container_name=None, docker_path='docker', submission_config=None): + metadata_xlsx=None, shallow_validation=False, docker_path='docker', submission_config=None): super().__init__(mapping_file, submission_dir, project_title, metadata_json=metadata_json, metadata_xlsx=metadata_xlsx, shallow_validation=shallow_validation, submission_config=submission_config) self.docker_path = docker_path - self.container_name = container_name - if self.container_name is None: - self.container_name = container_image.split('/')[1] + '.' + container_tag + submission_basename = re.sub('[^a-zA-Z0-9]', '', os.path.basename(submission_dir)) + self.container_name = f'{container_image.split("/")[1]}.{container_tag}.{submission_basename}' def _validate(self): self.run_docker_validator() @@ -49,8 +49,9 @@ def get_docker_validation_cmd(self): def run_docker_validator(self): # check if docker container is ready for running validation - self.verify_docker_env() - + self.verify_docker_is_installed() + self.download_container_image_if_needed() + self.run_container_if_required() try: # remove all existing files from container self._run_quiet_command( @@ -77,6 +78,7 @@ def run_docker_validator(self): "Copy validation output from container to host", f"{self.docker_path} cp {self.container_name}:{container_validation_dir}/{container_validation_output_dir}/. {self.output_dir}" ) + self.stop_running_container() def verify_docker_is_installed(self): try: @@ -135,50 +137,46 @@ def verify_image_available_locally(self): logger.debug(f"Container ({container_image}) image is not available locally") return False - def run_container(self): - logger.debug(f"Trying to run container {self.container_name}") - try: - self._run_quiet_command( - "Try running container", - f"{self.docker_path} run -it --rm -d --name {self.container_name} {container_image}:{container_tag}" - ) - # stopping execution to give some time to container to get up and running - time.sleep(5) - if not self.verify_container_is_running(): + def run_container_if_required(self): + if self.verify_container_is_running(): + raise RuntimeError(f"Container ({self.container_name}) is already running. " + f"Did you start multiple validation for the same directory ?") + if self.verify_container_is_stopped(): + logger.warn(f"Container {self.container_name} was stopped but not cleaned up before.") + self.try_restarting_container() + else: + logger.debug(f"Trying to start container {self.container_name}") + try: + self._run_quiet_command( + "Running container", + f"{self.docker_path} run -it --rm -d --name {self.container_name} {container_image}:{container_tag}" + ) + # Wait to give some time to container to get up and running + time.sleep(5) + if not self.verify_container_is_running(): + raise RuntimeError(f"Container ({self.container_name}) could not be started") + except subprocess.CalledProcessError as ex: + logger.error(ex) raise RuntimeError(f"Container ({self.container_name}) could not be started") - except subprocess.CalledProcessError as ex: - logger.error(ex) - raise RuntimeError(f"Container ({self.container_name}) could not be started") def stop_running_container(self): - if not self.verify_container_is_stopped(): + if self.verify_container_is_running(): self._run_quiet_command( "Stop the running container", f"{self.docker_path} stop {self.container_name}" ) - def download_container_image(self): - logger.debug(f"Pulling container ({container_image}) image") - try: - self._run_quiet_command("pull container image", f"{self.docker_path} pull {container_image}:{container_tag}") - except subprocess.CalledProcessError as ex: - logger.error(ex) - raise RuntimeError(f"Cannot pull container ({container_image}) image") - # Give the pull command some time to complete - time.sleep(5) - self.run_container() - - def verify_docker_env(self): - self.verify_docker_is_installed() - - if not self.verify_container_is_running(): - if self.verify_container_is_stopped(): - self.try_restarting_container() - else: - if self.verify_image_available_locally(): - self.run_container() - else: - self.download_container_image() + @retry(RuntimeError, tries=3, delay=5, backoff=1, jitter=2, logger=logger) + def download_container_image_if_needed(self): + if not self.verify_image_available_locally(): + logger.debug(f"Pulling container ({container_image}) image") + try: + self._run_quiet_command("pull container image", f"{self.docker_path} pull {container_image}:{container_tag}") + except subprocess.CalledProcessError as ex: + logger.error(ex) + raise RuntimeError(f"Cannot pull container ({container_image}) image") + # Give the pull command some time to complete + time.sleep(5) def copy_files_to_container(self): def _copy(file_description, file_path): diff --git a/tests/test_docker_validator.py b/tests/test_docker_validator.py index 7db488c..25a9274 100644 --- a/tests/test_docker_validator.py +++ b/tests/test_docker_validator.py @@ -1,7 +1,9 @@ import json import os import shutil +import subprocess from unittest import TestCase +from unittest.mock import patch import yaml @@ -16,15 +18,16 @@ class TestDockerValidator(TestCase): fasta_files = os.path.join(resources_folder, 'fasta_files') assembly_reports = os.path.join(resources_folder, 'assembly_reports') - test_run_dir = os.path.join(resources_folder, 'docker_test_run') - mapping_file = os.path.join(test_run_dir, 'vcf_files_metadata.csv') - metadata_json = os.path.join(test_run_dir, 'sub_metadata.json') - metadata_xlsx = os.path.join(test_run_dir, 'sub_metadata.xlsx') + test_run_dir1 = os.path.join(resources_folder, 'docker_test_run1') + test_run_dir2 = os.path.join(resources_folder, 'docker_test_run2') + mapping_file = os.path.join(test_run_dir1, 'vcf_files_metadata.csv') + metadata_json = os.path.join(test_run_dir1, 'sub_metadata.json') + metadata_xlsx = os.path.join(test_run_dir2, 'sub_metadata.xlsx') - submission_dir = test_run_dir def setUp(self): - os.makedirs(self.test_run_dir, exist_ok=True) + os.makedirs(self.test_run_dir1, exist_ok=True) + os.makedirs(self.test_run_dir2, exist_ok=True) # create vcf mapping file create_mapping_file(self.mapping_file, @@ -50,10 +53,9 @@ def setUp(self): json.dump(sub_metadata, open_metadata) self.validator = DockerValidator( mapping_file=self.mapping_file, - submission_dir=self.submission_dir, + submission_dir=self.test_run_dir1, project_title=self.project_title, - metadata_json=self.metadata_json, - container_name='eva-sub-cli-test' + metadata_json=self.metadata_json ) shutil.copyfile( os.path.join(self.resources_folder, 'EVA_Submission_test.xlsx'), @@ -62,28 +64,35 @@ def setUp(self): self.validator_from_excel = DockerValidator( mapping_file=self.mapping_file, - submission_dir=self.submission_dir, + submission_dir=self.test_run_dir2, project_title=self.project_title, - metadata_xlsx=self.metadata_xlsx, - container_name='eva-sub-cli-test' + metadata_xlsx=self.metadata_xlsx ) def tearDown(self): - if os.path.exists(self.test_run_dir): - shutil.rmtree(self.test_run_dir) + for d in [self.test_run_dir1, self.test_run_dir2]: + if os.path.exists(d): + shutil.rmtree(d) self.validator.stop_running_container() self.validator_from_excel.stop_running_container() + def assert_same_dict_and_unordered_list(self, o1, o2): + if isinstance(o1, dict) and isinstance(o2, dict): + self.assertEqual(set(o1), set(o2)) + [self.assert_same_dict_and_unordered_list(o1.get(k), o2.get(k)) for k in o1] + elif isinstance(o1, list) and isinstance(o2, list): + self.assertEqual(set(o1), set(o2)) + else: + self.assertEqual(o1, o2) + def assert_sample_checker(self, sample_checker_file, expected_checker): self.assertTrue(os.path.isfile(sample_checker_file)) with open(sample_checker_file) as open_yaml: - assert yaml.safe_load(open_yaml) == expected_checker - - def test_validate(self): - # run validation in docker - self.validator.validate() + self.assert_same_dict_and_unordered_list(yaml.safe_load(open_yaml), expected_checker) - vcf_format_dir = os.path.join(self.validator.output_dir, 'vcf_format') + def assert_validation_results(self, validator, expected_sample_checker, expected_metadata_files_json, + expected_metadata_val, expected_semantic_val): + vcf_format_dir = os.path.join(validator.output_dir, 'vcf_format') self.assertTrue(os.path.exists(vcf_format_dir)) vcf_format_log_file = os.path.join(vcf_format_dir, 'input_passed.vcf.vcf_format.log') @@ -95,13 +104,13 @@ def test_validate(self): vcf_format_logs[3]) text_report = vcf_format_logs[2].split(':')[1].strip() - with open(os.path.join(self.validator.output_dir, text_report)) as text_report: + with open(os.path.join(validator.output_dir, text_report)) as text_report: text_report_content = text_report.readlines() self.assertEqual('According to the VCF specification, the input file is valid\n', text_report_content[0]) # assert assembly report - assembly_check_dir = os.path.join(self.validator.output_dir, 'assembly_check') + assembly_check_dir = os.path.join(validator.output_dir, 'assembly_check') self.assertTrue(os.path.exists(assembly_check_dir)) assembly_check_log_file = os.path.join(assembly_check_dir, 'input_passed.vcf.assembly_check.log') @@ -113,39 +122,86 @@ def test_validate(self): self.assertEqual('[info] Percentage of matches: 100%\n', assembly_check_logs[5]) # Assert Samples concordance - expected_checker = { - 'overall_differences': False, - 'results_per_analysis': { - 'AA': { - 'difference': False, - 'more_metadata_submitted_files': [], - 'more_per_submitted_files_metadata': {}, - 'more_submitted_files_metadata': [] - } - } - } - self.assert_sample_checker(self.validator._sample_check_yaml, expected_checker) + self.assert_sample_checker(validator._sample_check_yaml, expected_sample_checker) - with open(self.validator.metadata_json_post_validation) as open_file: + with open(validator.metadata_json_post_validation) as open_file: json_data = json.load(open_file) - assert json_data.get('files') == [ - {'analysisAlias': 'AA', 'fileName': 'input_passed.vcf', 'fileType': 'vcf', - 'md5': '96a80c9368cc3c37095c86fbe6044fb2', 'fileSize': 45050} - ] + assert json_data.get('files') == expected_metadata_files_json # Check metadata errors - with open(os.path.join(self.validator.output_dir, 'other_validations', 'metadata_validation.txt')) as open_file: + with open(os.path.join(validator.output_dir, 'other_validations', 'metadata_validation.txt')) as open_file: metadata_val_lines = {l.strip() for l in open_file.readlines()} - assert 'must match pattern "^PRJ(EB|NA)\\d+$"' in metadata_val_lines + assert any((expected_metadata_val in line for line in metadata_val_lines)) # Check semantic metadata errors - semantic_yaml_file = os.path.join(self.validator.output_dir, 'other_validations', 'metadata_semantic_check.yml') + semantic_yaml_file = os.path.join(validator.output_dir, 'other_validations', 'metadata_semantic_check.yml') self.assertTrue(os.path.isfile(semantic_yaml_file)) with open(semantic_yaml_file) as open_yaml: semantic_output = yaml.safe_load(open_yaml) - assert semantic_output[1] == {'description': 'SAME123 does not exist or is private', + assert semantic_output[1] == expected_semantic_val + + def test_validate(self): + # run validation in docker + self.validator.validate() + expected_sample_checker = { + 'overall_differences': False, + 'results_per_analysis': { + 'AA': { + 'difference': False, + 'more_metadata_submitted_files': [], + 'more_per_submitted_files_metadata': {}, + 'more_submitted_files_metadata': [] + } + } + } + expected_metadata_files_json = [ + {'analysisAlias': 'AA', 'fileName': 'input_passed.vcf', 'fileType': 'vcf', + 'md5': '96a80c9368cc3c37095c86fbe6044fb2', 'fileSize': 45050} + ] + expected_metadata_val = 'must match pattern "^PRJ(EB|NA)\\d+$"' + expected_semantic_val = {'description': 'SAME123 does not exist or is private', 'property': '/sample/0/bioSampleAccession'} + self.assert_validation_results(self.validator, expected_sample_checker, expected_metadata_files_json, + expected_metadata_val, expected_semantic_val) + def test_validate_from_excel(self): self.validator_from_excel.validate() - self.assertTrue(os.path.isfile(self.validator_from_excel._sample_check_yaml)) + expected_sample_checker = { + 'overall_differences': True, + 'results_per_analysis': { + 'No analysis': {'difference': True, 'more_metadata_submitted_files': [], + 'more_per_submitted_files_metadata': {'input_passed.vcf': ['HG00096']}, + 'more_submitted_files_metadata': ['HG00096']}, + 'VD1': {'difference': True, 'more_metadata_submitted_files': ['sample2', 'sample1'], + 'more_per_submitted_files_metadata': {}, 'more_submitted_files_metadata': []}, + 'VD2': {'difference': True, 'more_metadata_submitted_files': ['sample2', 'sample1'], + 'more_per_submitted_files_metadata': {}, 'more_submitted_files_metadata': []}, + 'VD3': {'difference': True, 'more_metadata_submitted_files': ['sample3', 'sample2', 'sample1'], + 'more_per_submitted_files_metadata': {},'more_submitted_files_metadata': []}, + 'VD4': {'difference': True, 'more_metadata_submitted_files': ['sample4'], + 'more_per_submitted_files_metadata': {}, 'more_submitted_files_metadata': []}, + 'VD5': {'difference': True, 'more_metadata_submitted_files': ['sample4'], + 'more_per_submitted_files_metadata': {}, 'more_submitted_files_metadata': []} + } + } + expected_metadata_files_json = [ + {'analysisAlias': 'VD1', 'fileName': 'example1.vcf.gz', 'fileSize': '', 'md5': ''}, + {'analysisAlias': 'VD2', 'fileName': 'example2.vcf', 'fileSize': '', 'md5': ''}, + {'analysisAlias': 'VD3', 'fileName': 'example3.vcf', 'fileSize': '', 'md5': ''} + ] + expected_metadata_val = 'Validation passed successfully.' + expected_semantic_val = {'description': 'Project PRJEB00002 does not exist in ENA or is private', + 'property': '/project/childProjects/0'} + self.assert_validation_results(self.validator_from_excel, expected_sample_checker, expected_metadata_files_json, + expected_metadata_val, expected_semantic_val) + + def test_download_container_image_if_needed(self): + with patch('eva_sub_cli.validators.validator.run_command_with_output') as m_run_command: + m_run_command.side_effect = [ + None, # <- verify_image_available_locally + subprocess.CalledProcessError(1, 'pull'), # <- First Pull + None, # <- verify_image_available_locally + None # <- Second Pull + ] + self.validator.download_container_image_if_needed() diff --git a/tests/test_file_utils.py b/tests/test_file_utils.py index e1c12a0..b9ce3c6 100644 --- a/tests/test_file_utils.py +++ b/tests/test_file_utils.py @@ -1,9 +1,11 @@ import glob import os import shutil +import time from pathlib import Path +from unittest import TestCase -from eva_sub_cli.file_utils import backup_file_or_directory +from eva_sub_cli.file_utils import backup_file_or_directory, DirLock, DirLockError def set_up_test_dir(): @@ -44,3 +46,37 @@ def test_backup_file_or_directory_max_backups(): assert os.path.exists(f'backup_test/file.txt.{i}') assert not os.path.exists(f'backup_test/file.txt.{max_backups + 1}') clean_up() + + +class TestDirLock(TestCase): + resources_folder = os.path.join(os.path.dirname(__file__), 'resources') + + def setUp(self) -> None: + self.lock_folder = os.path.join(self.resources_folder, 'locked_folder') + os.makedirs(self.lock_folder) + + def tearDown(self) -> None: + shutil.rmtree(self.lock_folder) + + def test_create_lock(self): + with DirLock(self.lock_folder) as lock: + assert os.path.isfile(lock._lockfilename) + assert not os.path.exists(lock._lockfilename) + + def test_prevent_create_2_lock(self): + with DirLock(self.lock_folder) as lock: + assert os.path.isfile(lock._lockfilename) + with self.assertRaises(DirLockError): + with DirLock(self.lock_folder) as lock2: + pass + assert os.path.isfile(lock._lockfilename) + assert not os.path.exists(lock._lockfilename) + + def test_lock_with_exception(self): + try: + with DirLock(self.lock_folder) as lock: + assert os.path.isfile(lock._lockfilename) + raise Exception() + except Exception: + pass + assert not os.path.exists(lock._lockfilename)