Skip to content

Commit

Permalink
TEsting stuff
Browse files Browse the repository at this point in the history
Signed-off-by: manuelbuil <[email protected]>
  • Loading branch information
manuelbuil committed Sep 24, 2024
1 parent 40eda6a commit 180b6c4
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 99 deletions.
2 changes: 1 addition & 1 deletion pkg/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (k *k3s) Initialize(clientBuilder cloudprovider.ControllerClientBuilder, st
k.podCache = lbCoreFactory.Core().V1().Pod().Cache()
k.workqueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

if err := k.Register(ctx, coreFactory.Core().V1().Node(), lbCoreFactory.Core().V1().Pod(), lbDiscFactory.Discovery().V1().EndpointSlice()); err != nil {
if err := k.Register(ctx, coreFactory.Core().V1().Node(), lbCoreFactory.Core().V1().Pod(), lbDiscFactory.Discovery().V1().EndpointSlice(), lbCoreFactory.Core().V1().Service()); err != nil {
logrus.Panicf("failed to register %s handlers: %v", controllerName, err)
}

Expand Down
26 changes: 15 additions & 11 deletions pkg/cloudprovider/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,23 @@ import (
"context"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
// apierrors "k8s.io/apimachinery/pkg/api/errors"
cloudprovider "k8s.io/cloud-provider"

"github.com/sirupsen/logrus"
)

var _ cloudprovider.LoadBalancer = &k3s{}

// GetLoadBalancer returns whether the specified load balancer exists, and if so, what its status is.
func (k *k3s) GetLoadBalancer(ctx context.Context, clusterName string, service *corev1.Service) (*corev1.LoadBalancerStatus, bool, error) {
if _, err := k.getDaemonSet(service); err != nil {
if apierrors.IsNotFound(err) {
return nil, false, nil
}
return nil, false, err
}

//logrus.Infof("MANU - CloudProvider GetLoadBalancer called. This is service: %v", service)
// if _, err := k.getDaemonSet(service); err != nil {
//if apierrors.IsNotFound(err) {
// return nil, false, nil
//}
// return nil, false, err
// }
status, err := k.getStatus(service)
return status, true, err
}
Expand All @@ -32,9 +34,11 @@ func (k *k3s) GetLoadBalancerName(ctx context.Context, clusterName string, servi
// The node list is unused; see the comment on UpdateLoadBalancer for information on why.
// This is called when the Service is created or changes.
func (k *k3s) EnsureLoadBalancer(ctx context.Context, clusterName string, service *corev1.Service, nodes []*corev1.Node) (*corev1.LoadBalancerStatus, error) {
if err := k.deployDaemonSet(ctx, service); err != nil {
return nil, err
}
//logrus.Info("MANU - CloudProvider EnsureLoadBalancer called")
//if err := k.deployDaemonSet(ctx, service); err != nil {
// return nil, err
// }
logrus.Info("MANU - EnsureLoadBalancer was called")
return nil, cloudprovider.ImplementedElsewhere
}

Expand Down
165 changes: 78 additions & 87 deletions pkg/cloudprovider/servicelb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package cloudprovider

import (
"context"
"encoding/json"
"fmt"
"sort"
"strconv"
"strings"
"time"
"encoding/json"

"sigs.k8s.io/yaml"

"github.com/k3s-io/k3s/pkg/util"
Expand All @@ -16,7 +16,8 @@ import (
coreclient "github.com/rancher/wrangler/v3/pkg/generated/controllers/core/v1"
discoveryclient "github.com/rancher/wrangler/v3/pkg/generated/controllers/discovery/v1"
"github.com/rancher/wrangler/v3/pkg/merr"
"github.com/rancher/wrangler/v3/pkg/objectset"

// "github.com/rancher/wrangler/v3/pkg/objectset"
"github.com/sirupsen/logrus"
apps "k8s.io/api/apps/v1"
core "k8s.io/api/core/v1"
Expand Down Expand Up @@ -55,17 +56,19 @@ const (
)

var (
DefaultLBImage = "rancher/klipper-lb:v0.4.9"
DefaultLBImage = "mbuilsuse/klipperlb:20240920-testing2"
)

func (k *k3s) Register(ctx context.Context,
nodes coreclient.NodeController,
pods coreclient.PodController,
endpointslices discoveryclient.EndpointSliceController,
services coreclient.ServiceController,
) error {
nodes.OnChange(ctx, controllerName, k.onChangeNode)
pods.OnChange(ctx, controllerName, k.onChangePod)
endpointslices.OnChange(ctx, controllerName, k.onChangeEndpointSlice)
services.OnChange(ctx, controllerName, k.onChangeService)

if err := k.ensureServiceLBNamespace(ctx); err != nil {
return err
Expand Down Expand Up @@ -119,6 +122,7 @@ func (k *k3s) ensureServiceLBServiceAccount(ctx context.Context) error {
// If the pod has labels that tie it to a service, and the pod has an IP assigned,
// enqueue an update to the service's status.
func (k *k3s) onChangePod(key string, pod *core.Pod) (*core.Pod, error) {
logrus.Info("MANU - onChangePod called")
if pod == nil {
return nil, nil
}
Expand All @@ -144,6 +148,7 @@ func (k *k3s) onChangePod(key string, pod *core.Pod) (*core.Pod, error) {
// onChangeNode handles changes to Nodes. We need to handle this as we may need to kick the DaemonSet
// to add or remove pods from nodes if labels have changed.
func (k *k3s) onChangeNode(key string, node *core.Node) (*core.Node, error) {
logrus.Info("MANU - onChangeNode called")
if node == nil {
return nil, nil
}
Expand All @@ -161,6 +166,7 @@ func (k *k3s) onChangeNode(key string, node *core.Node) (*core.Node, error) {
// onChangeEndpointSlice handles changes to EndpointSlices. This is used to ensure that LoadBalancer
// addresses only list Nodes with ready Pods, when their ExternalTrafficPolicy is set to Local.
func (k *k3s) onChangeEndpointSlice(key string, eps *discovery.EndpointSlice) (*discovery.EndpointSlice, error) {
logrus.Info("MANU - onChangeEndpointSlice called")
if eps == nil {
return nil, nil
}
Expand All @@ -174,12 +180,30 @@ func (k *k3s) onChangeEndpointSlice(key string, eps *discovery.EndpointSlice) (*
return eps, nil
}

// onChangeService handles changes to Services.
// If the service is of type LoadBalancer, enqueue an update to the service's status.
// Addresses services that we changed its type to LoadBalancer.
func (k *k3s) onChangeService(key string, svc *core.Service) (*core.Service, error) {
logrus.Info("MANU - onChangeService called")
if svc == nil {
return nil, nil
}

if svc.Spec.Type != core.ServiceTypeLoadBalancer {
return svc, nil
}

k.workqueue.Add(svc.Namespace + "/" + svc.Name)
return svc, nil
}

// runWorker dequeues Service changes from the work queue
// We run a lightweight work queue to handle service updates. We don't need the full overhead
// of a wrangler service controller and shared informer cache, but we do want to run changes
// through a keyed queue to reduce thrashing when pods are updated. Much of this is cribbed from
// https://github.com/rancher/lasso/blob/release/v2.5/pkg/controller/controller.go#L173-L215
func (k *k3s) runWorker() {
logrus.Info("MANU - runWorker called")
for k.processNextWorkItem() {
}
}
Expand Down Expand Up @@ -230,6 +254,7 @@ func (k *k3s) processSingleItem(obj interface{}) error {
// LoadBalancer service. The patchStatus function handles checking to see if status needs updating.
func (k *k3s) updateStatus(namespace, name string) error {
svc, err := k.client.CoreV1().Services(namespace).Get(context.TODO(), name, meta.GetOptions{})
logrus.Info("MANU - In updateStatus. This is type: %v", svc.Spec.Type)

Check failure on line 257 in pkg/cloudprovider/servicelb.go

View workflow job for this annotation

GitHub Actions / Unit Tests

github.com/sirupsen/logrus.Info call has possible Printf formatting directive %v
if err != nil {
if apierrors.IsNotFound(err) {
return nil
Expand Down Expand Up @@ -280,15 +305,35 @@ func (k *k3s) getStatus(svc *core.Service) (*core.LoadBalancerStatus, error) {
}
}

pods, err := k.podCache.List(k.LBNamespace, labels.SelectorFromSet(labels.Set{
svcNameLabel: svc.Name,
svcNamespaceLabel: svc.Namespace,
}))
nodes, err := k.nodeCache.List(labels.Everything())
if err != nil {
return nil, err
}

nodeAddresses := []string{}

for _, node := range nodes {
for _, addresses := range node.Status.Addresses {
nodeAddresses = append(nodeAddresses, addresses.Address)
}
}

expectedIPs, err := filterByIPFamily(nodeAddresses, svc)
if err != nil {
return nil, err
}

expectedIPs, err := k.podIPs(pods, svc, readyNodes)
//pods, err := k.podCache.List(k.LBNamespace, labels.SelectorFromSet(labels.Set{
// svcNameLabel: svc.Name,
// svcNamespaceLabel: svc.Namespace,
// }))
// if err != nil {
// return nil, err
// }

//expectedIPs, err := k.podIPs(pods, svc, readyNodes)
// logrus.Infof("MANU - In getStatus. These are the expectedIPs: %v", expectedIPs)
// logrus.Infof("MANU - In getStatus. These are the addresses: %v", addresses)
if err != nil {
return nil, err
}
Expand All @@ -300,11 +345,14 @@ func (k *k3s) getStatus(svc *core.Service) (*core.LoadBalancerStatus, error) {
})
}

logrus.Infof("MANU - End of getStatus. This is the LoadBalancerStatus: %v", loadbalancer)
return loadbalancer, nil
}

// patchStatus patches the service status. If the status has not changed, this function is a no-op.
func (k *k3s) patchStatus(svc *core.Service, previousStatus, newStatus *core.LoadBalancerStatus) error {
logrus.Info("MANU - CloudProvider patchStatus called")
logrus.Infof("MANU - This is the previousStatus: %v and this is the new status: %v", previousStatus, newStatus)
if servicehelper.LoadBalancerStatusEqual(previousStatus, newStatus) {
return nil
}
Expand Down Expand Up @@ -372,6 +420,7 @@ func (k *k3s) podIPs(pods []*core.Pod, svc *core.Service, readyNodes map[string]
return []string{"127.0.0.1"}, nil
}

logrus.Infof("MANU - podIPs will return the ips: %v", ips)
return ips, nil
}

Expand Down Expand Up @@ -405,15 +454,17 @@ func filterByIPFamily(ips []string, svc *core.Service) ([]string, error) {
}

// deployDaemonSet ensures that there is a DaemonSet for the service.
func (k *k3s) deployDaemonSet(ctx context.Context, svc *core.Service) error {
ds, err := k.newDaemonSet(svc)
if err != nil {
return err
}

defer k.recorder.Eventf(svc, core.EventTypeNormal, "AppliedDaemonSet", "Applied LoadBalancer DaemonSet %s/%s", ds.Namespace, ds.Name)
return k.processor.WithContext(ctx).WithOwner(svc).Apply(objectset.NewObjectSet(ds))
}
//func (k *k3s) deployDaemonSet(ctx context.Context, svc *core.Service) error {
// logrus.Info("MANU CloudProvider - deployDaemonSet")
// ds, err := k.newDaemonSet(svc)
// if err != nil {
// return err
// }
// logrus.Infof("MANU CloudProvider - This is the DaemonSet: %v" , ds)

// defer k.recorder.Eventf(svc, core.EventTypeNormal, "AppliedDaemonSet", "Applied LoadBalancer DaemonSet %s/%s", ds.Namespace, ds.Name)
// return k.processor.WithContext(ctx).WithOwner(svc).Apply(objectset.NewObjectSet(ds))
//}

// deleteDaemonSet ensures that there are no DaemonSets for the given service.
func (k *k3s) deleteDaemonSet(ctx context.Context, svc *core.Service) error {
Expand All @@ -434,7 +485,7 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
name := generateName(svc)
oneInt := intstr.FromInt(1)
priorityClassName := k.getPriorityClassName(svc)
localTraffic := servicehelper.RequestsOnlyLocalTraffic(svc)
//localTraffic := servicehelper.RequestsOnlyLocalTraffic(svc)
sourceRangesSet, err := servicehelper.GetLoadBalancerSourceRanges(svc)
if err != nil {
return nil, err
Expand Down Expand Up @@ -516,74 +567,14 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
},
}

for _, port := range svc.Spec.Ports {
portName := fmt.Sprintf("lb-%s-%d", strings.ToLower(string(port.Protocol)), port.Port)
container := core.Container{
Name: portName,
Image: k.LBImage,
ImagePullPolicy: core.PullIfNotPresent,
Ports: []core.ContainerPort{
{
Name: portName,
ContainerPort: port.Port,
HostPort: port.Port,
Protocol: port.Protocol,
},
},
Env: []core.EnvVar{
{
Name: "SRC_PORT",
Value: strconv.Itoa(int(port.Port)),
},
{
Name: "SRC_RANGES",
Value: sourceRanges,
},
{
Name: "DEST_PROTO",
Value: string(port.Protocol),
},
},
SecurityContext: &core.SecurityContext{
Capabilities: &core.Capabilities{
Add: []core.Capability{
"NET_ADMIN",
},
},
},
}

if localTraffic {
container.Env = append(container.Env,
core.EnvVar{
Name: "DEST_PORT",
Value: strconv.Itoa(int(port.NodePort)),
},
core.EnvVar{
Name: "DEST_IPS",
ValueFrom: &core.EnvVarSource{
FieldRef: &core.ObjectFieldSelector{
FieldPath: getHostIPsFieldPath(),
},
},
},
)
} else {
container.Env = append(container.Env,
core.EnvVar{
Name: "DEST_PORT",
Value: strconv.Itoa(int(port.Port)),
},
core.EnvVar{
Name: "DEST_IPS",
Value: strings.Join(svc.Spec.ClusterIPs, ","),
},
)
}

ds.Spec.Template.Spec.Containers = append(ds.Spec.Template.Spec.Containers, container)
container := core.Container{
Name: "testing",
Image: k.LBImage,
ImagePullPolicy: core.PullIfNotPresent,
}

ds.Spec.Template.Spec.Containers = append(ds.Spec.Template.Spec.Containers, container)

// Add node selector only if label "svccontroller.k3s.cattle.io/enablelb" exists on the nodes
enableNodeSelector, err := k.nodeHasDaemonSetLabel()
if err != nil {
Expand Down Expand Up @@ -710,8 +701,8 @@ func (k *k3s) getPriorityClassName(svc *core.Service) string {
return k.LBDefaultPriorityClassName
}

// getTolerations retrieves the tolerations from a service's annotations.
// It parses the tolerations from a JSON or YAML string stored in the annotations.
// getTolerations retrieves the tolerations from a service's annotations.
// It parses the tolerations from a JSON or YAML string stored in the annotations.
func (k *k3s) getTolerations(svc *core.Service) ([]core.Toleration, error) {
tolerationsStr, ok := svc.Annotations[tolerationsAnnotation]
if !ok {
Expand Down

0 comments on commit 180b6c4

Please sign in to comment.