Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(terraform): add module check for commit hash revision usage #5261

Merged
merged 3 commits into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions checkov/common/goget/github/get_git.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,45 @@
from checkov.common.goget.base_getter import BaseGetter
from checkov.common.util.contextmanagers import temp_environ

TAG_PATTERN = re.compile(r'\?(ref=)(?P<tag>(.*))')
try:
from git import Repo
git_import_error = None
except ImportError as e:
git_import_error = e

COMMIT_ID_PATTERN = re.compile(r"\?(ref=)(?P<commit_id>([0-9a-f]{40}))")
TAG_PATTERN = re.compile(r'\?(ref=)(?P<tag>(.*))')

class GitGetter(BaseGetter):
def __init__(self, url: str, create_clone_and_result_dirs: bool = True) -> None:
self.logger = logging.getLogger(__name__)
self.create_clone_and_res_dirs = create_clone_and_result_dirs
self.tag = ''
self.commit_id: str | None = None

if "?ref" in url:
url = self.extract_git_ref(url=url)

super().__init__(url)

def extract_git_ref(self, url: str) -> str:
search_commit_id = re.search(COMMIT_ID_PATTERN, url)
if search_commit_id:
self.commit_id = search_commit_id.group("commit_id")
url = re.sub(COMMIT_ID_PATTERN, '', url)
return url

search_tag = re.search(TAG_PATTERN, url)
if search_tag:
self.tag = search_tag.group("tag")
# remove tag/ or tags/ from ref= to get actual branch name
self.tag = re.sub('tag.*/', '', self.tag)
url = re.sub(TAG_PATTERN, '', url)
url = re.sub(TAG_PATTERN, '', url)
return url

super().__init__(url)
logging.info(f"Module URL has an unknown ref: {url}")
gruebel marked this conversation as resolved.
Dismissed
Show resolved Hide resolved

return url

def do_get(self) -> str:
if git_import_error is not None:
Expand All @@ -53,7 +70,10 @@
def _clone(self, git_url: str, clone_dir: str) -> None:
self.logger.debug(f"cloning {self.url if '@' not in self.url else self.url.split('@')[1]} to {clone_dir}")
with temp_environ(GIT_TERMINAL_PROMPT="0"): # disables user prompts originating from GIT
if self.tag:
if self.commit_id:
repo = Repo.clone_from(git_url, clone_dir, no_checkout=True) # need to be a full git clone
repo.git.checkout(self.commit_id)
elif self.tag:
Repo.clone_from(git_url, clone_dir, depth=1, b=self.tag)
else:
Repo.clone_from(git_url, clone_dir, depth=1)
Expand Down
1 change: 1 addition & 0 deletions checkov/terraform/checks/module/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from checkov.terraform.checks.module.generic import * # noqa
12 changes: 10 additions & 2 deletions checkov/terraform/checks/module/base_module_check.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from __future__ import annotations

from abc import abstractmethod
from collections.abc import Iterable
from typing import List, Optional, Dict, Any

from checkov.common.checks.base_check import BaseCheck
Expand All @@ -8,8 +11,13 @@

