Skip to content

Commit

Permalink
Add exceptional conditions
Browse files Browse the repository at this point in the history
Signed-off-by: Divya Madala <[email protected]>
  • Loading branch information
Divyaasm committed Sep 12, 2024
1 parent 1f8701e commit 59c8575
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 27 deletions.
32 changes: 15 additions & 17 deletions src/validation_workflow/rpm/validation_rpm.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,7 @@ def start_cluster(self) -> bool:
try:
for project in self.args.projects:
execute(f'sudo systemctl start {project}', ".")
(stdout, stderr, status) = execute(f'sudo systemctl status {project}', ".")
if(status == 0):
logging.info(stdout)
else:
logging.info(stderr)

(status, _, _) = execute(f'sudo systemctl status {project}', ".")
except:
raise Exception('Failed to Start Cluster')
return True
Expand Down Expand Up @@ -85,16 +80,18 @@ def validate_metadata(self, product_type: str) -> None:
# The context the meta data should be based on type OpenSearch or OpenSearchDashBoards
if product_type == "opensearch":
ref_map['Summary'] = "An open source distributed and RESTful search engine"
ref_map['Description'] = "OpenSearch makes it easy to ingest, search, visualize, and analyze your data\nFor more information, see: https://opensearch.org/"
ref_map[
'Description'] = "OpenSearch makes it easy to ingest, search, visualize, and analyze your data\nFor more information, see: https://opensearch.org/"
else:
ref_map['Summary'] = "Open source visualization dashboards for OpenSearch"
ref_map['Description'] = "OpenSearch Dashboards is the visualization tool for data in OpenSearch\nFor more information, see: https://opensearch.org/"
ref_map[
'Description'] = "OpenSearch Dashboards is the visualization tool for data in OpenSearch\nFor more information, see: https://opensearch.org/"

meta_map = {}
for line in stdout.split('\n'):
key = line.split(':')[0].strip()
if key != 'Description':
meta_map[key] = line.split(':', 2)[1].strip()
meta_map[key] = line.split(':', 1)[1].strip()
else:
description_index = stdout.find(line)
meta_map[key] = stdout[description_index + len(line):].strip()
Expand All @@ -107,25 +104,26 @@ def validate_metadata(self, product_type: str) -> None:
elif value == 'arm64':
assert meta_map.get(key) == 'aarch64'
else:
if meta_map.get(key) == value:
logging.info(value)
logging.info(f"Meta data for {key} is validated")

assert meta_map.get(key) == value
logging.info(f"Meta data for {key} -> {value} is validated")
logging.info(f"Validation for {product_type} meta data of RPM distribution completed.")

