diff --git a/src/validation_workflow/rpm/validation_rpm.py b/src/validation_workflow/rpm/validation_rpm.py index bf80a5615d..247d4041b6 100644 --- a/src/validation_workflow/rpm/validation_rpm.py +++ b/src/validation_workflow/rpm/validation_rpm.py @@ -27,6 +27,8 @@ def installation(self) -> bool: execute('sudo rpm --import https://artifacts.opensearch.org/publickeys/opensearch.pgp', str(self.tmp_dir.path), True, False) for project in self.args.projects: self.filename = os.path.basename(self.args.file_path.get(project)) + self.validate_metadata(project) + self.validate_signature() execute(f'sudo yum remove {project} -y', ".") execute(f'sudo env OPENSEARCH_INITIAL_ADMIN_PASSWORD={get_password(str(self.args.version))} rpm -ivh {os.path.join(self.tmp_dir.path, self.filename)}', str(self.tmp_dir.path), True, False) # noqa: 501 except: @@ -68,3 +70,62 @@ def cleanup(self) -> bool: except Exception as e: raise Exception(f'Exception occurred either while attempting to stop cluster or removing OpenSearch/OpenSearch-Dashboards. {str(e)}') return True + + def validate_metadata(self, product_type: str) -> None: + (_, stdout, _) = execute(f'rpm -qip {os.path.join(self.tmp_dir.path, self.filename)}', ".") + logging.info("Meta data for the RPM distribution is: \n" + stdout) + ref_map = {} + ref_map['Name'] = product_type + ref_map['Version'] = self.args.version + ref_map['Architecture'] = self.args.arch + ref_map['Group'] = "Application/Internet" + ref_map['License'] = "Apache-2.0" + ref_map['Relocations'] = "(not relocatable)" + ref_map['URL'] = "https://opensearch.org/" + # The context the meta data should be based on type OpenSearch or OpenSearchDashBoards + if product_type == "opensearch": + ref_map['Summary'] = "An open source distributed and RESTful search engine" + ref_map['Description'] = "OpenSearch makes it easy to ingest, search, visualize, and analyze your data\nFor more information, see: https://opensearch.org/" + else: + ref_map['Summary'] = "Open source visualization dashboards for OpenSearch" + ref_map['Description'] = "OpenSearch Dashboards is the visualization tool for data in OpenSearch\nFor more information, see: https://opensearch.org/" + + meta_map = {} + for line in stdout.split('\n'): + key = line.split(':')[0].strip() + if key != 'Description': + meta_map[key] = line.split(':', 2)[1].strip() + else: + description_index = stdout.find(line) + meta_map[key] = stdout[description_index + len(line):].strip() + break + + for key, value in ref_map.items(): + if key == "Architecture": + if value == 'x64': + assert meta_map.get(key) == 'x86_64' + elif value == 'arm64': + assert meta_map.get(key) == 'aarch64' + else: + if meta_map.get(key) == value: + logging.info(value) + logging.info(f"Meta data for {key} is validated") + + logging.info(f"Validation for {product_type} meta data of RPM distribution completed.") + + def validate_signature(self) -> None: + (_, stdout, _) = execute(f'rpm -K -v {os.path.join(self.tmp_dir.path, self.filename)}', ".") + logging.info(stdout) + key_list = ["Header V4 RSA/SHA512 Signature, key ID 9310d3fc", "Header SHA256 digest", "Header SHA1 digest", "Payload SHA256 digest", "V4 RSA/SHA512 Signature, key ID 9310d3fc", "MD5 digest"] + present_key = [] + for line in stdout.rstrip('\n').split('\n'): + key = line.split(':')[0].strip() + if key != os.path.join(self.tmp_dir.path, self.filename): + if "OK" == line.split(':')[1].strip(): + logging.info(f"{key} is validated as: {line}") + present_key.append(key) + logging.info("Validation of all key digests starts: ") + for digest in key_list: + if digest in present_key: + logging.info(f'Key digest "{digest}" is validated to be present.') + logging.info("Validation for signature of RPM distribution completed.") diff --git a/tests/tests_validation_workflow/test_validation_rpm.py b/tests/tests_validation_workflow/test_validation_rpm.py index 8a65fb9546..5a742bbc99 100644 --- a/tests/tests_validation_workflow/test_validation_rpm.py +++ b/tests/tests_validation_workflow/test_validation_rpm.py @@ -93,7 +93,9 @@ def test_exceptions(self, mock_temporary_directory: Mock, mock_validation_args: @patch('validation_workflow.rpm.validation_rpm.ValidationArgs') @patch('system.temporary_directory.TemporaryDirectory') @patch("validation_workflow.rpm.validation_rpm.execute") - def test_installation(self, mock_temporary_directory: Mock, mock_system: Mock, mock_validation_args: Mock) -> None: + @patch("validation_workflow.rpm.validation_rpm.ValidateRpm.validate_metadata") + @patch("validation_workflow.rpm.validation_rpm.ValidateRpm.validate_signature") + def test_installation(self, mock_validate_signature: Mock, mock_validate_metadata: Mock, mock_temporary_directory: Mock, mock_system: Mock, mock_validation_args: Mock) -> None: mock_validation_args.return_value.version = '2.3.0' mock_validation_args.return_value.arch = 'x64' mock_validation_args.return_value.platform = 'linux' @@ -207,3 +209,51 @@ def test_cleanup(self, mock_temporary_directory: Mock, mock_validation_args: Moc result = validate_rpm.cleanup() self.assertTrue(result) + + @patch('validation_workflow.rpm.validation_rpm.execute') + @patch('validation_workflow.rpm.validation_rpm.logging.info') + @patch('validation_workflow.rpm.validation_rpm.ValidationArgs') + @patch('system.temporary_directory.TemporaryDirectory') + def test_validate_metadata(self, mock_temporary_directory: Mock, mock_validation_args: Mock, mock_logging_info: Mock, mock_execute: Mock) -> None: + mock_execute.return_value = (None, 'Name: opensearch\nVersion: 1.2.3\nArchitecture: x86_64\nDescription: This is a test application\n' + ' "OpenSearch makes it easy to ingest, search, visualize, and analyze your data\nFor more information,' + ' see: https://opensearch.org/', None) + + validate_rpm = ValidateRpm(mock_validation_args.return_value, mock_temporary_directory.return_value) + mock_temporary_directory.return_value.path = "/tmp/trytytyuit/" + validate_rpm.filename = 'example.rpm' + validate_rpm.args.version = '1.3.0' + validate_rpm.args.arch = "x64" + + validate_rpm.validate_metadata('opensearch') + + mock_logging_info.assert_any_call('Meta data for Name is validated') + mock_logging_info.assert_any_call('Meta data for Version is validated') + mock_logging_info.assert_any_call('Meta data for Architecture is validated') + mock_logging_info.assert_any_call('Meta data for Description is validated') + mock_logging_info.assert_any_call('Validation for opensearch meta data of RPM distribution completed.') + + mock_execute.assert_called_once_with( + 'rpm -qip /tmp/trytytyuit/example.rpm', '.' + ) + + @patch('validation_workflow.rpm.validation_rpm.execute') + @patch('validation_workflow.rpm.validation_rpm.logging.info') + @patch('validation_workflow.rpm.validation_rpm.ValidationArgs') + @patch('system.temporary_directory.TemporaryDirectory') + def test_validate_signature(self, mock_temporary_directory: Mock, mock_validation_args: Mock, mock_logging_info: Mock, mock_execute: Mock) -> None: + mock_execute.return_value = (None, 'Header SHA256 digest: OK\nPayload SHA256 digest: OK\n', None) + + validate_rpm = ValidateRpm(mock_validation_args.return_value, mock_temporary_directory.return_value) + mock_temporary_directory.return_value.path = "/tmp/trytytyuit/" + validate_rpm.filename = 'example.rpm' + + validate_rpm.validate_signature() + + mock_logging_info.assert_any_call('Key digest "Header SHA256 digest" is validated to be present.') + mock_logging_info.assert_any_call('Key digest "Payload SHA256 digest" is validated to be present.') + mock_logging_info.assert_any_call('Validation of all key digests starts: ') + mock_logging_info.assert_any_call('Validation for signature of RPM distribution completed.') + mock_execute.assert_called_once_with( + 'rpm -K -v /tmp/trytytyuit/example.rpm', '.' + )