Skip to content

Commit

Permalink
Merge pull request #1742 from openmeterio/refactor/fix-circular-depen…
Browse files Browse the repository at this point in the history
…dency

refactor: fix circular dependency
  • Loading branch information
chrisgacsal authored Oct 23, 2024
2 parents 19c721b + e80ee11 commit 5cef1dc
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 30 deletions.
2 changes: 1 addition & 1 deletion app/common/watermill.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewEventBusPublisher(
) (eventbus.Publisher, error) {
eventBusPublisher, err := eventbus.New(eventbus.Options{
Publisher: publisher,
Config: conf,
TopicMapping: conf.EventBusTopicMapping(),
Logger: logger,
MarshalerTransformFunc: watermillkafka.AddPartitionKeyFromSubject,
})
Expand Down
8 changes: 8 additions & 0 deletions app/config/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/spf13/viper"

"github.com/openmeterio/openmeter/openmeter/watermill/eventbus"
"github.com/openmeterio/openmeter/pkg/errorsx"
)

Expand All @@ -19,6 +20,13 @@ func (c EventsConfiguration) Validate() error {
return c.SystemEvents.Validate()
}

func (c EventsConfiguration) EventBusTopicMapping() eventbus.TopicMapping {
return eventbus.TopicMapping{
IngestEventsTopic: c.IngestEvents.Topic,
SystemEventsTopic: c.SystemEvents.Topic,
}
}

type EventSubsystemConfiguration struct {
Enabled bool
Topic string
Expand Down
2 changes: 1 addition & 1 deletion cmd/jobs/entitlement/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func initEntitlements(ctx context.Context, conf config.Configuration, logger *sl

eventPublisher, err := eventbus.New(eventbus.Options{
Publisher: eventPublisherDriver,
Config: conf.Events,
TopicMapping: conf.Events.EventBusTopicMapping(),
Logger: logger,
MarshalerTransformFunc: watermillkafka.AddPartitionKeyFromSubject,
})
Expand Down
15 changes: 15 additions & 0 deletions openmeter/testutils/logger.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package testutils

import (
"context"
"log/slog"
"testing"
)
Expand All @@ -9,3 +10,17 @@ func NewLogger(t testing.TB) *slog.Logger {
t.Helper()
return slog.Default()
}

// discardHandler is a slog.Handler implementation which does not emit log messages
// See: https://go-review.googlesource.com/c/go/+/548335/5/src/log/slog/example_discard_test.go#14
type discardHandler struct {
slog.JSONHandler
}

func (d *discardHandler) Enabled(context.Context, slog.Level) bool { return false }

func NewDiscardLogger(t testing.TB) *slog.Logger {
t.Helper()

return slog.New(&discardHandler{})
}
56 changes: 45 additions & 11 deletions openmeter/watermill/eventbus/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package eventbus

import (
"context"
"errors"
"fmt"
"log/slog"
"strings"
"testing"
Expand All @@ -11,19 +13,51 @@ import (
"github.com/ThreeDotsLabs/watermill/message"
"github.com/stretchr/testify/assert"

"github.com/openmeterio/openmeter/app/config"
ingestevents "github.com/openmeterio/openmeter/openmeter/sink/flushhandler/ingestnotification/events"
"github.com/openmeterio/openmeter/openmeter/watermill/driver/noop"
"github.com/openmeterio/openmeter/openmeter/watermill/marshaler"
)

type TopicMapping struct {
IngestEventsTopic string
SystemEventsTopic string
}

func (t TopicMapping) Validate() error {
if t.IngestEventsTopic == "" {
return errors.New("ingest events topic is required")
}

if t.SystemEventsTopic == "" {
return errors.New("system events topic is required")
}

return nil
}

type Options struct {
Publisher message.Publisher
Config config.EventsConfiguration
TopicMapping TopicMapping
Logger *slog.Logger
MarshalerTransformFunc marshaler.TransformFunc
}

func (o Options) Validate() error {
if o.Publisher == nil {
return errors.New("publisher is required")
}

if err := o.TopicMapping.Validate(); err != nil {
return fmt.Errorf("topic mapping: %w", err)
}

if o.Logger == nil {
return errors.New("logger is required")
}

return nil
}

type Publisher interface {
// Publish publishes an event to the event bus
Publish(ctx context.Context, event marshaler.Event) error
Expand Down Expand Up @@ -82,6 +116,10 @@ func (p contextPublisher) PublishIfNoError(event marshaler.Event, err error) err
}

func New(opts Options) (Publisher, error) {
if err := opts.Validate(); err != nil {
return nil, err
}

marshaler := marshaler.New(opts.MarshalerTransformFunc)

ingestVersionSubsystemPrefix := ingestevents.EventVersionSubsystem + "."
Expand All @@ -90,9 +128,9 @@ func New(opts Options) (Publisher, error) {
GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
switch {
case strings.HasPrefix(params.EventName, ingestVersionSubsystemPrefix):
return opts.Config.IngestEvents.Topic, nil
return opts.TopicMapping.IngestEventsTopic, nil
default:
return opts.Config.SystemEvents.Topic, nil
return opts.TopicMapping.SystemEventsTopic, nil
}
},

Expand All @@ -112,13 +150,9 @@ func New(opts Options) (Publisher, error) {
func NewMock(t *testing.T) Publisher {
eventBus, err := New(Options{
Publisher: &noop.Publisher{},
Config: config.EventsConfiguration{
SystemEvents: config.EventSubsystemConfiguration{
Topic: "test",
},
IngestEvents: config.EventSubsystemConfiguration{
Topic: "test",
},
TopicMapping: TopicMapping{
IngestEventsTopic: "test",
SystemEventsTopic: "test",
},
Logger: slog.Default(),
})
Expand Down
20 changes: 3 additions & 17 deletions pkg/kafka/topicprovisioner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,16 @@ package kafka

import (
"context"
"log/slog"
"testing"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/metric/noop"
)

// FIXME(chrisgacsal): move discardHandler to 'testutils' pkg after import cycle is resolved.
// discardHandler is a slog.Handler implementation which does not emit log messages
// See: https://go-review.googlesource.com/c/go/+/548335/5/src/log/slog/example_discard_test.go#14
type discardHandler struct {
slog.JSONHandler
}

func (d *discardHandler) Enabled(context.Context, slog.Level) bool { return false }

func NewDiscardLogger(t testing.TB) *slog.Logger {
t.Helper()

return slog.New(&discardHandler{})
}
"github.com/openmeterio/openmeter/openmeter/testutils"
)

var _ AdminClient = (*mockTopicProvisioner)(nil)

Expand Down Expand Up @@ -114,7 +100,7 @@ func TestTopicProvisioner(t *testing.T) {

adminClient := &mockTopicProvisioner{}
meter := noop.NewMeterProvider().Meter("test")
logger := NewDiscardLogger(t)
logger := testutils.NewDiscardLogger(t)

provisioner, err := NewTopicProvisioner(TopicProvisionerConfig{
AdminClient: adminClient,
Expand Down

0 comments on commit 5cef1dc

Please sign in to comment.