Skip to content

Commit

Permalink
feat: Changed to use watch for wait rollout
Browse files Browse the repository at this point in the history
Signed-off-by: Steve Hipwell <[email protected]>
  • Loading branch information
stevehipwell committed Aug 23, 2024
1 parent abed1de commit ea4b9ac
Show file tree
Hide file tree
Showing 3 changed files with 399 additions and 82 deletions.
2 changes: 1 addition & 1 deletion docs/resources/kubectl_manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ YAML
* `override_namespace` - Optional. Override the namespace to apply the kubernetes resource to, ignoring any declared namespace in the `yaml_body`.
* `validate_schema` - Optional. Setting to `false` will mimic `kubectl apply --validate=false` mode. Default `true`.
* `wait` - Optional. Set this flag to wait or not for finalized to complete for deleted objects. Default `false`.
* `wait_for_rollout` - Optional. Set this flag to wait or not for Deployments and APIService to complete rollout. Default `true`.
* `wait_for_rollout` - Optional. Set this flag to wait or not for `Deployment`, `DaemonSet`, `StatefulSet` & `APIService` resources to complete rollout. Default `true`.
* `wait_for` - Optional. If set, will wait until either all conditions are satisfied, or until timeout is reached (see [below for nested schema](#wait_for)). Under the hood [gojsonq](https://github.com/thedevsaddam/gojsonq) is used for querying, see the related syntax and examples.
* `delete_cascade` - Optional; `Background` or `Foreground` are valid options. If set this overrides the default provider behaviour which is to use `Background` unless `wait` is `true` when `Foreground` will be used. To duplicate the default behaviour of `kubectl` this should be explicitly set to `Background`.

Expand Down
329 changes: 248 additions & 81 deletions kubernetes/resource_kubectl_manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,16 +611,26 @@ func resourceKubectlManifestApply(ctx context.Context, d *schema.ResourceData, m

switch {
case manifest.GetKind() == "Deployment":
log.Printf("[INFO] %v waiting for deployment rollout for %vmin", manifest, timeout.Minutes())
err = resource.RetryContext(ctx, timeout,
waitForDeploymentReplicasFunc(ctx, meta.(*KubeProvider), manifest.GetNamespace(), manifest.GetName()))
log.Printf("[INFO] %v waiting for Deployment rollout for %vmin", manifest, timeout.Minutes())
err = waitForDeploymentRollout(ctx, meta.(*KubeProvider), manifest.GetNamespace(), manifest.GetName(), timeout)
if err != nil {
return err
}
case manifest.GetKind() == "DaemonSet":
log.Printf("[INFO] %v waiting for DaemonSet rollout for %vmin", manifest, timeout.Minutes())
err = waitForDaemonSetRollout(ctx, meta.(*KubeProvider), manifest.GetNamespace(), manifest.GetName(), timeout)
if err != nil {
return err
}
case manifest.GetKind() == "StatefulSet":
log.Printf("[INFO] %v waiting for v rollout for %vmin", manifest, timeout.Minutes())
err = waitForStatefulSetRollout(ctx, meta.(*KubeProvider), manifest.GetNamespace(), manifest.GetName(), timeout)
if err != nil {
return err
}
case manifest.GetKind() == "APIService" && manifest.GetAPIVersion() == "apiregistration.k8s.io/v1":
log.Printf("[INFO] %v waiting for APIService rollout for %vmin", manifest, timeout.Minutes())
err = resource.RetryContext(ctx, timeout,
waitForAPIServiceAvailableFunc(ctx, meta.(*KubeProvider), manifest.GetName()))
log.Printf("[INFO] %v waiting for APIService for %vmin", manifest, timeout.Minutes())
err = waitForApiService(ctx, meta.(*KubeProvider), manifest.GetName(), timeout)
if err != nil {
return err
}
Expand Down Expand Up @@ -728,20 +738,20 @@ func resourceKubectlManifestDelete(ctx context.Context, d *schema.ResourceData,

log.Printf("[INFO] %s perform delete of manifest", manifest)

waitForDelete := d.Get("wait").(bool)
wait := d.Get("wait").(bool)

var propagationPolicy meta_v1.DeletionPropagation
cascadeInput := d.Get("delete_cascade").(string)
if len(cascadeInput) > 0 {
propagationPolicy = meta_v1.DeletionPropagation(cascadeInput)
} else if waitForDelete {
} else if wait {
propagationPolicy = meta_v1.DeletePropagationForeground
} else {
propagationPolicy = meta_v1.DeletePropagationBackground
}

var resourceVersion string
if waitForDelete {
if wait {
rawResponse, err := restClient.ResourceInterface.Get(ctx, manifest.GetName(), meta_v1.GetOptions{})
if err != nil {
return err
Expand All @@ -758,28 +768,15 @@ func resourceKubectlManifestDelete(ctx context.Context, d *schema.ResourceData,
}

// The rest client doesn't wait for the delete so we need custom logic
if waitForDelete {
if wait {
log.Printf("[INFO] %s waiting for delete of manifest to complete", manifest)

watcher, err := restClient.ResourceInterface.Watch(ctx, meta_v1.ListOptions{Watch: true, FieldSelector: fields.OneTermEqualSelector("metadata.name", manifest.GetName()).String(), ResourceVersion: resourceVersion})
timeout := d.Timeout(schema.TimeoutDelete)

err = waitForDelete(ctx, restClient, manifest.GetName(), resourceVersion, timeout)
if err != nil {
return err
}

defer watcher.Stop()

deleted := false
for !deleted {
select {
case event := <-watcher.ResultChan():
if event.Type == watch.Deleted {
deleted = true
}

case <-ctx.Done():
return fmt.Errorf("%s failed to delete kubernetes resource", manifest)
}
}
}

// Success remove it from state
Expand Down Expand Up @@ -928,9 +925,86 @@ func checkAPIResourceIsPresent(available []*meta_v1.APIResourceList, resource me
return nil, false
}

// GetDeploymentConditionInternal returns the condition with the provided type.
// Borrowed from: https://github.com/kubernetes/kubernetes/blob/master/pkg/controller/deployment/util/deployment_util.go#L135
func GetDeploymentCondition(status apps_v1.DeploymentStatus, condType apps_v1.DeploymentConditionType) *apps_v1.DeploymentCondition {
func waitForDelete(ctx context.Context, restClient *RestClientResult, name string, resourceVersion string, timeout time.Duration) error {
timeoutSeconds := int64(timeout.Seconds())

watcher, err := restClient.ResourceInterface.Watch(ctx, meta_v1.ListOptions{Watch: true, TimeoutSeconds: &timeoutSeconds, FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(), ResourceVersion: resourceVersion})
if err != nil {
return err
}

defer watcher.Stop()

deleted := false
for !deleted {
select {
case event := <-watcher.ResultChan():
if event.Type == watch.Deleted {
deleted = true
}

case <-ctx.Done():
return fmt.Errorf("%s failed to delete resource", name)
}
}

return nil
}

func waitForDeploymentRollout(ctx context.Context, provider *KubeProvider, ns string, name string, timeout time.Duration) error {
// Borrowed from: https://github.com/kubernetes/kubectl/blob/c4be63c54b7188502c1a63bb884a0b05fac51ebd/pkg/polymorphichelpers/rollout_status.go#L59

timeoutSeconds := int64(timeout.Seconds())

watcher, err := provider.MainClientset.AppsV1().Deployments(ns).Watch(ctx, meta_v1.ListOptions{Watch: true, TimeoutSeconds: &timeoutSeconds, FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String()})
if err != nil {
return err
}

defer watcher.Stop()

done := false
for !done {
select {
case event := <-watcher.ResultChan():
if event.Type == watch.Modified {
deployment, ok := event.Object.(*apps_v1.Deployment)
if !ok {
return fmt.Errorf("%s could not cast to Deployment", name)
}

if deployment.Generation <= deployment.Status.ObservedGeneration {
condition := getDeploymentCondition(deployment.Status, apps_v1.DeploymentProgressing)
if condition != nil && condition.Reason == TimedOutReason {
continue
}

if deployment.Spec.Replicas != nil && deployment.Status.UpdatedReplicas < *deployment.Spec.Replicas {
continue
}

if deployment.Status.Replicas > deployment.Status.UpdatedReplicas {
continue
}

if deployment.Status.AvailableReplicas < deployment.Status.UpdatedReplicas {
continue
}

done = true
}
}

case <-ctx.Done():
return fmt.Errorf("%s failed to rollout Deployment", name)
}
}

return nil
}

func getDeploymentCondition(status apps_v1.DeploymentStatus, condType apps_v1.DeploymentConditionType) *apps_v1.DeploymentCondition {
// Borrowed from: https://github.com/kubernetes/kubectl/blob/c4be63c54b7188502c1a63bb884a0b05fac51ebd/pkg/util/deployment/deployment.go#L60
for i := range status.Conditions {
c := status.Conditions[i]
if c.Type == condType {
Expand All @@ -940,6 +1014,151 @@ func GetDeploymentCondition(status apps_v1.DeploymentStatus, condType apps_v1.De
return nil
}

func waitForDaemonSetRollout(ctx context.Context, provider *KubeProvider, ns string, name string, timeout time.Duration) error {
// Borrowed from: https://github.com/kubernetes/kubectl/blob/c4be63c54b7188502c1a63bb884a0b05fac51ebd/pkg/polymorphichelpers/rollout_status.go#L95

timeoutSeconds := int64(timeout.Seconds())

watcher, err := provider.MainClientset.AppsV1().DaemonSets(ns).Watch(ctx, meta_v1.ListOptions{Watch: true, TimeoutSeconds: &timeoutSeconds, FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String()})
if err != nil {
return err
}

defer watcher.Stop()

done := false
for !done {
select {
case event := <-watcher.ResultChan():
if event.Type == watch.Modified {
daemon, ok := event.Object.(*apps_v1.DaemonSet)
if !ok {
return fmt.Errorf("%s could not cast to DaemonSet", name)
}

if daemon.Spec.UpdateStrategy.Type != apps_v1.RollingUpdateDaemonSetStrategyType {
done = true
continue
}

if daemon.Generation <= daemon.Status.ObservedGeneration {
if daemon.Status.UpdatedNumberScheduled < daemon.Status.DesiredNumberScheduled {
continue
}

if daemon.Status.NumberAvailable < daemon.Status.DesiredNumberScheduled {
continue
}

done = true
}
}

case <-ctx.Done():
return fmt.Errorf("%s failed to rollout DaemonSet", name)
}
}

return nil
}

func waitForStatefulSetRollout(ctx context.Context, provider *KubeProvider, ns string, name string, timeout time.Duration) error {
// Borrowed from: https://github.com/kubernetes/kubectl/blob/c4be63c54b7188502c1a63bb884a0b05fac51ebd/pkg/polymorphichelpers/rollout_status.go#L120

timeoutSeconds := int64(timeout.Seconds())

watcher, err := provider.MainClientset.AppsV1().StatefulSets(ns).Watch(ctx, meta_v1.ListOptions{Watch: true, TimeoutSeconds: &timeoutSeconds, FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String()})
if err != nil {
return err
}

defer watcher.Stop()

done := false
for !done {
select {
case event := <-watcher.ResultChan():
if event.Type == watch.Modified {
sts, ok := event.Object.(*apps_v1.StatefulSet)
if !ok {
return fmt.Errorf("%s could not cast to StatefulSet", name)
}

if sts.Spec.UpdateStrategy.Type != apps_v1.RollingUpdateStatefulSetStrategyType {
done = true
continue
}

if sts.Status.ObservedGeneration == 0 || sts.Generation > sts.Status.ObservedGeneration {
continue
}

if sts.Spec.Replicas != nil && sts.Status.ReadyReplicas < *sts.Spec.Replicas {
continue
}

if sts.Spec.UpdateStrategy.Type == apps_v1.RollingUpdateStatefulSetStrategyType && sts.Spec.UpdateStrategy.RollingUpdate != nil {
if sts.Spec.Replicas != nil && sts.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
if sts.Status.UpdatedReplicas < (*sts.Spec.Replicas - *sts.Spec.UpdateStrategy.RollingUpdate.Partition) {
continue
}
}

done = true
continue
}

if sts.Status.UpdateRevision != sts.Status.CurrentRevision {
continue
}

done = true
}

case <-ctx.Done():
return fmt.Errorf("%s failed to rollout StatefulSet", name)
}
}

return nil
}

func waitForApiService(ctx context.Context, provider *KubeProvider, name string, timeout time.Duration) error {
timeoutSeconds := int64(timeout.Seconds())

watcher, err := provider.AggregatorClientset.ApiregistrationV1().APIServices().Watch(ctx, meta_v1.ListOptions{Watch: true, TimeoutSeconds: &timeoutSeconds, FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String()})
if err != nil {
return err
}

defer watcher.Stop()

done := false
for !done {
select {
case event := <-watcher.ResultChan():
if event.Type == watch.Modified {
apiService, ok := event.Object.(*apiregistration.APIService)
if !ok {
return fmt.Errorf("%s could not cast to APIService", name)
}

for i := range apiService.Status.Conditions {
if apiService.Status.Conditions[i].Type == apiregistration.Available {
done = true
continue
}
}
}

case <-ctx.Done():
return fmt.Errorf("%s failed to wait for APIService", name)
}
}

return nil
}

func waitForFields(ctx context.Context, provider *RestClientResult, conditions []types.WaitForField, ns, name string) resource.RetryFunc {
return func() *resource.RetryError {
rawResponse, err := provider.ResourceInterface.Get(ctx, name, meta_v1.GetOptions{})
Expand Down Expand Up @@ -982,58 +1201,6 @@ func waitForFields(ctx context.Context, provider *RestClientResult, conditions [
}
}

func waitForDeploymentReplicasFunc(ctx context.Context, provider *KubeProvider, ns, name string) resource.RetryFunc {
return func() *resource.RetryError {

// Query the deployment to get a status update.
dply, err := provider.MainClientset.AppsV1().Deployments(ns).Get(ctx, name, meta_v1.GetOptions{})
if err != nil {
return resource.NonRetryableError(err)
}

if dply.Generation <= dply.Status.ObservedGeneration {
cond := GetDeploymentCondition(dply.Status, apps_v1.DeploymentProgressing)
if cond != nil && cond.Reason == TimedOutReason {
err := fmt.Errorf("Deployment exceeded its progress deadline: %v", cond.String())
return resource.NonRetryableError(err)
}

if dply.Status.UpdatedReplicas < *dply.Spec.Replicas {
return resource.RetryableError(fmt.Errorf("Waiting for rollout to finish: %d out of %d new replicas have been updated...", dply.Status.UpdatedReplicas, dply.Spec.Replicas))
}

if dply.Status.Replicas > dply.Status.UpdatedReplicas {
return resource.RetryableError(fmt.Errorf("Waiting for rollout to finish: %d old replicas are pending termination...", dply.Status.Replicas-dply.Status.UpdatedReplicas))
}

if dply.Status.AvailableReplicas < dply.Status.UpdatedReplicas {
return resource.RetryableError(fmt.Errorf("Waiting for rollout to finish: %d of %d updated replicas are available...", dply.Status.AvailableReplicas, dply.Status.UpdatedReplicas))
}
} else if dply.Status.ObservedGeneration == 0 {
return resource.RetryableError(fmt.Errorf("Waiting for rollout to start"))
}
return nil
}
}

func waitForAPIServiceAvailableFunc(ctx context.Context, provider *KubeProvider, name string) resource.RetryFunc {
return func() *resource.RetryError {

apiService, err := provider.AggregatorClientset.ApiregistrationV1().APIServices().Get(ctx, name, meta_v1.GetOptions{})
if err != nil {
return resource.NonRetryableError(err)
}

for i := range apiService.Status.Conditions {
if apiService.Status.Conditions[i].Type == apiregistration.Available {
return nil
}
}

return resource.RetryableError(fmt.Errorf("Waiting for APIService %v to be Available", name))
}
}

// Takes the result of flatmap.Expand for an array of strings
// and returns a []*string
func expandStringList(configured []interface{}) []string {
Expand Down
Loading

0 comments on commit ea4b9ac

Please sign in to comment.