Skip to content

Commit

Permalink
Merge pull request #14 from port-labs/PORT-3942-k-8-s-exporter-not-sk…
Browse files Browse the repository at this point in the history
…ipping-delete-phase-on-error

Map only identifier and blueprint in get entities for delete stale entities, and aggregate resources by kind
  • Loading branch information
talsabagport authored Jun 8, 2023
2 parents ad3a14c + acaa75a commit 175181c
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 29 deletions.
10 changes: 10 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ type Config struct {
StateKey string
}

type KindConfig struct {
Selector Selector
Port Port
}

type AggregatedResource struct {
Kind string
KindConfigs []KindConfig
}

func New(filepath string, resyncInterval uint, stateKey string) (*Config, error) {
c := &Config{
ResyncInterval: resyncInterval,
Expand Down
19 changes: 15 additions & 4 deletions pkg/handlers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,29 @@ type ControllersHandler struct {
func NewControllersHandler(exporterConfig *config.Config, k8sClient *k8s.Client, portClient *cli.PortClient) *ControllersHandler {
resync := time.Minute * time.Duration(exporterConfig.ResyncInterval)
informersFactory := dynamicinformer.NewDynamicSharedInformerFactory(k8sClient.DynamicClient, resync)
controllers := make([]*k8s.Controller, 0, len(exporterConfig.Resources))

aggResources := make(map[string][]config.KindConfig)
for _, resource := range exporterConfig.Resources {
kindConfig := config.KindConfig{Selector: resource.Selector, Port: resource.Port}
if _, ok := aggResources[resource.Kind]; ok {
aggResources[resource.Kind] = append(aggResources[resource.Kind], kindConfig)
} else {
aggResources[resource.Kind] = []config.KindConfig{kindConfig}
}
}

controllers := make([]*k8s.Controller, 0, len(exporterConfig.Resources))

for kind, kindConfigs := range aggResources {
var gvr schema.GroupVersionResource
gvr, err := k8s.GetGVRFromResource(k8sClient.DiscoveryMapper, resource.Kind)
gvr, err := k8s.GetGVRFromResource(k8sClient.DiscoveryMapper, kind)
if err != nil {
klog.Errorf("Error getting GVR, skip handling for resource '%s': %s.", resource.Kind, err.Error())
klog.Errorf("Error getting GVR, skip handling for resource '%s': %s.", kind, err.Error())
continue
}

informer := informersFactory.ForResource(gvr)
controller := k8s.NewController(resource, portClient, informer)
controller := k8s.NewController(config.AggregatedResource{Kind: kind, KindConfigs: kindConfigs}, portClient, informer)
controllers = append(controllers, controller)
}

Expand Down
65 changes: 41 additions & 24 deletions pkg/k8s/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ type EventItem struct {
}

type Controller struct {
resource config.Resource
resource config.AggregatedResource
portClient *cli.PortClient
informer cache.SharedIndexInformer
lister cache.GenericLister
workqueue workqueue.RateLimitingInterface
}

func NewController(resource config.Resource, portClient *cli.PortClient, informer informers.GenericInformer) *Controller {
func NewController(resource config.AggregatedResource, portClient *cli.PortClient, informer informers.GenericInformer) *Controller {
controller := &Controller{
resource: resource,
portClient: portClient,
Expand Down Expand Up @@ -177,28 +177,35 @@ func (c *Controller) syncHandler(item EventItem) error {
}

func (c *Controller) objectHandler(obj interface{}, item EventItem) error {
portEntities, err := c.getObjectEntities(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error getting entities for object key '%s': %v", item.Key, err))
return nil
}

_, err = c.portClient.Authenticate(context.Background(), c.portClient.ClientID, c.portClient.ClientSecret)
_, err := c.portClient.Authenticate(context.Background(), c.portClient.ClientID, c.portClient.ClientSecret)
if err != nil {
return fmt.Errorf("error authenticating with Port: %v", err)
}

for _, portEntity := range portEntities {
err = c.entityHandler(portEntity, item.ActionType)
errors := make([]error, 0)
for _, kindConfig := range c.resource.KindConfigs {
portEntities, err := c.getObjectEntities(obj, kindConfig.Selector, kindConfig.Port.Entity.Mappings)
if err != nil {
return fmt.Errorf("error handling entity for object key '%s': %v", item.Key, err)
utilruntime.HandleError(fmt.Errorf("error getting entities for object key '%s': %v", item.Key, err))
continue
}

for _, portEntity := range portEntities {
err = c.entityHandler(portEntity, item.ActionType)
if err != nil {
errors = append(errors, err)
}
}
}

if len(errors) > 0 {
return fmt.Errorf("error handling entity for object key '%s': %v", item.Key, errors)
}

return nil
}

func (c *Controller) getObjectEntities(obj interface{}) ([]port.Entity, error) {
func (c *Controller) getObjectEntities(obj interface{}, selector config.Selector, mappings []port.EntityMapping) ([]port.Entity, error) {
unstructuredObj, ok := obj.(*unstructured.Unstructured)
if !ok {
return nil, fmt.Errorf("error casting to unstructured")
Expand All @@ -210,18 +217,18 @@ func (c *Controller) getObjectEntities(obj interface{}) ([]port.Entity, error) {
}

var selectorResult = true
if c.resource.Selector.Query != "" {
selectorResult, err = jq.ParseBool(c.resource.Selector.Query, structuredObj)
if selector.Query != "" {
selectorResult, err = jq.ParseBool(selector.Query, structuredObj)
if err != nil {
return nil, fmt.Errorf("invalid selector query '%s': %v", c.resource.Selector.Query, err)
return nil, fmt.Errorf("invalid selector query '%s': %v", selector.Query, err)
}
}
if !selectorResult {
return nil, nil
}

entities := make([]port.Entity, 0, len(c.resource.Port.Entity.Mappings))
for _, entityMapping := range c.resource.Port.Entity.Mappings {
entities := make([]port.Entity, 0, len(mappings))
for _, entityMapping := range mappings {
var portEntity *port.Entity
portEntity, err = mapping.NewEntity(structuredObj, entityMapping)
if err != nil {
Expand Down Expand Up @@ -258,13 +265,23 @@ func (c *Controller) GetEntitiesSet() (map[string]interface{}, error) {
if err != nil {
return nil, fmt.Errorf("error listing K8s objects of resource '%s': %v", c.resource.Kind, err)
}

for _, obj := range objects {
entities, err := c.getObjectEntities(obj)
if err != nil {
return nil, fmt.Errorf("error getting entities of object: %v", err)
}
for _, entity := range entities {
k8sEntitiesSet[c.portClient.GetEntityIdentifierKey(&entity)] = nil
for _, kindConfig := range c.resource.KindConfigs {
mappings := make([]port.EntityMapping, 0, len(kindConfig.Port.Entity.Mappings))
for _, m := range kindConfig.Port.Entity.Mappings {
mappings = append(mappings, port.EntityMapping{
Identifier: m.Identifier,
Blueprint: m.Blueprint,
})
}
entities, err := c.getObjectEntities(obj, kindConfig.Selector, mappings)
if err != nil {
return nil, fmt.Errorf("error getting entities of object: %v", err)
}
for _, entity := range entities {
k8sEntitiesSet[c.portClient.GetEntityIdentifierKey(&entity)] = nil
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/k8s/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ func newController(resource config.Resource, objects []runtime.Object, portClien
s := strings.SplitN(resource.Kind, "/", 3)
gvr := schema.GroupVersionResource{Group: s[0], Version: s[1], Resource: s[2]}
informer := k8sI.ForResource(gvr)
c := NewController(resource, portClient, informer)
kindConfig := config.KindConfig{Selector: resource.Selector, Port: resource.Port}
c := NewController(config.AggregatedResource{Kind: resource.Kind, KindConfigs: []config.KindConfig{kindConfig}}, portClient, informer)

for _, d := range objects {
informer.Informer().GetIndexer().Add(d)
Expand Down

0 comments on commit 175181c

Please sign in to comment.