Skip to content

Commit

Permalink
fix: get pods by name in logs proxy instead of labels (#5559)
Browse files Browse the repository at this point in the history
  • Loading branch information
exu authored Jun 10, 2024
1 parent c23bd57 commit e45318e
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 24 deletions.
16 changes: 16 additions & 0 deletions pkg/executor/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,22 @@ func IsPodFailed(pod *corev1.Pod) (err error) {
return
}

// GetPodByName returns job pod by name
func GetPodByName(ctx context.Context, podsClient tcorev1.PodInterface, podName string, retryNr, retryCount int) (*corev1.Pod, error) {
pod, err := podsClient.Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return nil, err
}
if retryNr == retryCount {
return nil, fmt.Errorf("retry count exceeeded, there is no active pod with given name=%s", podName)
}
if pod == nil {
time.Sleep(time.Duration(retryNr * 500 * int(time.Millisecond))) // increase backoff timeout
return GetPodByName(ctx, podsClient, podName, retryNr+1, retryCount)
}
return pod, nil
}

// GetJobPods returns job pods
func GetJobPods(ctx context.Context, podsClient tcorev1.PodInterface, jobName string, retryNr, retryCount int) (*corev1.PodList, error) {
pods, err := podsClient.List(ctx, metav1.ListOptions{LabelSelector: "job-name=" + jobName})
Expand Down
45 changes: 21 additions & 24 deletions pkg/logs/sidecar/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,41 +107,38 @@ func (p *Proxy) Run(ctx context.Context) error {
}

func (p *Proxy) streamLogs(ctx context.Context, logs chan *events.Log) (err error) {
pods, err := executor.GetJobPods(ctx, p.podsClient, p.getPodName(), 1, 10)
pod, err := executor.GetPodByName(ctx, p.podsClient, p.getPodName(), 1, 10)
if err != nil {
p.handleError(err, "error getting job pods")
return err
}

for _, pod := range pods.Items {
l := p.log.With("namespace", pod.Namespace, "podName", pod.Name, "podStatus", pod.Status)
l := p.log.With("namespace", pod.Namespace, "podName", pod.Name, "podStatus", pod.Status)

switch pod.Status.Phase {
switch pod.Status.Phase {

case corev1.PodRunning:
l.Debug("streaming pod logs: immediately")
return p.streamLogsFromPod(pod, logs)
case corev1.PodRunning:
l.Debug("streaming pod logs: immediately")
return p.streamLogsFromPod(*pod, logs)

case corev1.PodFailed:
err := fmt.Errorf("can't get pod logs, pod failed: %s/%s", pod.Namespace, pod.Name)
p.handleError(err, "streaming pod logs: pod failed")
return err

default:
l.Debugw("streaming pod logs: waiting for pod to be ready")
testFunc := p.isPodLoggable(pod.Name)
if err = wait.PollUntilContextTimeout(ctx, pollInterval, podStartTimeout, true, testFunc); err != nil {
// try to get pod container statuses from Waiting and Terminated states
status := p.getPodContainerStatuses(pod)
p.handleError(err, "can't get pod container status after pod failure")
return errors.Wrap(err, status)
}
case corev1.PodFailed:
err := fmt.Errorf("can't get pod logs, pod failed: %s/%s", pod.Namespace, pod.Name)
p.handleError(err, "streaming pod logs: pod failed")
return err

l.Debug("streaming pod logs: pod is loggable")
return p.streamLogsFromPod(pod, logs)
default:
l.Debugw("streaming pod logs: waiting for pod to be ready")
testFunc := p.isPodLoggable(pod.Name)
if err = wait.PollUntilContextTimeout(ctx, pollInterval, podStartTimeout, true, testFunc); err != nil {
// try to get pod container statuses from Waiting and Terminated states
status := p.getPodContainerStatuses(*pod)
p.handleError(err, "can't get pod container status after pod failure")
return errors.Wrap(err, status)
}

l.Debug("streaming pod logs: pod is loggable")
return p.streamLogsFromPod(*pod, logs)
}
return
}

func (p *Proxy) streamLogsFromPod(pod corev1.Pod, logs chan *events.Log) (err error) {
Expand Down

0 comments on commit e45318e

Please sign in to comment.