Skip to content

Commit

Permalink
Merge pull request #79 from port-labs/PORT-9402-port-api-search-ident…
Browse files Browse the repository at this point in the history
…ifier

Port 9402 port api search identifier
  • Loading branch information
talsabagport authored Sep 1, 2024
2 parents ecc101e + a11424c commit 0f21490
Show file tree
Hide file tree
Showing 13 changed files with 1,362 additions and 479 deletions.
3 changes: 2 additions & 1 deletion pkg/event_handler/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package event_handler

import (
"fmt"
"github.com/port-labs/port-k8s-exporter/pkg/handlers"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/klog/v2"
)
Expand All @@ -22,7 +23,7 @@ func Start(eventListener IListener, initControllerHandler func() (IStoppableRsyn

return eventListener.Run(func() {
klog.Infof("Resync request received. Recreating controllers for the new port configuration")
if controllerHandler != nil {
if controllerHandler != (*handlers.ControllersHandler)(nil) {
controllerHandler.Stop()
}

Expand Down
6 changes: 2 additions & 4 deletions pkg/event_handler/polling/polling.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,8 @@ func (h *Handler) Run(resync func()) {
klog.Infof("Polling event listener iteration after %d seconds. Checking for changes...", h.pollingRate)
configuration, err := integration.GetIntegration(h.portClient, h.stateKey)
if err != nil {
klog.Errorf("error resyncing: %s", err.Error())
}

if reflect.DeepEqual(currentState, configuration) != true {
klog.Errorf("error getting integration: %s", err.Error())
} else if reflect.DeepEqual(currentState, configuration) != true {
klog.Infof("Changes detected. Resyncing...")
currentState = configuration
resync()
Expand Down
90 changes: 56 additions & 34 deletions pkg/handlers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,19 @@ package handlers

import (
"context"
"time"

"github.com/port-labs/port-k8s-exporter/pkg/config"
"github.com/port-labs/port-k8s-exporter/pkg/port/integration"

"github.com/port-labs/port-k8s-exporter/pkg/crd"
"github.com/port-labs/port-k8s-exporter/pkg/goutils"
"github.com/port-labs/port-k8s-exporter/pkg/k8s"
"github.com/port-labs/port-k8s-exporter/pkg/port"
"github.com/port-labs/port-k8s-exporter/pkg/port/cli"
"github.com/port-labs/port-k8s-exporter/pkg/port/integration"
"github.com/port-labs/port-k8s-exporter/pkg/signal"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/klog/v2"
"sync"
"time"
)

type ControllersHandler struct {
Expand All @@ -24,6 +23,7 @@ type ControllersHandler struct {
stateKey string
portClient *cli.PortClient
stopCh chan struct{}
isStopped bool
}

func NewControllersHandler(exporterConfig *port.Config, portConfig *port.IntegrationAppConfig, k8sClient *k8s.Client, portClient *cli.PortClient) *ControllersHandler {
Expand Down Expand Up @@ -71,59 +71,81 @@ func NewControllersHandler(exporterConfig *port.Config, portConfig *port.Integra
func (c *ControllersHandler) Handle() {
klog.Info("Starting informers")
c.informersFactory.Start(c.stopCh)
klog.Info("Waiting for informers cache sync")

currentEntitiesSets := make([]map[string]interface{}, 0)
shouldDeleteStaleEntities := true
var syncWg sync.WaitGroup

for _, controller := range c.controllers {
controller := controller

go func() {
<-c.stopCh
klog.Info("Shutting down controllers")
controller.Shutdown()
klog.Info("Exporter exiting")
}()

klog.Infof("Waiting for informer cache to sync for resource '%s'", controller.Resource.Kind)
if err := controller.WaitForCacheSync(c.stopCh); err != nil {
klog.Fatalf("Error while waiting for informer cache sync: %s", err.Error())
}
}

currentEntitiesSet := make([]map[string]interface{}, 0)
for _, controller := range c.controllers {
controllerEntitiesSet, rawDataExamples, err := controller.GetEntitiesSet()
if err != nil {
klog.Errorf("error getting controller entities set: %s", err.Error())
}
currentEntitiesSet = append(currentEntitiesSet, controllerEntitiesSet)
if len(rawDataExamples) > 0 {
err = integration.PostIntegrationKindExample(c.portClient, c.stateKey, controller.Resource.Kind, rawDataExamples)
if err != nil {
klog.Warningf("failed to post integration kind example: %s", err.Error())
syncWg.Add(1)
go func() {
defer syncWg.Done()
klog.Infof("Starting full initial resync for resource '%s'", controller.Resource.Kind)
initialSyncResult := controller.RunInitialSync()
klog.Infof("Done full initial resync, starting live events sync for resource '%s'", controller.Resource.Kind)
controller.RunEventsSync(1, c.stopCh)
if initialSyncResult.EntitiesSet != nil {
currentEntitiesSets = append(currentEntitiesSets, initialSyncResult.EntitiesSet)
}
}
}

klog.Info("Deleting stale entities")
c.RunDeleteStaleEntities(currentEntitiesSet)
klog.Info("Starting controllers")
for _, controller := range c.controllers {
controller.Run(1, c.stopCh)
if len(initialSyncResult.RawDataExamples) > 0 {
err := integration.PostIntegrationKindExample(c.portClient, c.stateKey, controller.Resource.Kind, initialSyncResult.RawDataExamples)
if err != nil {
klog.Warningf("failed to post integration kind example: %s", err.Error())
}
}
shouldDeleteStaleEntities = shouldDeleteStaleEntities && initialSyncResult.ShouldDeleteStaleEntities
}()
}
syncWg.Wait()

ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()
go func() {
<-c.stopCh
klog.Info("Shutting down controllers")
for _, controller := range c.controllers {
controller.Shutdown()
}
klog.Info("Exporter exiting")
cancelCtx()
}()

if shouldDeleteStaleEntities {
klog.Info("Deleting stale entities")
c.runDeleteStaleEntities(ctx, currentEntitiesSets)
klog.Info("Done deleting stale entities")
} else {
klog.Warning("Skipping delete of stale entities due to a failure in getting all current entities from k8s")
}
}

func (c *ControllersHandler) RunDeleteStaleEntities(currentEntitiesSet []map[string]interface{}) {
_, err := c.portClient.Authenticate(context.Background(), c.portClient.ClientID, c.portClient.ClientSecret)
func (c *ControllersHandler) runDeleteStaleEntities(ctx context.Context, currentEntitiesSet []map[string]interface{}) {
_, err := c.portClient.Authenticate(ctx, c.portClient.ClientID, c.portClient.ClientSecret)
if err != nil {
klog.Errorf("error authenticating with Port: %v", err)
}

err = c.portClient.DeleteStaleEntities(context.Background(), c.stateKey, goutils.MergeMaps(currentEntitiesSet...))
err = c.portClient.DeleteStaleEntities(ctx, c.stateKey, goutils.MergeMaps(currentEntitiesSet...))
if err != nil {
klog.Errorf("error deleting stale entities: %s", err.Error())
}
klog.Info("Done deleting stale entities")
}

func (c *ControllersHandler) Stop() {
if c.isStopped {
return
}

klog.Info("Stopping controllers")
close(c.stopCh)
c.isStopped = true
}
Loading

0 comments on commit 0f21490

Please sign in to comment.