diff --git a/cmd/api-server/commons/commons.go b/cmd/api-server/commons/commons.go index 12b7d70c8d..1b68a3ed17 100644 --- a/cmd/api-server/commons/commons.go +++ b/cmd/api-server/commons/commons.go @@ -11,6 +11,7 @@ import ( "syscall" "time" + "github.com/nats-io/nats.go" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/mongo" "google.golang.org/grpc/metadata" @@ -25,6 +26,9 @@ import ( "github.com/kubeshop/testkube/pkg/cloud" "github.com/kubeshop/testkube/pkg/configmap" "github.com/kubeshop/testkube/pkg/dbmigrator" + "github.com/kubeshop/testkube/pkg/event" + "github.com/kubeshop/testkube/pkg/event/bus" + "github.com/kubeshop/testkube/pkg/event/kind/slack" kubeexecutor "github.com/kubeshop/testkube/pkg/executor" "github.com/kubeshop/testkube/pkg/featureflags" "github.com/kubeshop/testkube/pkg/imageinspector" @@ -317,6 +321,48 @@ func ReadProContext(ctx context.Context, cfg *config.Config, grpcClient cloud.Te return proContext } +func MustCreateSlackLoader(cfg *config.Config, envs map[string]string) *slack.SlackLoader { + slackTemplate, err := parser.LoadConfigFromStringOrFile( + cfg.SlackTemplate, + cfg.TestkubeConfigDir, + "slack-template.json", + "slack template", + ) + ExitOnError("Creating slack loader", err) + + slackConfig, err := parser.LoadConfigFromStringOrFile(cfg.SlackConfig, cfg.TestkubeConfigDir, "slack-config.json", "slack config") + ExitOnError("Creating slack loader", err) + + return slack.NewSlackLoader(slackTemplate, slackConfig, cfg.TestkubeClusterName, cfg.TestkubeDashboardURI, + testkube.AllEventTypes, envs) +} + +func MustCreateNATSConnection(cfg *config.Config) *nats.EncodedConn { + // if embedded NATS server is enabled, we'll replace connection with one to the embedded server + if cfg.NatsEmbedded { + _, nc, err := event.ServerWithConnection(cfg.NatsEmbeddedStoreDir) + ExitOnError("Creating NATS connection", err) + + log.DefaultLogger.Info("Started embedded NATS server") + + conn, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER) + ExitOnError("Creating NATS connection", err) + return conn + } + + conn, err := bus.NewNATSEncodedConnection(bus.ConnectionConfig{ + NatsURI: cfg.NatsURI, + NatsSecure: cfg.NatsSecure, + NatsSkipVerify: cfg.NatsSkipVerify, + NatsCertFile: cfg.NatsCertFile, + NatsKeyFile: cfg.NatsKeyFile, + NatsCAFile: cfg.NatsCAFile, + NatsConnectTimeout: cfg.NatsConnectTimeout, + }) + ExitOnError("Creating NATS connection", err) + return conn +} + // Components func CreateImageInspector(cfg *config.Config, configMapClient configmap.Interface, secretClient secret.Interface) imageinspector.Inspector { diff --git a/cmd/api-server/main.go b/cmd/api-server/main.go index 9f9d319fa8..e3568e67ec 100644 --- a/cmd/api-server/main.go +++ b/cmd/api-server/main.go @@ -6,7 +6,6 @@ import ( "fmt" "github.com/gofiber/fiber/v2/middleware/cors" - "github.com/nats-io/nats.go" "google.golang.org/grpc" executorsclientv1 "github.com/kubeshop/testkube-operator/pkg/client/executors/v1" @@ -33,12 +32,9 @@ import ( domainstorage "github.com/kubeshop/testkube/pkg/storage" "github.com/kubeshop/testkube/pkg/storage/minio" - "github.com/kubeshop/testkube/pkg/api/v1/testkube" - "github.com/kubeshop/testkube/pkg/event/kind/slack" - "github.com/kubeshop/testkube/internal/common" - "github.com/kubeshop/testkube/internal/config" parser "github.com/kubeshop/testkube/internal/template" + "github.com/kubeshop/testkube/pkg/api/v1/testkube" "github.com/kubeshop/testkube/pkg/version" "golang.org/x/sync/errgroup" @@ -121,8 +117,7 @@ func main() { } secretManager := secretmanager.New(clientset, secretConfig) - nc, err := newNATSEncodedConnection(cfg) - commons.ExitOnError("Creating NATS connection", err) + nc := commons.MustCreateNATSConnection(cfg) eventBus := bus.NewNATSBus(nc) if cfg.Trace { eventBus.TraceEvents() @@ -158,8 +153,7 @@ func main() { inspector := commons.CreateImageInspector(cfg, configMapClient, secretClient) - slackLoader, err := newSlackLoader(cfg, envs) - commons.ExitOnError("Creating slack loader", err) + slackLoader := commons.MustCreateSlackLoader(cfg, envs) var deprecatedRepositories commons.DeprecatedRepositories var testWorkflowResultsRepository testworkflow2.Repository @@ -548,47 +542,3 @@ func main() { log.DefaultLogger.Fatalf("Testkube is shutting down: %v", err) } } - -func newNATSEncodedConnection(cfg *config.Config) (*nats.EncodedConn, error) { - // if embedded NATS server is enabled, we'll replace connection with one to the embedded server - if cfg.NatsEmbedded { - _, nc, err := event.ServerWithConnection(cfg.NatsEmbeddedStoreDir) - if err != nil { - return nil, err - } - - log.DefaultLogger.Info("Started embedded NATS server") - - return nats.NewEncodedConn(nc, nats.JSON_ENCODER) - } - - return bus.NewNATSEncodedConnection(bus.ConnectionConfig{ - NatsURI: cfg.NatsURI, - NatsSecure: cfg.NatsSecure, - NatsSkipVerify: cfg.NatsSkipVerify, - NatsCertFile: cfg.NatsCertFile, - NatsKeyFile: cfg.NatsKeyFile, - NatsCAFile: cfg.NatsCAFile, - NatsConnectTimeout: cfg.NatsConnectTimeout, - }) -} - -func newSlackLoader(cfg *config.Config, envs map[string]string) (*slack.SlackLoader, error) { - slackTemplate, err := parser.LoadConfigFromStringOrFile( - cfg.SlackTemplate, - cfg.TestkubeConfigDir, - "slack-template.json", - "slack template", - ) - if err != nil { - return nil, err - } - - slackConfig, err := parser.LoadConfigFromStringOrFile(cfg.SlackConfig, cfg.TestkubeConfigDir, "slack-config.json", "slack config") - if err != nil { - return nil, err - } - - return slack.NewSlackLoader(slackTemplate, slackConfig, cfg.TestkubeClusterName, cfg.TestkubeDashboardURI, - testkube.AllEventTypes, envs), nil -}