Skip to content

Commit

Permalink
Merge pull request #147 from uhurutec/do-not-copy-app.kubernetes.io-l…
Browse files Browse the repository at this point in the history
…abels

Avoid conflicts when deployed by ArgoCD
  • Loading branch information
zakkg3 authored Oct 11, 2024
2 parents 0df0443 + 2330b7d commit af0c823
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 15 deletions.
3 changes: 2 additions & 1 deletion src/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@

CLUSTER_SECRET_LABEL = "clustersecret.io"

BLACK_LISTED_ANNOTATIONS = ["kopf.zalando.org", "kubectl.kubernetes.io"]
BLACK_LISTED_ANNOTATIONS = ["kopf.zalando.org", "kubectl.kubernetes.io"]
BLACK_LISTED_LABELS = ["app.kubernetes.io"]
35 changes: 22 additions & 13 deletions src/kubernetes_utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import logging
from datetime import datetime
from typing import Optional, Dict, Any, List, Mapping
from typing import Optional, Dict, Any, List, Mapping, Tuple, Iterator
import re

import kopf
from kubernetes.client import CoreV1Api, CustomObjectsApi, exceptions, V1ObjectMeta, rest, V1Secret

from os_utils import get_replace_existing, get_version
from consts import CREATE_BY_ANNOTATION, LAST_SYNC_ANNOTATION, VERSION_ANNOTATION, BLACK_LISTED_ANNOTATIONS, \
CREATE_BY_AUTHOR, CLUSTER_SECRET_LABEL
BLACK_LISTED_LABELS, CREATE_BY_AUTHOR, CLUSTER_SECRET_LABEL


def patch_clustersecret_status(
Expand Down Expand Up @@ -286,27 +286,36 @@ def create_secret_metadata(
Kubernetes metadata object with ClusterSecret annotations.
"""

_labels = {
def filter_dict(
prefixes: List[str],
base: Dict[str, str],
source: Optional[Mapping[str, str]] = None
) -> Iterator[Tuple[str, str]]:
""" Remove potential useless / dangerous annotations and labels"""
for item in base.items():
yield item
if source is not None:
for item in source.items():
key, _ = item
if not any(key.startswith(prefix) for prefix in prefixes):
yield item

base_labels = {
CLUSTER_SECRET_LABEL: 'true'
}
_labels.update(labels or {})

_annotations = {
base_annotations = {
CREATE_BY_ANNOTATION: CREATE_BY_AUTHOR,
VERSION_ANNOTATION: get_version(),
LAST_SYNC_ANNOTATION: datetime.now().isoformat(),
}
_annotations.update(annotations or {})

# Remove potential useless / dangerous annotations
_annotations = {key: value for key, value in _annotations.items() if
not any(key.startswith(prefix) for prefix in BLACK_LISTED_ANNOTATIONS)}

_annotations = filter_dict(BLACK_LISTED_ANNOTATIONS, base_annotations, annotations)
_labels = filter_dict(BLACK_LISTED_LABELS, base_labels, labels)
return V1ObjectMeta(
name=name,
namespace=namespace,
annotations=_annotations,
labels=_labels,
annotations=dict(_annotations),
labels=dict(_labels),
)


Expand Down
101 changes: 100 additions & 1 deletion src/tests/test_kubernetes_utils.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,30 @@
import datetime
import logging
import unittest
from typing import Tuple, Callable, Union
from unittest.mock import Mock

from kubernetes.client import V1ObjectMeta
from kubernetes_utils import get_ns_list

from consts import CREATE_BY_ANNOTATION, LAST_SYNC_ANNOTATION, VERSION_ANNOTATION, BLACK_LISTED_ANNOTATIONS, \
BLACK_LISTED_LABELS, CREATE_BY_AUTHOR, CLUSTER_SECRET_LABEL
from kubernetes_utils import get_ns_list, create_secret_metadata
from os_utils import get_version

USER_NAMESPACE_COUNT = 10
initial_namespaces = ['default', 'kube-node-lease', 'kube-public', 'kube-system']
user_namespaces = [f'example-{index}' for index in range(USER_NAMESPACE_COUNT)]


def is_iso_format(date: str) -> bool:
"""check whether a date string parses correctly as ISO 8601 format"""
try:
datetime.datetime.fromisoformat(date)
return True
except (TypeError, ValueError):
return False


class TestClusterSecret(unittest.TestCase):

def test_get_ns_list(self):
Expand Down Expand Up @@ -69,3 +84,87 @@ def test_get_ns_list(self):
)),
msg=case['name'],
)

def test_create_secret_metadata(self) -> None:

expected_base_label_key = CLUSTER_SECRET_LABEL
expected_base_label_value = 'true'

# key, value pairs, where the value can be a string or validation function
expected_base_annotations: list[Tuple[str, Union[str, Callable[[str], bool]]]] = [
(CREATE_BY_ANNOTATION, CREATE_BY_AUTHOR),
(VERSION_ANNOTATION, get_version()),
# Since LAST_SYNC_ANNOTATION is a date string which isn't easily validated by string equality
# have the function 'is_iso_format' validate the value of this annotation.
(LAST_SYNC_ANNOTATION, is_iso_format)
]

attributes_black_lists = dict(
labels=BLACK_LISTED_LABELS,
annotations=BLACK_LISTED_ANNOTATIONS,
)

test_cases: list[Tuple[dict[str, str], dict[str, str]]] = [
# Annotations, Labels
(
{},
{}
),
(
{},
{"modifiedAt": "1692462880",
"name": "prometheus-operator",
"owner": "helm",
"status": "superseded",
"version": "1"}
),
(
{"managed-by": "argocd.argoproj.io"},
{"argocd.argoproj.io/secret-type": "repository"}
),
(
{"argocd.argoproj.io/compare-options": "ServerSideDiff=true",
"argocd.argoproj.io/sync-wave": "4"},
{"app.kubernetes.io/instance": "cluster-secret"}
)
]

for annotations, labels in test_cases:

subject: V1ObjectMeta = create_secret_metadata(
name='test_secret',
namespace='test_namespace',
annotations=annotations,
labels=labels
)

self.assertIsInstance(obj=subject, cls=V1ObjectMeta, msg='returned value has correct type')

for attribute, black_list in attributes_black_lists.items():
attribute_object = subject.__getattribute__(attribute)
self.assertIsNotNone(obj=attribute_object, msg=f'attribute "{attribute}" is not None')

for key in attribute_object.keys():
self.assertIsInstance(obj=key, cls=str, msg=f'the {attribute} key is a string')
for black_listed_label_prefix in black_list:
self.assertFalse(
expr=key.startswith(black_listed_label_prefix),
msg=f'{attribute} key does not match black listed prefix'
)

# This tells mypy that those attributes are not None
assert subject.labels is not None
assert subject.annotations is not None

self.assertEqual(
first=subject.labels[expected_base_label_key],
second=expected_base_label_value,
msg='expected base label is present'
)
for key, value_expectation in expected_base_annotations:
validator = value_expectation if callable(value_expectation) else value_expectation.__eq__
value = subject.annotations[key]
self.assertTrue(
expr=validator(value),
msg=f'expected base annotation with key {key} is present and its value {value} is as expected'
)

0 comments on commit af0c823

Please sign in to comment.