From b0bef88c67547d6767263a856d6ed77bd946112f Mon Sep 17 00:00:00 2001 From: Andy Loughran <104910888+andyloughran@users.noreply.github.com> Date: Wed, 11 Oct 2023 16:11:30 +0100 Subject: [PATCH] feat(terraform): add CKV_AWS_364 to ensure that AWS Lambda Permissions are not global when granting permission to an AWS Service. (#4375) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Added lambda permission check against services defined in principal * Added LambaServicePermission tests * fixed imports and style-guide compatibility * Updated to fix tests and add ID * Added terraform test to get a CKV id for my Cloudformation test * Fixed styling errors * Flipped CFN logic to match terraform logic * Updated to add newline at end of file to pass flake8 check * Updated to harden service principal checks * Added additional tests examples to demonstrate alternative service principals * Fixed the syntax to pass * bump CKV check version to 287 * Updated CKV to 293 * Fixed className and added missing comma from test set * Update checkov/cloudformation/checks/resource/aws/LambdaServicePermission.py Co-authored-by: Anton Grübel * Update checkov/terraform/checks/resource/aws/LambdaServicePermission.py Keeping descriptions identical Co-authored-by: Anton Grübel * Update checkov/terraform/checks/resource/aws/LambdaServicePermission.py Co-authored-by: Anton Grübel * Update checkov/terraform/checks/resource/aws/LambdaServicePermission.py Co-authored-by: Anton Grübel * Update checkov/terraform/checks/resource/aws/LambdaServicePermission.py Co-authored-by: Anton Grübel * Update checkov/cloudformation/checks/resource/aws/LambdaServicePermission.py Co-authored-by: Anton Grübel * Update checkov/cloudformation/checks/resource/aws/LambdaServicePermission.py Co-authored-by: Anton Grübel * Update checkov/terraform/checks/resource/aws/LambdaServicePermission.py Co-authored-by: Anton Grübel * Updated rule id * Applying self.get_evaluated_keys to terraform * Updated tests to align terraform and cloudformation. Moved two examples to unknown, rather than pass/fail * Updated cfn to ensure passed cfn-lint PR checks * Updated terraform checkid * Fixed potentially calling split on non-string * adjust logic to fix test --------- Co-authored-by: Anton Grübel --- .../resource/aws/LambdaServicePermission.py | 39 ++++++++++++++++ .../resource/aws/LambdaServicePermission.py | 38 +++++++++++++++ .../LambdaServicePermission_Fail.yml | 9 ++++ .../LambdaServicePermission_Pass.yml | 42 +++++++++++++++++ .../aws/test_LambdaServicePermission.py | 43 +++++++++++++++++ .../LambdaServicePermission.tf | 37 +++++++++++++++ .../aws/test_LambdaServicePermission.py | 46 +++++++++++++++++++ 7 files changed, 254 insertions(+) create mode 100644 checkov/cloudformation/checks/resource/aws/LambdaServicePermission.py create mode 100644 checkov/terraform/checks/resource/aws/LambdaServicePermission.py create mode 100644 tests/cloudformation/checks/resource/aws/example_LambdaServicePermission/LambdaServicePermission_Fail.yml create mode 100644 tests/cloudformation/checks/resource/aws/example_LambdaServicePermission/LambdaServicePermission_Pass.yml create mode 100644 tests/cloudformation/checks/resource/aws/test_LambdaServicePermission.py create mode 100644 tests/terraform/checks/resource/aws/example_LambdaServicePermission/LambdaServicePermission.tf create mode 100644 tests/terraform/checks/resource/aws/test_LambdaServicePermission.py diff --git a/checkov/cloudformation/checks/resource/aws/LambdaServicePermission.py b/checkov/cloudformation/checks/resource/aws/LambdaServicePermission.py new file mode 100644 index 00000000000..78d38557da4 --- /dev/null +++ b/checkov/cloudformation/checks/resource/aws/LambdaServicePermission.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +from typing import List, Any + +from checkov.cloudformation.checks.resource.base_resource_check import BaseResourceCheck +from checkov.common.models.enums import CheckCategories, CheckResult + + +class LambdaServicePermission(BaseResourceCheck): + def __init__(self) -> None: + name = "Ensure that AWS Lambda function permissions delegated to AWS services are limited by SourceArn or SourceAccount" + id = "CKV_AWS_364" + supported_resources = ("AWS::Lambda::Permission",) + categories = (CheckCategories.IAM,) + super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) + + def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: + properties = conf.get('Properties') + if properties and isinstance(properties, dict): + principal = properties.get('Principal') + if principal and isinstance(principal, str): + principal_parts = principal.split('.') + try: + if principal_parts[1] == 'amazonaws' and principal_parts[2] == 'com': + if properties.get('SourceArn') or properties.get('SourceAccount'): + return CheckResult.PASSED + else: + return CheckResult.FAILED + except IndexError: + print("Not a service principal") + # Not a service principal, so pass. + return CheckResult.UNKNOWN + return CheckResult.UNKNOWN + + def get_evaluated_keys(self) -> List[str]: + return ['Properties/Principal', 'Properties/SourceArn', 'Properties/SourceAccount'] + + +check = LambdaServicePermission() diff --git a/checkov/terraform/checks/resource/aws/LambdaServicePermission.py b/checkov/terraform/checks/resource/aws/LambdaServicePermission.py new file mode 100644 index 00000000000..254a1a8e47f --- /dev/null +++ b/checkov/terraform/checks/resource/aws/LambdaServicePermission.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +from typing import List, Any + +from checkov.common.models.enums import CheckResult, CheckCategories +from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck + + +class LambdaServicePermission(BaseResourceCheck): + def __init__(self) -> None: + description = "Ensure that AWS Lambda function permissions delegated to AWS services are limited by SourceArn or SourceAccount" + id = "CKV_AWS_364" + supported_resources = ('aws_lambda_permission',) + categories = (CheckCategories.IAM,) + super().__init__(name=description, id=id, categories=categories, supported_resources=supported_resources) + + def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult: + # Replace this with the custom logic for your check + principal = conf.get("principal") + if principal and isinstance(principal, list) and isinstance(principal[0], str): + principal_parts = principal[0].split('.') + try: + if principal_parts[1] == 'amazonaws' and principal_parts[2] == 'com': # This confirms that the principal is set as a service principal. + if 'source_arn' in conf or 'source_account' in conf: # If either of these are set, we're good and the check should pass. + self.evaluated_keys = self.get_evaluated_keys() + return CheckResult.PASSED + else: + self.evaluated_keys = self.get_evaluated_keys() + return CheckResult.FAILED + except IndexError: + return CheckResult.UNKNOWN + return CheckResult.UNKNOWN + + def get_evaluated_keys(self) -> List[str]: + return ["principal", "source_arn", "source_account"] + + +check = LambdaServicePermission() diff --git a/tests/cloudformation/checks/resource/aws/example_LambdaServicePermission/LambdaServicePermission_Fail.yml b/tests/cloudformation/checks/resource/aws/example_LambdaServicePermission/LambdaServicePermission_Fail.yml new file mode 100644 index 00000000000..c8f9ad10758 --- /dev/null +++ b/tests/cloudformation/checks/resource/aws/example_LambdaServicePermission/LambdaServicePermission_Fail.yml @@ -0,0 +1,9 @@ +AWSTemplateFormatVersion: "2010-09-09" +Resources: + FunctionFailPermission: + Type: AWS::Lambda::Permission + Properties: + Action: lambda:InvokeFunction + FunctionName: TestFunction + Principal: apigateway.amazonaws.com + diff --git a/tests/cloudformation/checks/resource/aws/example_LambdaServicePermission/LambdaServicePermission_Pass.yml b/tests/cloudformation/checks/resource/aws/example_LambdaServicePermission/LambdaServicePermission_Pass.yml new file mode 100644 index 00000000000..16ac320cc11 --- /dev/null +++ b/tests/cloudformation/checks/resource/aws/example_LambdaServicePermission/LambdaServicePermission_Pass.yml @@ -0,0 +1,42 @@ +AWSTemplateFormatVersion: "2010-09-09" +Resources: + FunctionPassingArnPermission: + Type: AWS::Lambda::Permission + Properties: + Action: lambda:InvokeFunction + FunctionName: TestFunction + Principal: apigateway.amazonaws.com + SourceArn: arn:aws:apigateway:us-east-1::/apis/qwerty + + FunctionPassingAccountPermission: + Type: AWS::Lambda::Permission + Properties: + Action: lambda:InvokeFunction + FunctionName: TestFunction + Principal: apigateway.amazonaws.com + SourceAccount: 901234567812 + + FunctionStringPrincipallPermission: + Type: AWS::Lambda::Permission + Properties: + Action: lambda:InvokeFunction + FunctionName: TestFunction + Principal: 901234567812 + + ExampleS3ServicePermission: + Type: AWS::Lambda::Permission + Properties: + FunctionName: TestFunction + Action: lambda:InvokeFunction + Principal: s3.amazonaws.com + SourceAccount: 901234567812 + SourceArn: arn:aws:apigateway:us-east-1::/apis/qwerty + + ExampleEventsServicePermission: + Type: AWS::Lambda::Permission + Properties: + FunctionName: TestFunction + Action: lambda:InvokeFunction + Principal: events.amazonaws.com + SourceAccount: 901234567812 + SourceArn: arn:aws:apigateway:us-east-1::/apis/qwerty \ No newline at end of file diff --git a/tests/cloudformation/checks/resource/aws/test_LambdaServicePermission.py b/tests/cloudformation/checks/resource/aws/test_LambdaServicePermission.py new file mode 100644 index 00000000000..4d74a29a96e --- /dev/null +++ b/tests/cloudformation/checks/resource/aws/test_LambdaServicePermission.py @@ -0,0 +1,43 @@ +import os +import unittest + +from checkov.cloudformation.checks.resource.aws.LambdaServicePermission import check +from checkov.cloudformation.runner import Runner +from checkov.runner_filter import RunnerFilter + + +class TestLambdaServicePermission(unittest.TestCase): + def test_summary(self): + runner = Runner() + current_dir = os.path.dirname(os.path.realpath(__file__)) + + test_files_dir = current_dir + "/example_LambdaServicePermission" + report = runner.run(root_folder=test_files_dir, runner_filter=RunnerFilter(checks=[check.id])) + summary = report.get_summary() + + passing_resources = { + "AWS::Lambda::Permission.FunctionPassingArnPermission", + "AWS::Lambda::Permission.FunctionPassingAccountPermission", + "AWS::Lambda::Permission.ExampleS3ServicePermission", + "AWS::Lambda::Permission.ExampleEventsServicePermission" + } + failing_resources = { + "AWS::Lambda::Permission.FunctionFailPermission", + } + unknown_resources = { + "AWS::Lambda::Permission.FunctionStringPrincipallPermission", + } + + passed_check_resources = {c.resource for c in report.passed_checks} + failed_check_resources = {c.resource for c in report.failed_checks} + + self.assertEqual(summary['passed'], 4) + self.assertEqual(summary['failed'], 1) + self.assertEqual(summary['skipped'], 0) + self.assertEqual(summary['parsing_errors'], 0) + self.assertEqual(passing_resources, passed_check_resources) + self.assertEqual(failing_resources, failed_check_resources) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/terraform/checks/resource/aws/example_LambdaServicePermission/LambdaServicePermission.tf b/tests/terraform/checks/resource/aws/example_LambdaServicePermission/LambdaServicePermission.tf new file mode 100644 index 00000000000..d17f2b319f0 --- /dev/null +++ b/tests/terraform/checks/resource/aws/example_LambdaServicePermission/LambdaServicePermission.tf @@ -0,0 +1,37 @@ +## SHOULD PASS: This permission specifies a source_arn, therefore is not globally available. +resource "aws_lambda_permission" "ckv_unittest_pass_source_arn" { + statement_id = "AllowMyDemoAPIInvoke" + action = "lambda:InvokeFunction" + function_name = "MyDemoFunction" + principal = "apigateway.amazonaws.com" + + # The /*/*/* part allows invocation from any stage, method and resource path + # within API Gateway REST API. + source_arn = "${aws_api_gateway_rest_api.MyDemoAPI.execution_arn}/*/*/*" +} + +## SHOULD PASS: This permission specifies a source_account, therefore is not globally available. +resource "aws_lambda_permission" "ckv_unittest_pass_source_account" { + statement_id = "AllowMyDemoAPIInvoke" + action = "lambda:InvokeFunction" + function_name = "MyDemoFunction" + principal = "apigateway.amazonaws.com" + + source_account = "901234678" +} + +## SHOULD UNKNOWN: This permission specifies a principal as an account ID. +resource "aws_lambda_permission" "ckv_unittest_unknown_principal" { + statement_id = "AllowMyDemoAPIInvoke" + action = "lambda:InvokeFunction" + function_name = "MyDemoFunction" + principal = "901234678" +} + +## SHOULD FAIL: This allows any serviceprincpal across all accounts to access +resource "aws_lambda_permission" "ckv_unittest_fail" { + statement_id = "AllowMyDemoAPIInvoke" + action = "lambda:InvokeFunction" + function_name = "MyDemoFunction" + principal = "apigateway.amazonaws.com" +} \ No newline at end of file diff --git a/tests/terraform/checks/resource/aws/test_LambdaServicePermission.py b/tests/terraform/checks/resource/aws/test_LambdaServicePermission.py new file mode 100644 index 00000000000..9687142adf7 --- /dev/null +++ b/tests/terraform/checks/resource/aws/test_LambdaServicePermission.py @@ -0,0 +1,46 @@ +import os +import unittest + +from checkov.runner_filter import RunnerFilter +from checkov.terraform.runner import Runner +from checkov.terraform.checks.resource.aws.LambdaServicePermission import check + + +class TestLambdaServicePermission(unittest.TestCase): + + def test(self): + runner = Runner() + current_dir = os.path.dirname(os.path.realpath(__file__)) + + test_files_dir = os.path.join(current_dir, "example_LambdaServicePermission") + report = runner.run(root_folder=test_files_dir, + runner_filter=RunnerFilter(checks=[check.id])) + summary = report.get_summary() + + passing_resources = { + 'aws_lambda_permission.ckv_unittest_pass_source_arn', + 'aws_lambda_permission.ckv_unittest_pass_source_account' + } + failing_resources = { + 'aws_lambda_permission.ckv_unittest_fail', + } + unknown_resources = { + 'aws_lambda_permission.ckv_unittest_pass_principal', + } + + skipped_resources = {} + + passed_check_resources = set([c.resource for c in report.passed_checks]) + failed_check_resources = set([c.resource for c in report.failed_checks]) + + self.assertEqual(summary['passed'], len(passing_resources)) + self.assertEqual(summary['failed'], len(failing_resources)) + self.assertEqual(summary['skipped'], len(skipped_resources)) + self.assertEqual(summary['parsing_errors'], 0) + + self.assertEqual(passing_resources, passed_check_resources) + self.assertEqual(failing_resources, failed_check_resources) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file