Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add meta and signature validation to RPM dist in Validation workflow #5006

Merged
merged 4 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions src/validation_workflow/rpm/validation_rpm.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ def installation(self) -> bool:
execute('sudo rpm --import https://artifacts.opensearch.org/publickeys/opensearch.pgp', str(self.tmp_dir.path), True, False)
for project in self.args.projects:
self.filename = os.path.basename(self.args.file_path.get(project))
self.validate_metadata(project)
self.validate_signature()
execute(f'sudo yum remove {project} -y', ".")
execute(f'sudo env OPENSEARCH_INITIAL_ADMIN_PASSWORD={get_password(str(self.args.version))} rpm -ivh {os.path.join(self.tmp_dir.path, self.filename)}', str(self.tmp_dir.path), True, False) # noqa: 501
except:
Expand Down Expand Up @@ -68,3 +70,62 @@ def cleanup(self) -> bool:
except Exception as e:
raise Exception(f'Exception occurred either while attempting to stop cluster or removing OpenSearch/OpenSearch-Dashboards. {str(e)}')
return True

def validate_metadata(self, product_type: str) -> None:
(_, stdout, _) = execute(f'rpm -qip {os.path.join(self.tmp_dir.path, self.filename)}', ".")
logging.info("Meta data for the RPM distribution is: \n" + stdout)
ref_map = {}
ref_map['Name'] = product_type
ref_map['Version'] = self.args.version
ref_map['Architecture'] = self.args.arch
ref_map['Group'] = "Application/Internet"
ref_map['License'] = "Apache-2.0"
ref_map['Relocations'] = "(not relocatable)"
ref_map['URL'] = "https://opensearch.org/"
# The context the meta data should be based on type OpenSearch or OpenSearchDashBoards
if product_type == "opensearch":
ref_map['Summary'] = "An open source distributed and RESTful search engine"
ref_map['Description'] = "OpenSearch makes it easy to ingest, search, visualize, and analyze your data\nFor more information, see: https://opensearch.org/"
else:
ref_map['Summary'] = "Open source visualization dashboards for OpenSearch"
ref_map['Description'] = "OpenSearch Dashboards is the visualization tool for data in OpenSearch\nFor more information, see: https://opensearch.org/"

meta_map = {}
for line in stdout.split('\n'):
key = line.split(':')[0].strip()
if key != 'Description':
meta_map[key] = line.split(':', 2)[1].strip()
else:
description_index = stdout.find(line)
meta_map[key] = stdout[description_index + len(line):].strip()
break

for key, value in ref_map.items():
if key == "Architecture":
if value == 'x64':
assert meta_map.get(key) == 'x86_64'
elif value == 'arm64':
assert meta_map.get(key) == 'aarch64'
else:
if meta_map.get(key) == value:
logging.info(value)
logging.info(f"Meta data for {key} is validated")
peterzhuamazon marked this conversation as resolved.
Show resolved Hide resolved

logging.info(f"Validation for {product_type} meta data of RPM distribution completed.")

