diff --git a/src/validation_workflow/rpm/validation_rpm.py b/src/validation_workflow/rpm/validation_rpm.py index 247d4041b6..b29ede4690 100644 --- a/src/validation_workflow/rpm/validation_rpm.py +++ b/src/validation_workflow/rpm/validation_rpm.py @@ -39,12 +39,7 @@ def start_cluster(self) -> bool: try: for project in self.args.projects: execute(f'sudo systemctl start {project}', ".") - (stdout, stderr, status) = execute(f'sudo systemctl status {project}', ".") - if(status == 0): - logging.info(stdout) - else: - logging.info(stderr) - + (status, _, _) = execute(f'sudo systemctl status {project}', ".") except: raise Exception('Failed to Start Cluster') return True @@ -85,16 +80,18 @@ def validate_metadata(self, product_type: str) -> None: # The context the meta data should be based on type OpenSearch or OpenSearchDashBoards if product_type == "opensearch": ref_map['Summary'] = "An open source distributed and RESTful search engine" - ref_map['Description'] = "OpenSearch makes it easy to ingest, search, visualize, and analyze your data\nFor more information, see: https://opensearch.org/" + ref_map[ + 'Description'] = "OpenSearch makes it easy to ingest, search, visualize, and analyze your data\nFor more information, see: https://opensearch.org/" else: ref_map['Summary'] = "Open source visualization dashboards for OpenSearch" - ref_map['Description'] = "OpenSearch Dashboards is the visualization tool for data in OpenSearch\nFor more information, see: https://opensearch.org/" + ref_map[ + 'Description'] = "OpenSearch Dashboards is the visualization tool for data in OpenSearch\nFor more information, see: https://opensearch.org/" meta_map = {} for line in stdout.split('\n'): key = line.split(':')[0].strip() if key != 'Description': - meta_map[key] = line.split(':', 2)[1].strip() + meta_map[key] = line.split(':', 1)[1].strip() else: description_index = stdout.find(line) meta_map[key] = stdout[description_index + len(line):].strip() @@ -107,25 +104,26 @@ def validate_metadata(self, product_type: str) -> None: elif value == 'arm64': assert meta_map.get(key) == 'aarch64' else: - if meta_map.get(key) == value: - logging.info(value) - logging.info(f"Meta data for {key} is validated") - + assert meta_map.get(key) == value + logging.info(f"Meta data for {key} -> {value} is validated") logging.info(f"Validation for {product_type} meta data of RPM distribution completed.") def validate_signature(self) -> None: (_, stdout, _) = execute(f'rpm -K -v {os.path.join(self.tmp_dir.path, self.filename)}', ".") logging.info(stdout) - key_list = ["Header V4 RSA/SHA512 Signature, key ID 9310d3fc", "Header SHA256 digest", "Header SHA1 digest", "Payload SHA256 digest", "V4 RSA/SHA512 Signature, key ID 9310d3fc", "MD5 digest"] + key_list = ["Header V4 RSA/SHA512 Signature, key ID 9310d3fc", "Header SHA256 digest", "Header SHA1 digest", + "Payload SHA256 digest", "V4 RSA/SHA512 Signature, key ID 9310d3fc", "MD5 digest"] present_key = [] for line in stdout.rstrip('\n').split('\n'): key = line.split(':')[0].strip() if key != os.path.join(self.tmp_dir.path, self.filename): - if "OK" == line.split(':')[1].strip(): - logging.info(f"{key} is validated as: {line}") - present_key.append(key) + assert "OK" == line.split(':')[1].strip() + logging.info(f"{key} is validated as: {line}") + present_key.append(key) logging.info("Validation of all key digests starts: ") for digest in key_list: if digest in present_key: logging.info(f'Key digest "{digest}" is validated to be present.') + else: + raise ValueError(f'Key digest "{digest}" is not found') logging.info("Validation for signature of RPM distribution completed.") diff --git a/tests/tests_validation_workflow/test_validation_rpm.py b/tests/tests_validation_workflow/test_validation_rpm.py index 5a742bbc99..defbeca59d 100644 --- a/tests/tests_validation_workflow/test_validation_rpm.py +++ b/tests/tests_validation_workflow/test_validation_rpm.py @@ -210,28 +210,59 @@ def test_cleanup(self, mock_temporary_directory: Mock, mock_validation_args: Moc result = validate_rpm.cleanup() self.assertTrue(result) + @patch('validation_workflow.rpm.validation_rpm.execute') + @patch('validation_workflow.rpm.validation_rpm.logging.info') + def test_validate_metadata_opensearch(self, mock_logging_info: Mock, mock_execute: Mock) -> None: + mock_execute.return_value = (None, + 'Name : opensearch\n' + 'Version : 1.2.0\n' + 'Architecture : x86_64\n' + 'Group : Application/Internet\n' + 'License : Apache-2.0\n' + 'Relocations : (not relocatable)\n' + 'URL : https://opensearch.org/\n' + 'Summary : An open source distributed and RESTful search engine\n' + 'Description:\nOpenSearch makes it easy to ingest, search, visualize, and analyze your data\n' + 'For more information, see: https://opensearch.org/', + None) + + mock_validation_args = Mock() + mock_validation_args.version = '1.2.0' + mock_validation_args.arch = 'x64' + + mock_temporary_directory = Mock() + mock_temporary_directory.path = "/tmp/fakepath" + + validate_rpm = ValidateRpm(mock_validation_args, mock_temporary_directory) + validate_rpm.filename = 'opensearch.rpm' + + validate_rpm.validate_metadata('opensearch') + + mock_logging_info.assert_any_call("Meta data for Name -> opensearch is validated") + mock_logging_info.assert_any_call("Meta data for Version -> 1.2.0 is validated") + mock_logging_info.assert_any_call( + "Meta data for Description -> OpenSearch makes it easy to ingest, search, visualize, and analyze your data\nFor more information, see: https://opensearch.org/ is validated") + mock_logging_info.assert_any_call("Validation for opensearch meta data of RPM distribution completed.") + mock_execute.assert_called_once_with('rpm -qip /tmp/fakepath/opensearch.rpm', '.') + @patch('validation_workflow.rpm.validation_rpm.execute') @patch('validation_workflow.rpm.validation_rpm.logging.info') @patch('validation_workflow.rpm.validation_rpm.ValidationArgs') @patch('system.temporary_directory.TemporaryDirectory') - def test_validate_metadata(self, mock_temporary_directory: Mock, mock_validation_args: Mock, mock_logging_info: Mock, mock_execute: Mock) -> None: - mock_execute.return_value = (None, 'Name: opensearch\nVersion: 1.2.3\nArchitecture: x86_64\nDescription: This is a test application\n' - ' "OpenSearch makes it easy to ingest, search, visualize, and analyze your data\nFor more information,' - ' see: https://opensearch.org/', None) + def test_validate_metadata_exception(self, mock_temporary_directory: Mock, mock_validation_args: Mock, + mock_logging_info: Mock, mock_execute: Mock) -> None: + mock_execute.return_value = (None, 'Name: opensearch\nVersion: 1.2.3\nArchitecture: x86_64\nURL: https://opensearch.org/Summary: ' + 'An open source distributed and RESTful search engine\nDescription: This is a test application\n' + ' "OpenSearch makes it easy to ingest, search, visualize, and analyze your data\nFor more information, see: https://opensearch.org/', None) validate_rpm = ValidateRpm(mock_validation_args.return_value, mock_temporary_directory.return_value) mock_temporary_directory.return_value.path = "/tmp/trytytyuit/" validate_rpm.filename = 'example.rpm' validate_rpm.args.version = '1.3.0' validate_rpm.args.arch = "x64" - - validate_rpm.validate_metadata('opensearch') - - mock_logging_info.assert_any_call('Meta data for Name is validated') - mock_logging_info.assert_any_call('Meta data for Version is validated') - mock_logging_info.assert_any_call('Meta data for Architecture is validated') - mock_logging_info.assert_any_call('Meta data for Description is validated') - mock_logging_info.assert_any_call('Validation for opensearch meta data of RPM distribution completed.') + with self.assertRaises(AssertionError) as context: + validate_rpm.validate_metadata('opensearch') + self.assertIsInstance(context.exception, AssertionError) mock_execute.assert_called_once_with( 'rpm -qip /tmp/trytytyuit/example.rpm', '.' @@ -242,7 +273,8 @@ def test_validate_metadata(self, mock_temporary_directory: Mock, mock_validation @patch('validation_workflow.rpm.validation_rpm.ValidationArgs') @patch('system.temporary_directory.TemporaryDirectory') def test_validate_signature(self, mock_temporary_directory: Mock, mock_validation_args: Mock, mock_logging_info: Mock, mock_execute: Mock) -> None: - mock_execute.return_value = (None, 'Header SHA256 digest: OK\nPayload SHA256 digest: OK\n', None) + mock_execute.return_value = (None, 'Header V4 RSA/SHA512 Signature, key ID 9310d3fc: OK\nHeader SHA1 digest: OK\nV4 RSA/SHA512 Signature, key ID 9310d3fc: OK\n' + 'MD5 digest: OK\nHeader SHA256 digest: OK\nPayload SHA256 digest: OK\n', None) validate_rpm = ValidateRpm(mock_validation_args.return_value, mock_temporary_directory.return_value) mock_temporary_directory.return_value.path = "/tmp/trytytyuit/" @@ -257,3 +289,48 @@ def test_validate_signature(self, mock_temporary_directory: Mock, mock_validatio mock_execute.assert_called_once_with( 'rpm -K -v /tmp/trytytyuit/example.rpm', '.' ) + + @patch('validation_workflow.rpm.validation_rpm.execute') + @patch('validation_workflow.rpm.validation_rpm.logging.info') + @patch('validation_workflow.rpm.validation_rpm.ValidationArgs') + @patch('system.temporary_directory.TemporaryDirectory') + def test_validate_signature_exception(self, mock_temporary_directory: Mock, mock_validation_args: Mock, mock_logging_info: Mock, mock_execute: Mock) -> None: + mock_execute.return_value = (None, 'Header V4 RSA/SHA512 Signature, key ID 9310d3fc: OK\nHeader SHA1 digest: OK\nV4 RSA/SHA512 Signature, key ID 9310d3fc: OK\n' + 'MD5 digest: not OK\nHeader SHA256 digest: OK\nPayload SHA256 digest: OK\n', None) + + validate_rpm = ValidateRpm(mock_validation_args.return_value, mock_temporary_directory.return_value) + mock_temporary_directory.return_value.path = "/tmp/trytytyuit/" + validate_rpm.filename = 'example.rpm' + + with self.assertRaises(AssertionError) as context: + validate_rpm.validate_signature() + self.assertIsInstance(context.exception, AssertionError) + + mock_execute.assert_called_once_with( + 'rpm -K -v /tmp/trytytyuit/example.rpm', '.' + ) + + @patch('validation_workflow.rpm.validation_rpm.execute') + @patch('validation_workflow.rpm.validation_rpm.logging.info') + @patch('validation_workflow.rpm.validation_rpm.ValidationArgs') + @patch('system.temporary_directory.TemporaryDirectory') + def test_validate_signature_except(self, mock_temporary_directory: Mock, mock_validation_args: Mock, mock_logging_info: Mock, mock_execute: Mock) -> None: + mock_execute.return_value = (None, + 'Header V4 RSA/SHA512 Signature, key ID 9310d3fc: OK\nHeader SHA256 digest: OK\n' + 'Header SHA1 digest: OK\nPayload SHA256 digest: OK\nV4 RSA/SHA512 Signature, key ID 9310d3fc: OK\n', + None) + + validate_rpm = ValidateRpm(mock_validation_args.return_value, mock_temporary_directory.return_value) + mock_temporary_directory.return_value.path = "/tmp/trytytyuit/" + validate_rpm.filename = 'example.rpm' + + with self.assertRaises(Exception) as context: + validate_rpm.validate_signature() + mock_logging_info.assert_any_call('Key digest "Header SHA256 digest" is validated to be present.') + mock_logging_info.assert_any_call('Key digest "Payload SHA256 digest" is validated to be present.') + mock_logging_info.assert_any_call('Validation of all key digests starts: ') + self.assertEqual(str(context.exception), 'Key digest "MD5 digest" is not found') + + mock_execute.assert_called_once_with( + 'rpm -K -v /tmp/trytytyuit/example.rpm', '.' + )