From 609d8b3ce2233b18f9c6debfc4c8ec65ee067dfa Mon Sep 17 00:00:00 2001 From: Dillen Padhiar <38965141+dpadhiar@users.noreply.github.com> Date: Wed, 12 Jul 2023 17:27:47 -0700 Subject: [PATCH] feat: implement optional validation webhook. Fixes #817. (#832) Signed-off-by: Dillen Padhiar --- Makefile | 1 + cmd/commands/root.go | 1 + cmd/commands/webhook.go | 34 ++ config/extensions/webhook/kustomization.yaml | 14 + .../webhook/numaflow-webhook-deployment.yaml | 36 ++ .../webhook/numaflow-webhook-sa.yaml | 4 + .../webhook/numaflow-webhook-service.yaml | 11 + .../webhook/rbac/kustomization.yaml | 6 + ...numaflow-webhook-cluster-role-binding.yaml | 12 + .../rbac/numaflow-webhook-cluster-role.yaml | 60 +++ config/validating-webhook-install.yaml | 130 ++++++ docs/operations/validating-webhook.md | 48 +++ go.mod | 2 +- mkdocs.yml | 1 + test/manifests/kustomization.yaml | 9 +- webhook/cmd/start.go | 90 +++++ webhook/validator/isbsvc.go | 51 +++ webhook/validator/isbsvc_test.go | 25 ++ webhook/validator/pipeline.go | 46 +++ webhook/validator/pipeline_test.go | 25 ++ webhook/validator/validator.go | 82 ++++ webhook/validator/validator_test.go | 113 ++++++ webhook/webhook.go | 375 ++++++++++++++++++ webhook/webhook_test.go | 160 ++++++++ 24 files changed, 1334 insertions(+), 2 deletions(-) create mode 100644 cmd/commands/webhook.go create mode 100644 config/extensions/webhook/kustomization.yaml create mode 100644 config/extensions/webhook/numaflow-webhook-deployment.yaml create mode 100644 config/extensions/webhook/numaflow-webhook-sa.yaml create mode 100644 config/extensions/webhook/numaflow-webhook-service.yaml create mode 100644 config/extensions/webhook/rbac/kustomization.yaml create mode 100644 config/extensions/webhook/rbac/numaflow-webhook-cluster-role-binding.yaml create mode 100644 config/extensions/webhook/rbac/numaflow-webhook-cluster-role.yaml create mode 100644 config/validating-webhook-install.yaml create mode 100644 docs/operations/validating-webhook.md create mode 100644 webhook/cmd/start.go create mode 100644 webhook/validator/isbsvc.go create mode 100644 webhook/validator/isbsvc_test.go create mode 100644 webhook/validator/pipeline.go create mode 100644 webhook/validator/pipeline_test.go create mode 100644 webhook/validator/validator.go create mode 100644 webhook/validator/validator_test.go create mode 100644 webhook/webhook.go create mode 100644 webhook/webhook_test.go diff --git a/Makefile b/Makefile index bc5c4690b6..e4a851319c 100644 --- a/Makefile +++ b/Makefile @@ -201,6 +201,7 @@ manifests: crds kubectl kustomize config/advanced-install/namespaced-numaflow-server > config/advanced-install/namespaced-numaflow-server.yaml kubectl kustomize config/advanced-install/numaflow-server > config/advanced-install/numaflow-server.yaml kubectl kustomize config/advanced-install/minimal-crds > config/advanced-install/minimal-crds.yaml + kubectl kustomize config/extensions/webhook > config/validating-webhook-install.yaml $(GOPATH)/bin/golangci-lint: curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b `go env GOPATH`/bin v1.49.0 diff --git a/cmd/commands/root.go b/cmd/commands/root.go index 4bf9270188..bf3c4914a0 100644 --- a/cmd/commands/root.go +++ b/cmd/commands/root.go @@ -45,4 +45,5 @@ func init() { rootCmd.AddCommand(NewDaemonServerCommand()) rootCmd.AddCommand(NewServerCommand()) rootCmd.AddCommand(NewServerInitCommand()) + rootCmd.AddCommand(NewWebhookCommand()) } diff --git a/cmd/commands/webhook.go b/cmd/commands/webhook.go new file mode 100644 index 0000000000..6bf24317a0 --- /dev/null +++ b/cmd/commands/webhook.go @@ -0,0 +1,34 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package commands + +import ( + "github.com/spf13/cobra" + + webhookcmd "github.com/numaproj/numaflow/webhook/cmd" +) + +func NewWebhookCommand() *cobra.Command { + command := &cobra.Command{ + Use: "webhook-service", + Short: "Start validating Numaflow webhook server", + Run: func(cmd *cobra.Command, args []string) { + webhookcmd.Start() + }, + } + return command +} diff --git a/config/extensions/webhook/kustomization.yaml b/config/extensions/webhook/kustomization.yaml new file mode 100644 index 0000000000..3acf097bde --- /dev/null +++ b/config/extensions/webhook/kustomization.yaml @@ -0,0 +1,14 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + +resources: +- numaflow-webhook-sa.yaml +- rbac +- numaflow-webhook-deployment.yaml +- numaflow-webhook-service.yaml + +namespace: numaflow-system + +images: + - name: quay.io/numaproj/numaflow + newTag: latest \ No newline at end of file diff --git a/config/extensions/webhook/numaflow-webhook-deployment.yaml b/config/extensions/webhook/numaflow-webhook-deployment.yaml new file mode 100644 index 0000000000..914e361c10 --- /dev/null +++ b/config/extensions/webhook/numaflow-webhook-deployment.yaml @@ -0,0 +1,36 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: numaflow-webhook +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/part-of: numaflow + app.kubernetes.io/component: numaflow-webhook + template: + metadata: + labels: + app.kubernetes.io/part-of: numaflow + app.kubernetes.io/component: numaflow-webhook + spec: + containers: + - name: webhook + image: quay.io/numaproj/numaflow:latest + imagePullPolicy: Always + args: + - webhook-service + env: + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: PORT + value: "443" + - name: DEPLOYMENT_NAME + value: numaflow-webhook + - name: SERVICE_NAME + value: numaflow-webhook + - name: CLUSTER_ROLE_NAME + value: numaflow-webhook + serviceAccountName: numaflow-webhook-sa diff --git a/config/extensions/webhook/numaflow-webhook-sa.yaml b/config/extensions/webhook/numaflow-webhook-sa.yaml new file mode 100644 index 0000000000..43b50f2faf --- /dev/null +++ b/config/extensions/webhook/numaflow-webhook-sa.yaml @@ -0,0 +1,4 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: numaflow-webhook-sa \ No newline at end of file diff --git a/config/extensions/webhook/numaflow-webhook-service.yaml b/config/extensions/webhook/numaflow-webhook-service.yaml new file mode 100644 index 0000000000..1d2aa59516 --- /dev/null +++ b/config/extensions/webhook/numaflow-webhook-service.yaml @@ -0,0 +1,11 @@ +apiVersion: v1 +kind: Service +metadata: + name: numaflow-webhook +spec: + ports: + - port: 443 + targetPort: 443 + selector: + app.kubernetes.io/part-of: numaflow + app.kubernetes.io/component: numaflow-webhook diff --git a/config/extensions/webhook/rbac/kustomization.yaml b/config/extensions/webhook/rbac/kustomization.yaml new file mode 100644 index 0000000000..ed2530bfe0 --- /dev/null +++ b/config/extensions/webhook/rbac/kustomization.yaml @@ -0,0 +1,6 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + +resources: +- numaflow-webhook-cluster-role.yaml +- numaflow-webhook-cluster-role-binding.yaml \ No newline at end of file diff --git a/config/extensions/webhook/rbac/numaflow-webhook-cluster-role-binding.yaml b/config/extensions/webhook/rbac/numaflow-webhook-cluster-role-binding.yaml new file mode 100644 index 0000000000..12911def6d --- /dev/null +++ b/config/extensions/webhook/rbac/numaflow-webhook-cluster-role-binding.yaml @@ -0,0 +1,12 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: numaflow-webhook-binding +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: numaflow-webhook +subjects: + - kind: ServiceAccount + name: numaflow-webhook-sa + namespace: numaflow-system diff --git a/config/extensions/webhook/rbac/numaflow-webhook-cluster-role.yaml b/config/extensions/webhook/rbac/numaflow-webhook-cluster-role.yaml new file mode 100644 index 0000000000..3303d13d3d --- /dev/null +++ b/config/extensions/webhook/rbac/numaflow-webhook-cluster-role.yaml @@ -0,0 +1,60 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: numaflow-webhook +rules: + - apiGroups: + - "" + resources: + - secrets + verbs: + - get + - list + - create + - update + - delete + - patch + - watch + - apiGroups: + - "" + resources: + - configmaps + verbs: + - get + - list + - watch + - apiGroups: + - apps + resources: + - deployments + verbs: + - get + - list + - apiGroups: + - admissionregistration.k8s.io + resources: + - validatingwebhookconfigurations + verbs: + - get + - list + - create + - update + - delete + - patch + - watch + - apiGroups: + - numaproj.io + verbs: + - get + - list + - watch + resources: + - interstepbufferservices + - pipelines + - apiGroups: + - rbac.authorization.k8s.io + resources: + - clusterroles + verbs: + - get + - list \ No newline at end of file diff --git a/config/validating-webhook-install.yaml b/config/validating-webhook-install.yaml new file mode 100644 index 0000000000..732dfe0735 --- /dev/null +++ b/config/validating-webhook-install.yaml @@ -0,0 +1,130 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: numaflow-webhook-sa + namespace: numaflow-system +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: numaflow-webhook +rules: +- apiGroups: + - "" + resources: + - secrets + verbs: + - get + - list + - create + - update + - delete + - patch + - watch +- apiGroups: + - "" + resources: + - configmaps + verbs: + - get + - list + - watch +- apiGroups: + - apps + resources: + - deployments + verbs: + - get + - list +- apiGroups: + - admissionregistration.k8s.io + resources: + - validatingwebhookconfigurations + verbs: + - get + - list + - create + - update + - delete + - patch + - watch +- apiGroups: + - numaproj.io + resources: + - interstepbufferservices + - pipelines + verbs: + - get + - list + - watch +- apiGroups: + - rbac.authorization.k8s.io + resources: + - clusterroles + verbs: + - get + - list +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: numaflow-webhook-binding +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: numaflow-webhook +subjects: +- kind: ServiceAccount + name: numaflow-webhook-sa + namespace: numaflow-system +--- +apiVersion: v1 +kind: Service +metadata: + name: numaflow-webhook + namespace: numaflow-system +spec: + ports: + - port: 443 + targetPort: 443 + selector: + app.kubernetes.io/component: numaflow-webhook + app.kubernetes.io/part-of: numaflow +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: numaflow-webhook + namespace: numaflow-system +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/component: numaflow-webhook + app.kubernetes.io/part-of: numaflow + template: + metadata: + labels: + app.kubernetes.io/component: numaflow-webhook + app.kubernetes.io/part-of: numaflow + spec: + containers: + - args: + - webhook-service + env: + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: PORT + value: "443" + - name: DEPLOYMENT_NAME + value: numaflow-webhook + - name: SERVICE_NAME + value: numaflow-webhook + - name: CLUSTER_ROLE_NAME + value: numaflow-webhook + image: quay.io/numaproj/numaflow:latest + imagePullPolicy: Always + name: webhook + serviceAccountName: numaflow-webhook-sa diff --git a/docs/operations/validating-webhook.md b/docs/operations/validating-webhook.md new file mode 100644 index 0000000000..7df88fd449 --- /dev/null +++ b/docs/operations/validating-webhook.md @@ -0,0 +1,48 @@ +# Validating Admission Webhook + +This validating webhook will prevent disallowed spec changes to immutable fields of Numaflow CRDs including Pipelines and InterStepBufferServices. +It also prevents creating a CRD with a faulty spec. +The user sees an error immediately returned by the server explaining why the request was denied. + +## Installation + +To install the validating webhook, run the following command line: + +```shell +kubectl apply -n numaflow-system -f https://raw.githubusercontent.com/numaproj/numaflow/stable/config/validating-webhook-install.yaml +``` + +## Examples + +Currently, the validating webhook prevents updating the type of an InterStepBufferService from JetStream to Redis for example. + +Example spec: +```yaml +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: InterStepBufferService +metadata: + name: default +spec: + jetstream: // change to redis and reapply will cause below error + version: latest +``` + +```shell +Error from server (BadRequest): error when applying patch: +{"metadata":{"annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"numaflow.numaproj.io/v1alpha1\",\"kind\":\"InterStepBufferService\",\"metadata\":{\"annotations\":{},\"name\":\"default\",\"namespace\":\"numaflow-system\"},\"spec\":{\"redis\":{\"native\":{\"version\":\"7.0.11\"}}}}\n"}},"spec":{"jetstream":null,"redis":{"native":{"version":"7.0.11"}}}} +to: +Resource: "numaflow.numaproj.io/v1alpha1, Resource=interstepbufferservices", GroupVersionKind: "numaflow.numaproj.io/v1alpha1, Kind=InterStepBufferService" +Name: "default", Namespace: "numaflow-system" +for: "redis.yaml": error when patching "redis.yaml": admission webhook "webhook.numaflow.numaproj.io" denied the request: Can not change ISB Service type from Jetstream to Redis +``` + +There is also validation that prevents the `interStepBufferServiceName` of a Pipeline from being updated. + +```shell +Error from server (BadRequest): error when applying patch: +{"metadata":{"annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"numaflow.numaproj.io/v1alpha1\",\"kind\":\"Pipeline\",\"metadata\":{\"annotations\":{},\"name\":\"simple-pipeline\",\"namespace\":\"numaflow-system\"},\"spec\":{\"edges\":[{\"from\":\"in\",\"to\":\"cat\"},{\"from\":\"cat\",\"to\":\"out\"}],\"interStepBufferServiceName\":\"change\",\"vertices\":[{\"name\":\"in\",\"source\":{\"generator\":{\"duration\":\"1s\",\"rpu\":5}}},{\"name\":\"cat\",\"udf\":{\"builtin\":{\"name\":\"cat\"}}},{\"name\":\"out\",\"sink\":{\"log\":{}}}]}}\n"}},"spec":{"interStepBufferServiceName":"change","vertices":[{"name":"in","source":{"generator":{"duration":"1s","rpu":5}}},{"name":"cat","udf":{"builtin":{"name":"cat"}}},{"name":"out","sink":{"log":{}}}]}} +to: +Resource: "numaflow.numaproj.io/v1alpha1, Resource=pipelines", GroupVersionKind: "numaflow.numaproj.io/v1alpha1, Kind=Pipeline" +Name: "simple-pipeline", Namespace: "numaflow-system" +for: "examples/1-simple-pipeline.yaml": error when patching "examples/1-simple-pipeline.yaml": admission webhook "webhook.numaflow.numaproj.io" denied the request: Cannot update pipeline with different interStepBufferServiceName +``` \ No newline at end of file diff --git a/go.mod b/go.mod index 1fdde2e1c3..66e27bdf86 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/gavv/httpexpect/v2 v2.3.1 github.com/gin-contrib/static v0.0.2-0.20220606235426-ae09b2ea7e39 github.com/gin-gonic/gin v1.9.1 + github.com/go-openapi/inflect v0.19.0 github.com/go-swagger/go-swagger v0.28.0 github.com/goccy/go-json v0.10.2 github.com/gogo/protobuf v1.3.2 @@ -84,7 +85,6 @@ require ( github.com/go-logr/logr v1.2.3 // indirect github.com/go-openapi/analysis v0.20.1 // indirect github.com/go-openapi/errors v0.20.1 // indirect - github.com/go-openapi/inflect v0.19.0 // indirect github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/jsonreference v0.19.6 // indirect github.com/go-openapi/loads v0.20.3 // indirect diff --git a/mkdocs.yml b/mkdocs.yml index a7339d668c..726bf10521 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -98,6 +98,7 @@ nav: - Operator Manual: - Releases ⧉: "operations/releases.md" - operations/installation.md + - Validating Webhook: operations/validating-webhook.md - Configuration: - Controller Configuration: "operations/controller-configmap.md" - UI Server Access Path: "operations/ui-access-path.md" diff --git a/test/manifests/kustomization.yaml b/test/manifests/kustomization.yaml index 3a29a331a6..63f693efa7 100644 --- a/test/manifests/kustomization.yaml +++ b/test/manifests/kustomization.yaml @@ -3,7 +3,7 @@ kind: Kustomization resources: - ../../config/cluster-install - + - ../../config/extensions/webhook patches: - patch: |- - op: add @@ -37,6 +37,13 @@ patches: target: kind: Deployment name: numaflow-server + - patch: |- + - op: add + path: /spec/template/spec/containers/0/imagePullPolicy + value: IfNotPresent + target: + kind: Deployment + name: numaflow-webhook namespace: numaflow-system commonLabels: diff --git a/webhook/cmd/start.go b/webhook/cmd/start.go new file mode 100644 index 0000000000..bb13006491 --- /dev/null +++ b/webhook/cmd/start.go @@ -0,0 +1,90 @@ +package cmd + +import ( + "crypto/tls" + "os" + "strconv" + + dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" + "github.com/numaproj/numaflow/pkg/client/clientset/versioned" + "github.com/numaproj/numaflow/pkg/shared/logging" + sharedutil "github.com/numaproj/numaflow/pkg/shared/util" + "github.com/numaproj/numaflow/webhook" + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "sigs.k8s.io/controller-runtime/pkg/manager/signals" +) + +const ( + serviceNameEnvVar = "SERVICE_NAME" + deploymentNameEnvVar = "DEPLOYMENT_NAME" + clusterRoleNameEnvVar = "CLUSTER_ROLE_NAME" + namespaceEnvVar = "NAMESPACE" + portEnvVar = "PORT" +) + +func Start() { + logger := logging.NewLogger().Named("webhook") + var restConfig *rest.Config + var err error + kubeconfig := os.Getenv("KUBECONFIG") + if kubeconfig == "" { + home, _ := os.UserHomeDir() + kubeconfig = home + "/.kube/config" + if _, err := os.Stat(kubeconfig); err != nil && os.IsNotExist(err) { + kubeconfig = "" + } + } + if kubeconfig != "" { + restConfig, err = clientcmd.BuildConfigFromFlags("", kubeconfig) + } else { + restConfig, err = rest.InClusterConfig() + } + if err != nil { + logger.Fatalw("Failed to get kubeconfig", zap.Error(err)) + } + + namespace, defined := os.LookupEnv(namespaceEnvVar) + if !defined { + logger.Fatal("Required namespace variable isn't set") + } + + kubeClient := kubernetes.NewForConfigOrDie(restConfig) + isbSvcClient := versioned.NewForConfigOrDie(restConfig).NumaflowV1alpha1().InterStepBufferServices(namespace) + + portStr := sharedutil.LookupEnvStringOr(portEnvVar, "443") + port, err := strconv.Atoi(portStr) + if err != nil { + logger.Fatal("port should be a number, not valid") + } + + options := webhook.Options{ + ServiceName: sharedutil.LookupEnvStringOr(serviceNameEnvVar, "numaflow-webhook"), + DeploymentName: sharedutil.LookupEnvStringOr(deploymentNameEnvVar, "numaflow-webhook"), + ClusterRoleName: sharedutil.LookupEnvStringOr(clusterRoleNameEnvVar, "numaflow-webhook"), + Namespace: namespace, + Port: port, + SecretName: "numaflow-webhook-certs", + WebhookName: "webhook.numaflow.numaproj.io", + ClientAuth: tls.VerifyClientCertIfGiven, + } + controller := webhook.AdmissionController{ + Client: kubeClient, + ISBSVCClient: isbSvcClient, + Options: options, + Handlers: map[schema.GroupVersionKind]runtime.Object{ + {Group: "numaflow.numaproj.io", Version: "v1alpha1", Kind: "InterStepBufferService"}: &dfv1.InterStepBufferService{}, + {Group: "numaflow.numaproj.io", Version: "v1alpha1", Kind: "Pipeline"}: &dfv1.Pipeline{}, + }, + Logger: logger, + } + ctx := logging.WithLogger(signals.SetupSignalHandler(), logger) + if err := controller.Run(ctx); err != nil { + logger.Fatalw("Failed to create admission controller", zap.Error(err)) + } + +} diff --git a/webhook/validator/isbsvc.go b/webhook/validator/isbsvc.go new file mode 100644 index 0000000000..3f9acf9006 --- /dev/null +++ b/webhook/validator/isbsvc.go @@ -0,0 +1,51 @@ +package validator + +import ( + "context" + + dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" + "github.com/numaproj/numaflow/pkg/client/clientset/versioned/typed/numaflow/v1alpha1" + isbsvccontroller "github.com/numaproj/numaflow/pkg/reconciler/isbsvc" + admissionv1 "k8s.io/api/admission/v1" + "k8s.io/client-go/kubernetes" +) + +type isbsvcValidator struct { + client kubernetes.Interface + isbscv v1alpha1.InterStepBufferServiceInterface + + oldISBService *dfv1.InterStepBufferService + newISBService *dfv1.InterStepBufferService +} + +// returns ISBService validator +func NewISBServiceValidator(client kubernetes.Interface, isbsvc v1alpha1.InterStepBufferServiceInterface, old, new *dfv1.InterStepBufferService) Validator { + return &isbsvcValidator{client: client, isbscv: isbsvc, oldISBService: old, newISBService: new} +} + +func (v *isbsvcValidator) ValidateCreate(ctx context.Context) *admissionv1.AdmissionResponse { + if err := isbsvccontroller.ValidateInterStepBufferService(v.newISBService); err != nil { + return DeniedResponse(err.Error()) + } + return AllowedResponse() +} + +func (v *isbsvcValidator) ValidateUpdate(ctx context.Context) *admissionv1.AdmissionResponse { + + if err := isbsvccontroller.ValidateInterStepBufferService(v.newISBService); err != nil { + return DeniedResponse(err.Error()) + } + + switch { + case v.oldISBService.Spec.JetStream != nil: + if v.newISBService.Spec.Redis != nil { + return DeniedResponse("Can not change ISB Service type from Jetstream to Redis") + } + case v.oldISBService.Spec.Redis != nil: + if v.newISBService.Spec.JetStream != nil { + return DeniedResponse("Can not change ISB Service type from Redis to Jetstream") + } + } + + return AllowedResponse() +} diff --git a/webhook/validator/isbsvc_test.go b/webhook/validator/isbsvc_test.go new file mode 100644 index 0000000000..866c5fb1f7 --- /dev/null +++ b/webhook/validator/isbsvc_test.go @@ -0,0 +1,25 @@ +package validator + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestValidateISBServiceCreate(t *testing.T) { + isbsvc := fakeISBSvc() + v := NewISBServiceValidator(fakeK8sClient, &fakeISBSvcClient, nil, isbsvc) + r := v.ValidateCreate(contextWithLogger(t)) + assert.True(t, r.Allowed) +} + +func TestValidateISBServiceUpdate(t *testing.T) { + isbsvc := fakeISBSvc() + t.Run("test ISBSvc spec change", func(t *testing.T) { + JetStreamISBSvc := fakeJetStreamISBSvc() + newISBSvc := JetStreamISBSvc.DeepCopy() + v := NewISBServiceValidator(fakeK8sClient, &fakeISBSvcClient, isbsvc, newISBSvc) + r := v.ValidateUpdate(contextWithLogger(t)) + assert.False(t, r.Allowed) + }) +} diff --git a/webhook/validator/pipeline.go b/webhook/validator/pipeline.go new file mode 100644 index 0000000000..2c41cfecb1 --- /dev/null +++ b/webhook/validator/pipeline.go @@ -0,0 +1,46 @@ +package validator + +import ( + "context" + + dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" + "github.com/numaproj/numaflow/pkg/client/clientset/versioned/typed/numaflow/v1alpha1" + pipelinecontroller "github.com/numaproj/numaflow/pkg/reconciler/pipeline" + admissionv1 "k8s.io/api/admission/v1" + "k8s.io/client-go/kubernetes" +) + +type pipelineValidator struct { + client kubernetes.Interface + pipeline v1alpha1.PipelineInterface + + oldPipeline *dfv1.Pipeline + newPipeline *dfv1.Pipeline +} + +// return PipelineValidator +func NewPipelineValidator(client kubernetes.Interface, pipeline v1alpha1.PipelineInterface, old, new *dfv1.Pipeline) Validator { + return &pipelineValidator{client: client, pipeline: pipeline, oldPipeline: old, newPipeline: new} +} + +func (v *pipelineValidator) ValidateCreate(ctx context.Context) *admissionv1.AdmissionResponse { + if err := pipelinecontroller.ValidatePipeline(v.newPipeline); err != nil { + return DeniedResponse(err.Error()) + } + return AllowedResponse() +} + +func (v *pipelineValidator) ValidateUpdate(ctx context.Context) *admissionv1.AdmissionResponse { + + // check that update is valid pipeline + if err := pipelinecontroller.ValidatePipeline(v.newPipeline); err != nil { + return DeniedResponse(err.Error()) + } + + // can't change pipeline's isbsvc name + if v.newPipeline.Spec.InterStepBufferServiceName != v.oldPipeline.Spec.InterStepBufferServiceName { + return DeniedResponse("Cannot update pipeline with different interStepBufferServiceName") + } + + return AllowedResponse() +} diff --git a/webhook/validator/pipeline_test.go b/webhook/validator/pipeline_test.go new file mode 100644 index 0000000000..b0bc542e53 --- /dev/null +++ b/webhook/validator/pipeline_test.go @@ -0,0 +1,25 @@ +package validator + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestValidatePipelineCreate(t *testing.T) { + pipeline := fakePipeline() + v := NewPipelineValidator(fakeK8sClient, &fakePipelineClient, nil, pipeline) + r := v.ValidateCreate(contextWithLogger(t)) + assert.True(t, r.Allowed) +} + +func TestValidatePipelineUpdate(t *testing.T) { + pipeline := fakePipeline() + t.Run("test Pipeline interStepBufferServiceName change", func(t *testing.T) { + newPipeline := pipeline.DeepCopy() + newPipeline.Spec.InterStepBufferServiceName = "change-name" + v := NewPipelineValidator(fakeK8sClient, &fakePipelineClient, pipeline, newPipeline) + r := v.ValidateUpdate(contextWithLogger(t)) + assert.False(t, r.Allowed) + }) +} diff --git a/webhook/validator/validator.go b/webhook/validator/validator.go new file mode 100644 index 0000000000..d1c3b2f2f7 --- /dev/null +++ b/webhook/validator/validator.go @@ -0,0 +1,82 @@ +package validator + +import ( + "context" + "encoding/json" + "fmt" + + dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" + "github.com/numaproj/numaflow/pkg/client/clientset/versioned/typed/numaflow/v1alpha1" + "github.com/numaproj/numaflow/pkg/shared/logging" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + admissionv1 "k8s.io/api/admission/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" +) + +type Validator interface { + ValidateCreate(context.Context) *admissionv1.AdmissionResponse + ValidateUpdate(context.Context) *admissionv1.AdmissionResponse +} + +// GetValidator returns a Validator instance +func GetValidator(ctx context.Context, client kubernetes.Interface, ISBSVCClient v1alpha1.InterStepBufferServiceInterface, PipelineClient v1alpha1.PipelineInterface, kind metav1.GroupVersionKind, oldBytes []byte, newBytes []byte) (Validator, error) { + log := logging.FromContext(ctx) + switch kind.Kind { + case dfv1.ISBGroupVersionKind.Kind: + var new *dfv1.InterStepBufferService + if len(newBytes) > 0 { + new = &dfv1.InterStepBufferService{} + if err := json.Unmarshal(newBytes, new); err != nil { + log.Errorf("Could not unmarshal new raw object: %v", err) + return nil, err + } + } + var old *dfv1.InterStepBufferService + if len(oldBytes) > 0 { + old = &dfv1.InterStepBufferService{} + if err := json.Unmarshal(oldBytes, old); err != nil { + log.Errorf("Could not unmarshal old raw object: %v", err) + return nil, err + } + } + return NewISBServiceValidator(client, ISBSVCClient, old, new), nil + case dfv1.PipelineGroupVersionKind.Kind: + var new *dfv1.Pipeline + if len(newBytes) > 0 { + new = &dfv1.Pipeline{} + if err := json.Unmarshal(newBytes, new); err != nil { + log.Errorf("Could not unmarshal new raw object: %v", err) + return nil, err + } + } + var old *dfv1.Pipeline + if len(oldBytes) > 0 { + old = &dfv1.Pipeline{} + if err := json.Unmarshal(oldBytes, old); err != nil { + log.Errorf("Could not unmarshal old raw object: %v", err) + return nil, err + } + } + return NewPipelineValidator(client, PipelineClient, old, new), nil + default: + return nil, fmt.Errorf("Unrecognized kind: %v", kind) + } +} + +// DeniedResponse constructs a denied AdmissionResonse +func DeniedResponse(reason string, args ...interface{}) *admissionv1.AdmissionResponse { + result := apierrors.NewBadRequest(fmt.Sprintf(reason, args...)).Status() + return &admissionv1.AdmissionResponse{ + Result: &result, + Allowed: false, + } +} + +// AllowedResponse constructs an allowed AdmissionResonse +func AllowedResponse() *admissionv1.AdmissionResponse { + return &admissionv1.AdmissionResponse{ + Allowed: true, + } +} diff --git a/webhook/validator/validator_test.go b/webhook/validator/validator_test.go new file mode 100644 index 0000000000..181b767424 --- /dev/null +++ b/webhook/validator/validator_test.go @@ -0,0 +1,113 @@ +package validator + +import ( + "context" + "encoding/json" + "testing" + + dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" + fake "github.com/numaproj/numaflow/pkg/client/clientset/versioned/typed/numaflow/v1alpha1/fake" + "github.com/numaproj/numaflow/pkg/shared/logging" + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + fakeClient "k8s.io/client-go/kubernetes/fake" +) + +const ( + testNamespace = "test-ns" +) + +var ( + fakeK8sClient = fakeClient.NewSimpleClientset() + fakeISBSvcClient = fake.FakeInterStepBufferServices{} + fakePipelineClient = fake.FakePipelines{} +) + +func contextWithLogger(t *testing.T) context.Context { + t.Helper() + return logging.WithLogger(context.Background(), logging.NewLogger()) +} + +func fakeISBSvc() *dfv1.InterStepBufferService { + return &dfv1.InterStepBufferService{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNamespace, + Name: dfv1.DefaultISBSvcName, + }, + Spec: dfv1.InterStepBufferServiceSpec{ + Redis: &dfv1.RedisBufferService{ + Native: &dfv1.NativeRedis{ + Version: "6.2.6", + }, + }, + }, + } +} + +func fakeJetStreamISBSvc() *dfv1.InterStepBufferService { + return &dfv1.InterStepBufferService{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test-ns", + Name: dfv1.DefaultISBSvcName, + }, + Spec: dfv1.InterStepBufferServiceSpec{ + JetStream: &dfv1.JetStreamBufferService{ + Version: "1.1.1", + }, + }, + } +} + +func fakePipeline() *dfv1.Pipeline { + return &dfv1.Pipeline{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pl", + Namespace: "test-ns", + }, + Spec: dfv1.PipelineSpec{ + Vertices: []dfv1.AbstractVertex{ + { + Name: "input", + Source: &dfv1.Source{ + UDTransformer: &dfv1.UDTransformer{ + Builtin: &dfv1.Transformer{Name: "filter"}, + }}, + }, + { + Name: "p1", + UDF: &dfv1.UDF{ + Builtin: &dfv1.Function{Name: "cat"}, + }, + }, + { + Name: "output", + Sink: &dfv1.Sink{}, + }, + }, + Edges: []dfv1.Edge{ + {From: "input", To: "p1"}, + {From: "p1", To: "output"}, + }, + }, + } +} + +func TestGetValidator(t *testing.T) { + t.Run("test get InterStepBufferService validator", func(t *testing.T) { + bytes, err := json.Marshal(fakeISBSvc()) + assert.NoError(t, err) + assert.NotNil(t, bytes) + v, err := GetValidator(contextWithLogger(t), fakeK8sClient, &fakeISBSvcClient, &fakePipelineClient, metav1.GroupVersionKind{Group: "numaflow.numaproj.io", Version: "v1alpha1", Kind: "InterStepBufferService"}, nil, bytes) + assert.NoError(t, err) + assert.NotNil(t, v) + }) + + t.Run("test get Pipeline validator", func(t *testing.T) { + bytes, err := json.Marshal(fakePipeline()) + assert.NoError(t, err) + assert.NotNil(t, bytes) + v, err := GetValidator(contextWithLogger(t), fakeK8sClient, &fakeISBSvcClient, &fakePipelineClient, metav1.GroupVersionKind{Group: "numaflow.numaproj.io", Version: "v1alpha1", Kind: "Pipeline"}, nil, bytes) + assert.NoError(t, err) + assert.NotNil(t, v) + }) +} diff --git a/webhook/webhook.go b/webhook/webhook.go new file mode 100644 index 0000000000..edf43321dd --- /dev/null +++ b/webhook/webhook.go @@ -0,0 +1,375 @@ +package webhook + +import ( + "context" + "crypto/tls" + "crypto/x509" + "encoding/json" + "fmt" + "net/http" + "reflect" + "sort" + "strings" + "time" + + "github.com/go-openapi/inflect" + "github.com/numaproj/numaflow/pkg/client/clientset/versioned/typed/numaflow/v1alpha1" + "github.com/numaproj/numaflow/pkg/shared/logging" + commontls "github.com/numaproj/numaflow/pkg/shared/tls" + "github.com/numaproj/numaflow/webhook/validator" + "go.uber.org/zap" + admissionv1 "k8s.io/api/admission/v1" + admissionregistrationv1 "k8s.io/api/admissionregistration/v1" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes" + clientadmissionregistrationv1 "k8s.io/client-go/kubernetes/typed/admissionregistration/v1" +) + +const ( + secretServerKey = "server-key.pem" + secretServerCert = "server-cert.pem" + secretCACert = "ca-cert.pem" + + certOrg = "io.numaproj" +) + +// Webhook configuration +type Options struct { + WebhookName string + ServiceName string + DeploymentName string + ClusterRoleName string + SecretName string + Namespace string + Port int + ClientAuth tls.ClientAuthType +} + +// Controller for validation webhook +type AdmissionController struct { + Client kubernetes.Interface + ISBSVCClient v1alpha1.InterStepBufferServiceInterface + PipelineClient v1alpha1.PipelineInterface + + Options Options + Handlers map[schema.GroupVersionKind]runtime.Object + + Logger *zap.SugaredLogger +} + +func (ac *AdmissionController) Run(ctx context.Context) error { + logger := ac.Logger + tlsConfig, caCert, err := ac.configureCerts(ctx, ac.Options.ClientAuth) + if err != nil { + logger.Errorw("Could not configure admission webhook certs", zap.Error(err)) + } + server := &http.Server{ + Handler: ac, + Addr: fmt.Sprintf(":%v", ac.Options.Port), + TLSConfig: tlsConfig, + } + cl := ac.Client.AdmissionregistrationV1().ValidatingWebhookConfigurations() + if err := ac.register(ctx, cl, caCert); err != nil { + logger.Errorw("Failed to register webhook", zap.Error(err)) + return err + } + + logger.Info("Successfully registered webhook") + + serverStartErrCh := make(chan struct{}) + go func() { + if err := server.ListenAndServeTLS("", ""); err != nil { + logger.Errorw("ListenAndServeTLS for admission webhook erorred out", zap.Error(err)) + close(serverStartErrCh) + } + }() + select { + case <-ctx.Done(): + return server.Close() + case <-serverStartErrCh: + return fmt.Errorf("webhook server failed to start") + } + +} + +// Register registers the validating admission webhook +func (ac *AdmissionController) register(ctx context.Context, client clientadmissionregistrationv1.ValidatingWebhookConfigurationInterface, caCert []byte) error { + failurePolicy := admissionregistrationv1.Ignore + + var rules []admissionregistrationv1.RuleWithOperations + for gvk := range ac.Handlers { + plural := strings.ToLower(inflect.Pluralize(gvk.Kind)) + + rules = append(rules, admissionregistrationv1.RuleWithOperations{ + Operations: []admissionregistrationv1.OperationType{ + admissionregistrationv1.Create, + admissionregistrationv1.Update, + admissionregistrationv1.Delete, + }, + Rule: admissionregistrationv1.Rule{ + APIGroups: []string{gvk.Group}, + APIVersions: []string{gvk.Version}, + Resources: []string{plural}, + }, + }) + } + + sort.Slice(rules, func(i, j int) bool { + lhs, rhs := rules[i], rules[j] + if lhs.APIGroups[0] != rhs.APIGroups[0] { + return lhs.APIGroups[0] < rhs.APIGroups[0] + } + if lhs.APIVersions[0] != rhs.APIVersions[0] { + return lhs.APIVersions[0] < rhs.APIVersions[0] + } + return lhs.Resources[0] < rhs.Resources[0] + }) + + sideEffects := admissionregistrationv1.SideEffectClassNone + + port := int32(ac.Options.Port) + webhook := &admissionregistrationv1.ValidatingWebhookConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + Name: ac.Options.WebhookName, + }, + Webhooks: []admissionregistrationv1.ValidatingWebhook{{ + Name: ac.Options.WebhookName, + Rules: rules, + SideEffects: &sideEffects, + AdmissionReviewVersions: []string{"v1", "v1beta1"}, + ClientConfig: admissionregistrationv1.WebhookClientConfig{ + Service: &admissionregistrationv1.ServiceReference{ + Namespace: ac.Options.Namespace, + Name: ac.Options.ServiceName, + Port: &port, + }, + CABundle: caCert, + }, + FailurePolicy: &failurePolicy, + }}, + } + clusterRole, err := ac.Client.RbacV1().ClusterRoles().Get(ctx, ac.Options.ClusterRoleName, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to fetch webhook cluster role, %w", err) + } + clusterRoleRef := metav1.NewControllerRef(clusterRole, rbacv1.SchemeGroupVersion.WithKind("ClusterRole")) + webhook.OwnerReferences = append(webhook.OwnerReferences, *clusterRoleRef) + + _, err = client.Create(ctx, webhook, metav1.CreateOptions{}) + if err != nil { + if !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create a webhook, %w", err) + } + ac.Logger.Info("Webhook already exists") + configuredWebhook, err := client.Get(ctx, ac.Options.WebhookName, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to retrieve webhook, %w", err) + } + if !reflect.DeepEqual(configuredWebhook.Webhooks, webhook.Webhooks) { + ac.Logger.Info("Updating webhook") + // Set the ResourceVersion as required by update. + webhook.ObjectMeta.ResourceVersion = configuredWebhook.ObjectMeta.ResourceVersion + if _, err := client.Update(ctx, webhook, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("failed to update webhook, %w", err) + } + } else { + ac.Logger.Info("Webhook is valid") + } + } else { + ac.Logger.Info("Created a webhook") + } + return nil +} + +// ServeHTTP implements the validating admission webhook +func (ac *AdmissionController) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ac.Logger.Infof("Webhook ServeHTTP request=%#v", r) + + // content type validation + contentType := r.Header.Get("Content-Type") + if contentType != "application/json" { + http.Error(w, "invalid Content-Type, want `application/json`", http.StatusUnsupportedMediaType) + return + } + + var review admissionv1.AdmissionReview + defer r.Body.Close() + if err := json.NewDecoder(r.Body).Decode(&review); err != nil { + http.Error(w, fmt.Sprintf("could not decode body: %v", err), http.StatusBadRequest) + return + } + logger := ac.Logger.With("kind", fmt.Sprint(review.Request.Kind)). + With("namespace", review.Request.Namespace). + With("name", review.Request.Name). + With("operation", fmt.Sprint(review.Request.Operation)). + With("resource", fmt.Sprint(review.Request.Resource)). + With("subResource", fmt.Sprint(review.Request.SubResource)). + With("userInfo", fmt.Sprint(review.Request.UserInfo)) + + reviewResponse := ac.admit(logging.WithLogger(r.Context(), logger), review.Request) + response := admissionv1.AdmissionReview{ + TypeMeta: metav1.TypeMeta{ + Kind: "AdmissionReview", + APIVersion: "admission.k8s.io/v1", + }, + } + if reviewResponse != nil { + response.Response = reviewResponse + response.Response.UID = review.Request.UID + } + + logger.Infof("AdmissionReview for %s: %v/%v response=%v", + review.Request.Kind, review.Request.Namespace, review.Request.Name, reviewResponse) + + if err := json.NewEncoder(w).Encode(response); err != nil { + http.Error(w, fmt.Sprintf("could not encode response: %v", err), http.StatusInternalServerError) + return + } +} + +func (ac *AdmissionController) admit(ctx context.Context, request *admissionv1.AdmissionRequest) *admissionv1.AdmissionResponse { + log := logging.FromContext(ctx) + switch request.Operation { + case admissionv1.Create, admissionv1.Update: + default: + log.Infof("Operation not interested: %v %v", request.Kind, request.Operation) + return &admissionv1.AdmissionResponse{Allowed: true} + } + v, err := validator.GetValidator(ctx, ac.Client, ac.ISBSVCClient, ac.PipelineClient, request.Kind, request.OldObject.Raw, request.Object.Raw) + if err != nil { + return validator.DeniedResponse("failed to get a validator: %v", err) + } + + switch request.Operation { + case admissionv1.Create: + return v.ValidateCreate(ctx) + case admissionv1.Update: + return v.ValidateUpdate(ctx) + default: + return validator.AllowedResponse() + } +} + +// Generate cert secret +func (ac *AdmissionController) generateSecret(ctx context.Context) (*corev1.Secret, error) { + hosts := []string{} + hosts = append(hosts, fmt.Sprintf("%s.%s.svc.cluster.local", ac.Options.ServiceName, ac.Options.Namespace)) + hosts = append(hosts, fmt.Sprintf("%s.%s.svc", ac.Options.ServiceName, ac.Options.Namespace)) + serverKey, serverCert, caCert, err := commontls.CreateCerts(certOrg, hosts, time.Now().Add(10*365*24*time.Hour), true, false) + if err != nil { + return nil, err + } + deployment, err := ac.Client.AppsV1().Deployments(ac.Options.Namespace).Get(ctx, ac.Options.DeploymentName, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("Failed to fetch webhook deployment, %w", err) + } + deploymentRef := metav1.NewControllerRef(deployment, appsv1.SchemeGroupVersion.WithKind("Deployment")) + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: ac.Options.SecretName, + Namespace: ac.Options.Namespace, + }, + Data: map[string][]byte{ + secretServerKey: serverKey, + secretServerCert: serverCert, + secretCACert: caCert, + }, + } + secret.OwnerReferences = append(secret.OwnerReferences, *deploymentRef) + return secret, nil +} + +// getOrGenerateKeyCertsFromSecret creates CERTs if not existing and store in a secret +func (ac *AdmissionController) getOrGenerateKeyCertsFromSecret(ctx context.Context) (serverKey, serverCert, caCert []byte, err error) { + secret, err := ac.Client.CoreV1().Secrets(ac.Options.Namespace).Get(ctx, ac.Options.SecretName, metav1.GetOptions{}) + if err != nil { + if !apierrors.IsNotFound(err) { + return nil, nil, nil, err + } + // No existing secret, creating one + newSecret, err := ac.generateSecret(ctx) + if err != nil { + return nil, nil, nil, err + } + _, err = ac.Client.CoreV1().Secrets(newSecret.Namespace).Create(ctx, newSecret, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + return nil, nil, nil, err + } + // Something else might have created, try fetching it one more time + secret, err = ac.Client.CoreV1().Secrets(ac.Options.Namespace).Get(ctx, ac.Options.SecretName, metav1.GetOptions{}) + if err != nil { + return nil, nil, nil, err + } + } + + var ok bool + if serverKey, ok = secret.Data[secretServerKey]; !ok { + return nil, nil, nil, fmt.Errorf("server key missing") + } + if serverCert, ok = secret.Data[secretServerCert]; !ok { + return nil, nil, nil, fmt.Errorf("server cert missing") + } + if caCert, ok = secret.Data[secretCACert]; !ok { + return nil, nil, nil, fmt.Errorf("ca cert missing") + } + return serverKey, serverCert, caCert, nil +} + +// GetAPIServerExtensionCACert gets the K8s aggregate apiserver +// client CA cert used by validator. This certificate is provided by +// kubernetes. +func (ac *AdmissionController) getAPIServerExtensionCACert(ctx context.Context) ([]byte, error) { + const name = "extension-apiserver-authentication" + c, err := ac.Client.CoreV1().ConfigMaps(metav1.NamespaceSystem).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + const caFileName = "requestheader-client-ca-file" + pem, ok := c.Data[caFileName] + if !ok { + return nil, fmt.Errorf("cannot find %s in ConfigMap %s", caFileName, name) + } + return []byte(pem), nil +} + +func (ac *AdmissionController) configureCerts(ctx context.Context, clientAuth tls.ClientAuthType) (*tls.Config, []byte, error) { + var apiServerCACert []byte + if clientAuth >= tls.VerifyClientCertIfGiven { + var err error + apiServerCACert, err = ac.getAPIServerExtensionCACert(ctx) + if err != nil { + return nil, nil, err + } + } + + serverKey, serverCert, caCert, err := ac.getOrGenerateKeyCertsFromSecret(ctx) + if err != nil { + return nil, nil, err + } + tlsConfig, err := makeTLSConfig(serverCert, serverKey, apiServerCACert, clientAuth) + if err != nil { + return nil, nil, err + } + return tlsConfig, caCert, nil +} + +// makeTLSConfig makes a TLS configuration +func makeTLSConfig(serverCert, serverKey, caCert []byte, clientAuthType tls.ClientAuthType) (*tls.Config, error) { + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + cert, err := tls.X509KeyPair(serverCert, serverKey) + if err != nil { + return nil, err + } + return &tls.Config{ + Certificates: []tls.Certificate{cert}, + ClientCAs: caCertPool, + ClientAuth: clientAuthType, + }, nil +} diff --git a/webhook/webhook_test.go b/webhook/webhook_test.go new file mode 100644 index 0000000000..02516f14f5 --- /dev/null +++ b/webhook/webhook_test.go @@ -0,0 +1,160 @@ +package webhook + +import ( + "context" + "crypto/tls" + "fmt" + "net" + "testing" + + dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" + "github.com/numaproj/numaflow/pkg/shared/logging" + "github.com/stretchr/testify/assert" + admissionv1 "k8s.io/api/admission/v1" + admissionregistrationv1 "k8s.io/api/admissionregistration/v1" + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + fakeClient "k8s.io/client-go/kubernetes/fake" + "sigs.k8s.io/controller-runtime/pkg/manager/signals" +) + +func fakeOptions() Options { + return Options{ + Namespace: "test-ns", + DeploymentName: "numaflow-webhook", + ClusterRoleName: "numaflow-webhook", + ServiceName: "webhook", + Port: 8443, + SecretName: "webhook-certs", + WebhookName: "webhook.numaflow.numaproj.io", + } +} + +func fakeValidatingWebhookConfig(opts Options) *admissionregistrationv1.ValidatingWebhookConfiguration { + sideEffects := admissionregistrationv1.SideEffectClassNone + failurePolicy := admissionregistrationv1.Ignore + return &admissionregistrationv1.ValidatingWebhookConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + Name: opts.WebhookName, + }, + Webhooks: []admissionregistrationv1.ValidatingWebhook{ + { + Name: opts.WebhookName, + Rules: []admissionregistrationv1.RuleWithOperations{{}}, + SideEffects: &sideEffects, + FailurePolicy: &failurePolicy, + ClientConfig: admissionregistrationv1.WebhookClientConfig{}, + }, + }, + } +} + +func contextWithLogger(t *testing.T) context.Context { + t.Helper() + return logging.WithLogger(context.Background(), logging.NewLogger()) +} + +func contextWithLoggerAndCancel(t *testing.T) context.Context { + t.Helper() + return logging.WithLogger(signals.SetupSignalHandler(), logging.NewLogger()) +} + +func fakeAdmissionController(t *testing.T, options Options) *AdmissionController { + t.Helper() + ac := &AdmissionController{ + Client: fakeClient.NewSimpleClientset(), + Options: options, + Handlers: map[schema.GroupVersionKind]runtime.Object{ + {Group: "numaflow.numaproj.io", Version: "v1alpha1", Kind: "InterStepBufferService"}: &dfv1.InterStepBufferService{}, + }, + Logger: logging.NewLogger(), + } + return ac +} + +func TestConnectAllowed(t *testing.T) { + ac := fakeAdmissionController(t, fakeOptions()) + t.Run("test CONNECT allowed", func(t *testing.T) { + req := &admissionv1.AdmissionRequest{ + Operation: admissionv1.Connect, + } + resp := ac.admit(contextWithLogger(t), req) + assert.True(t, resp.Allowed) + }) +} + +func TestDeleteAllowed(t *testing.T) { + ac := fakeAdmissionController(t, fakeOptions()) + t.Run("test DELETE allowed", func(t *testing.T) { + req := &admissionv1.AdmissionRequest{ + Operation: admissionv1.Delete, + } + resp := ac.admit(contextWithLogger(t), req) + assert.True(t, resp.Allowed) + }) +} + +func TestDefaultClientAuth(t *testing.T) { + opts := fakeOptions() + assert.Equal(t, opts.ClientAuth, tls.NoClientCert) +} + +func createDeployment(ac *AdmissionController) { + opts := fakeOptions() + deployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: opts.DeploymentName, + Namespace: opts.Namespace, + }, + } + _, _ = ac.Client.AppsV1().Deployments(opts.Namespace).Create(context.TODO(), deployment, metav1.CreateOptions{}) +} + +func createWebhook(ac *AdmissionController, wh *admissionregistrationv1.ValidatingWebhookConfiguration) { + client := ac.Client.AdmissionregistrationV1().ValidatingWebhookConfigurations() + _, err := client.Create(context.TODO(), wh, metav1.CreateOptions{}) + if err != nil { + panic(fmt.Errorf("failed to create test webhook, %w", err)) + } +} + +func TestRun(t *testing.T) { + opts := fakeOptions() + ac := fakeAdmissionController(t, opts) + createDeployment(ac) + webhook := fakeValidatingWebhookConfig(opts) + createWebhook(ac, webhook) + + ctx := contextWithLoggerAndCancel(t) + go func() { + _ = ac.Run(ctx) + }() + _, err := net.Dial("tcp", fmt.Sprintf(":%d", opts.Port)) + assert.NotNil(t, err) +} + +func TestConfigureCertWithExistingSecret(t *testing.T) { + t.Run("test configure cert with existing secret", func(t *testing.T) { + opts := fakeOptions() + ac := fakeAdmissionController(t, opts) + createDeployment(ac) + ctx := contextWithLogger(t) + newSecret, err := ac.generateSecret(ctx) + assert.Nil(t, err) + _, err = ac.Client.CoreV1().Secrets(opts.Namespace).Create(context.TODO(), newSecret, metav1.CreateOptions{}) + assert.Nil(t, err) + + tlsConfig, caCert, err := ac.configureCerts(ctx, tls.NoClientCert) + assert.Nil(t, err) + assert.NotNil(t, tlsConfig) + + expectedCert, err := tls.X509KeyPair(newSecret.Data[secretServerCert], newSecret.Data[secretServerKey]) + assert.Nil(t, err) + assert.True(t, len(tlsConfig.Certificates) >= 1) + + assert.Equal(t, expectedCert.Certificate, tlsConfig.Certificates[0].Certificate) + assert.Equal(t, newSecret.Data[secretCACert], caCert) + }) +}