def validate_signature(self) -> None:
(_, stdout, _) = execute(f'rpm -K -v {os.path.join(self.tmp_dir.path, self.filename)}', ".")
logging.info(stdout)
key_list = ["Header V4 RSA/SHA512 Signature, key ID 9310d3fc", "Header SHA256 digest", "Header SHA1 digest", "Payload SHA256 digest", "V4 RSA/SHA512 Signature, key ID 9310d3fc", "MD5 digest"]
present_key = []
for line in stdout.rstrip('\n').split('\n'):
key = line.split(':')[0].strip()
if key != os.path.join(self.tmp_dir.path, self.filename):
peterzhuamazon marked this conversation as resolved.
Show resolved Hide resolved
if "OK" == line.split(':')[1].strip():
logging.info(f"{key} is validated as: {line}")
present_key.append(key)
peterzhuamazon marked this conversation as resolved.
Show resolved Hide resolved
logging.info("Validation of all key digests starts: ")
for digest in key_list:
if digest in present_key:
logging.info(f'Key digest "{digest}" is validated to be present.')
logging.info("Validation for signature of RPM distribution completed.")
peterzhuamazon marked this conversation as resolved.
Show resolved Hide resolved
52 changes: 51 additions & 1 deletion tests/tests_validation_workflow/test_validation_rpm.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ def test_exceptions(self, mock_temporary_directory: Mock, mock_validation_args:
@patch('validation_workflow.rpm.validation_rpm.ValidationArgs')
@patch('system.temporary_directory.TemporaryDirectory')
@patch("validation_workflow.rpm.validation_rpm.execute")
def test_installation(self, mock_temporary_directory: Mock, mock_system: Mock, mock_validation_args: Mock) -> None:
@patch("validation_workflow.rpm.validation_rpm.ValidateRpm.validate_metadata")
@patch("validation_workflow.rpm.validation_rpm.ValidateRpm.validate_signature")
def test_installation(self, mock_validate_signature: Mock, mock_validate_metadata: Mock, mock_temporary_directory: Mock, mock_system: Mock, mock_validation_args: Mock) -> None:
mock_validation_args.return_value.version = '2.3.0'
mock_validation_args.return_value.arch = 'x64'
mock_validation_args.return_value.platform = 'linux'
Expand Down Expand Up @@ -207,3 +209,51 @@ def test_cleanup(self, mock_temporary_directory: Mock, mock_validation_args: Moc

result = validate_rpm.cleanup()
self.assertTrue(result)

peterzhuamazon marked this conversation as resolved.
Show resolved Hide resolved
@patch('validation_workflow.rpm.validation_rpm.execute')
@patch('validation_workflow.rpm.validation_rpm.logging.info')
@patch('validation_workflow.rpm.validation_rpm.ValidationArgs')
@patch('system.temporary_directory.TemporaryDirectory')
def test_validate_metadata(self, mock_temporary_directory: Mock, mock_validation_args: Mock, mock_logging_info: Mock, mock_execute: Mock) -> None:
mock_execute.return_value = (None, 'Name: opensearch\nVersion: 1.2.3\nArchitecture: x86_64\nDescription: This is a test application\n'
' "OpenSearch makes it easy to ingest, search, visualize, and analyze your data\nFor more information,'
' see: https://opensearch.org/', None)

validate_rpm = ValidateRpm(mock_validation_args.return_value, mock_temporary_directory.return_value)
mock_temporary_directory.return_value.path = "/tmp/trytytyuit/"
validate_rpm.filename = 'example.rpm'
validate_rpm.args.version = '1.3.0'
validate_rpm.args.arch = "x64"

validate_rpm.validate_metadata('opensearch')

mock_logging_info.assert_any_call('Meta data for Name is validated')
mock_logging_info.assert_any_call('Meta data for Version is validated')
mock_logging_info.assert_any_call('Meta data for Architecture is validated')
mock_logging_info.assert_any_call('Meta data for Description is validated')
mock_logging_info.assert_any_call('Validation for opensearch meta data of RPM distribution completed.')

mock_execute.assert_called_once_with(
'rpm -qip /tmp/trytytyuit/example.rpm', '.'
)

@patch('validation_workflow.rpm.validation_rpm.execute')
@patch('validation_workflow.rpm.validation_rpm.logging.info')
@patch('validation_workflow.rpm.validation_rpm.ValidationArgs')
@patch('system.temporary_directory.TemporaryDirectory')
def test_validate_signature(self, mock_temporary_directory: Mock, mock_validation_args: Mock, mock_logging_info: Mock, mock_execute: Mock) -> None:
mock_execute.return_value = (None, 'Header SHA256 digest: OK\nPayload SHA256 digest: OK\n', None)

validate_rpm = ValidateRpm(mock_validation_args.return_value, mock_temporary_directory.return_value)
mock_temporary_directory.return_value.path = "/tmp/trytytyuit/"
validate_rpm.filename = 'example.rpm'

validate_rpm.validate_signature()

mock_logging_info.assert_any_call('Key digest "Header SHA256 digest" is validated to be present.')
mock_logging_info.assert_any_call('Key digest "Payload SHA256 digest" is validated to be present.')
mock_logging_info.assert_any_call('Validation of all key digests starts: ')
mock_logging_info.assert_any_call('Validation for signature of RPM distribution completed.')
mock_execute.assert_called_once_with(
'rpm -K -v /tmp/trytytyuit/example.rpm', '.'
)
Loading