From 5e2565695645558adf3681ef34ccca4a36041aaf Mon Sep 17 00:00:00 2001 From: Steve Hipwell Date: Thu, 22 Aug 2024 17:20:48 +0100 Subject: [PATCH] feat: Changed to use watch for wait rollout Signed-off-by: Steve Hipwell --- docs/resources/kubectl_manifest.md | 2 +- kubernetes/resource_kubectl_manifest.go | 323 ++++++++++++++----- kubernetes/resource_kubectl_manifest_test.go | 150 +++++++++ 3 files changed, 393 insertions(+), 82 deletions(-) diff --git a/docs/resources/kubectl_manifest.md b/docs/resources/kubectl_manifest.md index 35c55c34..be1c93b5 100644 --- a/docs/resources/kubectl_manifest.md +++ b/docs/resources/kubectl_manifest.md @@ -88,7 +88,7 @@ YAML * `override_namespace` - Optional. Override the namespace to apply the kubernetes resource to, ignoring any declared namespace in the `yaml_body`. * `validate_schema` - Optional. Setting to `false` will mimic `kubectl apply --validate=false` mode. Default `true`. * `wait` - Optional. Set this flag to wait or not for finalized to complete for deleted objects. Default `false`. -* `wait_for_rollout` - Optional. Set this flag to wait or not for Deployments and APIService to complete rollout. Default `true`. +* `wait_for_rollout` - Optional. Set this flag to wait or not for `Deployment`, `DaemonSet`, `StatefulSet` & `APIService` resources to complete rollout. Default `true`. * `wait_for` - Optional. If set, will wait until either all conditions are satisfied, or until timeout is reached (see [below for nested schema](#wait_for)). Under the hood [gojsonq](https://github.com/thedevsaddam/gojsonq) is used for querying, see the related syntax and examples. * `delete_cascade` - Optional; `Background` or `Foreground` are valid options. If set this overrides the default provider behaviour which is to use `Background` unless `wait` is `true` when `Foreground` will be used. To duplicate the default behaviour of `kubectl` this should be explicitly set to `Background`. diff --git a/kubernetes/resource_kubectl_manifest.go b/kubernetes/resource_kubectl_manifest.go index 3c370495..2403bcf3 100644 --- a/kubernetes/resource_kubectl_manifest.go +++ b/kubernetes/resource_kubectl_manifest.go @@ -611,16 +611,26 @@ func resourceKubectlManifestApply(ctx context.Context, d *schema.ResourceData, m switch { case manifest.GetKind() == "Deployment": - log.Printf("[INFO] %v waiting for deployment rollout for %vmin", manifest, timeout.Minutes()) - err = resource.RetryContext(ctx, timeout, - waitForDeploymentReplicasFunc(ctx, meta.(*KubeProvider), manifest.GetNamespace(), manifest.GetName())) + log.Printf("[INFO] %v waiting for Deployment rollout for %vmin", manifest, timeout.Minutes()) + err = waitForDeploymentRollout(ctx, meta.(*KubeProvider), manifest.GetNamespace(), manifest.GetName(), timeout) + if err != nil { + return err + } + case manifest.GetKind() == "DaemonSet": + log.Printf("[INFO] %v waiting for DaemonSet rollout for %vmin", manifest, timeout.Minutes()) + err = waitForDaemonSetRollout(ctx, meta.(*KubeProvider), manifest.GetNamespace(), manifest.GetName(), timeout) + if err != nil { + return err + } + case manifest.GetKind() == "StatefulSet": + log.Printf("[INFO] %v waiting for v rollout for %vmin", manifest, timeout.Minutes()) + err = waitForStatefulSetRollout(ctx, meta.(*KubeProvider), manifest.GetNamespace(), manifest.GetName(), timeout) if err != nil { return err } case manifest.GetKind() == "APIService" && manifest.GetAPIVersion() == "apiregistration.k8s.io/v1": - log.Printf("[INFO] %v waiting for APIService rollout for %vmin", manifest, timeout.Minutes()) - err = resource.RetryContext(ctx, timeout, - waitForAPIServiceAvailableFunc(ctx, meta.(*KubeProvider), manifest.GetName())) + log.Printf("[INFO] %v waiting for APIService for %vmin", manifest, timeout.Minutes()) + err = waitForApiService(ctx, meta.(*KubeProvider), manifest.GetName(), timeout) if err != nil { return err } @@ -728,20 +738,20 @@ func resourceKubectlManifestDelete(ctx context.Context, d *schema.ResourceData, log.Printf("[INFO] %s perform delete of manifest", manifest) - waitForDelete := d.Get("wait").(bool) + wait := d.Get("wait").(bool) var propagationPolicy meta_v1.DeletionPropagation cascadeInput := d.Get("delete_cascade").(string) if len(cascadeInput) > 0 { propagationPolicy = meta_v1.DeletionPropagation(cascadeInput) - } else if waitForDelete { + } else if wait { propagationPolicy = meta_v1.DeletePropagationForeground } else { propagationPolicy = meta_v1.DeletePropagationBackground } var resourceVersion string - if waitForDelete { + if wait { rawResponse, err := restClient.ResourceInterface.Get(ctx, manifest.GetName(), meta_v1.GetOptions{}) if err != nil { return err @@ -758,28 +768,15 @@ func resourceKubectlManifestDelete(ctx context.Context, d *schema.ResourceData, } // The rest client doesn't wait for the delete so we need custom logic - if waitForDelete { + if wait { log.Printf("[INFO] %s waiting for delete of manifest to complete", manifest) - watcher, err := restClient.ResourceInterface.Watch(ctx, meta_v1.ListOptions{Watch: true, FieldSelector: fields.OneTermEqualSelector("metadata.name", manifest.GetName()).String(), ResourceVersion: resourceVersion}) + timeout := d.Timeout(schema.TimeoutDelete) + + err = waitForDelete(ctx, restClient, manifest.GetName(), resourceVersion, timeout) if err != nil { return err } - - defer watcher.Stop() - - deleted := false - for !deleted { - select { - case event := <-watcher.ResultChan(): - if event.Type == watch.Deleted { - deleted = true - } - - case <-ctx.Done(): - return fmt.Errorf("%s failed to delete kubernetes resource", manifest) - } - } } // Success remove it from state @@ -928,9 +925,84 @@ func checkAPIResourceIsPresent(available []*meta_v1.APIResourceList, resource me return nil, false } -// GetDeploymentConditionInternal returns the condition with the provided type. -// Borrowed from: https://github.com/kubernetes/kubernetes/blob/master/pkg/controller/deployment/util/deployment_util.go#L135 -func GetDeploymentCondition(status apps_v1.DeploymentStatus, condType apps_v1.DeploymentConditionType) *apps_v1.DeploymentCondition { +func waitForDelete(ctx context.Context, restClient *RestClientResult, name string, resourceVersion string, timeout time.Duration) error { + timeoutSeconds := int64(timeout.Seconds()) + + watcher, err := restClient.ResourceInterface.Watch(ctx, meta_v1.ListOptions{Watch: true, TimeoutSeconds: &timeoutSeconds, FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(), ResourceVersion: resourceVersion}) + if err != nil { + return err + } + + defer watcher.Stop() + + deleted := false + for !deleted { + select { + case event := <-watcher.ResultChan(): + if event.Type == watch.Deleted { + deleted = true + } + + case <-ctx.Done(): + return fmt.Errorf("%s failed to delete resource", name) + } + } + + return nil +} + +func waitForDeploymentRollout(ctx context.Context, provider *KubeProvider, ns string, name string, timeout time.Duration) error { + timeoutSeconds := int64(timeout.Seconds()) + + watcher, err := provider.MainClientset.AppsV1().Deployments(ns).Watch(ctx, meta_v1.ListOptions{Watch: true, TimeoutSeconds: &timeoutSeconds, FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String()}) + if err != nil { + return err + } + + defer watcher.Stop() + + done := false + for !done { + select { + case event := <-watcher.ResultChan(): + if event.Type == watch.Modified { + deployment, ok := event.Object.(*apps_v1.Deployment) + if !ok { + return fmt.Errorf("%s could not cast to Deployment", name) + } + + if deployment.Generation <= deployment.Status.ObservedGeneration { + condition := getDeploymentCondition(deployment.Status, apps_v1.DeploymentProgressing) + if condition != nil && condition.Reason == TimedOutReason { + continue + } + + if deployment.Spec.Replicas != nil && deployment.Status.UpdatedReplicas < *deployment.Spec.Replicas { + continue + } + + if deployment.Status.Replicas > deployment.Status.UpdatedReplicas { + continue + } + + if deployment.Status.AvailableReplicas < deployment.Status.UpdatedReplicas { + continue + } + + done = true + } + } + + case <-ctx.Done(): + return fmt.Errorf("%s failed to rollout Deployment", name) + } + } + + return nil +} + +func getDeploymentCondition(status apps_v1.DeploymentStatus, condType apps_v1.DeploymentConditionType) *apps_v1.DeploymentCondition { + // Borrowed from: https://github.com/kubernetes/kubernetes/blob/master/pkg/controller/deployment/util/deployment_util.go#L135 for i := range status.Conditions { c := status.Conditions[i] if c.Type == condType { @@ -940,6 +1012,147 @@ func GetDeploymentCondition(status apps_v1.DeploymentStatus, condType apps_v1.De return nil } +func waitForDaemonSetRollout(ctx context.Context, provider *KubeProvider, ns string, name string, timeout time.Duration) error { + timeoutSeconds := int64(timeout.Seconds()) + + watcher, err := provider.MainClientset.AppsV1().DaemonSets(ns).Watch(ctx, meta_v1.ListOptions{Watch: true, TimeoutSeconds: &timeoutSeconds, FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String()}) + if err != nil { + return err + } + + defer watcher.Stop() + + done := false + for !done { + select { + case event := <-watcher.ResultChan(): + if event.Type == watch.Modified { + daemon, ok := event.Object.(*apps_v1.DaemonSet) + if !ok { + return fmt.Errorf("%s could not cast to DaemonSet", name) + } + + if daemon.Spec.UpdateStrategy.Type != apps_v1.RollingUpdateDaemonSetStrategyType { + done = true + continue + } + + if daemon.Generation <= daemon.Status.ObservedGeneration { + if daemon.Status.UpdatedNumberScheduled < daemon.Status.DesiredNumberScheduled { + continue + } + + if daemon.Status.NumberAvailable < daemon.Status.DesiredNumberScheduled { + continue + } + + done = true + } + } + + case <-ctx.Done(): + return fmt.Errorf("%s failed to rollout DaemonSet", name) + } + } + + return nil +} + +func waitForStatefulSetRollout(ctx context.Context, provider *KubeProvider, ns string, name string, timeout time.Duration) error { + timeoutSeconds := int64(timeout.Seconds()) + + watcher, err := provider.MainClientset.AppsV1().StatefulSets(ns).Watch(ctx, meta_v1.ListOptions{Watch: true, TimeoutSeconds: &timeoutSeconds, FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String()}) + if err != nil { + return err + } + + defer watcher.Stop() + + done := false + for !done { + select { + case event := <-watcher.ResultChan(): + if event.Type == watch.Modified { + sts, ok := event.Object.(*apps_v1.StatefulSet) + if !ok { + return fmt.Errorf("%s could not cast to StatefulSet", name) + } + + if sts.Spec.UpdateStrategy.Type != apps_v1.RollingUpdateStatefulSetStrategyType { + done = true + continue + } + + if sts.Status.ObservedGeneration == 0 || sts.Generation > sts.Status.ObservedGeneration { + continue + } + + if sts.Spec.Replicas != nil && sts.Status.ReadyReplicas < *sts.Spec.Replicas { + continue + } + + if sts.Spec.UpdateStrategy.Type == apps_v1.RollingUpdateStatefulSetStrategyType && sts.Spec.UpdateStrategy.RollingUpdate != nil { + if sts.Spec.Replicas != nil && sts.Spec.UpdateStrategy.RollingUpdate.Partition != nil { + if sts.Status.UpdatedReplicas < (*sts.Spec.Replicas - *sts.Spec.UpdateStrategy.RollingUpdate.Partition) { + continue + } + } + + done = true + continue + } + + if sts.Status.UpdateRevision != sts.Status.CurrentRevision { + continue + } + + done = true + } + + case <-ctx.Done(): + return fmt.Errorf("%s failed to rollout StatefulSet", name) + } + } + + return nil +} + +func waitForApiService(ctx context.Context, provider *KubeProvider, name string, timeout time.Duration) error { + timeoutSeconds := int64(timeout.Seconds()) + + watcher, err := provider.AggregatorClientset.ApiregistrationV1().APIServices().Watch(ctx, meta_v1.ListOptions{Watch: true, TimeoutSeconds: &timeoutSeconds, FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String()}) + if err != nil { + return err + } + + defer watcher.Stop() + + done := false + for !done { + select { + case event := <-watcher.ResultChan(): + if event.Type == watch.Modified { + apiService, ok := event.Object.(*apiregistration.APIService) + if !ok { + return fmt.Errorf("%s could not cast to APIService", name) + } + + for i := range apiService.Status.Conditions { + if apiService.Status.Conditions[i].Type == apiregistration.Available { + done = true + continue + } + } + } + + case <-ctx.Done(): + return fmt.Errorf("%s failed to wait for APIService", name) + } + } + + return nil +} + func waitForFields(ctx context.Context, provider *RestClientResult, conditions []types.WaitForField, ns, name string) resource.RetryFunc { return func() *resource.RetryError { rawResponse, err := provider.ResourceInterface.Get(ctx, name, meta_v1.GetOptions{}) @@ -982,58 +1195,6 @@ func waitForFields(ctx context.Context, provider *RestClientResult, conditions [ } } -func waitForDeploymentReplicasFunc(ctx context.Context, provider *KubeProvider, ns, name string) resource.RetryFunc { - return func() *resource.RetryError { - - // Query the deployment to get a status update. - dply, err := provider.MainClientset.AppsV1().Deployments(ns).Get(ctx, name, meta_v1.GetOptions{}) - if err != nil { - return resource.NonRetryableError(err) - } - - if dply.Generation <= dply.Status.ObservedGeneration { - cond := GetDeploymentCondition(dply.Status, apps_v1.DeploymentProgressing) - if cond != nil && cond.Reason == TimedOutReason { - err := fmt.Errorf("Deployment exceeded its progress deadline: %v", cond.String()) - return resource.NonRetryableError(err) - } - - if dply.Status.UpdatedReplicas < *dply.Spec.Replicas { - return resource.RetryableError(fmt.Errorf("Waiting for rollout to finish: %d out of %d new replicas have been updated...", dply.Status.UpdatedReplicas, dply.Spec.Replicas)) - } - - if dply.Status.Replicas > dply.Status.UpdatedReplicas { - return resource.RetryableError(fmt.Errorf("Waiting for rollout to finish: %d old replicas are pending termination...", dply.Status.Replicas-dply.Status.UpdatedReplicas)) - } - - if dply.Status.AvailableReplicas < dply.Status.UpdatedReplicas { - return resource.RetryableError(fmt.Errorf("Waiting for rollout to finish: %d of %d updated replicas are available...", dply.Status.AvailableReplicas, dply.Status.UpdatedReplicas)) - } - } else if dply.Status.ObservedGeneration == 0 { - return resource.RetryableError(fmt.Errorf("Waiting for rollout to start")) - } - return nil - } -} - -func waitForAPIServiceAvailableFunc(ctx context.Context, provider *KubeProvider, name string) resource.RetryFunc { - return func() *resource.RetryError { - - apiService, err := provider.AggregatorClientset.ApiregistrationV1().APIServices().Get(ctx, name, meta_v1.GetOptions{}) - if err != nil { - return resource.NonRetryableError(err) - } - - for i := range apiService.Status.Conditions { - if apiService.Status.Conditions[i].Type == apiregistration.Available { - return nil - } - } - - return resource.RetryableError(fmt.Errorf("Waiting for APIService %v to be Available", name)) - } -} - // Takes the result of flatmap.Expand for an array of strings // and returns a []*string func expandStringList(configured []interface{}) []string { diff --git a/kubernetes/resource_kubectl_manifest_test.go b/kubernetes/resource_kubectl_manifest_test.go index d3d4e4b8..5f66ed08 100644 --- a/kubernetes/resource_kubectl_manifest_test.go +++ b/kubernetes/resource_kubectl_manifest_test.go @@ -224,6 +224,156 @@ YAML }) } +func TestAccKubectl_WaitForRolloutDeployment(t *testing.T) { + //language=hcl + config := ` +resource "kubectl_manifest" "test" { + wait_for_rollout = true + yaml_body = <