diff --git a/cmd/api-server/main.go b/cmd/api-server/main.go index 8ad5b8c425c..99354160f68 100644 --- a/cmd/api-server/main.go +++ b/cmd/api-server/main.go @@ -383,10 +383,11 @@ func main() { envs[pair[0]] += pair[1] } - nc, err := newNATSConnection(cfg) + nc, err := newNATSEncodedConnection(cfg) if err != nil { exitOnError("Creating NATS connection", err) } + eventBus := bus.NewNATSBus(nc) if cfg.Trace { eventBus.TraceEvents() @@ -764,7 +765,19 @@ func parseDefaultExecutors(cfg *config.Config) (executors []testkube.ExecutorDet return executors, nil } -func newNATSConnection(cfg *config.Config) (*nats.EncodedConn, error) { +func newNATSEncodedConnection(cfg *config.Config) (*nats.EncodedConn, error) { + // if embedded NATS server is enabled, we'll replace connection with one to the embedded server + if cfg.NatsEmbedded { + _, nc, err := event.ServerWithConnection(cfg.NatsEmbeddedStoreDir) + if err != nil { + return nil, err + } + + log.DefaultLogger.Info("Started embedded NATS server") + + return nats.NewEncodedConn(nc, nats.JSON_ENCODER) + } + return bus.NewNATSEncodedConnection(bus.ConnectionConfig{ NatsURI: cfg.NatsURI, NatsSecure: cfg.NatsSecure, diff --git a/internal/config/config.go b/internal/config/config.go index 77f0921a0bd..42dcb32c539 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -37,6 +37,8 @@ type Config struct { ScrapperEnabled bool `envconfig:"SCRAPPERENABLED" default:"false"` LogsBucket string `envconfig:"LOGS_BUCKET" default:""` LogsStorage string `envconfig:"LOGS_STORAGE" default:""` + NatsEmbedded bool `envconfig:"NATS_EMBEDDED" default:"false"` + NatsEmbeddedStoreDir string `envconfig:"NATS_EMBEDDED_STORE_DIR" default:"/app/nats"` NatsURI string `envconfig:"NATS_URI" default:"nats://localhost:4222"` NatsSecure bool `envconfig:"NATS_SECURE" default:"false"` NatsSkipVerify bool `envconfig:"NATS_SKIP_VERIFY" default:"false"` diff --git a/pkg/event/server.go b/pkg/event/server.go new file mode 100644 index 00000000000..ed078a8cbbc --- /dev/null +++ b/pkg/event/server.go @@ -0,0 +1,52 @@ +package event + +import ( + "fmt" + "time" + + "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats.go" +) + +const natsConnectionTimeout = 10 * time.Second + +var ErrNatsEmbeddedServerTimeout = fmt.Errorf("server not ready for connections in " + natsConnectionTimeout.String()) + +// ServerWithConnection starts NATS server with embedded JetStream, wait for readines and returns connection to it +func ServerWithConnection(dir string) (*server.Server, *nats.Conn, error) { + opts := &server.Options{ + JetStream: true, + Port: 4222, + Host: "localhost", + StoreDir: dir, + NoLog: false, + NoSigs: true, + MaxControlLine: 4096, + DisableShortFirstPing: true, + } + + // Initialize new server with options + ns, err := server.NewServer(opts) + if err != nil { + return nil, nil, err + } + + // Start the server via goroutine + ns.Start() + ns.EnableJetStream(&server.JetStreamConfig{ + StoreDir: dir, + }) + + // Wait for server to be ready for connections - this one will block + if !ns.ReadyForConnections(natsConnectionTimeout) { + return nil, nil, ErrNatsEmbeddedServerTimeout + } + + // Connect to server + nc, err := nats.Connect(ns.ClientURL()) + if err != nil { + return nil, nil, err + } + + return ns, nc, nil +} diff --git a/pkg/event/server_test.go b/pkg/event/server_test.go new file mode 100644 index 00000000000..719fe99440b --- /dev/null +++ b/pkg/event/server_test.go @@ -0,0 +1,25 @@ +package event + +import ( + "testing" + + "github.com/nats-io/nats.go" + "github.com/stretchr/testify/assert" +) + +func TestEmbeddedServer_Start(t *testing.T) { + + ns, nc, err := ServerWithConnection(t.TempDir()) + assert.NoError(t, err) + + nc.Subscribe("events", func(msg *nats.Msg) { + t.Logf("Received message: %s", string(msg.Data)) + ns.Shutdown() + }) + + nc.Publish("events", []byte("test")) + + t.Log("Waiting for shutdown") + ns.WaitForShutdown() + +}