diff --git a/go.mod b/go.mod index 152d8fd..2ce19e9 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,9 @@ go 1.19 require ( github.com/confluentinc/confluent-kafka-go v1.9.2 github.com/go-resty/resty/v2 v2.7.0 + github.com/google/uuid v1.3.0 github.com/itchyny/gojq v0.12.9 + github.com/stretchr/testify v1.8.2 gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.25.2 k8s.io/apimachinery v0.25.2 @@ -38,8 +40,8 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/spf13/pflag v1.0.5 // indirect - github.com/stretchr/testify v1.8.2 // indirect golang.org/x/net v0.19.0 // indirect golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect golang.org/x/sys v0.15.0 // indirect diff --git a/main.go b/main.go index 353a914..99698a0 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,8 @@ import ( "fmt" "github.com/port-labs/port-k8s-exporter/pkg/config" "github.com/port-labs/port-k8s-exporter/pkg/event_listener" + "github.com/port-labs/port-k8s-exporter/pkg/event_listener/consumer" + "github.com/port-labs/port-k8s-exporter/pkg/event_listener/polling" "github.com/port-labs/port-k8s-exporter/pkg/handlers" "github.com/port-labs/port-k8s-exporter/pkg/k8s" "github.com/port-labs/port-k8s-exporter/pkg/port" @@ -28,6 +30,19 @@ func initiateHandler(exporterConfig *port.Config, k8sClient *k8s.Client, portCli return newHandler, nil } +func CreateEventListener(portClient *cli.PortClient) (event_listener.IEventListener, error) { + klog.Infof("Received event listener type: %s", config.ApplicationConfig.EventListenerType) + switch config.ApplicationConfig.EventListenerType { + case "KAFKA": + return consumer.NewEventListener(portClient) + case "POLLING": + return polling.NewEventListener(portClient), nil + default: + return nil, fmt.Errorf("unknown event listener type: %s", config.ApplicationConfig.EventListenerType) + } + +} + func main() { klog.InitFlags(nil) @@ -65,13 +80,18 @@ func main() { } } + eventListener, err := CreateEventListener(portClient) + if err != nil { + klog.Fatalf("Error creating event listener: %s", err.Error()) + } + klog.Info("Starting controllers handler") handler, _ := initiateHandler(exporterConfig, k8sClient, portClient) - eventListener := event_listener.NewEventListener(config.ApplicationConfig.StateKey, config.ApplicationConfig.EventListenerType, handler, portClient) - err = eventListener.Start(func(handler *handlers.ControllersHandler) (*handlers.ControllersHandler, error) { + err = event_listener.StartEventHandler(eventListener, handler, func(handler *handlers.ControllersHandler) (*handlers.ControllersHandler, error) { handler.Stop() return initiateHandler(exporterConfig, k8sClient, portClient) }) + if err != nil { klog.Fatalf("Error starting event listener: %s", err.Error()) } diff --git a/pkg/event_listener/consumer/event_listener.go b/pkg/event_listener/consumer/event_listener.go new file mode 100644 index 0000000..26e8cbe --- /dev/null +++ b/pkg/event_listener/consumer/event_listener.go @@ -0,0 +1,86 @@ +package consumer + +import ( + "encoding/json" + "fmt" + "github.com/port-labs/port-k8s-exporter/pkg/config" + "github.com/port-labs/port-k8s-exporter/pkg/port/cli" + "github.com/port-labs/port-k8s-exporter/pkg/port/kafka_credentials" + "github.com/port-labs/port-k8s-exporter/pkg/port/org_details" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/klog/v2" +) + +type EventListener struct { + stateKey string + portClient *cli.PortClient + topic string + consumer *Consumer +} + +type IncomingMessage struct { + Diff *struct { + After *struct { + Identifier string `json:"installationId"` + } `json:"after"` + } `json:"diff"` +} + +func NewEventListener(portClient *cli.PortClient) (*EventListener, error) { + klog.Infof("Getting Consumer Information") + credentials, err := kafka_credentials.GetKafkaCredentials(portClient) + if err != nil { + return nil, err + } + orgId, err := org_details.GetOrgId(portClient) + if err != nil { + return nil, err + } + + c := &config.KafkaConfiguration{ + Brokers: config.KafkaConfig.Brokers, + SecurityProtocol: config.KafkaConfig.SecurityProtocol, + AuthenticationMechanism: config.KafkaConfig.AuthenticationMechanism, + Username: credentials.Username, + Password: credentials.Password, + GroupID: orgId + ".k8s." + config.ApplicationConfig.StateKey, + } + + topic := orgId + ".change.log" + instance, err := NewConsumer(c, nil) + if err != nil { + return nil, err + } + + return &EventListener{ + stateKey: config.ApplicationConfig.StateKey, + portClient: portClient, + topic: topic, + consumer: instance, + }, nil +} + +func shouldResync(stateKey string, message *IncomingMessage) bool { + return message.Diff != nil && + message.Diff.After != nil && + message.Diff.After.Identifier != "" && + message.Diff.After.Identifier == stateKey +} + +func (l *EventListener) Run(resync func()) error { + klog.Infof("Starting Kafka event listener") + + klog.Infof("Starting consumer for topic %s", l.topic) + l.consumer.Consume(l.topic, func(value []byte) { + incomingMessage := &IncomingMessage{} + parsingError := json.Unmarshal(value, &incomingMessage) + if parsingError != nil { + utilruntime.HandleError(fmt.Errorf("error handling message: %s", parsingError.Error())) + } else if shouldResync(l.stateKey, incomingMessage) { + klog.Infof("Changes detected. Resyncing...") + resync() + } + }, nil) + + return nil +} diff --git a/pkg/event_listener/event_listener_handler.go b/pkg/event_listener/event_listener_handler.go index 55647b6..68192f7 100644 --- a/pkg/event_listener/event_listener_handler.go +++ b/pkg/event_listener/event_listener_handler.go @@ -1,126 +1,33 @@ package event_listener import ( - "encoding/json" "fmt" - "github.com/port-labs/port-k8s-exporter/pkg/config" - "github.com/port-labs/port-k8s-exporter/pkg/event_listener/consumer" - "github.com/port-labs/port-k8s-exporter/pkg/event_listener/polling" "github.com/port-labs/port-k8s-exporter/pkg/handlers" - "github.com/port-labs/port-k8s-exporter/pkg/port" - "github.com/port-labs/port-k8s-exporter/pkg/port/cli" - "github.com/port-labs/port-k8s-exporter/pkg/port/kafka_credentials" - "github.com/port-labs/port-k8s-exporter/pkg/port/org_details" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/klog/v2" ) -type IncomingMessage struct { - Diff *struct { - After *struct { - Identifier string `json:"installationId"` - } `json:"after"` - } `json:"diff"` +type IEventListener interface { + Run(resync func()) error } -type EventListener struct { - settings port.EventListenerSettings - stateKey string +type Handler struct { + eventListener IEventListener controllerHandler *handlers.ControllersHandler - portClient *cli.PortClient } -func shouldResync(stateKey string, message *IncomingMessage) bool { - return message.Diff != nil && - message.Diff.After != nil && - message.Diff.After.Identifier != "" && - message.Diff.After.Identifier == stateKey -} - -func NewEventListener(stateKey string, eventListenerType string, controllerHandler *handlers.ControllersHandler, client *cli.PortClient) *EventListener { - eventListener := &EventListener{ - settings: port.EventListenerSettings{ - Type: eventListenerType, - }, - stateKey: stateKey, - controllerHandler: controllerHandler, - portClient: client, - } - - return eventListener -} - -func startKafkaEventListener(l *EventListener, resync func()) error { - klog.Infof("Starting Kafka event listener") - klog.Infof("Getting Consumer Information") - credentials, err := kafka_credentials.GetKafkaCredentials(l.portClient) - if err != nil { - return err - } - orgId, err := org_details.GetOrgId(l.portClient) - if err != nil { - return err - } - - c := &config.KafkaConfiguration{ - Brokers: config.KafkaConfig.Brokers, - SecurityProtocol: config.KafkaConfig.SecurityProtocol, - AuthenticationMechanism: config.KafkaConfig.AuthenticationMechanism, - Username: credentials.Username, - Password: credentials.Password, - GroupID: orgId + ".k8s." + l.stateKey, - } - - topic := orgId + ".change.log" - instance, err := consumer.NewConsumer(c, nil) - - if err != nil { - return err - } - - klog.Infof("Starting consumer for topic %s and groupId %s", topic, c.GroupID) - instance.Consume(topic, func(value []byte) { - incomingMessage := &IncomingMessage{} - parsingError := json.Unmarshal(value, &incomingMessage) - if parsingError != nil { - utilruntime.HandleError(fmt.Errorf("error handling message: %s", parsingError.Error())) - } else if shouldResync(l.stateKey, incomingMessage) { - klog.Infof("Changes detected. Resyncing...") - resync() - } - }, nil) - - return nil -} - -func startPollingEventListener(l *EventListener, resync func()) { - klog.Infof("Starting polling event listener") - klog.Infof("Polling rate set to %d seconds", config.PollingListenerRate) - pollingHandler := polling.NewPollingHandler(config.PollingListenerRate, l.stateKey, l.portClient) - pollingHandler.Run(resync) -} - -func (l *EventListener) Start(resync func(*handlers.ControllersHandler) (*handlers.ControllersHandler, error)) error { - wrappedResync := func() { +func StartEventHandler(eventListener IEventListener, controllerHandler *handlers.ControllersHandler, resync func(*handlers.ControllersHandler) (*handlers.ControllersHandler, error)) error { + err := eventListener.Run(func() { klog.Infof("Resync request received. Recreating controllers for the new port configuration") - newController, resyncErr := resync(l.controllerHandler) - l.controllerHandler = newController + newController, resyncErr := resync(controllerHandler) + controllerHandler = newController if resyncErr != nil { utilruntime.HandleError(fmt.Errorf("error resyncing: %s", resyncErr.Error())) } - } - klog.Infof("Received event listener type: %s", l.settings.Type) - switch l.settings.Type { - case "KAFKA": - err := startKafkaEventListener(l, wrappedResync) - if err != nil { - return err - } - case "POLLING": - startPollingEventListener(l, wrappedResync) - default: - return fmt.Errorf("unknown event listener type: %s", l.settings.Type) + }) + if err != nil { + return err } return nil diff --git a/pkg/event_listener/polling/event_listener.go b/pkg/event_listener/polling/event_listener.go new file mode 100644 index 0000000..546f052 --- /dev/null +++ b/pkg/event_listener/polling/event_listener.go @@ -0,0 +1,28 @@ +package polling + +import ( + "github.com/port-labs/port-k8s-exporter/pkg/config" + "github.com/port-labs/port-k8s-exporter/pkg/port/cli" + "k8s.io/klog/v2" +) + +type EventListener struct { + stateKey string + portClient *cli.PortClient +} + +func NewEventListener(portClient *cli.PortClient) *EventListener { + return &EventListener{ + stateKey: config.ApplicationConfig.StateKey, + portClient: portClient, + } +} + +func (l *EventListener) Run(resync func()) error { + klog.Infof("Starting polling event listener") + klog.Infof("Polling rate set to %d seconds", config.PollingListenerRate) + pollingHandler := NewPollingHandler(config.PollingListenerRate, l.stateKey, l.portClient, nil) + pollingHandler.Run(resync) + + return nil +} diff --git a/pkg/event_listener/polling/polling.go b/pkg/event_listener/polling/polling.go index 4c93014..24c5b38 100644 --- a/pkg/event_listener/polling/polling.go +++ b/pkg/event_listener/polling/polling.go @@ -11,16 +11,42 @@ import ( "time" ) +type ITicker interface { + GetC() <-chan time.Time +} + +type Ticker struct { + ticker *time.Ticker +} + +func NewTicker(d time.Duration) *Ticker { + return &Ticker{ + ticker: time.NewTicker(d), + } +} + +func (t *Ticker) Stop() { + t.ticker.Stop() +} + +func (t *Ticker) GetC() <-chan time.Time { + return t.ticker.C +} + type HandlerSettings struct { - ticker *time.Ticker + ticker ITicker stateKey string portClient *cli.PortClient pollingRate uint } -func NewPollingHandler(pollingRate uint, stateKey string, portClient *cli.PortClient) *HandlerSettings { +func NewPollingHandler(pollingRate uint, stateKey string, portClient *cli.PortClient, tickerOverride ITicker) *HandlerSettings { + ticker := tickerOverride + if ticker == nil { + ticker = NewTicker(time.Second * time.Duration(pollingRate)) + } rv := &HandlerSettings{ - ticker: time.NewTicker(time.Second * time.Duration(pollingRate)), + ticker: ticker, stateKey: stateKey, portClient: portClient, pollingRate: pollingRate, @@ -30,7 +56,7 @@ func NewPollingHandler(pollingRate uint, stateKey string, portClient *cli.PortCl func (h *HandlerSettings) Run(resync func()) { klog.Infof("Starting polling handler") - currentState, err := integration.GetIntegrationConfig(h.portClient, h.stateKey) + currentState, err := integration.GetIntegration(h.portClient, h.stateKey) if err != nil { klog.Errorf("Error fetching the first AppConfig state: %s", err.Error()) } @@ -45,9 +71,9 @@ func (h *HandlerSettings) Run(resync func()) { case sig := <-sigChan: klog.Infof("Received signal %v: terminating\n", sig) run = false - case <-h.ticker.C: + case <-h.ticker.GetC(): klog.Infof("Polling event listener iteration after %d seconds. Checking for changes...", h.pollingRate) - configuration, err := integration.GetIntegrationConfig(h.portClient, h.stateKey) + configuration, err := integration.GetIntegration(h.portClient, h.stateKey) if err != nil { klog.Errorf("error resyncing: %s", err.Error()) } diff --git a/pkg/event_listener/polling/polling_test.go b/pkg/event_listener/polling/polling_test.go new file mode 100644 index 0000000..6476c3b --- /dev/null +++ b/pkg/event_listener/polling/polling_test.go @@ -0,0 +1,82 @@ +package polling + +import ( + "flag" + "fmt" + guuid "github.com/google/uuid" + "github.com/port-labs/port-k8s-exporter/pkg/config" + "github.com/port-labs/port-k8s-exporter/pkg/port" + "github.com/port-labs/port-k8s-exporter/pkg/port/cli" + "github.com/port-labs/port-k8s-exporter/pkg/port/integration" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +type Fixture struct { + t *testing.T + ticker MockTicker + portClient *cli.PortClient + stateKey string +} + +type MockTicker struct { + c chan time.Time +} + +func (m *MockTicker) GetC() <-chan time.Time { + return m.c +} + +func NewFixture(t *testing.T, c chan time.Time) *Fixture { + config.Init() + flag.Parse() + stateKey := guuid.NewString() + portClient, err := cli.New("https://api.getport.io", cli.WithHeader("User-Agent", fmt.Sprintf("port-k8s-exporter/0.1 (statekey/%s)", stateKey)), + cli.WithClientID(config.ApplicationConfig.PortClientId), cli.WithClientSecret(config.ApplicationConfig.PortClientSecret)) + if err != nil { + t.Errorf("Error building Port client: %s", err.Error()) + } + + _ = integration.DeleteIntegration(portClient, stateKey) + err = integration.NewIntegration(portClient, &port.Config{ + StateKey: stateKey, + }, []port.Resource{}) + if err != nil { + t.Errorf("Error creating Port integration: %s", err.Error()) + } + return &Fixture{ + t: t, + ticker: MockTicker{c: c}, + portClient: portClient, + stateKey: stateKey, + } +} + +func (f *Fixture) CleanIntegration() { + _ = integration.DeleteIntegration(f.portClient, f.stateKey) +} + +func TestPolling_DifferentConfiguration(t *testing.T) { + called := false + c := make(chan time.Time) + fixture := NewFixture(t, c) + defer fixture.CleanIntegration() + handler := NewPollingHandler(uint(1), fixture.stateKey, fixture.portClient, &fixture.ticker) + go handler.Run(func() { + called = true + }) + + c <- time.Now() + time.Sleep(time.Millisecond * 500) + assert.False(t, called) + + _ = integration.UpdateIntegrationConfig(fixture.portClient, fixture.stateKey, &port.AppConfig{ + Resources: []port.Resource{}, + }) + + c <- time.Now() + time.Sleep(time.Millisecond * 500) + + assert.True(t, called) +} diff --git a/pkg/port/cli/integration.go b/pkg/port/cli/integration.go index f6e108e..df38d9a 100644 --- a/pkg/port/cli/integration.go +++ b/pkg/port/cli/integration.go @@ -38,6 +38,20 @@ func (c *PortClient) CreateIntegration(i *port.Integration) (*port.Integration, return &pb.Integration, nil } +func (c *PortClient) GetIntegration(stateKey string) (*port.Integration, error) { + pb := &port.ResponseBody{} + resp, err := c.Client.R(). + SetResult(&pb). + Get(fmt.Sprintf("v1/integration/%s", stateKey)) + if err != nil { + return nil, err + } + if !pb.OK { + return nil, fmt.Errorf("failed to get integration, got: %s", resp.Body()) + } + return &pb.Integration, nil +} + func (c *PortClient) GetIntegrationConfig(stateKey string) (*port.AppConfig, error) { pb := &port.ResponseBody{} resp, err := c.Client.R(). @@ -51,3 +65,35 @@ func (c *PortClient) GetIntegrationConfig(stateKey string) (*port.AppConfig, err } return pb.Integration.Config, nil } + +func (c *PortClient) DeleteIntegration(stateKey string) error { + resp, err := c.Client.R(). + Delete(fmt.Sprintf("v1/integration/%s", stateKey)) + if err != nil { + return err + } + if resp.StatusCode() != 200 { + return fmt.Errorf("failed to delete integration, got: %s", resp.Body()) + } + return nil +} + +func (c *PortClient) UpdateConfig(stateKey string, config *port.AppConfig) error { + type Config struct { + Config *port.AppConfig `json:"config"` + } + pb := &port.ResponseBody{} + resp, err := c.Client.R(). + SetBody(&Config{ + Config: config, + }). + SetResult(&pb). + Patch(fmt.Sprintf("v1/integration/%s/config", stateKey)) + if err != nil { + return err + } + if !pb.OK { + return fmt.Errorf("failed to update config, got: %s", resp.Body()) + } + return nil +} diff --git a/pkg/port/integration/integration.go b/pkg/port/integration/integration.go index 0c39090..06241e6 100644 --- a/pkg/port/integration/integration.go +++ b/pkg/port/integration/integration.go @@ -43,5 +43,44 @@ func GetIntegrationConfig(portClient *cli.PortClient, stateKey string) (*port.Ap } return apiConfig, nil +} + +func GetIntegration(portClient *cli.PortClient, stateKey string) (*port.Integration, error) { + _, err := portClient.Authenticate(context.Background(), portClient.ClientID, portClient.ClientSecret) + if err != nil { + return nil, fmt.Errorf("error authenticating with Port: %v", err) + } + apiIntegration, err := portClient.GetIntegration(stateKey) + if err != nil { + return nil, fmt.Errorf("error getting Port integration: %v", err) + } + + return apiIntegration, nil +} + +func DeleteIntegration(portClient *cli.PortClient, stateKey string) error { + _, err := portClient.Authenticate(context.Background(), portClient.ClientID, portClient.ClientSecret) + if err != nil { + return fmt.Errorf("error authenticating with Port: %v", err) + } + + err = portClient.DeleteIntegration(stateKey) + if err != nil { + return fmt.Errorf("error deleting Port integration: %v", err) + } + return nil +} + +func UpdateIntegrationConfig(portClient *cli.PortClient, stateKey string, config *port.AppConfig) error { + _, err := portClient.Authenticate(context.Background(), portClient.ClientID, portClient.ClientSecret) + if err != nil { + return fmt.Errorf("error authenticating with Port: %v", err) + } + + err = portClient.UpdateConfig(stateKey, config) + if err != nil { + return fmt.Errorf("error updating Port integration config: %v", err) + } + return nil } diff --git a/pkg/port/models.go b/pkg/port/models.go index 8b7000c..2f06df7 100644 --- a/pkg/port/models.go +++ b/pkg/port/models.go @@ -28,12 +28,13 @@ type ( } Integration struct { - InstallationId string `json:"installationId"` + InstallationId string `json:"installationId,omitempty"` Title string `json:"title,omitempty"` Version string `json:"version,omitempty"` InstallationAppType string `json:"installationAppType,omitempty"` EventListener EventListenerSettings `json:"changelogDestination,omitempty"` Config *AppConfig `json:"config,omitempty"` + UpdatedAt *time.Time `json:"updatedAt,omitempty"` } BlueprintProperty struct {