From e45318ea94115341756b3ed2b5d0fe1bef94c87c Mon Sep 17 00:00:00 2001 From: Jacek Wysocki Date: Mon, 10 Jun 2024 22:37:14 +0200 Subject: [PATCH] fix: get pods by name in logs proxy instead of labels (#5559) --- pkg/executor/common.go | 16 ++++++++++++++ pkg/logs/sidecar/proxy.go | 45 ++++++++++++++++++--------------------- 2 files changed, 37 insertions(+), 24 deletions(-) diff --git a/pkg/executor/common.go b/pkg/executor/common.go index 25de90b35df..33720f81d90 100644 --- a/pkg/executor/common.go +++ b/pkg/executor/common.go @@ -339,6 +339,22 @@ func IsPodFailed(pod *corev1.Pod) (err error) { return } +// GetPodByName returns job pod by name +func GetPodByName(ctx context.Context, podsClient tcorev1.PodInterface, podName string, retryNr, retryCount int) (*corev1.Pod, error) { + pod, err := podsClient.Get(ctx, podName, metav1.GetOptions{}) + if err != nil { + return nil, err + } + if retryNr == retryCount { + return nil, fmt.Errorf("retry count exceeeded, there is no active pod with given name=%s", podName) + } + if pod == nil { + time.Sleep(time.Duration(retryNr * 500 * int(time.Millisecond))) // increase backoff timeout + return GetPodByName(ctx, podsClient, podName, retryNr+1, retryCount) + } + return pod, nil +} + // GetJobPods returns job pods func GetJobPods(ctx context.Context, podsClient tcorev1.PodInterface, jobName string, retryNr, retryCount int) (*corev1.PodList, error) { pods, err := podsClient.List(ctx, metav1.ListOptions{LabelSelector: "job-name=" + jobName}) diff --git a/pkg/logs/sidecar/proxy.go b/pkg/logs/sidecar/proxy.go index cbad9ab94a5..be1138db381 100644 --- a/pkg/logs/sidecar/proxy.go +++ b/pkg/logs/sidecar/proxy.go @@ -107,41 +107,38 @@ func (p *Proxy) Run(ctx context.Context) error { } func (p *Proxy) streamLogs(ctx context.Context, logs chan *events.Log) (err error) { - pods, err := executor.GetJobPods(ctx, p.podsClient, p.getPodName(), 1, 10) + pod, err := executor.GetPodByName(ctx, p.podsClient, p.getPodName(), 1, 10) if err != nil { p.handleError(err, "error getting job pods") return err } - for _, pod := range pods.Items { - l := p.log.With("namespace", pod.Namespace, "podName", pod.Name, "podStatus", pod.Status) + l := p.log.With("namespace", pod.Namespace, "podName", pod.Name, "podStatus", pod.Status) - switch pod.Status.Phase { + switch pod.Status.Phase { - case corev1.PodRunning: - l.Debug("streaming pod logs: immediately") - return p.streamLogsFromPod(pod, logs) + case corev1.PodRunning: + l.Debug("streaming pod logs: immediately") + return p.streamLogsFromPod(*pod, logs) - case corev1.PodFailed: - err := fmt.Errorf("can't get pod logs, pod failed: %s/%s", pod.Namespace, pod.Name) - p.handleError(err, "streaming pod logs: pod failed") - return err - - default: - l.Debugw("streaming pod logs: waiting for pod to be ready") - testFunc := p.isPodLoggable(pod.Name) - if err = wait.PollUntilContextTimeout(ctx, pollInterval, podStartTimeout, true, testFunc); err != nil { - // try to get pod container statuses from Waiting and Terminated states - status := p.getPodContainerStatuses(pod) - p.handleError(err, "can't get pod container status after pod failure") - return errors.Wrap(err, status) - } + case corev1.PodFailed: + err := fmt.Errorf("can't get pod logs, pod failed: %s/%s", pod.Namespace, pod.Name) + p.handleError(err, "streaming pod logs: pod failed") + return err - l.Debug("streaming pod logs: pod is loggable") - return p.streamLogsFromPod(pod, logs) + default: + l.Debugw("streaming pod logs: waiting for pod to be ready") + testFunc := p.isPodLoggable(pod.Name) + if err = wait.PollUntilContextTimeout(ctx, pollInterval, podStartTimeout, true, testFunc); err != nil { + // try to get pod container statuses from Waiting and Terminated states + status := p.getPodContainerStatuses(*pod) + p.handleError(err, "can't get pod container status after pod failure") + return errors.Wrap(err, status) } + + l.Debug("streaming pod logs: pod is loggable") + return p.streamLogsFromPod(*pod, logs) } - return } func (p *Proxy) streamLogsFromPod(pod corev1.Pod, logs chan *events.Log) (err error) {