Skip to content

Commit

Permalink
feat: added embedded nats server (#5564)
Browse files Browse the repository at this point in the history
* feat: added embedded nats server

* chore: added comments to the code
  • Loading branch information
exu authored Jun 11, 2024
1 parent 2e9e28d commit 1b44ba3
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 2 deletions.
17 changes: 15 additions & 2 deletions cmd/api-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,11 @@ func main() {
envs[pair[0]] += pair[1]
}

nc, err := newNATSConnection(cfg)
nc, err := newNATSEncodedConnection(cfg)
if err != nil {
exitOnError("Creating NATS connection", err)
}

eventBus := bus.NewNATSBus(nc)
if cfg.Trace {
eventBus.TraceEvents()
Expand Down Expand Up @@ -764,7 +765,19 @@ func parseDefaultExecutors(cfg *config.Config) (executors []testkube.ExecutorDet
return executors, nil
}

func newNATSConnection(cfg *config.Config) (*nats.EncodedConn, error) {
func newNATSEncodedConnection(cfg *config.Config) (*nats.EncodedConn, error) {
// if embedded NATS server is enabled, we'll replace connection with one to the embedded server
if cfg.NatsEmbedded {
_, nc, err := event.ServerWithConnection(cfg.NatsEmbeddedStoreDir)
if err != nil {
return nil, err
}

log.DefaultLogger.Info("Started embedded NATS server")

return nats.NewEncodedConn(nc, nats.JSON_ENCODER)
}

return bus.NewNATSEncodedConnection(bus.ConnectionConfig{
NatsURI: cfg.NatsURI,
NatsSecure: cfg.NatsSecure,
Expand Down
2 changes: 2 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type Config struct {
ScrapperEnabled bool `envconfig:"SCRAPPERENABLED" default:"false"`
LogsBucket string `envconfig:"LOGS_BUCKET" default:""`
LogsStorage string `envconfig:"LOGS_STORAGE" default:""`
NatsEmbedded bool `envconfig:"NATS_EMBEDDED" default:"false"`
NatsEmbeddedStoreDir string `envconfig:"NATS_EMBEDDED_STORE_DIR" default:"/app/nats"`
NatsURI string `envconfig:"NATS_URI" default:"nats://localhost:4222"`
NatsSecure bool `envconfig:"NATS_SECURE" default:"false"`
NatsSkipVerify bool `envconfig:"NATS_SKIP_VERIFY" default:"false"`
Expand Down
52 changes: 52 additions & 0 deletions pkg/event/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package event

import (
"fmt"
"time"

"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
)

const natsConnectionTimeout = 10 * time.Second

var ErrNatsEmbeddedServerTimeout = fmt.Errorf("server not ready for connections in " + natsConnectionTimeout.String())

// ServerWithConnection starts NATS server with embedded JetStream, wait for readines and returns connection to it
func ServerWithConnection(dir string) (*server.Server, *nats.Conn, error) {
opts := &server.Options{
JetStream: true,
Port: 4222,
Host: "localhost",
StoreDir: dir,
NoLog: false,
NoSigs: true,
MaxControlLine: 4096,
DisableShortFirstPing: true,
}

// Initialize new server with options
ns, err := server.NewServer(opts)
if err != nil {
return nil, nil, err
}

// Start the server via goroutine
ns.Start()
ns.EnableJetStream(&server.JetStreamConfig{
StoreDir: dir,
})

// Wait for server to be ready for connections - this one will block
if !ns.ReadyForConnections(natsConnectionTimeout) {
return nil, nil, ErrNatsEmbeddedServerTimeout
}

// Connect to server
nc, err := nats.Connect(ns.ClientURL())
if err != nil {
return nil, nil, err
}

return ns, nc, nil
}
25 changes: 25 additions & 0 deletions pkg/event/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package event

import (
"testing"

"github.com/nats-io/nats.go"
"github.com/stretchr/testify/assert"
)

func TestEmbeddedServer_Start(t *testing.T) {

ns, nc, err := ServerWithConnection(t.TempDir())
assert.NoError(t, err)

nc.Subscribe("events", func(msg *nats.Msg) {
t.Logf("Received message: %s", string(msg.Data))
ns.Shutdown()
})

nc.Publish("events", []byte("test"))

t.Log("Waiting for shutdown")
ns.WaitForShutdown()

}

0 comments on commit 1b44ba3

Please sign in to comment.