-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.py
116 lines (102 loc) · 3.92 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import kopf
import pykube
import preflight
from custom_resources import VirtualServiceResource
from handlers.kube_handler import KubernetesHandler
from schemas import (
DeployConfig,
ServiceConfig,
VirtualServiceConfig,
)
from config import EnvConfig
kubernetes_handler = KubernetesHandler()
api = kubernetes_handler.get_api()
@kopf.on.login()
def custom_login_fn(**kwargs):
"""
The custom_login_fn function is used to determine how the operator will authenticate with the Kubernetes API.
If we are running in a development environment, then we want to use our local kubeconfig file.
Otherwise, if we are running in a production environment (i.e., on OpenShift), then we want to use the service account token.
:param **kwargs: Pass a variable number of keyword arguments to the function
:return: A function
"""
if EnvConfig.ENV == "dev":
return kopf.login_with_kubeconfig(**kwargs)
else:
return kopf.login_with_service_account(**kwargs)
@kopf.on.startup()
def prepare_fn(**_):
"""
The prepare_fn function is called when the operator starts up.
It is used to prepare the environment for the operator to run in.
:param **_: Pass a variable number of keyword arguments to the function
:return: None
"""
preflight.run()
@kopf.on.create("imran.dev.io", "v1beta1", "microservices")
def create_fn_v1alpha2(spec, **kwargs):
deploy_config = DeployConfig(
**spec,
namespace=kwargs["body"]["metadata"]["namespace"],
name=kwargs["body"]["metadata"]["name"],
)
deployment = kubernetes_handler.create_deployment(deploy_config)
service = kubernetes_handler.create_service(
ServiceConfig(**deploy_config.model_dump())
)
kopf.adopt(deployment)
kopf.adopt(service)
deployment["metadata"]["name"] = service["metadata"]["name"] = kwargs["body"][
"metadata"
]["name"]
pykube.Deployment(api, deployment).create()
pykube.Service(api, service).create()
children = [deployment["metadata"], service["metadata"]]
# if spec.get("path"):
# virtual_service = kubernetes_handler.create_virtual_service(
# VirtualServiceConfig(**deploy_config.model_dump())
# )
# kopf.adopt(virtual_service)
# virtual_service["metadata"]["name"] = kwargs["body"]["metadata"]["name"]
# VirtualServiceResource(api, virtual_service).create()
# children.append(virtual_service["metadata"])
api.session.close()
return {
"children": children,
}
@kopf.on.update("imran.dev.io", "v1beta1", "microservices")
def update_fn_v1alpha2(spec, **kwargs):
deploy_config = DeployConfig(
**spec,
namespace=kwargs["body"]["metadata"]["namespace"],
name=kwargs["body"]["metadata"]["name"],
)
deployment = kubernetes_handler.update_deployment(deploy_config)
service = kubernetes_handler.update_service(
ServiceConfig(**deploy_config.model_dump())
)
children = [
deployment.obj["metadata"],
service.obj["metadata"],
]
# if dict(spec).get("path"):
# virtual_service, is_update = kubernetes_handler.update_virtual_service(
# VirtualServiceConfig(**deploy_config.model_dump())
# )
# if is_update:
# children.append(virtual_service.obj["metadata"])
# else:
# kopf.adopt(virtual_service)
# virtual_service["metadata"]["name"] = kwargs["body"]["metadata"]["name"]
# VirtualServiceResource(api, virtual_service).create()
# children.append(virtual_service["metadata"])
# elif dict(spec).get("path") is None:
# if virtual_service := kubernetes_handler.get_virtual_service_by_name(
# kwargs["body"]["metadata"]["name"],
# namespace=kwargs["body"]["metadata"]["namespace"],
# ):
# virtual_service.delete()
api.session.close()
return {
"children": children,
}