Skip to content
This repository has been archived by the owner on Sep 12, 2024. It is now read-only.

Fixes the wrong use of k8s client-go #58

Merged
merged 5 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ linters:
- dupword # It interferes with duplicate words in comments
- wsl
linters-settings:
mnd:
ignored-functions:
- "net.IPv4"
govet:
disable:
- lostcancel # Not tracking cancel usage correctly.
2 changes: 1 addition & 1 deletion cmd/rancher-desktop-guestagent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func main() {
k8sServiceListenerIP := net.ParseIP(*k8sServiceListenerAddr)

if k8sServiceListenerIP == nil || !(k8sServiceListenerIP.Equal(net.IPv4zero) ||
k8sServiceListenerIP.Equal(net.IPv4(127, 0, 0, 1))) { //nolint:gomnd // IPv4 addr localhost
k8sServiceListenerIP.Equal(net.IPv4(127, 0, 0, 1))) {
log.Fatalf("empty or none valid input for Kubernetes service listener IP address %s. "+
"Valid options are 0.0.0.0 and 127.0.0.1.", *k8sServiceListenerAddr)
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/kube/servicewatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"golang.org/x/sys/unix"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
Expand All @@ -50,12 +50,15 @@ func watchServices(ctx context.Context, client *kubernetes.Clientset) (<-chan ev
sharedInformer := serviceInformer.Informer()
_, _ = sharedInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
log.Debugf("Service Informer: Add func called with: %+v", obj)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that it matters, but these logs should be trace?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK for the trace logging to work it needs to be handled from the main. I don't think just changing these to trace would work 🤔
Also, I don't recall if we handle trace logging at all in RD anywhere.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, trace would mean it's basically off and needs a code change to turn on. We don't really want to log everything in release builds (even with debug turned on).

handleUpdate(nil, obj, eventCh)
},
DeleteFunc: func(obj interface{}) {
log.Debugf("Service Informer: Del func called with: %+v", obj)
handleUpdate(obj, nil, eventCh)
},
UpdateFunc: func(oldObj, newObj interface{}) {
log.Debugf("Service Informer: Update func called with old object %+v and new Object: %+v", oldObj, newObj)
handleUpdate(oldObj, newObj, eventCh)
},
})
Expand Down Expand Up @@ -101,15 +104,16 @@ func watchServices(ctx context.Context, client *kubernetes.Clientset) (<-chan ev
informerFactory.WaitForCacheSync(ctx.Done())
informerFactory.Start(ctx.Done())

services, err := serviceInformer.Lister().List(labels.Everything())
services, err := client.CoreV1().Services(corev1.NamespaceAll).List(ctx, v1.ListOptions{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed with Nino: This manages to return an error (good!), but we don't handle it (bad!) and exit the whole process. At least init restarts us, but we should avoid needing to do that.

if err != nil {
return nil, nil, fmt.Errorf("error listing services: %w", err)
}
log.Debugf("coreV1 services list :%+v", services.Items)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here — trace?


// List the initial set of services asynchronously, so that we don't have to
// worry about the channel blocking.
go func() {
for _, svc := range services {
for _, svc := range services.Items {
handleUpdate(nil, svc, eventCh)
}
}()
Expand Down
26 changes: 20 additions & 6 deletions pkg/kube/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ import (
"io/fs"
"net"
"strconv"
"strings"
"time"

"github.com/Masterminds/log-go"
"github.com/docker/go-connections/nat"
"github.com/rancher-sandbox/rancher-desktop-agent/pkg/tracker"
"golang.org/x/sys/unix"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
Expand Down Expand Up @@ -120,14 +122,18 @@ func WatchForServices(

eventCh, errorCh, err = watchServices(watchContext, clientset)
if err != nil {
if isTimeout(err) {
// If it's a time out, the server may not be running yet
time.Sleep(time.Second)

continue
switch {
default:
return err
case isTimeout(err):
case errors.Is(err, unix.ENETUNREACH):
case errors.Is(err, unix.ECONNREFUSED):
case isAPINotReady(err):
}
// sleep and continue for all the expected case
time.Sleep(time.Second)

return err
continue
}

log.Debugf("watching kubernetes services")
Expand Down Expand Up @@ -258,6 +264,14 @@ func isTimeout(err error) bool {
return false
}

// This is a k3s error that is received over
// the HTTP, Also, it is worth noting that this
// error is wrapped. This is why we are not testing
// against the real error object using errors.Is().
func isAPINotReady(err error) bool {
return strings.Contains(err.Error(), "apiserver not ready")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to have a comment about where this comes from (k3s, over HTTP so we can't test for a specific error object). Ideally we also check that err is of the right type here, but I guess that can be optional.

}

func createPortMapping(ports map[int32]corev1.Protocol, k8sServiceListenerIP net.IP) (nat.PortMap, error) {
portMap := make(nat.PortMap)

Expand Down
Loading