def validate_signature(self) -> None:
(_, stdout, _) = execute(f'rpm -K -v {os.path.join(self.tmp_dir.path, self.filename)}', ".")
logging.info(stdout)
key_list = ["Header V4 RSA/SHA512 Signature, key ID 9310d3fc", "Header SHA256 digest", "Header SHA1 digest", "Payload SHA256 digest", "V4 RSA/SHA512 Signature, key ID 9310d3fc", "MD5 digest"]
key_list = ["Header V4 RSA/SHA512 Signature, key ID 9310d3fc", "Header SHA256 digest", "Header SHA1 digest",
"Payload SHA256 digest", "V4 RSA/SHA512 Signature, key ID 9310d3fc", "MD5 digest"]
present_key = []
for line in stdout.rstrip('\n').split('\n'):
key = line.split(':')[0].strip()
if key != os.path.join(self.tmp_dir.path, self.filename):
if "OK" == line.split(':')[1].strip():
logging.info(f"{key} is validated as: {line}")
present_key.append(key)
assert "OK" == line.split(':')[1].strip()
logging.info(f"{key} is validated as: {line}")
present_key.append(key)
logging.info("Validation of all key digests starts: ")
for digest in key_list:
if digest in present_key:
logging.info(f'Key digest "{digest}" is validated to be present.')
else:
raise ValueError(f'Key digest "{digest}" is not found')
logging.info("Validation for signature of RPM distribution completed.")
99 changes: 89 additions & 10 deletions tests/tests_validation_workflow/test_validation_rpm.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,20 @@ def test_cleanup(self, mock_temporary_directory: Mock, mock_validation_args: Moc
@patch('validation_workflow.rpm.validation_rpm.logging.info')
@patch('validation_workflow.rpm.validation_rpm.ValidationArgs')
@patch('system.temporary_directory.TemporaryDirectory')
def test_validate_metadata(self, mock_temporary_directory: Mock, mock_validation_args: Mock, mock_logging_info: Mock, mock_execute: Mock) -> None:
mock_execute.return_value = (None, 'Name: opensearch\nVersion: 1.2.3\nArchitecture: x86_64\nDescription: This is a test application\n'
' "OpenSearch makes it easy to ingest, search, visualize, and analyze your data\nFor more information,'
' see: https://opensearch.org/', None)
def test_validate_metadata(self, mock_temporary_directory: Mock, mock_validation_args: Mock,
mock_logging_info: Mock, mock_execute: Mock) -> None:
mock_execute.return_value = (None,
'Name : opensearch\n'
'Version : 1.3.0\n'
'Architecture : x86_64\n'
'Group : Application/Internet\n'
'License : Apache-2.0\n'
'Relocations : (not relocatable)\n'
'URL : https://opensearch.org/\n'
'Summary : An open source distributed and RESTful search engine\n'
'Description:\nOpenSearch makes it easy to ingest, search, visualize, and analyze your data\n'
'For more information, see: https://opensearch.org/',
None)

validate_rpm = ValidateRpm(mock_validation_args.return_value, mock_temporary_directory.return_value)
mock_temporary_directory.return_value.path = "/tmp/trytytyuit/"
Expand All @@ -227,11 +237,34 @@ def test_validate_metadata(self, mock_temporary_directory: Mock, mock_validation

validate_rpm.validate_metadata('opensearch')

mock_logging_info.assert_any_call('Meta data for Name is validated')
mock_logging_info.assert_any_call('Meta data for Version is validated')
mock_logging_info.assert_any_call('Meta data for Architecture is validated')
mock_logging_info.assert_any_call('Meta data for Description is validated')
mock_logging_info.assert_any_call('Validation for opensearch meta data of RPM distribution completed.')
mock_logging_info.assert_any_call("Meta data for Name -> opensearch is validated")
mock_logging_info.assert_any_call("Meta data for Version -> 1.3.0 is validated")
mock_logging_info.assert_any_call(
"Meta data for Description -> OpenSearch makes it easy to ingest, search, visualize, and analyze your data\nFor more information, see: https://opensearch.org/ is validated")
mock_logging_info.assert_any_call("Validation for opensearch meta data of RPM distribution completed.")

mock_execute.assert_called_once_with(
'rpm -qip /tmp/trytytyuit/example.rpm', '.'
)

@patch('validation_workflow.rpm.validation_rpm.execute')
@patch('validation_workflow.rpm.validation_rpm.logging.info')
@patch('validation_workflow.rpm.validation_rpm.ValidationArgs')
@patch('system.temporary_directory.TemporaryDirectory')
def test_validate_metadata_exception(self, mock_temporary_directory: Mock, mock_validation_args: Mock,
mock_logging_info: Mock, mock_execute: Mock) -> None:
mock_execute.return_value = (None, 'Name: opensearch\nVersion: 1.2.3\nArchitecture: x86_64\nURL: https://opensearch.org/Summary: '
'An open source distributed and RESTful search engine\nDescription: This is a test application\n'
' "OpenSearch makes it easy to ingest, search, visualize, and analyze your data\nFor more information, see: https://opensearch.org/', None)

validate_rpm = ValidateRpm(mock_validation_args.return_value, mock_temporary_directory.return_value)
mock_temporary_directory.return_value.path = "/tmp/trytytyuit/"
validate_rpm.filename = 'example.rpm'
validate_rpm.args.version = '1.3.0'
validate_rpm.args.arch = "x64"
with self.assertRaises(AssertionError) as context:
validate_rpm.validate_metadata('opensearch')
self.assertIsInstance(context.exception, AssertionError)

mock_execute.assert_called_once_with(
'rpm -qip /tmp/trytytyuit/example.rpm', '.'
Expand All @@ -242,7 +275,8 @@ def test_validate_metadata(self, mock_temporary_directory: Mock, mock_validation
@patch('validation_workflow.rpm.validation_rpm.ValidationArgs')
@patch('system.temporary_directory.TemporaryDirectory')
def test_validate_signature(self, mock_temporary_directory: Mock, mock_validation_args: Mock, mock_logging_info: Mock, mock_execute: Mock) -> None:
mock_execute.return_value = (None, 'Header SHA256 digest: OK\nPayload SHA256 digest: OK\n', None)
mock_execute.return_value = (None, 'Header V4 RSA/SHA512 Signature, key ID 9310d3fc: OK\nHeader SHA1 digest: OK\nV4 RSA/SHA512 Signature, key ID 9310d3fc: OK\n'
'MD5 digest: OK\nHeader SHA256 digest: OK\nPayload SHA256 digest: OK\n', None)

validate_rpm = ValidateRpm(mock_validation_args.return_value, mock_temporary_directory.return_value)
mock_temporary_directory.return_value.path = "/tmp/trytytyuit/"
Expand All @@ -257,3 +291,48 @@ def test_validate_signature(self, mock_temporary_directory: Mock, mock_validatio
mock_execute.assert_called_once_with(
'rpm -K -v /tmp/trytytyuit/example.rpm', '.'
)

@patch('validation_workflow.rpm.validation_rpm.execute')
@patch('validation_workflow.rpm.validation_rpm.logging.info')
@patch('validation_workflow.rpm.validation_rpm.ValidationArgs')
@patch('system.temporary_directory.TemporaryDirectory')
def test_validate_signature_exception(self, mock_temporary_directory: Mock, mock_validation_args: Mock, mock_logging_info: Mock, mock_execute: Mock) -> None:
mock_execute.return_value = (None, 'Header V4 RSA/SHA512 Signature, key ID 9310d3fc: OK\nHeader SHA1 digest: OK\nV4 RSA/SHA512 Signature, key ID 9310d3fc: OK\n'
'MD5 digest: not OK\nHeader SHA256 digest: OK\nPayload SHA256 digest: OK\n', None)

validate_rpm = ValidateRpm(mock_validation_args.return_value, mock_temporary_directory.return_value)
mock_temporary_directory.return_value.path = "/tmp/trytytyuit/"
validate_rpm.filename = 'example.rpm'

with self.assertRaises(AssertionError) as context:
validate_rpm.validate_signature()
self.assertIsInstance(context.exception, AssertionError)

mock_execute.assert_called_once_with(
'rpm -K -v /tmp/trytytyuit/example.rpm', '.'
)

@patch('validation_workflow.rpm.validation_rpm.execute')
@patch('validation_workflow.rpm.validation_rpm.logging.info')
@patch('validation_workflow.rpm.validation_rpm.ValidationArgs')
@patch('system.temporary_directory.TemporaryDirectory')
def test_validate_signature_except(self, mock_temporary_directory: Mock, mock_validation_args: Mock, mock_logging_info: Mock, mock_execute: Mock) -> None:
mock_execute.return_value = (None,
'Header V4 RSA/SHA512 Signature, key ID 9310d3fc: OK\nHeader SHA256 digest: OK\n'
'Header SHA1 digest: OK\nPayload SHA256 digest: OK\nV4 RSA/SHA512 Signature, key ID 9310d3fc: OK\n',
None)

validate_rpm = ValidateRpm(mock_validation_args.return_value, mock_temporary_directory.return_value)
mock_temporary_directory.return_value.path = "/tmp/trytytyuit/"
validate_rpm.filename = 'example.rpm'

with self.assertRaises(Exception) as context:
validate_rpm.validate_signature()
mock_logging_info.assert_any_call('Key digest "Header SHA256 digest" is validated to be present.')
mock_logging_info.assert_any_call('Key digest "Payload SHA256 digest" is validated to be present.')
mock_logging_info.assert_any_call('Validation of all key digests starts: ')
self.assertEqual(str(context.exception), 'Key digest "MD5 digest" is not found')

mock_execute.assert_called_once_with(
'rpm -K -v /tmp/trytytyuit/example.rpm', '.'
)

0 comments on commit 59c8575

Please sign in to comment.