Skip to content

Commit

Permalink
Add meta and signature validation to rpm
Browse files Browse the repository at this point in the history
Signed-off-by: Divya Madala <[email protected]>
  • Loading branch information
Divyaasm committed Sep 9, 2024
1 parent da81d93 commit 1f8701e
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 1 deletion.
61 changes: 61 additions & 0 deletions src/validation_workflow/rpm/validation_rpm.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ def installation(self) -> bool:
execute('sudo rpm --import https://artifacts.opensearch.org/publickeys/opensearch.pgp', str(self.tmp_dir.path), True, False)
for project in self.args.projects:
self.filename = os.path.basename(self.args.file_path.get(project))
self.validate_metadata(project)
self.validate_signature()
execute(f'sudo yum remove {project} -y', ".")
execute(f'sudo env OPENSEARCH_INITIAL_ADMIN_PASSWORD={get_password(str(self.args.version))} rpm -ivh {os.path.join(self.tmp_dir.path, self.filename)}', str(self.tmp_dir.path), True, False) # noqa: 501
except:
Expand Down Expand Up @@ -68,3 +70,62 @@ def cleanup(self) -> bool:
except Exception as e:
raise Exception(f'Exception occurred either while attempting to stop cluster or removing OpenSearch/OpenSearch-Dashboards. {str(e)}')
return True

def validate_metadata(self, product_type: str) -> None:
(_, stdout, _) = execute(f'rpm -qip {os.path.join(self.tmp_dir.path, self.filename)}', ".")
logging.info("Meta data for the RPM distribution is: \n" + stdout)
ref_map = {}
ref_map['Name'] = product_type
ref_map['Version'] = self.args.version
ref_map['Architecture'] = self.args.arch
ref_map['Group'] = "Application/Internet"
ref_map['License'] = "Apache-2.0"
ref_map['Relocations'] = "(not relocatable)"
ref_map['URL'] = "https://opensearch.org/"
# The context the meta data should be based on type OpenSearch or OpenSearchDashBoards
if product_type == "opensearch":
ref_map['Summary'] = "An open source distributed and RESTful search engine"
ref_map['Description'] = "OpenSearch makes it easy to ingest, search, visualize, and analyze your data\nFor more information, see: https://opensearch.org/"
else:
ref_map['Summary'] = "Open source visualization dashboards for OpenSearch"
ref_map['Description'] = "OpenSearch Dashboards is the visualization tool for data in OpenSearch\nFor more information, see: https://opensearch.org/"

meta_map = {}
for line in stdout.split('\n'):
key = line.split(':')[0].strip()
if key != 'Description':
meta_map[key] = line.split(':', 2)[1].strip()
else:
description_index = stdout.find(line)
meta_map[key] = stdout[description_index + len(line):].strip()
break

for key, value in ref_map.items():
if key == "Architecture":
if value == 'x64':
assert meta_map.get(key) == 'x86_64'
elif value == 'arm64':
assert meta_map.get(key) == 'aarch64'
else:
if meta_map.get(key) == value:
logging.info(value)
logging.info(f"Meta data for {key} is validated")

logging.info(f"Validation for {product_type} meta data of RPM distribution completed.")