class BaseModuleCheck(BaseCheck):
def __init__(
self, name: str, id: str, categories: List[CheckCategories], supported_resources: Optional[List[str]] = None,
guideline=None) -> None:
self,
name: str,
id: str,
categories: Iterable[CheckCategories],
supported_resources: Iterable[str] | None = None,
guideline: str | None = None
) -> None:
"""
Base class for terraform module call related checks.

Expand Down
28 changes: 28 additions & 0 deletions checkov/terraform/checks/module/generic/RevisionHash.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from __future__ import annotations

import re
from typing import Any

from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.terraform.checks.module.base_module_check import BaseModuleCheck

COMMIT_ID_PATTERN = re.compile(r"\?(ref=)(?P<commit_id>([0-9a-f]{5,40}))")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the general git source. This broke the tag reference.



class RevisionHash(BaseModuleCheck):
def __init__(self) -> None:
name = "Ensure module source uses a commit hash"
gruebel marked this conversation as resolved.
Show resolved Hide resolved
id = "CKV_TF_1"
categories = [CheckCategories.SUPPLY_CHAIN]
super().__init__(name=name, id=id, categories=categories)

def scan_module_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
source = conf.get("source")
if source and isinstance(source, list) and "?ref" in source[0] and re.search(COMMIT_ID_PATTERN, source[0]):
# do first a quick lookup, if '?ref' exists in the string before actually searching for the commit hash
return CheckResult.PASSED

return CheckResult.FAILED


check = RevisionHash()
4 changes: 4 additions & 0 deletions checkov/terraform/checks/module/generic/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from pathlib import Path

modules = Path(__file__).parent.glob("*.py")
__all__ = [f.stem for f in modules if f.is_file() and not f.stem == "__init__"]
Empty file.
59 changes: 59 additions & 0 deletions tests/terraform/checks/module/generic/example_RevisionHash/main.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# pass

module "hash" {
source = "git::https://github.com/terraform-aws-modules/terraform-aws-vpc.git?ref=26c38a66f12e7c6c93b6a2ba127ad68981a48671"

name = "my-vpc"
cidr = "10.0.0.0/16"

azs = ["eu-west-1a", "eu-west-1b", "eu-west-1c"]
private_subnets = ["10.0.1.0/24", "10.0.2.0/24", "10.0.3.0/24"]
public_subnets = ["10.0.101.0/24", "10.0.102.0/24", "10.0.103.0/24"]

enable_nat_gateway = true
enable_vpn_gateway = true

tags = {
Terraform = "true"
Environment = "dev"
}
}

module "sub_dir_hash" {
source = "git::https://github.com/terraform-aws-modules/terraform-aws-cloudwatch.git//modules/log-group?ref=60cf981e0f1ae033699e5b274440867e48289967"

name = "git"
retention_in_days = 120
}

# fail

module "tf_registry" {
source = "terraform-aws-modules/cloudwatch/aws//modules/log-group"
version = "4.3.0"

name = "normal"
retention_in_days = 120
}

module "tag" {
source = "git::https://github.com/terraform-aws-modules/terraform-aws-vpc.git?ref=v5.0.0"

name = "my-vpc"
cidr = "10.0.0.0/16"

azs = ["eu-west-1a", "eu-west-1b", "eu-west-1c"]
private_subnets = ["10.0.1.0/24", "10.0.2.0/24", "10.0.3.0/24"]
public_subnets = ["10.0.101.0/24", "10.0.102.0/24", "10.0.103.0/24"]

enable_nat_gateway = true
enable_vpn_gateway = true

tags = {
Terraform = "true"
Environment = "dev"
}
}

# unknown

42 changes: 42 additions & 0 deletions tests/terraform/checks/module/generic/test_RevisionHash.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import unittest
from pathlib import Path

from checkov.runner_filter import RunnerFilter
from checkov.terraform.checks.module.generic.RevisionHash import check
from checkov.terraform.runner import Runner


class TestRevisionHash(unittest.TestCase):
def test(self):
# given
test_files_dir = Path(__file__).parent / "example_RevisionHash"

# when
report = Runner().run(root_folder=str(test_files_dir), runner_filter=RunnerFilter(checks=[check.id]))

# then
summary = report.get_summary()

passing_resources = {
"hash",
"sub_dir_hash",
}
failing_resources = {
"tag",
"tf_registry",
}

passed_check_resources = {c.resource for c in report.passed_checks}
failed_check_resources = {c.resource for c in report.failed_checks}

self.assertEqual(summary["passed"], len(passing_resources))
self.assertEqual(summary["failed"], len(failing_resources))
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)


if __name__ == "__main__":
unittest.main()
Loading