Skip to content

Commit

Permalink
feat(terraform): add CKV_AWS_364 to ensure that AWS Lambda Permission…
Browse files Browse the repository at this point in the history
…s are not global when granting permission to an AWS Service. (#4375)

* Added lambda permission check against services defined in principal

* Added LambaServicePermission tests

* fixed imports and style-guide compatibility

* Updated to fix tests and add ID

* Added terraform test to get a CKV id for my Cloudformation test

* Fixed styling errors

* Flipped CFN logic to match terraform logic

* Updated to add newline at end of file to pass flake8 check

* Updated to harden service principal checks

* Added additional tests examples to demonstrate alternative service principals

* Fixed the syntax to pass

* bump CKV check version to 287

* Updated CKV to 293

* Fixed className and added missing comma from test set

* Update checkov/cloudformation/checks/resource/aws/LambdaServicePermission.py

Co-authored-by: Anton Grübel <[email protected]>

* Update checkov/terraform/checks/resource/aws/LambdaServicePermission.py

Keeping descriptions identical

Co-authored-by: Anton Grübel <[email protected]>

* Update checkov/terraform/checks/resource/aws/LambdaServicePermission.py

Co-authored-by: Anton Grübel <[email protected]>

* Update checkov/terraform/checks/resource/aws/LambdaServicePermission.py

Co-authored-by: Anton Grübel <[email protected]>

* Update checkov/terraform/checks/resource/aws/LambdaServicePermission.py

Co-authored-by: Anton Grübel <[email protected]>

* Update checkov/cloudformation/checks/resource/aws/LambdaServicePermission.py

Co-authored-by: Anton Grübel <[email protected]>

* Update checkov/cloudformation/checks/resource/aws/LambdaServicePermission.py

Co-authored-by: Anton Grübel <[email protected]>

* Update checkov/terraform/checks/resource/aws/LambdaServicePermission.py

Co-authored-by: Anton Grübel <[email protected]>

* Updated rule id

* Applying self.get_evaluated_keys to terraform

* Updated tests to align terraform and cloudformation.  Moved two examples to unknown, rather than pass/fail

* Updated cfn to ensure passed cfn-lint PR checks

* Updated terraform checkid

* Fixed potentially calling split on non-string

* adjust logic to fix test

---------

Co-authored-by: Anton Grübel <[email protected]>
  • Loading branch information
andyloughran and gruebel committed Oct 11, 2023
1 parent c469868 commit b0bef88
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from __future__ import annotations

from typing import List, Any

from checkov.cloudformation.checks.resource.base_resource_check import BaseResourceCheck
from checkov.common.models.enums import CheckCategories, CheckResult


class LambdaServicePermission(BaseResourceCheck):
def __init__(self) -> None:
name = "Ensure that AWS Lambda function permissions delegated to AWS services are limited by SourceArn or SourceAccount"
id = "CKV_AWS_364"
supported_resources = ("AWS::Lambda::Permission",)
categories = (CheckCategories.IAM,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult:
properties = conf.get('Properties')
if properties and isinstance(properties, dict):
principal = properties.get('Principal')
if principal and isinstance(principal, str):
principal_parts = principal.split('.')
try:
if principal_parts[1] == 'amazonaws' and principal_parts[2] == 'com':
if properties.get('SourceArn') or properties.get('SourceAccount'):
return CheckResult.PASSED
else:
return CheckResult.FAILED
except IndexError:
print("Not a service principal")
# Not a service principal, so pass.
return CheckResult.UNKNOWN
return CheckResult.UNKNOWN

def get_evaluated_keys(self) -> List[str]:
return ['Properties/Principal', 'Properties/SourceArn', 'Properties/SourceAccount']


check = LambdaServicePermission()
38 changes: 38 additions & 0 deletions checkov/terraform/checks/resource/aws/LambdaServicePermission.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from __future__ import annotations

from typing import List, Any

from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck


class LambdaServicePermission(BaseResourceCheck):
def __init__(self) -> None:
description = "Ensure that AWS Lambda function permissions delegated to AWS services are limited by SourceArn or SourceAccount"
id = "CKV_AWS_364"
supported_resources = ('aws_lambda_permission',)
categories = (CheckCategories.IAM,)
super().__init__(name=description, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
# Replace this with the custom logic for your check
principal = conf.get("principal")
if principal and isinstance(principal, list) and isinstance(principal[0], str):
principal_parts = principal[0].split('.')
try:
if principal_parts[1] == 'amazonaws' and principal_parts[2] == 'com': # This confirms that the principal is set as a service principal.
if 'source_arn' in conf or 'source_account' in conf: # If either of these are set, we're good and the check should pass.
self.evaluated_keys = self.get_evaluated_keys()
return CheckResult.PASSED
else:
self.evaluated_keys = self.get_evaluated_keys()
return CheckResult.FAILED
except IndexError:
return CheckResult.UNKNOWN
return CheckResult.UNKNOWN

def get_evaluated_keys(self) -> List[str]:
return ["principal", "source_arn", "source_account"]


check = LambdaServicePermission()
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
AWSTemplateFormatVersion: "2010-09-09"
Resources:
FunctionFailPermission:
Type: AWS::Lambda::Permission
Properties:
Action: lambda:InvokeFunction
FunctionName: TestFunction
Principal: apigateway.amazonaws.com

Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
AWSTemplateFormatVersion: "2010-09-09"
Resources:
FunctionPassingArnPermission:
Type: AWS::Lambda::Permission
Properties:
Action: lambda:InvokeFunction
FunctionName: TestFunction
Principal: apigateway.amazonaws.com
SourceArn: arn:aws:apigateway:us-east-1::/apis/qwerty

FunctionPassingAccountPermission:
Type: AWS::Lambda::Permission
Properties:
Action: lambda:InvokeFunction
FunctionName: TestFunction
Principal: apigateway.amazonaws.com
SourceAccount: 901234567812

FunctionStringPrincipallPermission:
Type: AWS::Lambda::Permission
Properties:
Action: lambda:InvokeFunction
FunctionName: TestFunction
Principal: 901234567812

ExampleS3ServicePermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: TestFunction
Action: lambda:InvokeFunction
Principal: s3.amazonaws.com
SourceAccount: 901234567812
SourceArn: arn:aws:apigateway:us-east-1::/apis/qwerty

ExampleEventsServicePermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: TestFunction
Action: lambda:InvokeFunction
Principal: events.amazonaws.com
SourceAccount: 901234567812
SourceArn: arn:aws:apigateway:us-east-1::/apis/qwerty
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import os
import unittest

from checkov.cloudformation.checks.resource.aws.LambdaServicePermission import check
from checkov.cloudformation.runner import Runner
from checkov.runner_filter import RunnerFilter


class TestLambdaServicePermission(unittest.TestCase):
def test_summary(self):
runner = Runner()
current_dir = os.path.dirname(os.path.realpath(__file__))

test_files_dir = current_dir + "/example_LambdaServicePermission"
report = runner.run(root_folder=test_files_dir, runner_filter=RunnerFilter(checks=[check.id]))
summary = report.get_summary()

passing_resources = {
"AWS::Lambda::Permission.FunctionPassingArnPermission",
"AWS::Lambda::Permission.FunctionPassingAccountPermission",
"AWS::Lambda::Permission.ExampleS3ServicePermission",
"AWS::Lambda::Permission.ExampleEventsServicePermission"
}
failing_resources = {
"AWS::Lambda::Permission.FunctionFailPermission",
}
unknown_resources = {
"AWS::Lambda::Permission.FunctionStringPrincipallPermission",
}

passed_check_resources = {c.resource for c in report.passed_checks}
failed_check_resources = {c.resource for c in report.failed_checks}

self.assertEqual(summary['passed'], 4)
self.assertEqual(summary['failed'], 1)
self.assertEqual(summary['skipped'], 0)
self.assertEqual(summary['parsing_errors'], 0)
self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
## SHOULD PASS: This permission specifies a source_arn, therefore is not globally available.
resource "aws_lambda_permission" "ckv_unittest_pass_source_arn" {
statement_id = "AllowMyDemoAPIInvoke"
action = "lambda:InvokeFunction"
function_name = "MyDemoFunction"
principal = "apigateway.amazonaws.com"

# The /*/*/* part allows invocation from any stage, method and resource path
# within API Gateway REST API.
source_arn = "${aws_api_gateway_rest_api.MyDemoAPI.execution_arn}/*/*/*"
}

## SHOULD PASS: This permission specifies a source_account, therefore is not globally available.
resource "aws_lambda_permission" "ckv_unittest_pass_source_account" {
statement_id = "AllowMyDemoAPIInvoke"
action = "lambda:InvokeFunction"
function_name = "MyDemoFunction"
principal = "apigateway.amazonaws.com"

source_account = "901234678"
}

## SHOULD UNKNOWN: This permission specifies a principal as an account ID.
resource "aws_lambda_permission" "ckv_unittest_unknown_principal" {
statement_id = "AllowMyDemoAPIInvoke"
action = "lambda:InvokeFunction"
function_name = "MyDemoFunction"
principal = "901234678"
}

## SHOULD FAIL: This allows any serviceprincpal across all accounts to access
resource "aws_lambda_permission" "ckv_unittest_fail" {
statement_id = "AllowMyDemoAPIInvoke"
action = "lambda:InvokeFunction"
function_name = "MyDemoFunction"
principal = "apigateway.amazonaws.com"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import os
import unittest

from checkov.runner_filter import RunnerFilter
from checkov.terraform.runner import Runner
from checkov.terraform.checks.resource.aws.LambdaServicePermission import check


class TestLambdaServicePermission(unittest.TestCase):

def test(self):
runner = Runner()
current_dir = os.path.dirname(os.path.realpath(__file__))

test_files_dir = os.path.join(current_dir, "example_LambdaServicePermission")
report = runner.run(root_folder=test_files_dir,
runner_filter=RunnerFilter(checks=[check.id]))
summary = report.get_summary()

passing_resources = {
'aws_lambda_permission.ckv_unittest_pass_source_arn',
'aws_lambda_permission.ckv_unittest_pass_source_account'
}
failing_resources = {
'aws_lambda_permission.ckv_unittest_fail',
}
unknown_resources = {
'aws_lambda_permission.ckv_unittest_pass_principal',
}

skipped_resources = {}

passed_check_resources = set([c.resource for c in report.passed_checks])
failed_check_resources = set([c.resource for c in report.failed_checks])

self.assertEqual(summary['passed'], len(passing_resources))
self.assertEqual(summary['failed'], len(failing_resources))
self.assertEqual(summary['skipped'], len(skipped_resources))
self.assertEqual(summary['parsing_errors'], 0)

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)


if __name__ == '__main__':
unittest.main()

0 comments on commit b0bef88

Please sign in to comment.