diff --git a/go.mod b/go.mod index f89b0bb..870f225 100644 --- a/go.mod +++ b/go.mod @@ -86,6 +86,7 @@ require ( github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect + github.com/onsi/gomega v1.31.1 // indirect github.com/opencontainers/go-digest v1.0.1-0.20231025023718-d50d2fec9c98 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect github.com/peterbourgon/diskv v2.0.1+incompatible // indirect @@ -136,6 +137,7 @@ require ( github.com/fluxcd/image-reflector-controller/api v0.31.2 github.com/fluxcd/kustomize-controller/api v1.2.2 github.com/fluxcd/notification-controller/api v1.2.4 + github.com/fluxcd/pkg/runtime v0.44.1 github.com/fluxcd/source-controller/api v1.2.4 github.com/go-chi/chi v1.5.5 github.com/go-logr/logr v1.4.1 // indirect diff --git a/go.sum b/go.sum index ebfadc3..e16c55a 100644 --- a/go.sum +++ b/go.sum @@ -111,6 +111,8 @@ github.com/fluxcd/pkg/apis/kustomize v1.3.0 h1:qvB46CfaOWcL1SyR2RiVWN/j7/035D0Ot github.com/fluxcd/pkg/apis/kustomize v1.3.0/go.mod h1:PCXf5kktTzNav0aH2Ns3jsowqwmA9xTcsrEOoPzx/K8= github.com/fluxcd/pkg/apis/meta v1.3.0 h1:KxeEc6olmSZvQ5pBONPE4IKxyoWQbqTJF1X6K5nIXpU= github.com/fluxcd/pkg/apis/meta v1.3.0/go.mod h1:3Ui8xFkoU4sYehqmscjpq7NjqH2YN1A2iX2okbO3/yA= +github.com/fluxcd/pkg/runtime v0.44.1 h1:XuPTcNIgn/NsoIo/A6qfPZaD9E7cbnJTDbeNw8O1SZQ= +github.com/fluxcd/pkg/runtime v0.44.1/go.mod h1:s1AhSOTCEBPaTfz/GdBD/Ws66uOByIuNP4Znrq+is9M= github.com/fluxcd/source-controller/api v1.2.4 h1:XjKTWhSSeLGsogWnTcLl5sUnyMlC5TKDbbBgP9SyJ5c= github.com/fluxcd/source-controller/api v1.2.4/go.mod h1:j3QSHpIPBP5sjaGIkVtsgWCx8JcOmcsutRmdJmRMOZg= github.com/foxcpp/go-mockdns v1.0.0 h1:7jBqxd3WDWwi/6WhDvacvH1XsN3rOLXyHM1uhvIx6FI= @@ -282,6 +284,8 @@ github.com/onsi/ginkgo/v2 v2.14.0 h1:vSmGj2Z5YPb9JwCWT6z6ihcUvDhuXLc3sJiqd3jMKAY github.com/onsi/ginkgo/v2 v2.14.0/go.mod h1:JkUdW7JkN0V6rFvsHcJ478egV3XH9NxpD27Hal/PhZw= github.com/onsi/gomega v1.30.0 h1:hvMK7xYz4D3HapigLTeGdId/NcfQx1VHMJc60ew99+8= github.com/onsi/gomega v1.30.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ= +github.com/onsi/gomega v1.31.1 h1:KYppCUK+bUgAZwHOu7EXVBKyQA6ILvOESHkn/tgoqvo= +github.com/onsi/gomega v1.31.1/go.mod h1:y40C95dwAD1Nz36SsEnxvfFe8FFfNxzI5eJ0EYGyAy0= github.com/opencontainers/go-digest v1.0.1-0.20231025023718-d50d2fec9c98 h1:H55sU3giNgBkIvmAo0vI/AAFwVTwfWsf6MN3+9H6U8o= github.com/opencontainers/go-digest v1.0.1-0.20231025023718-d50d2fec9c98/go.mod h1:RqnyioA3pIEZMkSbOIcrw32YSgETfn/VrLuEikEdPNU= github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= diff --git a/pkg/api/api.go b/pkg/api/api.go index 6c9e710..c8ee2e7 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -203,6 +203,32 @@ func stopLogs(w http.ResponseWriter, r *http.Request) { w.Write([]byte("{}")) } +func suspend(w http.ResponseWriter, r *http.Request) { + resource := r.URL.Query().Get("resource") + namespace := r.URL.Query().Get("namespace") + name := r.URL.Query().Get("name") + config, _ := r.Context().Value("config").(*rest.Config) + + reconcileCommand := flux.NewSuspendCommand(resource) + go reconcileCommand.Run(config, namespace, name) + + w.WriteHeader(http.StatusOK) + w.Write([]byte("{}")) +} + +func resume(w http.ResponseWriter, r *http.Request) { + resource := r.URL.Query().Get("resource") + namespace := r.URL.Query().Get("namespace") + name := r.URL.Query().Get("name") + config, _ := r.Context().Value("config").(*rest.Config) + + reconcileCommand := flux.NewResumeCommand(resource) + go reconcileCommand.Run(config, namespace, name) + + w.WriteHeader(http.StatusOK) + w.Write([]byte("{}")) +} + func reconcile(w http.ResponseWriter, r *http.Request) { resource := r.URL.Query().Get("resource") namespace := r.URL.Query().Get("namespace") diff --git a/pkg/api/router.go b/pkg/api/router.go index ca6731b..e0da429 100644 --- a/pkg/api/router.go +++ b/pkg/api/router.go @@ -39,6 +39,8 @@ func SetupRouter( r.Get("/api/describePod", describePod) r.Get("/api/logs", streamLogs) r.Get("/api/stopLogs", stopLogs) + r.Post("/api/suspend", suspend) + r.Post("/api/resume", resume) r.Post("/api/reconcile", reconcile) r.Get("/ws/", func(w http.ResponseWriter, r *http.Request) { streaming.ServeWs(clientHub, w, r) diff --git a/pkg/flux/helmrelease.go b/pkg/flux/helmrelease.go index 4ca4c80..24031f7 100644 --- a/pkg/flux/helmrelease.go +++ b/pkg/flux/helmrelease.go @@ -32,10 +32,30 @@ func (h helmReleaseAdapter) asClientObject() client.Object { return h.HelmRelease } +func (h helmReleaseAdapter) deepCopyClientObject() client.Object { + return h.HelmRelease.DeepCopy() +} + func (obj helmReleaseAdapter) isSuspended() bool { return obj.HelmRelease.Spec.Suspend } +func (obj helmReleaseAdapter) setSuspended() { + obj.HelmRelease.Spec.Suspend = true +} + +func (obj helmReleaseAdapter) setUnsuspended() { + obj.HelmRelease.Spec.Suspend = false +} + +func (obj helmReleaseAdapter) getObservedGeneration() int64 { + return obj.HelmRelease.Status.ObservedGeneration +} + +func (obj helmReleaseAdapter) isStatic() bool { + return false +} + func (obj helmReleaseAdapter) lastHandledReconcileRequest() string { return obj.Status.GetLastHandledReconcileRequest() } @@ -43,3 +63,23 @@ func (obj helmReleaseAdapter) lastHandledReconcileRequest() string { func (obj helmReleaseAdapter) successMessage() string { return fmt.Sprintf("applied revision %s", obj.Status.LastAppliedRevision) } + +type helmReleaseListAdapter struct { + *helmv2beta1.HelmReleaseList +} + +func (h helmReleaseListAdapter) asClientList() client.ObjectList { + return h.HelmReleaseList +} + +func (h helmReleaseListAdapter) len() int { + return len(h.HelmReleaseList.Items) +} + +func (a helmReleaseListAdapter) item(i int) suspendable { + return &helmReleaseAdapter{&a.HelmReleaseList.Items[i]} +} + +func (a helmReleaseListAdapter) resumeItem(i int) resumable { + return &helmReleaseAdapter{&a.HelmReleaseList.Items[i]} +} diff --git a/pkg/flux/kustomization.go b/pkg/flux/kustomization.go index cfceaf6..b06385b 100644 --- a/pkg/flux/kustomization.go +++ b/pkg/flux/kustomization.go @@ -32,10 +32,30 @@ func (a kustomizationAdapter) asClientObject() client.Object { return a.Kustomization } +func (a kustomizationAdapter) deepCopyClientObject() client.Object { + return a.Kustomization.DeepCopy() +} + func (obj kustomizationAdapter) isSuspended() bool { return obj.Kustomization.Spec.Suspend } +func (obj kustomizationAdapter) setSuspended() { + obj.Kustomization.Spec.Suspend = true +} + +func (obj kustomizationAdapter) setUnsuspended() { + obj.Kustomization.Spec.Suspend = false +} + +func (obj kustomizationAdapter) getObservedGeneration() int64 { + return obj.Kustomization.Status.ObservedGeneration +} + +func (obj kustomizationAdapter) isStatic() bool { + return false +} + func (obj kustomizationAdapter) lastHandledReconcileRequest() string { return obj.Status.GetLastHandledReconcileRequest() } @@ -43,3 +63,23 @@ func (obj kustomizationAdapter) lastHandledReconcileRequest() string { func (obj kustomizationAdapter) successMessage() string { return fmt.Sprintf("applied revision %s", obj.Status.LastAppliedRevision) } + +type kustomizationListAdapter struct { + *kustomizationv1.KustomizationList +} + +func (a kustomizationListAdapter) asClientList() client.ObjectList { + return a.KustomizationList +} + +func (a kustomizationListAdapter) len() int { + return len(a.KustomizationList.Items) +} + +func (a kustomizationListAdapter) item(i int) suspendable { + return &kustomizationAdapter{&a.KustomizationList.Items[i]} +} + +func (a kustomizationListAdapter) resumeItem(i int) resumable { + return &kustomizationAdapter{&a.KustomizationList.Items[i]} +} diff --git a/pkg/flux/reconcile.go b/pkg/flux/reconcile.go index c3a6961..36b31cc 100644 --- a/pkg/flux/reconcile.go +++ b/pkg/flux/reconcile.go @@ -31,7 +31,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" - apiruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" @@ -104,7 +103,7 @@ func NewReconcileCommand(resource string) *reconcileCommand { } func (r *reconcileCommand) Run(config *rest.Config, namespace, name string) { - scheme := apiruntime.NewScheme() + scheme := runtime.NewScheme() sourcev1.AddToScheme(scheme) sourcev1beta2.AddToScheme(scheme) kustomizationv1.AddToScheme(scheme) diff --git a/pkg/flux/resume.go b/pkg/flux/resume.go new file mode 100644 index 0000000..29a316e --- /dev/null +++ b/pkg/flux/resume.go @@ -0,0 +1,295 @@ +/* +Copyright 2020 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +Original version: https://github.com/fluxcd/flux2/blob/437a94367784541695fa68deba7a52b188d97ea8/cmd/flux/resume.go +*/ + +package flux + +import ( + "context" + "fmt" + "time" + + helmv2beta1 "github.com/fluxcd/helm-controller/api/v2beta1" + kustomizationv1 "github.com/fluxcd/kustomize-controller/api/v1" + "github.com/fluxcd/pkg/apis/meta" + "github.com/fluxcd/pkg/runtime/object" + "github.com/fluxcd/pkg/runtime/patch" + sourcev1 "github.com/fluxcd/source-controller/api/v1" + sourcev1beta2 "github.com/fluxcd/source-controller/api/v1beta2" + "github.com/sirupsen/logrus" + apimeta "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/rest" + kstatus "sigs.k8s.io/cli-utils/pkg/kstatus/status" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// objectStatusType is the type of object in terms of status when computing the +// readiness of an object. Readiness check method depends on the type of object. +// For a dynamic object, Ready status condition is considered only for the +// latest generation of the object. For a static object that don't have any +// condition, the object generation is not considered. +type objectStatusType int + +const ( + objectStatusDynamic objectStatusType = iota + objectStatusStatic +) + +type resumeCommand struct { + kind string + groupVersion schema.GroupVersion + list listResumable +} + +type listResumable interface { + asClientList() client.ObjectList + len() int + resumeItem(i int) resumable +} + +type resumable interface { + asClientObject() client.Object + deepCopyClientObject() client.Object + GetGeneration() int64 + getObservedGeneration() int64 + setUnsuspended() + isStatic() bool + successMessage() string +} + +func NewResumeCommand(resource string) *resumeCommand { + switch resource { + case "kustomization": + return &resumeCommand{ + kind: kustomizationv1.KustomizationKind, + groupVersion: kustomizationv1.GroupVersion, + list: kustomizationListAdapter{&kustomizationv1.KustomizationList{}}, + } + case "helmrelease": + return &resumeCommand{ + kind: helmv2beta1.HelmReleaseKind, + groupVersion: helmv2beta1.GroupVersion, + list: helmReleaseListAdapter{&helmv2beta1.HelmReleaseList{}}, + } + case sourcev1.GitRepositoryKind: + return &resumeCommand{ + kind: sourcev1.GitRepositoryKind, + groupVersion: sourcev1.GroupVersion, + list: gitRepositoryListAdapter{&sourcev1.GitRepositoryList{}}, + } + case sourcev1beta2.OCIRepositoryKind: + return &resumeCommand{ + kind: sourcev1beta2.OCIRepositoryKind, + groupVersion: sourcev1beta2.GroupVersion, + list: ociRepositoryListAdapter{&sourcev1beta2.OCIRepositoryList{}}, + } + case sourcev1beta2.BucketKind: + return &resumeCommand{ + kind: sourcev1beta2.BucketKind, + groupVersion: sourcev1beta2.GroupVersion, + list: bucketListAdapter{&sourcev1beta2.BucketList{}}, + } + } + + return nil +} + +func (r *resumeCommand) Run(config *rest.Config, namespace, name string) { + scheme := runtime.NewScheme() + sourcev1.AddToScheme(scheme) + sourcev1beta2.AddToScheme(scheme) + kustomizationv1.AddToScheme(scheme) + helmv2beta1.AddToScheme(scheme) + + kubeClient, err := client.NewWithWatch(config, client.Options{ + Scheme: scheme, + }) + if err != nil { + logrus.Errorf("kubernetes client initialization failed: %s", err) + return + } + + listOpts := []client.ListOption{ + client.InNamespace(namespace), + client.MatchingFields{ + "metadata.name": name, + }, + } + + obj, err := r.patch(context.TODO(), kubeClient, listOpts, namespace) + if err != nil { + if err == ErrNoObjectsFound { + logrus.Errorf("%s %s not found in %s namespace", r.kind, name, namespace) + } else { + logrus.Errorf("failed suspending %s %s in %s namespace: %s", r.kind, name, namespace, err.Error()) + } + } + r.reconcile(kubeClient, obj, namespace) +} + +// Patches resumable object by setting status to unsuspended. +// Returns a resumable that have been patched and any error encountered during patching. +func (r resumeCommand) patch(ctx context.Context, kubeClient client.WithWatch, listOpts []client.ListOption, namespace string) (resumable, error) { + if err := kubeClient.List(ctx, r.list.asClientList(), listOpts...); err != nil { + return nil, err + } + + if r.list.len() == 0 { + logrus.Errorf("no %s objects found in %s namespace", r.kind, namespace) + return nil, nil + } + + var resumables []resumable + for i := 0; i < r.list.len(); i++ { + obj := r.list.resumeItem(i) + logrus.Infof("resuming %s %s in %s namespace", r.kind, obj.asClientObject().GetName(), namespace) + + patch := client.MergeFrom(obj.deepCopyClientObject()) + obj.setUnsuspended() + if err := kubeClient.Patch(ctx, obj.asClientObject(), patch); err != nil { + return nil, err + } + + resumables = append(resumables, obj) + + logrus.Infof("%s resumed", r.kind) + } + + return resumables[0], nil +} + +// Waits for resumable object to be reconciled and returns the object and any error encountered while waiting. +func (r resumeCommand) reconcile(kubeClient client.WithWatch, res resumable, namespace string) { + namespacedName := types.NamespacedName{ + Name: res.asClientObject().GetName(), + Namespace: namespace, + } + + logrus.Infof("waiting for %s reconciliation", r.kind) + + readyConditionFunc := isObjectReadyConditionFunc(kubeClient, namespacedName, res.asClientObject()) + if res.isStatic() { + readyConditionFunc = isStaticObjectReadyConditionFunc(kubeClient, namespacedName, res.asClientObject()) + } + + if err := wait.PollUntilContextTimeout(context.TODO(), 2*time.Second, 5*time.Minute, true, readyConditionFunc); err != nil { + logrus.Error(err) + return + } + + logrus.Infof("%s %s reconciliation completed", r.kind, res.asClientObject().GetName()) + logrus.Infof(res.successMessage()) +} + +// isObjectReady determines if an object is ready using the kstatus.Compute() +// result. statusType helps differenciate between static and dynamic objects to +// accurately check the object's readiness. A dynamic object may have some extra +// considerations depending on the object. +func isObjectReady(obj client.Object, statusType objectStatusType) (bool, error) { + observedGen, err := object.GetStatusObservedGeneration(obj) + if err != nil && err != object.ErrObservedGenerationNotFound { + return false, err + } + + if statusType == objectStatusDynamic { + // Object not reconciled yet. + if observedGen < 1 { + return false, nil + } + + cobj, ok := obj.(meta.ObjectWithConditions) + if !ok { + return false, fmt.Errorf("unable to get conditions from object") + } + + if c := apimeta.FindStatusCondition(cobj.GetConditions(), meta.ReadyCondition); c != nil { + // Ensure that the ready condition is for the latest generation of + // the object. + // NOTE: Some APIs like ImageUpdateAutomation and HelmRelease don't + // support per condition observed generation yet. Per condition + // observed generation for them are always zero. + // There are two strategies used across different object kinds to + // check the latest ready condition: + // - check that the ready condition's generation matches the + // object's generation. + // - check that the observed generation of the object in the + // status matches the object's generation. + // + // TODO: Once ImageUpdateAutomation and HelmRelease APIs have per + // condition observed generation, remove the object's observed + // generation and object's generation check (the second condition + // below). Also, try replacing this readiness check function with + // fluxcd/pkg/ssa's ResourceManager.Wait(), which uses kstatus + // internally to check readiness of the objects. + if c.ObservedGeneration != 0 && c.ObservedGeneration != obj.GetGeneration() { + return false, nil + } + if c.ObservedGeneration == 0 && observedGen != obj.GetGeneration() { + return false, nil + } + } else { + return false, nil + } + } + + u, err := patch.ToUnstructured(obj) + if err != nil { + return false, err + } + result, err := kstatus.Compute(u) + if err != nil { + return false, err + } + switch result.Status { + case kstatus.CurrentStatus: + return true, nil + case kstatus.InProgressStatus: + return false, nil + default: + return false, fmt.Errorf(result.Message) + } +} + +// isObjectReadyConditionFunc returns a wait.ConditionFunc to be used with +// wait.Poll* while polling for an object with dynamic status to be ready. +func isObjectReadyConditionFunc(kubeClient client.Client, namespaceName types.NamespacedName, obj client.Object) wait.ConditionWithContextFunc { + return func(ctx context.Context) (bool, error) { + err := kubeClient.Get(ctx, namespaceName, obj) + if err != nil { + return false, err + } + + return isObjectReady(obj, objectStatusDynamic) + } +} + +// isStaticObjectReadyConditionFunc returns a wait.ConditionFunc to be used with +// wait.Poll* while polling for an object with static or no status to be +// ready. +func isStaticObjectReadyConditionFunc(kubeClient client.Client, namespaceName types.NamespacedName, obj client.Object) wait.ConditionWithContextFunc { + return func(ctx context.Context) (bool, error) { + err := kubeClient.Get(ctx, namespaceName, obj) + if err != nil { + return false, err + } + + return isObjectReady(obj, objectStatusStatic) + } +} diff --git a/pkg/flux/source.go b/pkg/flux/source.go index 108effa..520df93 100644 --- a/pkg/flux/source.go +++ b/pkg/flux/source.go @@ -21,6 +21,7 @@ import ( "fmt" sourcev1 "github.com/fluxcd/source-controller/api/v1" + sourcev1b2 "github.com/fluxcd/source-controller/api/v1beta2" sourcev1beta2 "github.com/fluxcd/source-controller/api/v1beta2" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -33,10 +34,30 @@ func (a gitRepositoryAdapter) asClientObject() client.Object { return a.GitRepository } +func (a gitRepositoryAdapter) deepCopyClientObject() client.Object { + return a.GitRepository.DeepCopy() +} + func (obj gitRepositoryAdapter) isSuspended() bool { return obj.GitRepository.Spec.Suspend } +func (obj gitRepositoryAdapter) setSuspended() { + obj.GitRepository.Spec.Suspend = true +} + +func (obj gitRepositoryAdapter) setUnsuspended() { + obj.GitRepository.Spec.Suspend = false +} + +func (obj gitRepositoryAdapter) getObservedGeneration() int64 { + return obj.GitRepository.Status.ObservedGeneration +} + +func (obj gitRepositoryAdapter) isStatic() bool { + return false +} + func (obj gitRepositoryAdapter) lastHandledReconcileRequest() string { return obj.Status.GetLastHandledReconcileRequest() } @@ -45,6 +66,26 @@ func (obj gitRepositoryAdapter) successMessage() string { return fmt.Sprintf("fetched revision %s", obj.Status.Artifact.Revision) } +type gitRepositoryListAdapter struct { + *sourcev1.GitRepositoryList +} + +func (a gitRepositoryListAdapter) asClientList() client.ObjectList { + return a.GitRepositoryList +} + +func (a gitRepositoryListAdapter) len() int { + return len(a.GitRepositoryList.Items) +} + +func (a gitRepositoryListAdapter) item(i int) suspendable { + return &gitRepositoryAdapter{&a.GitRepositoryList.Items[i]} +} + +func (a gitRepositoryListAdapter) resumeItem(i int) resumable { + return &gitRepositoryAdapter{&a.GitRepositoryList.Items[i]} +} + type ociRepositoryAdapter struct { *sourcev1beta2.OCIRepository } @@ -53,10 +94,30 @@ func (a ociRepositoryAdapter) asClientObject() client.Object { return a.OCIRepository } +func (a ociRepositoryAdapter) deepCopyClientObject() client.Object { + return a.OCIRepository.DeepCopy() +} + func (obj ociRepositoryAdapter) isSuspended() bool { return obj.OCIRepository.Spec.Suspend } +func (obj ociRepositoryAdapter) setSuspended() { + obj.OCIRepository.Spec.Suspend = true +} + +func (obj ociRepositoryAdapter) setUnsuspended() { + obj.OCIRepository.Spec.Suspend = false +} + +func (obj ociRepositoryAdapter) getObservedGeneration() int64 { + return obj.OCIRepository.Status.ObservedGeneration +} + +func (obj ociRepositoryAdapter) isStatic() bool { + return false +} + func (obj ociRepositoryAdapter) lastHandledReconcileRequest() string { return obj.Status.GetLastHandledReconcileRequest() } @@ -65,6 +126,26 @@ func (obj ociRepositoryAdapter) successMessage() string { return fmt.Sprintf("fetched revision %s", obj.Status.Artifact.Revision) } +type ociRepositoryListAdapter struct { + *sourcev1b2.OCIRepositoryList +} + +func (a ociRepositoryListAdapter) asClientList() client.ObjectList { + return a.OCIRepositoryList +} + +func (a ociRepositoryListAdapter) len() int { + return len(a.OCIRepositoryList.Items) +} + +func (a ociRepositoryListAdapter) item(i int) suspendable { + return &ociRepositoryAdapter{&a.OCIRepositoryList.Items[i]} +} + +func (a ociRepositoryListAdapter) resumeItem(i int) resumable { + return &ociRepositoryAdapter{&a.OCIRepositoryList.Items[i]} +} + type bucketAdapter struct { *sourcev1beta2.Bucket } @@ -73,10 +154,30 @@ func (a bucketAdapter) asClientObject() client.Object { return a.Bucket } +func (a bucketAdapter) deepCopyClientObject() client.Object { + return a.Bucket.DeepCopy() +} + func (obj bucketAdapter) isSuspended() bool { return obj.Bucket.Spec.Suspend } +func (obj bucketAdapter) setSuspended() { + obj.Bucket.Spec.Suspend = true +} + +func (obj bucketAdapter) setUnsuspended() { + obj.Bucket.Spec.Suspend = false +} + +func (obj bucketAdapter) getObservedGeneration() int64 { + return obj.Bucket.Status.ObservedGeneration +} + +func (obj bucketAdapter) isStatic() bool { + return false +} + func (obj bucketAdapter) lastHandledReconcileRequest() string { return obj.Status.GetLastHandledReconcileRequest() } @@ -85,6 +186,26 @@ func (obj bucketAdapter) successMessage() string { return fmt.Sprintf("fetched revision %s", obj.Status.Artifact.Revision) } +type bucketListAdapter struct { + *sourcev1b2.BucketList +} + +func (a bucketListAdapter) asClientList() client.ObjectList { + return a.BucketList +} + +func (a bucketListAdapter) len() int { + return len(a.BucketList.Items) +} + +func (a bucketListAdapter) item(i int) suspendable { + return &bucketAdapter{&a.BucketList.Items[i]} +} + +func (a bucketListAdapter) resumeItem(i int) resumable { + return &bucketAdapter{&a.BucketList.Items[i]} +} + type helmRepositoryAdapter struct { *sourcev1beta2.HelmRepository } diff --git a/pkg/flux/suspend.go b/pkg/flux/suspend.go new file mode 100644 index 0000000..b0ea73f --- /dev/null +++ b/pkg/flux/suspend.go @@ -0,0 +1,153 @@ +/* +Copyright 2020 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +Original version: https://github.com/fluxcd/flux2/blob/437a94367784541695fa68deba7a52b188d97ea8/cmd/flux/suspend.go +*/ + +package flux + +import ( + "context" + "errors" + + helmv2beta1 "github.com/fluxcd/helm-controller/api/v2beta1" + kustomizationv1 "github.com/fluxcd/kustomize-controller/api/v1" + sourcev1 "github.com/fluxcd/source-controller/api/v1" + sourcev1beta2 "github.com/fluxcd/source-controller/api/v1beta2" + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type suspendCommand struct { + kind string + groupVersion schema.GroupVersion + list listSuspendable + object suspendable +} + +type listSuspendable interface { + asClientList() client.ObjectList + len() int + item(i int) suspendable +} + +type suspendable interface { + asClientObject() client.Object + deepCopyClientObject() client.Object + isSuspended() bool + setSuspended() +} + +func NewSuspendCommand(resource string) *suspendCommand { + switch resource { + case "kustomization": + return &suspendCommand{ + kind: kustomizationv1.KustomizationKind, + groupVersion: kustomizationv1.GroupVersion, + object: kustomizationAdapter{&kustomizationv1.Kustomization{}}, + list: &kustomizationListAdapter{&kustomizationv1.KustomizationList{}}, + } + case "helmrelease": + return &suspendCommand{ + kind: helmv2beta1.HelmReleaseKind, + groupVersion: helmv2beta1.GroupVersion, + object: &helmReleaseAdapter{&helmv2beta1.HelmRelease{}}, + list: &helmReleaseListAdapter{&helmv2beta1.HelmReleaseList{}}, + } + case sourcev1.GitRepositoryKind: + return &suspendCommand{ + kind: sourcev1.GitRepositoryKind, + groupVersion: sourcev1.GroupVersion, + object: gitRepositoryAdapter{&sourcev1.GitRepository{}}, + list: gitRepositoryListAdapter{&sourcev1.GitRepositoryList{}}, + } + case sourcev1beta2.OCIRepositoryKind: + return &suspendCommand{ + kind: sourcev1beta2.OCIRepositoryKind, + groupVersion: sourcev1beta2.GroupVersion, + object: ociRepositoryAdapter{&sourcev1beta2.OCIRepository{}}, + list: ociRepositoryListAdapter{&sourcev1beta2.OCIRepositoryList{}}, + } + case sourcev1beta2.BucketKind: + return &suspendCommand{ + kind: sourcev1beta2.BucketKind, + groupVersion: sourcev1beta2.GroupVersion, + object: bucketAdapter{&sourcev1beta2.Bucket{}}, + list: bucketListAdapter{&sourcev1beta2.BucketList{}}, + } + } + + return nil +} + +func (s *suspendCommand) Run(config *rest.Config, namespace, name string) { + scheme := runtime.NewScheme() + sourcev1.AddToScheme(scheme) + sourcev1beta2.AddToScheme(scheme) + kustomizationv1.AddToScheme(scheme) + helmv2beta1.AddToScheme(scheme) + + kubeClient, err := client.NewWithWatch(config, client.Options{ + Scheme: scheme, + }) + if err != nil { + logrus.Errorf("kubernetes client initialization failed: %s", err) + return + } + + listOpts := []client.ListOption{ + client.InNamespace(namespace), + client.MatchingFields{ + "metadata.name": name, + }, + } + + if err := s.patch(context.TODO(), kubeClient, listOpts, namespace); err != nil { + if err == ErrNoObjectsFound { + logrus.Errorf("%s %s not found in %s namespace", s.kind, name, namespace) + } else { + logrus.Errorf("failed suspending %s %s in %s namespace: %s", s.kind, name, namespace, err.Error()) + } + } +} + +var ErrNoObjectsFound = errors.New("no objects found") + +func (s suspendCommand) patch(ctx context.Context, kubeClient client.WithWatch, listOpts []client.ListOption, namespace string) error { + if err := kubeClient.List(ctx, s.list.asClientList(), listOpts...); err != nil { + return err + } + + if s.list.len() == 0 { + return ErrNoObjectsFound + } + + for i := 0; i < s.list.len(); i++ { + logrus.Infof("suspending %s %s in %s namespace", s.kind, s.list.item(i).asClientObject().GetName(), namespace) + + obj := s.list.item(i) + patch := client.MergeFrom(obj.deepCopyClientObject()) + obj.setSuspended() + if err := kubeClient.Patch(ctx, obj.asClientObject(), patch); err != nil { + return err + } + + logrus.Infof("%s suspended", s.kind) + } + + return nil +} diff --git a/web/src/HelmRelease.jsx b/web/src/HelmRelease.jsx index 1126941..90d850e 100644 --- a/web/src/HelmRelease.jsx +++ b/web/src/HelmRelease.jsx @@ -18,7 +18,7 @@ export function HelmRelease(props) { return (
@@ -35,7 +35,20 @@ export function HelmRelease(props) {
-
+
+