Skip to content

Commit

Permalink
dependency injection and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
yairsimantov20 committed Dec 18, 2023
1 parent 3e3c91c commit 80421b1
Show file tree
Hide file tree
Showing 10 changed files with 351 additions and 114 deletions.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ go 1.19
require (
github.com/confluentinc/confluent-kafka-go v1.9.2
github.com/go-resty/resty/v2 v2.7.0
github.com/google/uuid v1.3.0
github.com/itchyny/gojq v0.12.9
github.com/stretchr/testify v1.8.2
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.25.2
k8s.io/apimachinery v0.25.2
Expand Down Expand Up @@ -38,8 +40,8 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/testify v1.8.2 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
golang.org/x/sys v0.15.0 // indirect
Expand Down
24 changes: 22 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"github.com/port-labs/port-k8s-exporter/pkg/config"
"github.com/port-labs/port-k8s-exporter/pkg/event_listener"
"github.com/port-labs/port-k8s-exporter/pkg/event_listener/consumer"
"github.com/port-labs/port-k8s-exporter/pkg/event_listener/polling"
"github.com/port-labs/port-k8s-exporter/pkg/handlers"
"github.com/port-labs/port-k8s-exporter/pkg/k8s"
"github.com/port-labs/port-k8s-exporter/pkg/port"
Expand All @@ -28,6 +30,19 @@ func initiateHandler(exporterConfig *port.Config, k8sClient *k8s.Client, portCli
return newHandler, nil
}

func CreateEventListener(portClient *cli.PortClient) (event_listener.IEventListener, error) {
klog.Infof("Received event listener type: %s", config.ApplicationConfig.EventListenerType)
switch config.ApplicationConfig.EventListenerType {
case "KAFKA":
return consumer.NewEventListener(portClient)
case "POLLING":
return polling.NewEventListener(portClient), nil
default:
return nil, fmt.Errorf("unknown event listener type: %s", config.ApplicationConfig.EventListenerType)
}

}

func main() {
klog.InitFlags(nil)

Expand Down Expand Up @@ -65,13 +80,18 @@ func main() {
}
}

eventListener, err := CreateEventListener(portClient)
if err != nil {
klog.Fatalf("Error creating event listener: %s", err.Error())
}

klog.Info("Starting controllers handler")
handler, _ := initiateHandler(exporterConfig, k8sClient, portClient)
eventListener := event_listener.NewEventListener(config.ApplicationConfig.StateKey, config.ApplicationConfig.EventListenerType, handler, portClient)
err = eventListener.Start(func(handler *handlers.ControllersHandler) (*handlers.ControllersHandler, error) {
err = event_listener.StartEventHandler(eventListener, handler, func(handler *handlers.ControllersHandler) (*handlers.ControllersHandler, error) {
handler.Stop()
return initiateHandler(exporterConfig, k8sClient, portClient)
})

if err != nil {
klog.Fatalf("Error starting event listener: %s", err.Error())
}
Expand Down
86 changes: 86 additions & 0 deletions pkg/event_listener/consumer/event_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package consumer

import (
"encoding/json"
"fmt"
"github.com/port-labs/port-k8s-exporter/pkg/config"
"github.com/port-labs/port-k8s-exporter/pkg/port/cli"
"github.com/port-labs/port-k8s-exporter/pkg/port/kafka_credentials"
"github.com/port-labs/port-k8s-exporter/pkg/port/org_details"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/klog/v2"
)

type EventListener struct {
stateKey string
portClient *cli.PortClient
topic string
consumer *Consumer
}

type IncomingMessage struct {
Diff *struct {
After *struct {
Identifier string `json:"installationId"`
} `json:"after"`
} `json:"diff"`
}

func NewEventListener(portClient *cli.PortClient) (*EventListener, error) {
klog.Infof("Getting Consumer Information")
credentials, err := kafka_credentials.GetKafkaCredentials(portClient)
if err != nil {
return nil, err
}
orgId, err := org_details.GetOrgId(portClient)
if err != nil {
return nil, err
}

c := &config.KafkaConfiguration{
Brokers: config.KafkaConfig.Brokers,
SecurityProtocol: config.KafkaConfig.SecurityProtocol,
AuthenticationMechanism: config.KafkaConfig.AuthenticationMechanism,
Username: credentials.Username,
Password: credentials.Password,
GroupID: orgId + ".k8s." + config.ApplicationConfig.StateKey,
}

topic := orgId + ".change.log"
instance, err := NewConsumer(c, nil)
if err != nil {
return nil, err
}

return &EventListener{
stateKey: config.ApplicationConfig.StateKey,
portClient: portClient,
topic: topic,
consumer: instance,
}, nil
}

func shouldResync(stateKey string, message *IncomingMessage) bool {
return message.Diff != nil &&
message.Diff.After != nil &&
message.Diff.After.Identifier != "" &&
message.Diff.After.Identifier == stateKey
}

func (l *EventListener) Run(resync func()) error {
klog.Infof("Starting Kafka event listener")

klog.Infof("Starting consumer for topic %s", l.topic)
l.consumer.Consume(l.topic, func(value []byte) {
incomingMessage := &IncomingMessage{}
parsingError := json.Unmarshal(value, &incomingMessage)
if parsingError != nil {
utilruntime.HandleError(fmt.Errorf("error handling message: %s", parsingError.Error()))
} else if shouldResync(l.stateKey, incomingMessage) {
klog.Infof("Changes detected. Resyncing...")
resync()
}
}, nil)

return nil
}
115 changes: 11 additions & 104 deletions pkg/event_listener/event_listener_handler.go
Original file line number Diff line number Diff line change
@@ -1,126 +1,33 @@
package event_listener

import (
"encoding/json"
"fmt"
"github.com/port-labs/port-k8s-exporter/pkg/config"
"github.com/port-labs/port-k8s-exporter/pkg/event_listener/consumer"
"github.com/port-labs/port-k8s-exporter/pkg/event_listener/polling"
"github.com/port-labs/port-k8s-exporter/pkg/handlers"
"github.com/port-labs/port-k8s-exporter/pkg/port"
"github.com/port-labs/port-k8s-exporter/pkg/port/cli"
"github.com/port-labs/port-k8s-exporter/pkg/port/kafka_credentials"
"github.com/port-labs/port-k8s-exporter/pkg/port/org_details"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/klog/v2"
)

type IncomingMessage struct {
Diff *struct {
After *struct {
Identifier string `json:"installationId"`
} `json:"after"`
} `json:"diff"`
type IEventListener interface {
Run(resync func()) error
}

type EventListener struct {
settings port.EventListenerSettings
stateKey string
type Handler struct {
eventListener IEventListener
controllerHandler *handlers.ControllersHandler
portClient *cli.PortClient
}

func shouldResync(stateKey string, message *IncomingMessage) bool {
return message.Diff != nil &&
message.Diff.After != nil &&
message.Diff.After.Identifier != "" &&
message.Diff.After.Identifier == stateKey
}

func NewEventListener(stateKey string, eventListenerType string, controllerHandler *handlers.ControllersHandler, client *cli.PortClient) *EventListener {
eventListener := &EventListener{
settings: port.EventListenerSettings{
Type: eventListenerType,
},
stateKey: stateKey,
controllerHandler: controllerHandler,
portClient: client,
}

return eventListener
}

func startKafkaEventListener(l *EventListener, resync func()) error {
klog.Infof("Starting Kafka event listener")
klog.Infof("Getting Consumer Information")
credentials, err := kafka_credentials.GetKafkaCredentials(l.portClient)
if err != nil {
return err
}
orgId, err := org_details.GetOrgId(l.portClient)
if err != nil {
return err
}

c := &config.KafkaConfiguration{
Brokers: config.KafkaConfig.Brokers,
SecurityProtocol: config.KafkaConfig.SecurityProtocol,
AuthenticationMechanism: config.KafkaConfig.AuthenticationMechanism,
Username: credentials.Username,
Password: credentials.Password,
GroupID: orgId + ".k8s." + l.stateKey,
}

topic := orgId + ".change.log"
instance, err := consumer.NewConsumer(c, nil)

if err != nil {
return err
}

klog.Infof("Starting consumer for topic %s and groupId %s", topic, c.GroupID)
instance.Consume(topic, func(value []byte) {
incomingMessage := &IncomingMessage{}
parsingError := json.Unmarshal(value, &incomingMessage)
if parsingError != nil {
utilruntime.HandleError(fmt.Errorf("error handling message: %s", parsingError.Error()))
} else if shouldResync(l.stateKey, incomingMessage) {
klog.Infof("Changes detected. Resyncing...")
resync()
}
}, nil)

return nil
}

func startPollingEventListener(l *EventListener, resync func()) {
klog.Infof("Starting polling event listener")
klog.Infof("Polling rate set to %d seconds", config.PollingListenerRate)
pollingHandler := polling.NewPollingHandler(config.PollingListenerRate, l.stateKey, l.portClient)
pollingHandler.Run(resync)
}

func (l *EventListener) Start(resync func(*handlers.ControllersHandler) (*handlers.ControllersHandler, error)) error {
wrappedResync := func() {
func StartEventHandler(eventListener IEventListener, controllerHandler *handlers.ControllersHandler, resync func(*handlers.ControllersHandler) (*handlers.ControllersHandler, error)) error {
err := eventListener.Run(func() {
klog.Infof("Resync request received. Recreating controllers for the new port configuration")
newController, resyncErr := resync(l.controllerHandler)
l.controllerHandler = newController
newController, resyncErr := resync(controllerHandler)
controllerHandler = newController

if resyncErr != nil {
utilruntime.HandleError(fmt.Errorf("error resyncing: %s", resyncErr.Error()))
}
}
klog.Infof("Received event listener type: %s", l.settings.Type)
switch l.settings.Type {
case "KAFKA":
err := startKafkaEventListener(l, wrappedResync)
if err != nil {
return err
}
case "POLLING":
startPollingEventListener(l, wrappedResync)
default:
return fmt.Errorf("unknown event listener type: %s", l.settings.Type)
})
if err != nil {
return err
}

return nil
Expand Down
28 changes: 28 additions & 0 deletions pkg/event_listener/polling/event_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package polling

import (
"github.com/port-labs/port-k8s-exporter/pkg/config"
"github.com/port-labs/port-k8s-exporter/pkg/port/cli"
"k8s.io/klog/v2"
)

type EventListener struct {
stateKey string
portClient *cli.PortClient
}

func NewEventListener(portClient *cli.PortClient) *EventListener {
return &EventListener{
stateKey: config.ApplicationConfig.StateKey,
portClient: portClient,
}
}

func (l *EventListener) Run(resync func()) error {
klog.Infof("Starting polling event listener")
klog.Infof("Polling rate set to %d seconds", config.PollingListenerRate)
pollingHandler := NewPollingHandler(config.PollingListenerRate, l.stateKey, l.portClient, nil)
pollingHandler.Run(resync)

return nil
}
38 changes: 32 additions & 6 deletions pkg/event_listener/polling/polling.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,42 @@ import (
"time"
)

type ITicker interface {
GetC() <-chan time.Time
}

type Ticker struct {
ticker *time.Ticker
}

func NewTicker(d time.Duration) *Ticker {
return &Ticker{
ticker: time.NewTicker(d),
}
}

func (t *Ticker) Stop() {
t.ticker.Stop()
}

func (t *Ticker) GetC() <-chan time.Time {
return t.ticker.C
}

type HandlerSettings struct {
ticker *time.Ticker
ticker ITicker
stateKey string
portClient *cli.PortClient
pollingRate uint
}

func NewPollingHandler(pollingRate uint, stateKey string, portClient *cli.PortClient) *HandlerSettings {
func NewPollingHandler(pollingRate uint, stateKey string, portClient *cli.PortClient, tickerOverride ITicker) *HandlerSettings {
ticker := tickerOverride
if ticker == nil {
ticker = NewTicker(time.Second * time.Duration(pollingRate))
}
rv := &HandlerSettings{
ticker: time.NewTicker(time.Second * time.Duration(pollingRate)),
ticker: ticker,
stateKey: stateKey,
portClient: portClient,
pollingRate: pollingRate,
Expand All @@ -30,7 +56,7 @@ func NewPollingHandler(pollingRate uint, stateKey string, portClient *cli.PortCl

func (h *HandlerSettings) Run(resync func()) {
klog.Infof("Starting polling handler")
currentState, err := integration.GetIntegrationConfig(h.portClient, h.stateKey)
currentState, err := integration.GetIntegration(h.portClient, h.stateKey)
if err != nil {
klog.Errorf("Error fetching the first AppConfig state: %s", err.Error())
}
Expand All @@ -45,9 +71,9 @@ func (h *HandlerSettings) Run(resync func()) {
case sig := <-sigChan:
klog.Infof("Received signal %v: terminating\n", sig)
run = false
case <-h.ticker.C:
case <-h.ticker.GetC():
klog.Infof("Polling event listener iteration after %d seconds. Checking for changes...", h.pollingRate)
configuration, err := integration.GetIntegrationConfig(h.portClient, h.stateKey)
configuration, err := integration.GetIntegration(h.portClient, h.stateKey)
if err != nil {
klog.Errorf("error resyncing: %s", err.Error())
}
Expand Down
Loading

0 comments on commit 80421b1

Please sign in to comment.