def validate_signature(self) -> None:
(_, stdout, _) = execute(f'rpm -K -v {os.path.join(self.tmp_dir.path, self.filename)}', ".")
logging.info(stdout)
key_list = ["Header V4 RSA/SHA512 Signature, key ID 9310d3fc", "Header SHA256 digest", "Header SHA1 digest", "Payload SHA256 digest", "V4 RSA/SHA512 Signature, key ID 9310d3fc", "MD5 digest"]
present_key = []
for line in stdout.rstrip('\n').split('\n'):
key = line.split(':')[0].strip()
if key != os.path.join(self.tmp_dir.path, self.filename):
if "OK" == line.split(':')[1].strip():
logging.info(f"{key} is validated as: {line}")
present_key.append(key)
logging.info("Validation of all key digests starts: ")
for digest in key_list:
if digest in present_key:
logging.info(f'Key digest "{digest}" is validated to be present.')
logging.info("Validation for signature of RPM distribution completed.")
52 changes: 51 additions & 1 deletion tests/tests_validation_workflow/test_validation_rpm.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ def test_exceptions(self, mock_temporary_directory: Mock, mock_validation_args:
@patch('validation_workflow.rpm.validation_rpm.ValidationArgs')
@patch('system.temporary_directory.TemporaryDirectory')
@patch("validation_workflow.rpm.validation_rpm.execute")
def test_installation(self, mock_temporary_directory: Mock, mock_system: Mock, mock_validation_args: Mock) -> None:
@patch("validation_workflow.rpm.validation_rpm.ValidateRpm.validate_metadata")
@patch("validation_workflow.rpm.validation_rpm.ValidateRpm.validate_signature")
def test_installation(self, mock_validate_signature: Mock, mock_validate_metadata: Mock, mock_temporary_directory: Mock, mock_system: Mock, mock_validation_args: Mock) -> None:
mock_validation_args.return_value.version = '2.3.0'
mock_validation_args.return_value.arch = 'x64'
mock_validation_args.return_value.platform = 'linux'
Expand Down Expand Up @@ -207,3 +209,51 @@ def test_cleanup(self, mock_temporary_directory: Mock, mock_validation_args: Moc

result = validate_rpm.cleanup()
self.assertTrue(result)

@patch('validation_workflow.rpm.validation_rpm.execute')
@patch('validation_workflow.rpm.validation_rpm.logging.info')
@patch('validation_workflow.rpm.validation_rpm.ValidationArgs')
@patch('system.temporary_directory.TemporaryDirectory')
def test_validate_metadata(self, mock_temporary_directory: Mock, mock_validation_args: Mock, mock_logging_info: Mock, mock_execute: Mock) -> None:
mock_execute.return_value = (None, 'Name: opensearch\nVersion: 1.2.3\nArchitecture: x86_64\nDescription: This is a test application\n'
' "OpenSearch makes it easy to ingest, search, visualize, and analyze your data\nFor more information,'
' see: https://opensearch.org/', None)

validate_rpm = ValidateRpm(mock_validation_args.return_value, mock_temporary_directory.return_value)
mock_temporary_directory.return_value.path = "/tmp/trytytyuit/"
validate_rpm.filename = 'example.rpm'
validate_rpm.args.version = '1.3.0'
validate_rpm.args.arch = "x64"

validate_rpm.validate_metadata('opensearch')

mock_logging_info.assert_any_call('Meta data for Name is validated')
mock_logging_info.assert_any_call('Meta data for Version is validated')
mock_logging_info.assert_any_call('Meta data for Architecture is validated')
mock_logging_info.assert_any_call('Meta data for Description is validated')
mock_logging_info.assert_any_call('Validation for opensearch meta data of RPM distribution completed.')

mock_execute.assert_called_once_with(
'rpm -qip /tmp/trytytyuit/example.rpm', '.'
)

@patch('validation_workflow.rpm.validation_rpm.execute')
@patch('validation_workflow.rpm.validation_rpm.logging.info')
@patch('validation_workflow.rpm.validation_rpm.ValidationArgs')
@patch('system.temporary_directory.TemporaryDirectory')
def test_validate_signature(self, mock_temporary_directory: Mock, mock_validation_args: Mock, mock_logging_info: Mock, mock_execute: Mock) -> None:
mock_execute.return_value = (None, 'Header SHA256 digest: OK\nPayload SHA256 digest: OK\n', None)

validate_rpm = ValidateRpm(mock_validation_args.return_value, mock_temporary_directory.return_value)
mock_temporary_directory.return_value.path = "/tmp/trytytyuit/"
validate_rpm.filename = 'example.rpm'

validate_rpm.validate_signature()

mock_logging_info.assert_any_call('Key digest "Header SHA256 digest" is validated to be present.')
mock_logging_info.assert_any_call('Key digest "Payload SHA256 digest" is validated to be present.')
mock_logging_info.assert_any_call('Validation of all key digests starts: ')
mock_logging_info.assert_any_call('Validation for signature of RPM distribution completed.')
mock_execute.assert_called_once_with(
'rpm -K -v /tmp/trytytyuit/example.rpm', '.'
)

0 comments on commit 1f8701e

Please sign in to comment.