Skip to content

Commit

Permalink
Method to Wait for Pods to be ready which are not included in the pip…
Browse files Browse the repository at this point in the history
…eline (#9)

* Added Pod Waiting

* Make pod ready

* Pod ready watcher

* Addde timeout

* Added timeout args variadic function
  • Loading branch information
shubhamdixit863 committed Nov 27, 2023
1 parent a6bca1d commit 6f98374
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 2 deletions.
61 changes: 61 additions & 0 deletions testing/fixtures/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"fmt"
"io"
"k8s.io/apimachinery/pkg/watch"
"os"
"os/exec"
"regexp"
Expand Down Expand Up @@ -234,6 +235,66 @@ func WaitForPipelineRunning(ctx context.Context, pipelineClient flowpkg.Pipeline
}
}

func WaitForPodToBeReady(ctx context.Context, kubeClient kubernetes.Interface, timeout time.Duration, namespace, podName string) error {
labelSelector := fmt.Sprintf("name=%s", podName)

// Define a ListOptions with the labelSelector
opts := metav1.ListOptions{LabelSelector: labelSelector}

// Create a context with timeout
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

// Start watching for Pod updates before listing to prevent missing any updates
watchInterface, err := kubeClient.CoreV1().Pods(namespace).Watch(ctx, opts)
if err != nil {
return fmt.Errorf("failed to start watching for pod readiness: %v", err)
}
defer watchInterface.Stop()

for {
select {
case <-ctx.Done():
return fmt.Errorf("timeout after %v waiting for pod %s to be ready", timeout, podName)
case event, ok := <-watchInterface.ResultChan():
if !ok {
return fmt.Errorf("watch channel for pod %s closed", podName)
}

if event.Type == watch.Error {
return fmt.Errorf("error watching pod %s", podName)
}

pod, ok := event.Object.(*corev1.Pod)
if !ok {
return fmt.Errorf("unexpected type when watching pod")
}

if pod.Name != podName {
continue // Skip if it's not the pod we're waiting for
}

if isPodReady(pod) {
fmt.Printf("Pod %s is now ready\n", podName)
return nil
}
}
}
}

// isPodReady checks if a Pod is in a Running state and all containers are ready.
func isPodReady(pod *corev1.Pod) bool {
if pod.Status.Phase != corev1.PodRunning {
return false
}
for _, cond := range pod.Status.Conditions {
if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue {
return true
}
}
return false
}

func WaitForVertexPodRunning(kubeClient kubernetes.Interface, vertexClient flowpkg.VertexInterface, namespace, pipelineName, vertexName string, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
Expand Down
16 changes: 14 additions & 2 deletions testing/fixtures/when.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (w *When) CreatePipelineAndWait() *When {
return w
}

func (w *When) DeletePipelineAndWait() *When {
func (w *When) DeletePipelineAndWait(timeoutArgs ...time.Duration) *When {
w.t.Helper()
if w.pipeline == nil {
w.t.Fatal("No Pipeline to delete")
Expand All @@ -131,8 +131,10 @@ func (w *When) DeletePipelineAndWait() *When {
if err := w.pipelineClient.Delete(ctx, w.pipeline.Name, metav1.DeleteOptions{}); err != nil {
w.t.Fatal(err)
}

timeout := defaultTimeout
if len(timeoutArgs) > 0 {
timeout = timeoutArgs[0]
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
labelSelector := fmt.Sprintf("%s=%s", dfv1.KeyPipelineName, w.pipeline.Name)
Expand Down Expand Up @@ -178,6 +180,16 @@ func (w *When) WaitForStatefulSetReady(labelSelector string) *When {
return w
}

func (w *When) WaitForPodReady(podName string) *When {
w.t.Helper()
ctx := context.Background()
if err := WaitForPodToBeReady(ctx, w.kubeClient, 5*time.Minute, Namespace, podName); err != nil {
w.t.Fatal(err)
}

return w
}

func (w *When) VertexPodPortForward(vertexName string, localPort, remotePort int) *When {
w.t.Helper()
labelSelector := fmt.Sprintf("%s=%s,%s=%s", dfv1.KeyPipelineName, w.pipeline.Name, dfv1.KeyVertexName, vertexName)
Expand Down

0 comments on commit 6f98374

Please sign in to comment.