diff --git a/app/common/watermill.go b/app/common/watermill.go index a7403d919..2bfe6cdae 100644 --- a/app/common/watermill.go +++ b/app/common/watermill.go @@ -57,7 +57,7 @@ func NewEventBusPublisher( ) (eventbus.Publisher, error) { eventBusPublisher, err := eventbus.New(eventbus.Options{ Publisher: publisher, - Config: conf, + TopicMapping: conf.EventBusTopicMapping(), Logger: logger, MarshalerTransformFunc: watermillkafka.AddPartitionKeyFromSubject, }) diff --git a/app/config/events.go b/app/config/events.go index 655983628..4e89720cb 100644 --- a/app/config/events.go +++ b/app/config/events.go @@ -6,6 +6,7 @@ import ( "github.com/spf13/viper" + "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" "github.com/openmeterio/openmeter/pkg/errorsx" ) @@ -19,6 +20,13 @@ func (c EventsConfiguration) Validate() error { return c.SystemEvents.Validate() } +func (c EventsConfiguration) EventBusTopicMapping() eventbus.TopicMapping { + return eventbus.TopicMapping{ + IngestEventsTopic: c.IngestEvents.Topic, + SystemEventsTopic: c.SystemEvents.Topic, + } +} + type EventSubsystemConfiguration struct { Enabled bool Topic string diff --git a/cmd/jobs/entitlement/init.go b/cmd/jobs/entitlement/init.go index 89c2ee12e..9f7bf6e11 100644 --- a/cmd/jobs/entitlement/init.go +++ b/cmd/jobs/entitlement/init.go @@ -78,7 +78,7 @@ func initEntitlements(ctx context.Context, conf config.Configuration, logger *sl eventPublisher, err := eventbus.New(eventbus.Options{ Publisher: eventPublisherDriver, - Config: conf.Events, + TopicMapping: conf.Events.EventBusTopicMapping(), Logger: logger, MarshalerTransformFunc: watermillkafka.AddPartitionKeyFromSubject, }) diff --git a/openmeter/testutils/logger.go b/openmeter/testutils/logger.go index d3d7d4297..e3998552f 100644 --- a/openmeter/testutils/logger.go +++ b/openmeter/testutils/logger.go @@ -1,6 +1,7 @@ package testutils import ( + "context" "log/slog" "testing" ) @@ -9,3 +10,17 @@ func NewLogger(t testing.TB) *slog.Logger { t.Helper() return slog.Default() } + +// discardHandler is a slog.Handler implementation which does not emit log messages +// See: https://go-review.googlesource.com/c/go/+/548335/5/src/log/slog/example_discard_test.go#14 +type discardHandler struct { + slog.JSONHandler +} + +func (d *discardHandler) Enabled(context.Context, slog.Level) bool { return false } + +func NewDiscardLogger(t testing.TB) *slog.Logger { + t.Helper() + + return slog.New(&discardHandler{}) +} diff --git a/openmeter/watermill/eventbus/eventbus.go b/openmeter/watermill/eventbus/eventbus.go index cbce9e2ac..6c2a3749a 100644 --- a/openmeter/watermill/eventbus/eventbus.go +++ b/openmeter/watermill/eventbus/eventbus.go @@ -2,6 +2,8 @@ package eventbus import ( "context" + "errors" + "fmt" "log/slog" "strings" "testing" @@ -11,19 +13,51 @@ import ( "github.com/ThreeDotsLabs/watermill/message" "github.com/stretchr/testify/assert" - "github.com/openmeterio/openmeter/app/config" ingestevents "github.com/openmeterio/openmeter/openmeter/sink/flushhandler/ingestnotification/events" "github.com/openmeterio/openmeter/openmeter/watermill/driver/noop" "github.com/openmeterio/openmeter/openmeter/watermill/marshaler" ) +type TopicMapping struct { + IngestEventsTopic string + SystemEventsTopic string +} + +func (t TopicMapping) Validate() error { + if t.IngestEventsTopic == "" { + return errors.New("ingest events topic is required") + } + + if t.SystemEventsTopic == "" { + return errors.New("system events topic is required") + } + + return nil +} + type Options struct { Publisher message.Publisher - Config config.EventsConfiguration + TopicMapping TopicMapping Logger *slog.Logger MarshalerTransformFunc marshaler.TransformFunc } +func (o Options) Validate() error { + if o.Publisher == nil { + return errors.New("publisher is required") + } + + if err := o.TopicMapping.Validate(); err != nil { + return fmt.Errorf("topic mapping: %w", err) + } + + if o.Logger == nil { + return errors.New("logger is required") + } + + return nil +} + type Publisher interface { // Publish publishes an event to the event bus Publish(ctx context.Context, event marshaler.Event) error @@ -82,6 +116,10 @@ func (p contextPublisher) PublishIfNoError(event marshaler.Event, err error) err } func New(opts Options) (Publisher, error) { + if err := opts.Validate(); err != nil { + return nil, err + } + marshaler := marshaler.New(opts.MarshalerTransformFunc) ingestVersionSubsystemPrefix := ingestevents.EventVersionSubsystem + "." @@ -90,9 +128,9 @@ func New(opts Options) (Publisher, error) { GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) { switch { case strings.HasPrefix(params.EventName, ingestVersionSubsystemPrefix): - return opts.Config.IngestEvents.Topic, nil + return opts.TopicMapping.IngestEventsTopic, nil default: - return opts.Config.SystemEvents.Topic, nil + return opts.TopicMapping.SystemEventsTopic, nil } }, @@ -112,13 +150,9 @@ func New(opts Options) (Publisher, error) { func NewMock(t *testing.T) Publisher { eventBus, err := New(Options{ Publisher: &noop.Publisher{}, - Config: config.EventsConfiguration{ - SystemEvents: config.EventSubsystemConfiguration{ - Topic: "test", - }, - IngestEvents: config.EventSubsystemConfiguration{ - Topic: "test", - }, + TopicMapping: TopicMapping{ + IngestEventsTopic: "test", + SystemEventsTopic: "test", }, Logger: slog.Default(), }) diff --git a/pkg/kafka/topicprovisioner_test.go b/pkg/kafka/topicprovisioner_test.go index a31cb8e61..751afc224 100644 --- a/pkg/kafka/topicprovisioner_test.go +++ b/pkg/kafka/topicprovisioner_test.go @@ -2,7 +2,6 @@ package kafka import ( "context" - "log/slog" "testing" "time" @@ -10,22 +9,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/metric/noop" -) - -// FIXME(chrisgacsal): move discardHandler to 'testutils' pkg after import cycle is resolved. -// discardHandler is a slog.Handler implementation which does not emit log messages -// See: https://go-review.googlesource.com/c/go/+/548335/5/src/log/slog/example_discard_test.go#14 -type discardHandler struct { - slog.JSONHandler -} - -func (d *discardHandler) Enabled(context.Context, slog.Level) bool { return false } -func NewDiscardLogger(t testing.TB) *slog.Logger { - t.Helper() - - return slog.New(&discardHandler{}) -} + "github.com/openmeterio/openmeter/openmeter/testutils" +) var _ AdminClient = (*mockTopicProvisioner)(nil) @@ -114,7 +100,7 @@ func TestTopicProvisioner(t *testing.T) { adminClient := &mockTopicProvisioner{} meter := noop.NewMeterProvider().Meter("test") - logger := NewDiscardLogger(t) + logger := testutils.NewDiscardLogger(t) provisioner, err := NewTopicProvisioner(TopicProvisionerConfig{ AdminClient: adminClient,