Skip to content
This repository has been archived by the owner on Sep 12, 2024. It is now read-only.

Commit

Permalink
Merge pull request #58 from rancher-sandbox/fix-connect-refuse-k8s-cl…
Browse files Browse the repository at this point in the history
…ient

Fixes the wrong use of k8s client-go
  • Loading branch information
mook-as authored May 7, 2024
2 parents 078aaed + ae6abd7 commit afce3ee
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 10 deletions.
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ linters:
- dupword # It interferes with duplicate words in comments
- wsl
linters-settings:
mnd:
ignored-functions:
- "net.IPv4"
govet:
disable:
- lostcancel # Not tracking cancel usage correctly.
2 changes: 1 addition & 1 deletion cmd/rancher-desktop-guestagent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func main() {
k8sServiceListenerIP := net.ParseIP(*k8sServiceListenerAddr)

if k8sServiceListenerIP == nil || !(k8sServiceListenerIP.Equal(net.IPv4zero) ||
k8sServiceListenerIP.Equal(net.IPv4(127, 0, 0, 1))) { //nolint:gomnd // IPv4 addr localhost
k8sServiceListenerIP.Equal(net.IPv4(127, 0, 0, 1))) {
log.Fatalf("empty or none valid input for Kubernetes service listener IP address %s. "+
"Valid options are 0.0.0.0 and 127.0.0.1.", *k8sServiceListenerAddr)
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/kube/servicewatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"golang.org/x/sys/unix"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
Expand All @@ -50,12 +50,15 @@ func watchServices(ctx context.Context, client *kubernetes.Clientset) (<-chan ev
sharedInformer := serviceInformer.Informer()
_, _ = sharedInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
log.Debugf("Service Informer: Add func called with: %+v", obj)
handleUpdate(nil, obj, eventCh)
},
DeleteFunc: func(obj interface{}) {
log.Debugf("Service Informer: Del func called with: %+v", obj)
handleUpdate(obj, nil, eventCh)
},
UpdateFunc: func(oldObj, newObj interface{}) {
log.Debugf("Service Informer: Update func called with old object %+v and new Object: %+v", oldObj, newObj)
handleUpdate(oldObj, newObj, eventCh)
},
})
Expand Down Expand Up @@ -101,15 +104,16 @@ func watchServices(ctx context.Context, client *kubernetes.Clientset) (<-chan ev
informerFactory.WaitForCacheSync(ctx.Done())
informerFactory.Start(ctx.Done())

services, err := serviceInformer.Lister().List(labels.Everything())
services, err := client.CoreV1().Services(corev1.NamespaceAll).List(ctx, v1.ListOptions{})
if err != nil {
return nil, nil, fmt.Errorf("error listing services: %w", err)
}
log.Debugf("coreV1 services list :%+v", services.Items)

// List the initial set of services asynchronously, so that we don't have to
// worry about the channel blocking.
go func() {
for _, svc := range services {
for _, svc := range services.Items {
handleUpdate(nil, svc, eventCh)
}
}()
Expand Down
26 changes: 20 additions & 6 deletions pkg/kube/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ import (
"io/fs"
"net"
"strconv"
"strings"
"time"

"github.com/Masterminds/log-go"
"github.com/docker/go-connections/nat"
"github.com/rancher-sandbox/rancher-desktop-agent/pkg/tracker"
"golang.org/x/sys/unix"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
Expand Down Expand Up @@ -120,14 +122,18 @@ func WatchForServices(

eventCh, errorCh, err = watchServices(watchContext, clientset)
if err != nil {
if isTimeout(err) {
// If it's a time out, the server may not be running yet
time.Sleep(time.Second)

continue
switch {
default:
return err
case isTimeout(err):
case errors.Is(err, unix.ENETUNREACH):
case errors.Is(err, unix.ECONNREFUSED):
case isAPINotReady(err):
}
// sleep and continue for all the expected case
time.Sleep(time.Second)

return err
continue
}

log.Debugf("watching kubernetes services")
Expand Down Expand Up @@ -258,6 +264,14 @@ func isTimeout(err error) bool {
return false
}

// This is a k3s error that is received over
// the HTTP, Also, it is worth noting that this
// error is wrapped. This is why we are not testing
// against the real error object using errors.Is().
func isAPINotReady(err error) bool {
return strings.Contains(err.Error(), "apiserver not ready")
}

func createPortMapping(ports map[int32]corev1.Protocol, k8sServiceListenerIP net.IP) (nat.PortMap, error) {
portMap := make(nat.PortMap)

Expand Down

0 comments on commit afce3ee

Please sign in to comment.