diff --git a/src/validation_workflow/rpm/validation_rpm.py b/src/validation_workflow/rpm/validation_rpm.py index 22ccc1e156..b29ede4690 100644 --- a/src/validation_workflow/rpm/validation_rpm.py +++ b/src/validation_workflow/rpm/validation_rpm.py @@ -105,7 +105,7 @@ def validate_metadata(self, product_type: str) -> None: assert meta_map.get(key) == 'aarch64' else: assert meta_map.get(key) == value - logging.info(f"Meta data for {key} is validated") + logging.info(f"Meta data for {key} -> {value} is validated") logging.info(f"Validation for {product_type} meta data of RPM distribution completed.") def validate_signature(self) -> None: diff --git a/tests/tests_validation_workflow/test_validation_rpm.py b/tests/tests_validation_workflow/test_validation_rpm.py index f22e089178..10a9151b1b 100644 --- a/tests/tests_validation_workflow/test_validation_rpm.py +++ b/tests/tests_validation_workflow/test_validation_rpm.py @@ -210,27 +210,60 @@ def test_cleanup(self, mock_temporary_directory: Mock, mock_validation_args: Moc result = validate_rpm.cleanup() self.assertTrue(result) + @patch('validation_workflow.rpm.validation_rpm.execute') + @patch('validation_workflow.rpm.validation_rpm.logging.info') + def test_validate_metadata_opensearch(self, mock_logging_info: Mock, mock_execute: Mock) -> None: + mock_execute.return_value = (None, + 'Name : opensearch\n' + 'Version : 1.2.0\n' + 'Architecture : x86_64\n' + 'Group : Application/Internet\n' + 'License : Apache-2.0\n' + 'Relocations : (not relocatable)\n' + 'URL : https://opensearch.org/\n' + 'Summary : An open source distributed and RESTful search engine\n' + 'Description:\nOpenSearch makes it easy to ingest, search, visualize, and analyze your data\n' + 'For more information, see: https://opensearch.org/', + None) + + mock_validation_args = Mock() + mock_validation_args.version = '1.2.0' + mock_validation_args.arch = 'x64' + + mock_temporary_directory = Mock() + mock_temporary_directory.path = "/tmp/fakepath" + + validate_rpm = ValidateRpm(mock_validation_args, mock_temporary_directory) + validate_rpm.filename = 'opensearch.rpm' + + validate_rpm.validate_metadata('opensearch') + + mock_logging_info.assert_any_call("Meta data for Name -> opensearch is validated") + mock_logging_info.assert_any_call("Meta data for Version -> 1.2.0 is validated") + mock_logging_info.assert_any_call( + "Meta data for Description -> OpenSearch makes it easy to ingest, search, visualize, and analyze your data\nFor more information, see: https://opensearch.org/ is validated") + mock_logging_info.assert_any_call("Validation for opensearch meta data of RPM distribution completed.") + mock_execute.assert_called_once_with('rpm -qip /tmp/fakepath/opensearch.rpm', '.') + + @patch('validation_workflow.rpm.validation_rpm.execute') @patch('validation_workflow.rpm.validation_rpm.logging.info') @patch('validation_workflow.rpm.validation_rpm.ValidationArgs') @patch('system.temporary_directory.TemporaryDirectory') - def test_validate_metadata(self, mock_temporary_directory: Mock, mock_validation_args: Mock, mock_logging_info: Mock, mock_execute: Mock) -> None: - mock_execute.return_value = (None, 'Name: opensearch\nVersion: 1.2.3\nArchitecture: x86_64\nDescription: This is a test application\n' - ' "OpenSearch makes it easy to ingest, search, visualize, and analyze your data\nFor more information, see: https://opensearch.org/', None) + def test_validate_metadata_exception(self, mock_temporary_directory: Mock, mock_validation_args: Mock, + mock_logging_info: Mock, mock_execute: Mock) -> None: + mock_execute.return_value = ( + None, 'Name: opensearch\nVersion: 1.2.3\nArchitecture: x86_64\nURL: https://opensearch.org/Summary: ' + 'An open source distributed and RESTful search engine\nDescription: This is a test application\n' + ' "OpenSearch makes it easy to ingest, search, visualize, and analyze your data\nFor more information, see: https://opensearch.org/',None) validate_rpm = ValidateRpm(mock_validation_args.return_value, mock_temporary_directory.return_value) mock_temporary_directory.return_value.path = "/tmp/trytytyuit/" validate_rpm.filename = 'example.rpm' validate_rpm.args.version = '1.3.0' validate_rpm.args.arch = "x64" - - validate_rpm.validate_metadata('opensearch') - - mock_logging_info.assert_any_call('Meta data for Name is validated') - mock_logging_info.assert_any_call('Meta data for Version is validated') - mock_logging_info.assert_any_call('Meta data for Architecture is validated') - mock_logging_info.assert_any_call('Meta data for Description is validated') - mock_logging_info.assert_any_call('Validation for opensearch meta data of RPM distribution completed.') + with self.assertRaises(Exception) as context: + validate_rpm.validate_metadata('opensearch') mock_execute.assert_called_once_with( 'rpm -qip /tmp/trytytyuit/example.rpm', '.' @@ -241,7 +274,8 @@ def test_validate_metadata(self, mock_temporary_directory: Mock, mock_validation @patch('validation_workflow.rpm.validation_rpm.ValidationArgs') @patch('system.temporary_directory.TemporaryDirectory') def test_validate_signature(self, mock_temporary_directory: Mock, mock_validation_args: Mock, mock_logging_info: Mock, mock_execute: Mock) -> None: - mock_execute.return_value = (None, 'Header SHA256 digest: OK\nPayload SHA256 digest: OK\n', None) + mock_execute.return_value = (None, 'Header V4 RSA/SHA512 Signature, key ID 9310d3fc: OK\nHeader SHA1 digest: OK\nV4 RSA/SHA512 Signature, key ID 9310d3fc: OK\n' + 'MD5 digest: OK\nHeader SHA256 digest: OK\nPayload SHA256 digest: OK\n', None) validate_rpm = ValidateRpm(mock_validation_args.return_value, mock_temporary_directory.return_value) mock_temporary_directory.return_value.path = "/tmp/trytytyuit/" @@ -256,3 +290,52 @@ def test_validate_signature(self, mock_temporary_directory: Mock, mock_validatio mock_execute.assert_called_once_with( 'rpm -K -v /tmp/trytytyuit/example.rpm', '.' ) + + @patch('validation_workflow.rpm.validation_rpm.execute') + @patch('validation_workflow.rpm.validation_rpm.logging.info') + @patch('validation_workflow.rpm.validation_rpm.ValidationArgs') + @patch('system.temporary_directory.TemporaryDirectory') + def test_validate_signature_exception(self, mock_temporary_directory: Mock, mock_validation_args: Mock, mock_logging_info: Mock, mock_execute: Mock) -> None: + mock_execute.return_value = (None, 'Header V4 RSA/SHA512 Signature, key ID 9310d3fc: OK\nHeader SHA1 digest: OK\nV4 RSA/SHA512 Signature, key ID 9310d3fc: OK\n' + 'MD5 digest: not OK\nHeader SHA256 digest: OK\nPayload SHA256 digest: OK\n', None) + + validate_rpm = ValidateRpm(mock_validation_args.return_value, mock_temporary_directory.return_value) + mock_temporary_directory.return_value.path = "/tmp/trytytyuit/" + validate_rpm.filename = 'example.rpm' + + with self.assertRaises(Exception) as context: + validate_rpm.validate_signature() + + mock_execute.assert_called_once_with( + 'rpm -K -v /tmp/trytytyuit/example.rpm', '.' + ) + + @patch('validation_workflow.rpm.validation_rpm.execute') + @patch('validation_workflow.rpm.validation_rpm.logging.info') + @patch('validation_workflow.rpm.validation_rpm.ValidationArgs') + @patch('system.temporary_directory.TemporaryDirectory') + def test_validate_signature_except(self, mock_temporary_directory: Mock, mock_validation_args: Mock, mock_logging_info: Mock, mock_execute: Mock) -> None: + mock_execute.return_value = (None, + 'Header V4 RSA/SHA512 Signature, key ID 9310d3fc: OK\nHeader SHA256 digest: OK\n' + 'Header SHA1 digest: OK\nPayload SHA256 digest: OK\nV4 RSA/SHA512 Signature, key ID 9310d3fc: OK\n', + None) + + validate_rpm = ValidateRpm(mock_validation_args.return_value, mock_temporary_directory.return_value) + mock_temporary_directory.return_value.path = "/tmp/trytytyuit/" + validate_rpm.filename = 'example.rpm' + + with self.assertRaises(Exception) as context: + validate_rpm.validate_signature() + mock_logging_info.assert_any_call('Key digest "Header SHA256 digest" is validated to be present.') + mock_logging_info.assert_any_call('Key digest "Payload SHA256 digest" is validated to be present.') + mock_logging_info.assert_any_call('Validation of all key digests starts: ') + self.assertEqual(str(context.exception), 'Key digest "MD5 digest" is not found') + + mock_execute.assert_called_once_with( + 'rpm -K -v /tmp/trytytyuit/example.rpm', '.' + ) + + + + +