From a3e9fb1c69f966d673ab4b8d5587b16667c0cadf Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Wed, 31 Jul 2024 17:12:38 +0200 Subject: [PATCH 1/5] refactor: use windmill event bus --- cmd/server/main.go | 58 ++++----- internal/entitlement/connector.go | 49 +++----- internal/entitlement/events.go | 22 ++++ internal/event/spec/event_type.go | 27 +++++ internal/registry/entitlement.go | 5 +- openmeter/entitlement/adapters.go | 6 +- openmeter/watermill/marshaler/marshaler.go | 134 +++++++++++++++++++++ 7 files changed, 228 insertions(+), 73 deletions(-) create mode 100644 openmeter/watermill/marshaler/marshaler.go diff --git a/cmd/server/main.go b/cmd/server/main.go index 2d9740a40..681aa9124 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -16,6 +16,8 @@ import ( health "github.com/AppsFlyer/go-sundheit" healthhttp "github.com/AppsFlyer/go-sundheit/http" "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/components/cqrs" "github.com/ThreeDotsLabs/watermill/message" "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/go-chi/chi/v5" @@ -37,7 +39,6 @@ import ( "github.com/openmeterio/openmeter/config" "github.com/openmeterio/openmeter/internal/debug" "github.com/openmeterio/openmeter/internal/ent/db" - "github.com/openmeterio/openmeter/internal/event/publisher" "github.com/openmeterio/openmeter/internal/ingest" "github.com/openmeterio/openmeter/internal/ingest/ingestdriver" "github.com/openmeterio/openmeter/internal/ingest/kafkaingest" @@ -54,6 +55,7 @@ import ( "github.com/openmeterio/openmeter/internal/streaming/clickhouse_connector" watermillkafka "github.com/openmeterio/openmeter/internal/watermill/driver/kafka" "github.com/openmeterio/openmeter/internal/watermill/driver/noop" + "github.com/openmeterio/openmeter/openmeter/watermill/marshaler" "github.com/openmeterio/openmeter/pkg/contextx" "github.com/openmeterio/openmeter/pkg/errorsx" "github.com/openmeterio/openmeter/pkg/framework/entutils" @@ -226,6 +228,12 @@ func main() { } }() + eventBus, err := initEventBus(eventPublisher.watermillPublisher, conf, logger) + if err != nil { + logger.Error("failed to initialize event bus", "error", err) + os.Exit(1) + } + // Initialize Kafka Ingest ingestCollector, kafkaIngestNamespaceHandler, err := initKafkaIngest( kafkaProducer, @@ -322,7 +330,7 @@ func main() { StreamingConnector: streamingConnector, MeterRepository: meterRepository, Logger: logger, - Publisher: eventPublishers.eventPublisher.ForTopic(conf.Events.SystemEvents.Topic), + EventBus: eventBus, }) } @@ -430,24 +438,9 @@ func main() { } } -type publishers struct { - eventPublisher publisher.Publisher - driver message.Publisher -} - -func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (*publishers, error) { +func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, kafkaProducer *kafka.Producer) (message.Publisher, error) { if !conf.Events.Enabled { - publisher, err := publisher.NewPublisher(publisher.PublisherOptions{ - Publisher: &noop.Publisher{}, - }) - if err != nil { - return nil, fmt.Errorf("failed to create event driver: %w", err) - } - - return &publishers{ - eventPublisher: publisher, - driver: &noop.Publisher{}, - }, nil + return &noop.Publisher{}, nil } provisionTopics := []watermillkafka.AutoProvisionTopic{} @@ -458,7 +451,7 @@ func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Co }) } - eventDriver, err := watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{ + return watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{ KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration, ProvisionTopics: provisionTopics, ClientID: otelName, @@ -466,19 +459,6 @@ func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Co MetricMeter: metricMeter, DebugLogging: conf.Telemetry.Log.Level == slog.LevelDebug, }) - if err != nil { - return nil, fmt.Errorf("failed to create event driver: %w", err) - } - - eventPublisher, err := publisher.NewPublisher(publisher.PublisherOptions{ - Publisher: eventDriver, - Transform: watermillkafka.AddPartitionKeyFromSubject, - }) - - return &publishers{ - eventPublisher: eventPublisher, - driver: eventDriver, - }, err } func initKafkaProducer(ctx context.Context, config config.Configuration, logger *slog.Logger, metricMeter metric.Meter, group *run.Group) (*kafka.Producer, error) { @@ -597,3 +577,15 @@ func initPGClients(config config.PostgresConfig) ( client: dbClient, }, nil } + +func initEventBus(publisher message.Publisher, config config.Configuration, logger *slog.Logger) (*cqrs.EventBus, error) { + return cqrs.NewEventBusWithConfig(publisher, cqrs.EventBusConfig{ + GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) { + // TODO: make it generic between sink / server + return config.Events.SystemEvents.Topic, nil + }, + + Marshaler: marshaler.New(), + Logger: watermill.NewSlogLogger(logger), + }) +} diff --git a/internal/entitlement/connector.go b/internal/entitlement/connector.go index 22cd99355..757eefacd 100644 --- a/internal/entitlement/connector.go +++ b/internal/entitlement/connector.go @@ -5,9 +5,8 @@ import ( "fmt" "time" + "github.com/ThreeDotsLabs/watermill/components/cqrs" eventmodels "github.com/openmeterio/openmeter/internal/event/models" - "github.com/openmeterio/openmeter/internal/event/publisher" - "github.com/openmeterio/openmeter/internal/event/spec" "github.com/openmeterio/openmeter/internal/meter" "github.com/openmeterio/openmeter/internal/productcatalog" "github.com/openmeterio/openmeter/pkg/framework/entutils" @@ -74,7 +73,7 @@ type entitlementConnector struct { featureConnector productcatalog.FeatureConnector meterRepo meter.Repository - publisher publisher.TopicPublisher + eventBus *cqrs.EventBus } func NewEntitlementConnector( @@ -84,7 +83,7 @@ func NewEntitlementConnector( meteredEntitlementConnector SubTypeConnector, staticEntitlementConnector SubTypeConnector, booleanEntitlementConnector SubTypeConnector, - publisher publisher.TopicPublisher, + eventBus *cqrs.EventBus, ) Connector { return &entitlementConnector{ meteredEntitlementConnector: meteredEntitlementConnector, @@ -93,7 +92,7 @@ func NewEntitlementConnector( entitlementRepo: entitlementRepo, featureConnector: featureConnector, meterRepo: meterRepo, - publisher: publisher, + eventBus: eventBus, } } @@ -149,26 +148,16 @@ func (c *entitlementConnector) CreateEntitlement(ctx context.Context, input Crea return nil, err } - event, err := spec.NewCloudEvent( - spec.EventSpec{ - Source: spec.ComposeResourcePath(input.Namespace, spec.EntityEntitlement, ent.ID), - Subject: spec.ComposeResourcePath(input.Namespace, spec.EntitySubjectKey, ent.SubjectKey), + err = c.eventBus.Publish(ctx, EntitlementCreatedEvent{ + Entitlement: *ent, + Namespace: eventmodels.NamespaceID{ + ID: input.Namespace, }, - EntitlementCreatedEvent{ - Entitlement: *ent, - Namespace: eventmodels.NamespaceID{ - ID: input.Namespace, - }, - }, - ) + }) if err != nil { return nil, err } - if err := c.publisher.Publish(event); err != nil { - return nil, err - } - return ent, nil }) @@ -193,26 +182,16 @@ func (c *entitlementConnector) DeleteEntitlement(ctx context.Context, namespace return nil, err } - event, err := spec.NewCloudEvent( - spec.EventSpec{ - Source: spec.ComposeResourcePath(namespace, spec.EntityEntitlement, ent.ID), - Subject: spec.ComposeResourcePath(namespace, spec.EntitySubjectKey, ent.SubjectKey), + err = c.eventBus.Publish(ctx, EntitlementDeletedEvent{ + Entitlement: *ent, + Namespace: eventmodels.NamespaceID{ + ID: namespace, }, - EntitlementDeletedEvent{ - Entitlement: *ent, - Namespace: eventmodels.NamespaceID{ - ID: namespace, - }, - }, - ) + }) if err != nil { return nil, err } - if err := c.publisher.Publish(event); err != nil { - return nil, err - } - return ent, nil }) diff --git a/internal/entitlement/events.go b/internal/entitlement/events.go index 6969cfdef..dbce79935 100644 --- a/internal/entitlement/events.go +++ b/internal/entitlement/events.go @@ -53,6 +53,17 @@ func (e EntitlementCreatedEvent) Validate() error { return entitlementEvent(e).Validate() } +func (e EntitlementCreatedEvent) EventName() string { + return e.Spec().Type() +} + +func (e EntitlementCreatedEvent) EventMetadata() spec.EventMetadata { + return spec.EventMetadata{ + Source: spec.ComposeResourcePath(e.Namespace.ID, spec.EntityEntitlement, e.ID), + Subject: spec.ComposeResourcePath(e.Namespace.ID, spec.EntitySubjectKey, e.SubjectKey), + } +} + type EntitlementDeletedEvent entitlementEvent var entitlementDeletedEventSpec = spec.EventTypeSpec{ @@ -68,3 +79,14 @@ func (e EntitlementDeletedEvent) Spec() *spec.EventTypeSpec { func (e EntitlementDeletedEvent) Validate() error { return entitlementEvent(e).Validate() } + +func (e EntitlementDeletedEvent) EventName() string { + return e.Spec().Type() +} + +func (e EntitlementDeletedEvent) EventMetadata() spec.EventMetadata { + return spec.EventMetadata{ + Source: spec.ComposeResourcePath(e.Namespace.ID, spec.EntityEntitlement, e.ID), + Subject: spec.ComposeResourcePath(e.Namespace.ID, spec.EntitySubjectKey, e.SubjectKey), + } +} diff --git a/internal/event/spec/event_type.go b/internal/event/spec/event_type.go index 5a7b0b061..0f6de621b 100644 --- a/internal/event/spec/event_type.go +++ b/internal/event/spec/event_type.go @@ -1,5 +1,6 @@ package spec +// TODO: move to metadata import ( "fmt" "time" @@ -28,6 +29,8 @@ type EventTypeSpec struct { cloudEventType string } +// TODO: "string constructor" + func (s *EventTypeSpec) Type() string { if s.cloudEventType != "" { return s.cloudEventType @@ -59,3 +62,27 @@ type EventSpec struct { Subject string Source string } + +type EventMetadata struct { + // ID of the event + ID string + + // Time specifies when the event occurred + Time time.Time + + // Subject meta + // Examples for source and subject pairs + // grant: + // source: //openmeter.io/namespace//entitlement//grant/ + // subject: //openmeter.io/namespace//subject/ + // + // entitlement: + // source: //openmeter.io/namespace//entitlement/ + // subject: //openmeter.io/namespace//subject/ + // + // ingest: + // source: //openmeter.io/namespace//event + // subject: //openmeter.io/namespace//subject/ + Subject string + Source string +} diff --git a/internal/registry/entitlement.go b/internal/registry/entitlement.go index 7841722ff..af8a72157 100644 --- a/internal/registry/entitlement.go +++ b/internal/registry/entitlement.go @@ -3,12 +3,12 @@ package registry import ( "log/slog" + "github.com/ThreeDotsLabs/watermill/components/cqrs" "github.com/openmeterio/openmeter/internal/ent/db" "github.com/openmeterio/openmeter/internal/meter" "github.com/openmeterio/openmeter/openmeter/credit" "github.com/openmeterio/openmeter/openmeter/entitlement" meteredentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/metered" - "github.com/openmeterio/openmeter/openmeter/event/publisher" "github.com/openmeterio/openmeter/openmeter/productcatalog" "github.com/openmeterio/openmeter/openmeter/streaming" ) @@ -30,5 +30,6 @@ type EntitlementOptions struct { StreamingConnector streaming.Connector Logger *slog.Logger MeterRepository meter.Repository - Publisher publisher.TopicPublisher + // TODO: let's have an interface for the publisher, instead of watermill deps + EventBus *cqrs.EventBus } diff --git a/openmeter/entitlement/adapters.go b/openmeter/entitlement/adapters.go index a07334d9c..22d187492 100644 --- a/openmeter/entitlement/adapters.go +++ b/openmeter/entitlement/adapters.go @@ -1,8 +1,8 @@ package entitlement import ( + "github.com/ThreeDotsLabs/watermill/components/cqrs" "github.com/openmeterio/openmeter/internal/entitlement" - "github.com/openmeterio/openmeter/openmeter/event/publisher" "github.com/openmeterio/openmeter/openmeter/meter" "github.com/openmeterio/openmeter/openmeter/productcatalog" ) @@ -14,7 +14,7 @@ func NewEntitlementConnector( metered SubTypeConnector, static SubTypeConnector, boolean SubTypeConnector, - publisher publisher.TopicPublisher, + eventBus *cqrs.EventBus, ) EntitlementConnector { - return entitlement.NewEntitlementConnector(edb, fc, meterRepo, metered, static, boolean, publisher) + return entitlement.NewEntitlementConnector(edb, fc, meterRepo, metered, static, boolean, eventBus) } diff --git a/openmeter/watermill/marshaler/marshaler.go b/openmeter/watermill/marshaler/marshaler.go new file mode 100644 index 000000000..db04c8e2f --- /dev/null +++ b/openmeter/watermill/marshaler/marshaler.go @@ -0,0 +1,134 @@ +package marshaler + +import ( + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/ThreeDotsLabs/watermill/components/cqrs" + "github.com/ThreeDotsLabs/watermill/message" + cloudevents "github.com/cloudevents/sdk-go/v2/event" + "github.com/oklog/ulid/v2" + "github.com/openmeterio/openmeter/internal/event/spec" +) + +const ( + CloudEventsHeaderType = "ce_type" + CloudEventsHeaderTime = "ce_time" + CloudEventsHeaderSource = "ce_source" + CloudEventsHeaderSubject = "ce_subject" +) + +type TransformFunc func(watermillIn *message.Message, cloudEvent cloudevents.Event) (*message.Message, error) + +type event interface { + EventName() string + EventMetadata() spec.EventMetadata + Validate() error +} + +type marshaler struct{} + +func New() cqrs.CommandEventMarshaler { + return &marshaler{} +} + +func (m *marshaler) Marshal(v interface{}) (*message.Message, error) { + ev, ok := v.(event) + if !ok { + return nil, errors.New("invalid event type") + } + + // cloud events object + ce, err := NewCloudEvent(ev) + if err != nil { + return nil, err + } + + ceBytes, err := ce.MarshalJSON() + if err != nil { + return nil, err + } + + // watermill message + msg := message.NewMessage(ce.ID(), ceBytes) + + msg.Metadata.Set(CloudEventsHeaderType, ce.Type()) + msg.Metadata.Set(CloudEventsHeaderTime, ce.Time().In(time.UTC).Format(time.RFC3339)) + msg.Metadata.Set(CloudEventsHeaderSource, ce.Source()) + if ce.Subject() != "" { + msg.Metadata.Set(CloudEventsHeaderSubject, ce.Subject()) + } + + /* + // TODO! + if m.transform != nil { + msg, err = m.transform(msg, event) + if err != nil { + return nil, err + } + }*/ + + return msg, nil +} + +func NewCloudEvent(ev event) (cloudevents.Event, error) { + metadata := ev.EventMetadata() + // Mandatory cloud events fields + if metadata.Source == "" { + return cloudevents.Event{}, errors.New("source is required") + } + + cloudEvent := cloudevents.New() + cloudEvent.SetType(ev.EventName()) + cloudEvent.SetSpecVersion("1.0") + + if metadata.Time.IsZero() { + cloudEvent.SetTime(time.Now()) + } else { + cloudEvent.SetTime(metadata.Time) + } + + if metadata.ID == "" { + cloudEvent.SetID(ulid.Make().String()) + } else { + cloudEvent.SetID(metadata.ID) + } + + cloudEvent.SetSource(metadata.Source) + + cloudEvent.SetSubject(metadata.Subject) + + if err := ev.Validate(); err != nil { + return cloudevents.Event{}, err + } + + if err := cloudEvent.SetData("application/json", ev); err != nil { + return cloudevents.Event{}, err + } + return cloudEvent, nil +} + +func (m *marshaler) Unmarshal(msg *message.Message, v interface{}) error { + cloudEvent := cloudevents.Event{} + if err := cloudEvent.UnmarshalJSON(msg.Payload); err != nil { + return fmt.Errorf("failed to unmarshal CloudEvent: %w", err) + } + + return json.Unmarshal(cloudEvent.Data(), v) +} + +func (m *marshaler) Name(v interface{}) string { + ev, ok := v.(event) + if !ok { + // TODO: how to report error + return "TODO" + } + + return ev.EventName() +} + +func (m *marshaler) NameFromMessage(msg *message.Message) string { + return msg.Metadata.Get(CloudEventsHeaderType) +} From 8566b46d04e93e60cfc6cdd92c922b7fa2093b5c Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Fri, 2 Aug 2024 14:27:17 +0200 Subject: [PATCH 2/5] refactor: add publisher interface --- internal/entitlement/connector.go | 12 ++++++------ internal/event/publisher.go | 8 ++++++++ 2 files changed, 14 insertions(+), 6 deletions(-) create mode 100644 internal/event/publisher.go diff --git a/internal/entitlement/connector.go b/internal/entitlement/connector.go index 757eefacd..313b61301 100644 --- a/internal/entitlement/connector.go +++ b/internal/entitlement/connector.go @@ -5,7 +5,7 @@ import ( "fmt" "time" - "github.com/ThreeDotsLabs/watermill/components/cqrs" + "github.com/openmeterio/openmeter/internal/event" eventmodels "github.com/openmeterio/openmeter/internal/event/models" "github.com/openmeterio/openmeter/internal/meter" "github.com/openmeterio/openmeter/internal/productcatalog" @@ -73,7 +73,7 @@ type entitlementConnector struct { featureConnector productcatalog.FeatureConnector meterRepo meter.Repository - eventBus *cqrs.EventBus + publisher event.Publisher } func NewEntitlementConnector( @@ -83,7 +83,7 @@ func NewEntitlementConnector( meteredEntitlementConnector SubTypeConnector, staticEntitlementConnector SubTypeConnector, booleanEntitlementConnector SubTypeConnector, - eventBus *cqrs.EventBus, + publisher event.Publisher, ) Connector { return &entitlementConnector{ meteredEntitlementConnector: meteredEntitlementConnector, @@ -92,7 +92,7 @@ func NewEntitlementConnector( entitlementRepo: entitlementRepo, featureConnector: featureConnector, meterRepo: meterRepo, - eventBus: eventBus, + publisher: publisher, } } @@ -148,7 +148,7 @@ func (c *entitlementConnector) CreateEntitlement(ctx context.Context, input Crea return nil, err } - err = c.eventBus.Publish(ctx, EntitlementCreatedEvent{ + err = c.publisher.Publish(ctx, EntitlementCreatedEvent{ Entitlement: *ent, Namespace: eventmodels.NamespaceID{ ID: input.Namespace, @@ -182,7 +182,7 @@ func (c *entitlementConnector) DeleteEntitlement(ctx context.Context, namespace return nil, err } - err = c.eventBus.Publish(ctx, EntitlementDeletedEvent{ + err = c.publisher.Publish(ctx, EntitlementDeletedEvent{ Entitlement: *ent, Namespace: eventmodels.NamespaceID{ ID: namespace, diff --git a/internal/event/publisher.go b/internal/event/publisher.go new file mode 100644 index 000000000..cff35ac5e --- /dev/null +++ b/internal/event/publisher.go @@ -0,0 +1,8 @@ +package event + +import "context" + +type Publisher interface { + // TODO: can we constraint it to accept only events? + Publish(ctx context.Context, event any) error +} From beacd81916ac4b5ffd4b42ebb38da5e8282d0138 Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Wed, 7 Aug 2024 09:24:04 +0200 Subject: [PATCH 3/5] feat: wire in watermill event bus --- cmd/balance-worker/main.go | 51 +++---- cmd/notification-service/main.go | 50 +++---- cmd/server/main.go | 31 ++--- cmd/sink-worker/main.go | 14 +- internal/credit/connector.go | 6 +- internal/credit/grant.go | 41 ++---- internal/credit/grant/events.go | 57 +++++--- .../balanceworker/entitlementhandler.go | 27 ++-- .../balanceworker/ingesthandler.go | 10 +- internal/entitlement/balanceworker/worker.go | 76 ++++++----- internal/entitlement/events.go | 62 ++++----- internal/entitlement/metered/connector.go | 6 +- internal/entitlement/metered/events.go | 32 +++-- internal/entitlement/metered/reset.go | 28 ++-- internal/entitlement/metered/utils_test.go | 8 +- internal/entitlement/snapshot/event.go | 29 ++-- internal/event/metadata/event_type.go | 59 +++++++++ .../event/{spec => metadata}/resourcepath.go | 2 +- internal/event/publisher.go | 9 +- internal/event/publisher/publisher.go | 124 ------------------ internal/event/publisher/transformer.go | 8 -- internal/event/spec/event_type.go | 88 ------------- internal/event/spec/parser.go | 106 --------------- internal/event/spec/parser_test.go | 80 ----------- internal/notification/consumer/consumer.go | 15 ++- internal/registry/entitlement.go | 5 +- .../ingestnotification/{ => events}/events.go | 37 ++++-- .../ingestnotification/handler.go | 25 +--- internal/watermill/driver/kafka/marshaler.go | 5 + internal/watermill/driver/kafka/publisher.go | 7 +- internal/watermill/eventbus/eventbus.go | 90 +++++++++++++ openmeter/credit/adapters.go | 4 +- openmeter/entitlement/adapters.go | 4 +- openmeter/entitlement/metered/adapters.go | 4 +- openmeter/event/publisher/publisher.go | 22 ---- .../ingestnotification/events/events.go | 14 ++ .../ingestnotification/ingestnotification.go | 12 +- openmeter/watermill/eventbus/eventbus.go | 20 +++ openmeter/watermill/marshaler/marshaler.go | 60 ++++++--- openmeter/watermill/marshaler/source.go | 44 +++++++ test/entitlement/regression/framework_test.go | 10 +- 41 files changed, 588 insertions(+), 794 deletions(-) create mode 100644 internal/event/metadata/event_type.go rename internal/event/{spec => metadata}/resourcepath.go (96%) delete mode 100644 internal/event/publisher/publisher.go delete mode 100644 internal/event/publisher/transformer.go delete mode 100644 internal/event/spec/event_type.go delete mode 100644 internal/event/spec/parser.go delete mode 100644 internal/event/spec/parser_test.go rename internal/sink/flushhandler/ingestnotification/{ => events}/events.go (60%) create mode 100644 internal/watermill/eventbus/eventbus.go delete mode 100644 openmeter/event/publisher/publisher.go create mode 100644 openmeter/sink/flushhandler/ingestnotification/events/events.go create mode 100644 openmeter/watermill/eventbus/eventbus.go create mode 100644 openmeter/watermill/marshaler/source.go diff --git a/cmd/balance-worker/main.go b/cmd/balance-worker/main.go index 8f2d3e649..e58b47947 100644 --- a/cmd/balance-worker/main.go +++ b/cmd/balance-worker/main.go @@ -37,12 +37,13 @@ import ( "github.com/openmeterio/openmeter/internal/ent/db" entitlementpgadapter "github.com/openmeterio/openmeter/internal/entitlement/adapter" "github.com/openmeterio/openmeter/internal/entitlement/balanceworker" - "github.com/openmeterio/openmeter/internal/event/publisher" "github.com/openmeterio/openmeter/internal/meter" "github.com/openmeterio/openmeter/internal/registry" registrybuilder "github.com/openmeterio/openmeter/internal/registry/builder" "github.com/openmeterio/openmeter/internal/streaming/clickhouse_connector" + "github.com/openmeterio/openmeter/internal/watermill/driver/kafka" watermillkafka "github.com/openmeterio/openmeter/internal/watermill/driver/kafka" + "github.com/openmeterio/openmeter/internal/watermill/eventbus" "github.com/openmeterio/openmeter/pkg/contextx" "github.com/openmeterio/openmeter/pkg/framework/entutils" "github.com/openmeterio/openmeter/pkg/framework/operation" @@ -230,7 +231,7 @@ func main() { } // Create publisher - publishers, err := initEventPublisher(ctx, logger, conf, metricMeter) + eventPublisherDriver, err := initEventPublisherDriver(ctx, logger, conf, metricMeter) if err != nil { logger.Error("failed to initialize event publisher", slog.String("error", err.Error())) os.Exit(1) @@ -238,18 +239,29 @@ func main() { defer func() { // We are using sync publishing, so it's fine to close the publisher using defers. - if err := publishers.watermillPublisher.Close(); err != nil { + if err := eventPublisherDriver.Close(); err != nil { logger.Error("failed to close event publisher", slog.String("error", err.Error())) } }() + eventPublisher, err := eventbus.New(eventbus.Options{ + Publisher: eventPublisherDriver, + Config: conf.Events, + Logger: logger, + MarshalerTransformFunc: kafka.AddPartitionKeyFromSubject, + }) + if err != nil { + logger.Error("failed to initialize event publisher", slog.String("error", err.Error())) + os.Exit(1) + } + // Dependencies: entitlement entitlementConnectors := registrybuilder.GetEntitlementRegistry(registry.EntitlementOptions{ DatabaseClient: pgClients.client, StreamingConnector: clickhouseStreamingConnector, MeterRepository: meterRepository, Logger: logger, - Publisher: publishers.eventPublisher.ForTopic(conf.Events.SystemEvents.Topic), + Publisher: eventPublisher, }) // Initialize worker @@ -259,8 +271,8 @@ func main() { Subscriber: wmSubscriber, TargetTopic: conf.Events.SystemEvents.Topic, - Publisher: publishers.watermillPublisher, - Marshaler: publishers.marshaler, + Publisher: eventPublisherDriver, + Marshaler: eventPublisher.Marshaler(), Entitlement: entitlementConnectors, Repo: entitlementpgadapter.NewPostgresEntitlementRepo(pgClients.client), @@ -353,13 +365,7 @@ func initKafkaSubscriber(conf config.Configuration, logger *slog.Logger) (messag return subscriber, nil } -type eventPublishers struct { - watermillPublisher message.Publisher - marshaler publisher.CloudEventMarshaler - eventPublisher publisher.Publisher -} - -func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (*eventPublishers, error) { +func initEventPublisherDriver(ctx context.Context, logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (message.Publisher, error) { provisionTopics := []watermillkafka.AutoProvisionTopic{} if conf.BalanceWorker.DLQ.AutoProvision.Enabled { provisionTopics = append(provisionTopics, watermillkafka.AutoProvisionTopic{ @@ -368,7 +374,7 @@ func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Co }) } - eventDriver, err := watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{ + return watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{ KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration, ProvisionTopics: provisionTopics, ClientID: otelName, @@ -376,23 +382,6 @@ func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Co MetricMeter: metricMeter, DebugLogging: conf.Telemetry.Log.Level == slog.LevelDebug, }) - if err != nil { - return nil, fmt.Errorf("failed to create event driver: %w", err) - } - - eventPublisher, err := publisher.NewPublisher(publisher.PublisherOptions{ - Publisher: eventDriver, - Transform: watermillkafka.AddPartitionKeyFromSubject, - }) - if err != nil { - return nil, fmt.Errorf("failed to create event publisher: %w", err) - } - - return &eventPublishers{ - watermillPublisher: eventDriver, - marshaler: publisher.NewCloudEventMarshaler(watermillkafka.AddPartitionKeyFromSubject), - eventPublisher: eventPublisher, - }, nil } type pgClients struct { diff --git a/cmd/notification-service/main.go b/cmd/notification-service/main.go index 7d900ceea..b8835a6e1 100644 --- a/cmd/notification-service/main.go +++ b/cmd/notification-service/main.go @@ -35,13 +35,14 @@ import ( "github.com/openmeterio/openmeter/config" "github.com/openmeterio/openmeter/internal/ent/db" - "github.com/openmeterio/openmeter/internal/event/publisher" "github.com/openmeterio/openmeter/internal/meter" "github.com/openmeterio/openmeter/internal/notification/consumer" "github.com/openmeterio/openmeter/internal/registry" registrybuilder "github.com/openmeterio/openmeter/internal/registry/builder" "github.com/openmeterio/openmeter/internal/streaming/clickhouse_connector" + "github.com/openmeterio/openmeter/internal/watermill/driver/kafka" watermillkafka "github.com/openmeterio/openmeter/internal/watermill/driver/kafka" + "github.com/openmeterio/openmeter/internal/watermill/eventbus" "github.com/openmeterio/openmeter/pkg/contextx" "github.com/openmeterio/openmeter/pkg/framework/entutils" "github.com/openmeterio/openmeter/pkg/framework/operation" @@ -229,7 +230,7 @@ func main() { } // Create publisher - publishers, err := initEventPublisher(ctx, logger, conf, metricMeter) + eventPublisherDriver, err := initEventPublisherDriver(ctx, logger, conf, metricMeter) if err != nil { logger.Error("failed to initialize event publisher", slog.String("error", err.Error())) os.Exit(1) @@ -237,18 +238,29 @@ func main() { defer func() { // We are using sync producer, so it is fine to close this as a last step - if err := publishers.watermillPublisher.Close(); err != nil { + if err := eventPublisherDriver.Close(); err != nil { logger.Error("failed to close kafka producer", slog.String("error", err.Error())) } }() + eventPublisher, err := eventbus.New(eventbus.Options{ + Publisher: eventPublisherDriver, + Config: conf.Events, + Logger: logger, + MarshalerTransformFunc: kafka.AddPartitionKeyFromSubject, + }) + if err != nil { + logger.Error("failed to initialize event publisher", slog.String("error", err.Error())) + os.Exit(1) + } + // Dependencies: entitlement entitlementConnectors := registrybuilder.GetEntitlementRegistry(registry.EntitlementOptions{ DatabaseClient: pgClients.client, StreamingConnector: clickhouseStreamingConnector, MeterRepository: meterRepository, Logger: logger, - Publisher: publishers.eventPublisher.ForTopic(conf.Events.SystemEvents.Topic), + Publisher: eventPublisher, }) // Initialize consumer @@ -256,7 +268,8 @@ func main() { SystemEventsTopic: conf.Events.SystemEvents.Topic, Subscriber: wmSubscriber, - Publisher: publishers.watermillPublisher, + Publisher: eventPublisherDriver, + Marshaler: eventPublisher.Marshaler(), Entitlement: entitlementConnectors, @@ -348,13 +361,7 @@ func initKafkaSubscriber(conf config.Configuration, logger *slog.Logger) (messag return subscriber, nil } -type eventPublishers struct { - watermillPublisher message.Publisher - marshaler publisher.CloudEventMarshaler - eventPublisher publisher.Publisher -} - -func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (*eventPublishers, error) { +func initEventPublisherDriver(ctx context.Context, logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (message.Publisher, error) { provisionTopics := []watermillkafka.AutoProvisionTopic{} if conf.NotificationService.Consumer.DLQ.AutoProvision.Enabled { provisionTopics = append(provisionTopics, watermillkafka.AutoProvisionTopic{ @@ -363,7 +370,7 @@ func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Co }) } - eventDriver, err := watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{ + return watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{ KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration, ProvisionTopics: provisionTopics, ClientID: otelName, @@ -371,23 +378,6 @@ func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Co MetricMeter: metricMeter, DebugLogging: conf.Telemetry.Log.Level == slog.LevelDebug, }) - if err != nil { - return nil, fmt.Errorf("failed to create event driver: %w", err) - } - - eventPublisher, err := publisher.NewPublisher(publisher.PublisherOptions{ - Publisher: eventDriver, - Transform: watermillkafka.AddPartitionKeyFromSubject, - }) - if err != nil { - return nil, fmt.Errorf("failed to create event publisher: %w", err) - } - - return &eventPublishers{ - watermillPublisher: eventDriver, - marshaler: publisher.NewCloudEventMarshaler(watermillkafka.AddPartitionKeyFromSubject), - eventPublisher: eventPublisher, - }, nil } type pgClients struct { diff --git a/cmd/server/main.go b/cmd/server/main.go index 681aa9124..734c93041 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -16,8 +16,6 @@ import ( health "github.com/AppsFlyer/go-sundheit" healthhttp "github.com/AppsFlyer/go-sundheit/http" "github.com/ClickHouse/clickhouse-go/v2" - "github.com/ThreeDotsLabs/watermill" - "github.com/ThreeDotsLabs/watermill/components/cqrs" "github.com/ThreeDotsLabs/watermill/message" "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/go-chi/chi/v5" @@ -55,7 +53,7 @@ import ( "github.com/openmeterio/openmeter/internal/streaming/clickhouse_connector" watermillkafka "github.com/openmeterio/openmeter/internal/watermill/driver/kafka" "github.com/openmeterio/openmeter/internal/watermill/driver/noop" - "github.com/openmeterio/openmeter/openmeter/watermill/marshaler" + "github.com/openmeterio/openmeter/internal/watermill/eventbus" "github.com/openmeterio/openmeter/pkg/contextx" "github.com/openmeterio/openmeter/pkg/errorsx" "github.com/openmeterio/openmeter/pkg/framework/entutils" @@ -215,7 +213,7 @@ func main() { os.Exit(1) } - eventPublishers, err := initEventPublisher(ctx, logger, conf, metricMeter) + eventPublisherDriver, err := initEventPublisherDriver(ctx, logger, conf, metricMeter) if err != nil { logger.Error("failed to initialize event publisher", "error", err) os.Exit(1) @@ -223,12 +221,17 @@ func main() { defer func() { logger.Info("closing event publisher") - if err = eventPublishers.driver.Close(); err != nil { + if err = eventPublisherDriver.Close(); err != nil { logger.Error("failed to close event publisher", "error", err) } }() - eventBus, err := initEventBus(eventPublisher.watermillPublisher, conf, logger) + eventPublisher, err := eventbus.New(eventbus.Options{ + Publisher: eventPublisherDriver, + Config: conf.Events, + Logger: logger, + MarshalerTransformFunc: watermillkafka.AddPartitionKeyFromSubject, + }) if err != nil { logger.Error("failed to initialize event bus", "error", err) os.Exit(1) @@ -330,7 +333,7 @@ func main() { StreamingConnector: streamingConnector, MeterRepository: meterRepository, Logger: logger, - EventBus: eventBus, + Publisher: eventPublisher, }) } @@ -438,7 +441,7 @@ func main() { } } -func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, kafkaProducer *kafka.Producer) (message.Publisher, error) { +func initEventPublisherDriver(ctx context.Context, logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (message.Publisher, error) { if !conf.Events.Enabled { return &noop.Publisher{}, nil } @@ -577,15 +580,3 @@ func initPGClients(config config.PostgresConfig) ( client: dbClient, }, nil } - -func initEventBus(publisher message.Publisher, config config.Configuration, logger *slog.Logger) (*cqrs.EventBus, error) { - return cqrs.NewEventBusWithConfig(publisher, cqrs.EventBusConfig{ - GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) { - // TODO: make it generic between sink / server - return config.Events.SystemEvents.Topic, nil - }, - - Marshaler: marshaler.New(), - Logger: watermill.NewSlogLogger(logger), - }) -} diff --git a/cmd/sink-worker/main.go b/cmd/sink-worker/main.go index 7740c15cd..012999f66 100644 --- a/cmd/sink-worker/main.go +++ b/cmd/sink-worker/main.go @@ -30,12 +30,12 @@ import ( "github.com/openmeterio/openmeter/config" "github.com/openmeterio/openmeter/internal/dedupe" - "github.com/openmeterio/openmeter/internal/event/publisher" "github.com/openmeterio/openmeter/internal/meter" "github.com/openmeterio/openmeter/internal/sink" "github.com/openmeterio/openmeter/internal/sink/flushhandler" "github.com/openmeterio/openmeter/internal/sink/flushhandler/ingestnotification" watermillkafka "github.com/openmeterio/openmeter/internal/watermill/driver/kafka" + "github.com/openmeterio/openmeter/internal/watermill/eventbus" "github.com/openmeterio/openmeter/pkg/gosundheit" pkgkafka "github.com/openmeterio/openmeter/pkg/kafka" "github.com/openmeterio/openmeter/pkg/models" @@ -243,16 +243,16 @@ func initIngestEventPublisher(ctx context.Context, logger *slog.Logger, conf con return nil, err } - eventPublisher, err := publisher.NewPublisher(publisher.PublisherOptions{ - Publisher: eventDriver, - Transform: watermillkafka.AddPartitionKeyFromSubject, + eventPublisher, err := eventbus.New(eventbus.Options{ + Publisher: eventDriver, + Config: conf.Events, + Logger: logger, + MarshalerTransformFunc: watermillkafka.AddPartitionKeyFromSubject, }) if err != nil { return nil, err } - targetTopic := eventPublisher.ForTopic(conf.Events.IngestEvents.Topic) - flushHandlerMux := flushhandler.NewFlushEventHandlers() // We should only close the producer once the ingest events are fully processed flushHandlerMux.OnDrainComplete(func() { @@ -262,7 +262,7 @@ func initIngestEventPublisher(ctx context.Context, logger *slog.Logger, conf con } }) - ingestNotificationHandler, err := ingestnotification.NewHandler(logger, metricMeter, targetTopic, ingestnotification.HandlerConfig{ + ingestNotificationHandler, err := ingestnotification.NewHandler(logger, metricMeter, eventPublisher, ingestnotification.HandlerConfig{ MaxEventsInBatch: conf.Sink.IngestNotifications.MaxEventsInBatch, }) if err != nil { diff --git a/internal/credit/connector.go b/internal/credit/connector.go index 1204d4e0a..c1cee162d 100644 --- a/internal/credit/connector.go +++ b/internal/credit/connector.go @@ -6,8 +6,8 @@ import ( "github.com/openmeterio/openmeter/internal/credit/balance" "github.com/openmeterio/openmeter/internal/credit/grant" - "github.com/openmeterio/openmeter/internal/event/publisher" "github.com/openmeterio/openmeter/internal/streaming" + "github.com/openmeterio/openmeter/internal/watermill/eventbus" ) type CreditConnector interface { @@ -20,7 +20,7 @@ type connector struct { grantRepo grant.Repo balanceSnapshotRepo balance.SnapshotRepo // external dependencies - publisher publisher.TopicPublisher + publisher eventbus.Publisher ownerConnector grant.OwnerConnector streamingConnector streaming.Connector logger *slog.Logger @@ -36,7 +36,7 @@ func NewCreditConnector( streamingConnector streaming.Connector, logger *slog.Logger, granularity time.Duration, - publisher publisher.TopicPublisher, + publisher eventbus.Publisher, ) CreditConnector { return &connector{ grantRepo: grantRepo, diff --git a/internal/credit/grant.go b/internal/credit/grant.go index 5aad85afc..b0ebc5987 100644 --- a/internal/credit/grant.go +++ b/internal/credit/grant.go @@ -7,7 +7,6 @@ import ( "github.com/openmeterio/openmeter/internal/credit/grant" eventmodels "github.com/openmeterio/openmeter/internal/event/models" - "github.com/openmeterio/openmeter/internal/event/spec" "github.com/openmeterio/openmeter/pkg/clock" "github.com/openmeterio/openmeter/pkg/framework/entutils" "github.com/openmeterio/openmeter/pkg/models" @@ -87,22 +86,13 @@ func (m *connector) CreateGrant(ctx context.Context, owner grant.NamespacedOwner return nil, err } - event, err := spec.NewCloudEvent( - spec.EventSpec{ - Source: spec.ComposeResourcePath(owner.Namespace, spec.EntityEntitlement, string(owner.ID), spec.EntityGrant, g.ID), - Subject: spec.ComposeResourcePath(owner.Namespace, spec.EntitySubjectKey, subjectKey), - }, - grant.CreatedEvent{ - Grant: *g, - Namespace: eventmodels.NamespaceID{ID: owner.Namespace}, - Subject: eventmodels.SubjectKeyAndID{Key: subjectKey}, - }, - ) - if err != nil { - return nil, err + event := grant.CreatedEvent{ + Grant: *g, + Namespace: eventmodels.NamespaceID{ID: owner.Namespace}, + Subject: eventmodels.SubjectKeyAndID{Key: subjectKey}, } - if err := m.publisher.Publish(event); err != nil { + if err := m.publisher.Publish(ctx, event); err != nil { return nil, err } @@ -154,22 +144,11 @@ func (m *connector) VoidGrant(ctx context.Context, grantID models.NamespacedID) return nil, err } - event, err := spec.NewCloudEvent( - spec.EventSpec{ - Source: spec.ComposeResourcePath(grantID.Namespace, spec.EntityEntitlement, string(owner.ID), spec.EntityGrant, grantID.ID), - Subject: spec.ComposeResourcePath(grantID.Namespace, spec.EntitySubjectKey, subjectKey), - }, - grant.VoidedEvent{ - Grant: g, - Namespace: eventmodels.NamespaceID{ID: owner.Namespace}, - Subject: eventmodels.SubjectKeyAndID{Key: subjectKey}, - }, - ) - if err != nil { - return nil, err - } - - return nil, m.publisher.Publish(event) + return nil, m.publisher.Publish(ctx, grant.VoidedEvent{ + Grant: g, + Namespace: eventmodels.NamespaceID{ID: owner.Namespace}, + Subject: eventmodels.SubjectKeyAndID{Key: subjectKey}, + }) }) return err } diff --git a/internal/credit/grant/events.go b/internal/credit/grant/events.go index b682d970d..390a5aa0f 100644 --- a/internal/credit/grant/events.go +++ b/internal/credit/grant/events.go @@ -3,17 +3,13 @@ package grant import ( "errors" + "github.com/openmeterio/openmeter/internal/event/metadata" "github.com/openmeterio/openmeter/internal/event/models" - "github.com/openmeterio/openmeter/internal/event/spec" + "github.com/openmeterio/openmeter/openmeter/watermill/marshaler" ) const ( - EventSubsystem spec.EventSubsystem = "credit" -) - -const ( - grantCreatedEventName spec.EventName = "grant.created" - grantVoidedEventName spec.EventName = "grant.voided" + EventSubsystem metadata.EventSubsystem = "credit" ) type grantEvent struct { @@ -45,16 +41,31 @@ func (g grantEvent) Validate() error { return nil } +func (e grantEvent) EventMetadata() metadata.EventMetadata { + return metadata.EventMetadata{ + Source: metadata.ComposeResourcePath(e.Namespace.ID, metadata.EntityEntitlement, string(e.OwnerID), metadata.EntityGrant, e.ID), + Subject: metadata.ComposeResourcePath(e.Namespace.ID, metadata.EntitySubjectKey, e.Subject.Key), + } +} + type CreatedEvent grantEvent -var grantCreatedEventSpec = spec.EventTypeSpec{ - Subsystem: EventSubsystem, - Name: grantCreatedEventName, - Version: "v1", +var ( + _ marshaler.Event = CreatedEvent{} + + grantCreatedEventName = metadata.GetEventName(metadata.EventType{ + Subsystem: EventSubsystem, + Name: "grant.created", + Version: "v1", + }) +) + +func (e CreatedEvent) EventName() string { + return grantCreatedEventName } -func (e CreatedEvent) Spec() *spec.EventTypeSpec { - return &grantCreatedEventSpec +func (e CreatedEvent) EventMetadata() metadata.EventMetadata { + return grantEvent(e).EventMetadata() } func (e CreatedEvent) Validate() error { @@ -63,14 +74,22 @@ func (e CreatedEvent) Validate() error { type VoidedEvent grantEvent -var grantVoidedEventSpec = spec.EventTypeSpec{ - Subsystem: EventSubsystem, - Name: grantVoidedEventName, - Version: "v1", +var ( + _ marshaler.Event = VoidedEvent{} + + grantVoidedEventName = metadata.GetEventName(metadata.EventType{ + Subsystem: EventSubsystem, + Name: "grant.voided", + Version: "v1", + }) +) + +func (e VoidedEvent) EventName() string { + return grantVoidedEventName } -func (e VoidedEvent) Spec() *spec.EventTypeSpec { - return &grantVoidedEventSpec +func (e VoidedEvent) EventMetadata() metadata.EventMetadata { + return grantEvent(e).EventMetadata() } func (e VoidedEvent) Validate() error { diff --git a/internal/entitlement/balanceworker/entitlementhandler.go b/internal/entitlement/balanceworker/entitlementhandler.go index 17e53339a..b7e2b1218 100644 --- a/internal/entitlement/balanceworker/entitlementhandler.go +++ b/internal/entitlement/balanceworker/entitlementhandler.go @@ -10,9 +10,10 @@ import ( "github.com/openmeterio/openmeter/internal/entitlement" entitlementdriver "github.com/openmeterio/openmeter/internal/entitlement/driver" "github.com/openmeterio/openmeter/internal/entitlement/snapshot" + "github.com/openmeterio/openmeter/internal/event/metadata" "github.com/openmeterio/openmeter/internal/event/models" - "github.com/openmeterio/openmeter/internal/event/spec" "github.com/openmeterio/openmeter/internal/productcatalog" + "github.com/openmeterio/openmeter/openmeter/watermill/marshaler" "github.com/openmeterio/openmeter/pkg/convert" ) @@ -34,11 +35,8 @@ func (w *Worker) handleEntitlementDeleteEvent(ctx context.Context, delEvent enti calculationTime := time.Now() - event, err := spec.NewCloudEvent( - spec.EventSpec{ - Source: spec.ComposeResourcePath(namespace, spec.EntityEntitlement, delEvent.ID), - Subject: spec.ComposeResourcePath(namespace, spec.EntitySubjectKey, delEvent.SubjectKey), - }, + event := marshaler.WithSource( + metadata.ComposeResourcePath(namespace, metadata.EntityEntitlement, delEvent.ID), snapshot.SnapshotEvent{ Entitlement: delEvent.Entitlement, Namespace: models.NamespaceID{ @@ -56,11 +54,8 @@ func (w *Worker) handleEntitlementDeleteEvent(ctx context.Context, delEvent enti CurrentUsagePeriod: delEvent.CurrentUsagePeriod, }, ) - if err != nil { - return nil, fmt.Errorf("failed to create cloud event: %w", err) - } - wmMessage, err := w.opts.Marshaler.MarshalEvent(event) + wmMessage, err := w.opts.Marshaler.Marshal(event) if err != nil { return nil, fmt.Errorf("failed to marshal cloud event: %w", err) } @@ -123,11 +118,8 @@ func (w *Worker) createSnapshotEvent(ctx context.Context, entitlementID Namespac } } - event, err := spec.NewCloudEvent( - spec.EventSpec{ - Source: source, - Subject: spec.ComposeResourcePath(entitlementID.Namespace, spec.EntitySubjectKey, entitlement.SubjectKey), - }, + event := marshaler.WithSource( + source, snapshot.SnapshotEvent{ Entitlement: *entitlement, Namespace: models.NamespaceID{ @@ -146,11 +138,8 @@ func (w *Worker) createSnapshotEvent(ctx context.Context, entitlementID Namespac CurrentUsagePeriod: entitlement.CurrentUsagePeriod, }, ) - if err != nil { - return nil, fmt.Errorf("failed to create cloud event: %w", err) - } - wmMessage, err := w.opts.Marshaler.MarshalEvent(event) + wmMessage, err := w.opts.Marshaler.Marshal(event) if err != nil { return nil, fmt.Errorf("failed to marshal cloud event: %w", err) } diff --git a/internal/entitlement/balanceworker/ingesthandler.go b/internal/entitlement/balanceworker/ingesthandler.go index 153306f69..547c32731 100644 --- a/internal/entitlement/balanceworker/ingesthandler.go +++ b/internal/entitlement/balanceworker/ingesthandler.go @@ -7,14 +7,14 @@ import ( "github.com/hashicorp/go-multierror" "github.com/openmeterio/openmeter/internal/entitlement" - "github.com/openmeterio/openmeter/internal/event/spec" + "github.com/openmeterio/openmeter/internal/event/metadata" "github.com/openmeterio/openmeter/internal/productcatalog" - "github.com/openmeterio/openmeter/internal/sink/flushhandler/ingestnotification" + ingestevents "github.com/openmeterio/openmeter/internal/sink/flushhandler/ingestnotification/events" "github.com/openmeterio/openmeter/pkg/slicesx" ) -func (w *Worker) handleBatchedIngestEvent(ctx context.Context, event ingestnotification.EventBatchedIngest) ([]*message.Message, error) { - filters := slicesx.Map(event.Events, func(e ingestnotification.IngestEventData) IngestEventQueryFilter { +func (w *Worker) handleBatchedIngestEvent(ctx context.Context, event ingestevents.EventBatchedIngest) ([]*message.Message, error) { + filters := slicesx.Map(event.Events, func(e ingestevents.IngestEventData) IngestEventQueryFilter { return IngestEventQueryFilter{ Namespace: e.Namespace.ID, SubjectKey: e.SubjectKey, @@ -33,7 +33,7 @@ func (w *Worker) handleBatchedIngestEvent(ctx context.Context, event ingestnotif messages, err := w.handleEntitlementUpdateEvent( ctx, NamespacedID{Namespace: entitlement.Namespace, ID: entitlement.EntitlementID}, - spec.ComposeResourcePath(entitlement.Namespace, spec.EntityEvent), + metadata.ComposeResourcePath(entitlement.Namespace, metadata.EntityEvent), ) if err != nil { // TODO: add error information too diff --git a/internal/entitlement/balanceworker/worker.go b/internal/entitlement/balanceworker/worker.go index 359d6a714..05a35fe97 100644 --- a/internal/entitlement/balanceworker/worker.go +++ b/internal/entitlement/balanceworker/worker.go @@ -15,10 +15,10 @@ import ( "github.com/openmeterio/openmeter/internal/credit/grant" "github.com/openmeterio/openmeter/internal/entitlement" meteredentitlement "github.com/openmeterio/openmeter/internal/entitlement/metered" - "github.com/openmeterio/openmeter/internal/event/publisher" - "github.com/openmeterio/openmeter/internal/event/spec" + "github.com/openmeterio/openmeter/internal/event/metadata" "github.com/openmeterio/openmeter/internal/registry" - "github.com/openmeterio/openmeter/internal/sink/flushhandler/ingestnotification" + ingestevents "github.com/openmeterio/openmeter/internal/sink/flushhandler/ingestnotification/events" + "github.com/openmeterio/openmeter/openmeter/watermill/marshaler" pkgmodels "github.com/openmeterio/openmeter/pkg/models" ) @@ -44,7 +44,7 @@ type WorkerOptions struct { TargetTopic string DLQ *WorkerDLQOptions Publisher message.Publisher - Marshaler publisher.CloudEventMarshaler + Marshaler marshaler.Marshaler Entitlement *registry.Entitlement Repo BalanceWorkerRepository @@ -169,7 +169,7 @@ func (w *Worker) Close() error { func (w *Worker) handleEvent(msg *message.Message) ([]*message.Message, error) { w.opts.Logger.Debug("received system event", w.messageToLogFields(msg)...) - ceType, found := msg.Metadata[publisher.CloudEventsHeaderType] + ceType, found := msg.Metadata[marshaler.CloudEventsHeaderType] if !found { w.opts.Logger.Warn("missing CloudEvents type, ignoring message") return nil, nil @@ -177,70 +177,80 @@ func (w *Worker) handleEvent(msg *message.Message) ([]*message.Message, error) { switch ceType { // Entitlement events - case entitlement.EntitlementCreatedEvent{}.Spec().Type(): - event, err := spec.ParseCloudEventFromBytes[entitlement.EntitlementCreatedEvent](msg.Payload) - if err != nil { + case entitlement.EntitlementCreatedEvent{}.EventName(): + event := entitlement.EntitlementCreatedEvent{} + + if err := w.opts.Marshaler.Unmarshal(msg, &event); err != nil { w.opts.Logger.Error("failed to parse entitlement created event", w.messageToLogFields(msg)...) return nil, err } + return w.handleEntitlementUpdateEvent( msg.Context(), - NamespacedID{Namespace: event.Payload.Namespace.ID, ID: event.Payload.ID}, - spec.ComposeResourcePath(event.Payload.Namespace.ID, spec.EntityEntitlement, event.Payload.ID), + NamespacedID{Namespace: event.Namespace.ID, ID: event.ID}, + metadata.ComposeResourcePath(event.Namespace.ID, metadata.EntityEntitlement, event.ID), ) - case entitlement.EntitlementDeletedEvent{}.Spec().Type(): - event, err := spec.ParseCloudEventFromBytes[entitlement.EntitlementDeletedEvent](msg.Payload) - if err != nil { + + case entitlement.EntitlementDeletedEvent{}.EventName(): + event := entitlement.EntitlementDeletedEvent{} + + if err := w.opts.Marshaler.Unmarshal(msg, &event); err != nil { w.opts.Logger.Error("failed to parse entitlement deleted event", w.messageToLogFields(msg)...) return nil, err } - return w.handleEntitlementDeleteEvent(msg.Context(), event.Payload) + return w.handleEntitlementDeleteEvent(msg.Context(), event) // Grant events - case grant.CreatedEvent{}.Spec().Type(): - event, err := spec.ParseCloudEventFromBytes[grant.CreatedEvent](msg.Payload) - if err != nil { + case grant.CreatedEvent{}.EventName(): + event := grant.CreatedEvent{} + + if err := w.opts.Marshaler.Unmarshal(msg, &event); err != nil { return nil, fmt.Errorf("failed to parse grant created event: %w", err) } return w.handleEntitlementUpdateEvent( msg.Context(), - NamespacedID{Namespace: event.Payload.Namespace.ID, ID: string(event.Payload.OwnerID)}, - spec.ComposeResourcePath(event.Payload.Namespace.ID, spec.EntityEntitlement, string(event.Payload.OwnerID), spec.EntityGrant, event.Payload.ID), + NamespacedID{Namespace: event.Namespace.ID, ID: string(event.OwnerID)}, + metadata.ComposeResourcePath(event.Namespace.ID, metadata.EntityEntitlement, string(event.OwnerID), metadata.EntityGrant, event.ID), ) - case grant.VoidedEvent{}.Spec().Type(): - event, err := spec.ParseCloudEventFromBytes[grant.VoidedEvent](msg.Payload) - if err != nil { + + case grant.VoidedEvent{}.EventName(): + event := grant.VoidedEvent{} + + if err := w.opts.Marshaler.Unmarshal(msg, &event); err != nil { return nil, fmt.Errorf("failed to parse grant voided event: %w", err) } return w.handleEntitlementUpdateEvent( msg.Context(), - NamespacedID{Namespace: event.Payload.Namespace.ID, ID: string(event.Payload.OwnerID)}, - spec.ComposeResourcePath(event.Payload.Namespace.ID, spec.EntityEntitlement, string(event.Payload.OwnerID), spec.EntityGrant, event.Payload.ID), + NamespacedID{Namespace: event.Namespace.ID, ID: string(event.OwnerID)}, + metadata.ComposeResourcePath(event.Namespace.ID, metadata.EntityEntitlement, string(event.OwnerID), metadata.EntityGrant, event.ID), ) // Metered entitlement events - case meteredentitlement.EntitlementResetEvent{}.Spec().Type(): - event, err := spec.ParseCloudEventFromBytes[meteredentitlement.EntitlementResetEvent](msg.Payload) - if err != nil { + case meteredentitlement.EntitlementResetEvent{}.EventName(): + event := meteredentitlement.EntitlementResetEvent{} + + if err := w.opts.Marshaler.Unmarshal(msg, &event); err != nil { return nil, fmt.Errorf("failed to parse reset entitlement event: %w", err) } return w.handleEntitlementUpdateEvent( msg.Context(), - NamespacedID{Namespace: event.Payload.Namespace.ID, ID: event.Payload.EntitlementID}, - spec.ComposeResourcePath(event.Payload.Namespace.ID, spec.EntityEntitlement, event.Payload.EntitlementID), + NamespacedID{Namespace: event.Namespace.ID, ID: event.EntitlementID}, + metadata.ComposeResourcePath(event.Namespace.ID, metadata.EntityEntitlement, event.EntitlementID), ) + // Ingest events - case ingestnotification.EventBatchedIngest{}.Spec().Type(): - event, err := spec.ParseCloudEventFromBytes[ingestnotification.EventBatchedIngest](msg.Payload) - if err != nil { + case ingestevents.EventBatchedIngest{}.EventName(): + event := ingestevents.EventBatchedIngest{} + + if err := w.opts.Marshaler.Unmarshal(msg, &event); err != nil { return nil, fmt.Errorf("failed to parse ingest event: %w", err) } - return w.handleBatchedIngestEvent(msg.Context(), event.Payload) + return w.handleBatchedIngestEvent(msg.Context(), event) } return nil, nil } diff --git a/internal/entitlement/events.go b/internal/entitlement/events.go index dbce79935..0b2593ae6 100644 --- a/internal/entitlement/events.go +++ b/internal/entitlement/events.go @@ -3,17 +3,13 @@ package entitlement import ( "errors" + "github.com/openmeterio/openmeter/internal/event/metadata" "github.com/openmeterio/openmeter/internal/event/models" - "github.com/openmeterio/openmeter/internal/event/spec" + "github.com/openmeterio/openmeter/openmeter/watermill/marshaler" ) const ( - EventSubsystem spec.EventSubsystem = "entitlement" -) - -const ( - entitlementCreatedEventName spec.EventName = "entitlement.created" - entitlementDeletedEventName spec.EventName = "entitlement.deleted" + EventSubsystem metadata.EventSubsystem = "entitlement" ) type entitlementEvent struct { @@ -39,54 +35,54 @@ func (e entitlementEvent) Validate() error { type EntitlementCreatedEvent entitlementEvent -var entitlementCreatedEventSpec = spec.EventTypeSpec{ - Subsystem: EventSubsystem, - Name: entitlementCreatedEventName, - Version: "v1", -} +var ( + _ marshaler.Event = EntitlementCreatedEvent{} -func (e EntitlementCreatedEvent) Spec() *spec.EventTypeSpec { - return &entitlementCreatedEventSpec -} + entitlementCreatedEventName = metadata.GetEventName(metadata.EventType{ + Subsystem: EventSubsystem, + Name: "entitlement.created", + Version: "v1", + }) +) func (e EntitlementCreatedEvent) Validate() error { return entitlementEvent(e).Validate() } func (e EntitlementCreatedEvent) EventName() string { - return e.Spec().Type() + return entitlementCreatedEventName } -func (e EntitlementCreatedEvent) EventMetadata() spec.EventMetadata { - return spec.EventMetadata{ - Source: spec.ComposeResourcePath(e.Namespace.ID, spec.EntityEntitlement, e.ID), - Subject: spec.ComposeResourcePath(e.Namespace.ID, spec.EntitySubjectKey, e.SubjectKey), +func (e EntitlementCreatedEvent) EventMetadata() metadata.EventMetadata { + return metadata.EventMetadata{ + Source: metadata.ComposeResourcePath(e.Namespace.ID, metadata.EntityEntitlement, e.ID), + Subject: metadata.ComposeResourcePath(e.Namespace.ID, metadata.EntitySubjectKey, e.SubjectKey), } } type EntitlementDeletedEvent entitlementEvent -var entitlementDeletedEventSpec = spec.EventTypeSpec{ - Subsystem: EventSubsystem, - Name: entitlementDeletedEventName, - Version: "v1", -} +var ( + _ marshaler.Event = EntitlementDeletedEvent{} -func (e EntitlementDeletedEvent) Spec() *spec.EventTypeSpec { - return &entitlementDeletedEventSpec -} + entitlementDeletedEventName = metadata.GetEventName(metadata.EventType{ + Subsystem: EventSubsystem, + Name: "entitlement.deleted", + Version: "v1", + }) +) func (e EntitlementDeletedEvent) Validate() error { return entitlementEvent(e).Validate() } func (e EntitlementDeletedEvent) EventName() string { - return e.Spec().Type() + return entitlementDeletedEventName } -func (e EntitlementDeletedEvent) EventMetadata() spec.EventMetadata { - return spec.EventMetadata{ - Source: spec.ComposeResourcePath(e.Namespace.ID, spec.EntityEntitlement, e.ID), - Subject: spec.ComposeResourcePath(e.Namespace.ID, spec.EntitySubjectKey, e.SubjectKey), +func (e EntitlementDeletedEvent) EventMetadata() metadata.EventMetadata { + return metadata.EventMetadata{ + Source: metadata.ComposeResourcePath(e.Namespace.ID, metadata.EntityEntitlement, e.ID), + Subject: metadata.ComposeResourcePath(e.Namespace.ID, metadata.EntitySubjectKey, e.SubjectKey), } } diff --git a/internal/entitlement/metered/connector.go b/internal/entitlement/metered/connector.go index 8ede7517c..22b223e40 100644 --- a/internal/entitlement/metered/connector.go +++ b/internal/entitlement/metered/connector.go @@ -11,7 +11,7 @@ import ( "github.com/openmeterio/openmeter/internal/entitlement" "github.com/openmeterio/openmeter/internal/productcatalog" "github.com/openmeterio/openmeter/internal/streaming" - "github.com/openmeterio/openmeter/openmeter/event/publisher" + "github.com/openmeterio/openmeter/internal/watermill/eventbus" "github.com/openmeterio/openmeter/pkg/clock" "github.com/openmeterio/openmeter/pkg/convert" "github.com/openmeterio/openmeter/pkg/defaultx" @@ -63,7 +63,7 @@ type connector struct { entitlementRepo entitlement.EntitlementRepo granularity time.Duration - publisher publisher.TopicPublisher + publisher eventbus.Publisher } func NewMeteredEntitlementConnector( @@ -73,7 +73,7 @@ func NewMeteredEntitlementConnector( grantConnector credit.GrantConnector, grantRepo grant.Repo, entitlementRepo entitlement.EntitlementRepo, - publisher publisher.TopicPublisher, + publisher eventbus.Publisher, ) Connector { return &connector{ streamingConnector: streamingConnector, diff --git a/internal/entitlement/metered/events.go b/internal/entitlement/metered/events.go index 14916dcfe..042c9dee7 100644 --- a/internal/entitlement/metered/events.go +++ b/internal/entitlement/metered/events.go @@ -4,16 +4,13 @@ import ( "errors" "time" + "github.com/openmeterio/openmeter/internal/event/metadata" "github.com/openmeterio/openmeter/internal/event/models" - "github.com/openmeterio/openmeter/internal/event/spec" + "github.com/openmeterio/openmeter/openmeter/watermill/marshaler" ) const ( - EventSubsystem spec.EventSubsystem = "meteredEntitlement" -) - -const ( - resetEntitlementEventName spec.EventName = "entitlement.reset" + EventSubsystem metadata.EventSubsystem = "meteredEntitlement" ) type EntitlementResetEvent struct { @@ -24,14 +21,25 @@ type EntitlementResetEvent struct { RetainAnchor bool `json:"retainAnchor"` } -var resetEntitlementEventSpec = spec.EventTypeSpec{ - Subsystem: EventSubsystem, - Name: resetEntitlementEventName, - Version: "v1", +var ( + _ marshaler.Event = EntitlementResetEvent{} + + resetEntitlementEventName = metadata.GetEventName(metadata.EventType{ + Subsystem: EventSubsystem, + Name: "entitlement.reset", + Version: "v1", + }) +) + +func (e EntitlementResetEvent) EventName() string { + return resetEntitlementEventName } -func (e EntitlementResetEvent) Spec() *spec.EventTypeSpec { - return &resetEntitlementEventSpec +func (e EntitlementResetEvent) EventMetadata() metadata.EventMetadata { + return metadata.EventMetadata{ + Source: metadata.ComposeResourcePath(e.Namespace.ID, metadata.EntityEntitlement, e.EntitlementID), + Subject: metadata.ComposeResourcePath(e.Namespace.ID, metadata.EntitySubjectKey, e.Subject.Key), + } } func (e EntitlementResetEvent) Validate() error { diff --git a/internal/entitlement/metered/reset.go b/internal/entitlement/metered/reset.go index 35d689511..3f793be4c 100644 --- a/internal/entitlement/metered/reset.go +++ b/internal/entitlement/metered/reset.go @@ -10,7 +10,6 @@ import ( "github.com/openmeterio/openmeter/internal/credit/grant" "github.com/openmeterio/openmeter/internal/entitlement" eventmodels "github.com/openmeterio/openmeter/internal/event/models" - "github.com/openmeterio/openmeter/internal/event/spec" "github.com/openmeterio/openmeter/pkg/framework/entutils" "github.com/openmeterio/openmeter/pkg/models" ) @@ -45,28 +44,19 @@ func (e *connector) ResetEntitlementUsage(ctx context.Context, entitlementID mod return nil, err } - event, err := spec.NewCloudEvent( - spec.EventSpec{ - Source: spec.ComposeResourcePath(entitlementID.Namespace, spec.EntityEntitlement, entitlementID.ID), - Subject: spec.ComposeResourcePath(entitlementID.Namespace, spec.EntitySubjectKey, ent.SubjectKey), + event := EntitlementResetEvent{ + EntitlementID: entitlementID.ID, + Namespace: eventmodels.NamespaceID{ + ID: entitlementID.Namespace, }, - EntitlementResetEvent{ - EntitlementID: entitlementID.ID, - Namespace: eventmodels.NamespaceID{ - ID: entitlementID.Namespace, - }, - Subject: eventmodels.SubjectKeyAndID{ - Key: ent.SubjectKey, - }, - ResetAt: params.At, - RetainAnchor: params.RetainAnchor, + Subject: eventmodels.SubjectKeyAndID{ + Key: ent.SubjectKey, }, - ) - if err != nil { - return nil, err + ResetAt: params.At, + RetainAnchor: params.RetainAnchor, } - if err := e.publisher.Publish(event); err != nil { + if err := e.publisher.Publish(ctx, event); err != nil { return nil, err } diff --git a/internal/entitlement/metered/utils_test.go b/internal/entitlement/metered/utils_test.go index c9994ba97..ae50e7ac4 100644 --- a/internal/entitlement/metered/utils_test.go +++ b/internal/entitlement/metered/utils_test.go @@ -16,12 +16,12 @@ import ( "github.com/openmeterio/openmeter/internal/entitlement" entitlement_postgresadapter "github.com/openmeterio/openmeter/internal/entitlement/adapter" meteredentitlement "github.com/openmeterio/openmeter/internal/entitlement/metered" - "github.com/openmeterio/openmeter/internal/event/publisher" "github.com/openmeterio/openmeter/internal/productcatalog" productcatalog_postgresadapter "github.com/openmeterio/openmeter/internal/productcatalog/adapter" streaming_testutils "github.com/openmeterio/openmeter/internal/streaming/testutils" "github.com/openmeterio/openmeter/internal/testutils" "github.com/openmeterio/openmeter/openmeter/meter" + "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" "github.com/openmeterio/openmeter/pkg/models" ) @@ -75,6 +75,8 @@ func setupConnector(t *testing.T) (meteredentitlement.Connector, *dependencies) t.Fatalf("failed to migrate database %s", err) } + mockPublisher := eventbus.NewMock(t) + // build adapters owner := meteredentitlement.NewEntitlementGrantOwnerAdapter( featureRepo, @@ -91,7 +93,7 @@ func setupConnector(t *testing.T) (meteredentitlement.Connector, *dependencies) streamingConnector, testLogger, time.Minute, - publisher.NewMockTopicPublisher(t), + mockPublisher, ) connector := meteredentitlement.NewMeteredEntitlementConnector( @@ -101,7 +103,7 @@ func setupConnector(t *testing.T) (meteredentitlement.Connector, *dependencies) creditConnector, grantRepo, entitlementRepo, - publisher.NewMockTopicPublisher(t), + mockPublisher, ) return connector, &dependencies{ diff --git a/internal/entitlement/snapshot/event.go b/internal/entitlement/snapshot/event.go index 95f50bc3c..784701b29 100644 --- a/internal/entitlement/snapshot/event.go +++ b/internal/entitlement/snapshot/event.go @@ -5,16 +5,13 @@ import ( "time" "github.com/openmeterio/openmeter/internal/entitlement" + "github.com/openmeterio/openmeter/internal/event/metadata" "github.com/openmeterio/openmeter/internal/event/models" - "github.com/openmeterio/openmeter/internal/event/spec" "github.com/openmeterio/openmeter/internal/productcatalog" + "github.com/openmeterio/openmeter/openmeter/watermill/marshaler" "github.com/openmeterio/openmeter/pkg/recurrence" ) -const ( - snapshotEventName spec.EventName = "entitlement.snapshot" -) - type BalanceOperationType string const ( @@ -55,14 +52,24 @@ type SnapshotEvent struct { CurrentUsagePeriod *recurrence.Period `json:"currentUsagePeriod,omitempty"` } -var SnapshotEventSpec = spec.EventTypeSpec{ - Subsystem: entitlement.EventSubsystem, - Name: snapshotEventName, - Version: "v1", +var ( + _ marshaler.Event = SnapshotEvent{} + + snapshotEventName = metadata.GetEventName(metadata.EventType{ + Subsystem: entitlement.EventSubsystem, + Name: "entitlement.snapshot", + Version: "v1", + }) +) + +func (e SnapshotEvent) EventName() string { + return snapshotEventName } -func (e SnapshotEvent) Spec() *spec.EventTypeSpec { - return &SnapshotEventSpec +func (e SnapshotEvent) EventMetadata() metadata.EventMetadata { + return metadata.EventMetadata{ + Subject: metadata.ComposeResourcePath(e.Namespace.ID, metadata.EntitySubjectKey, e.Subject.Key), + } } func (e SnapshotEvent) Validate() error { diff --git a/internal/event/metadata/event_type.go b/internal/event/metadata/event_type.go new file mode 100644 index 000000000..6af3297cc --- /dev/null +++ b/internal/event/metadata/event_type.go @@ -0,0 +1,59 @@ +package metadata + +import ( + "fmt" + "time" +) + +type ( + EventSubsystem string + EventName string + EventVersion string +) + +type EventType struct { + // Subsystem defines which connector/component is responsible for the event (e.g. ingest, entitlements, etc) + Subsystem EventSubsystem + + // Type is the actual event type (e.g. ingestion, flush, etc) + Name EventName + + // Version is the version of the event (e.g. v1, v2, etc) + Version EventVersion +} + +func (s *EventType) EventName() string { + return fmt.Sprintf("io.openmeter.%s.%s.%s", s.Subsystem, s.Version, s.Name) +} + +func (s *EventType) VersionSubsystem() string { + return fmt.Sprintf("io.openmeter.%s.%s", s.Subsystem, s.Version) +} + +func GetEventName(spec EventType) string { + return spec.EventName() +} + +type EventMetadata struct { + // ID of the event + ID string + + // Time specifies when the event occurred + Time time.Time + + // Subject meta + // Examples for source and subject pairs + // grant: + // source: //openmeter.io/namespace//entitlement//grant/ + // subject: //openmeter.io/namespace//subject/ + // + // entitlement: + // source: //openmeter.io/namespace//entitlement/ + // subject: //openmeter.io/namespace//subject/ + // + // ingest: + // source: //openmeter.io/namespace//event + // subject: //openmeter.io/namespace//subject/ + Subject string + Source string +} diff --git a/internal/event/spec/resourcepath.go b/internal/event/metadata/resourcepath.go similarity index 96% rename from internal/event/spec/resourcepath.go rename to internal/event/metadata/resourcepath.go index a8486afc2..8bb0947ad 100644 --- a/internal/event/spec/resourcepath.go +++ b/internal/event/metadata/resourcepath.go @@ -1,4 +1,4 @@ -package spec +package metadata import ( "fmt" diff --git a/internal/event/publisher.go b/internal/event/publisher.go index cff35ac5e..efc1a0970 100644 --- a/internal/event/publisher.go +++ b/internal/event/publisher.go @@ -1,8 +1,11 @@ package event -import "context" +import ( + "context" + + "github.com/openmeterio/openmeter/openmeter/watermill/marshaler" +) type Publisher interface { - // TODO: can we constraint it to accept only events? - Publish(ctx context.Context, event any) error + Publish(ctx context.Context, event marshaler.Event) error } diff --git a/internal/event/publisher/publisher.go b/internal/event/publisher/publisher.go deleted file mode 100644 index dc4754c57..000000000 --- a/internal/event/publisher/publisher.go +++ /dev/null @@ -1,124 +0,0 @@ -package publisher - -import ( - "errors" - "testing" - "time" - - "github.com/ThreeDotsLabs/watermill" - "github.com/ThreeDotsLabs/watermill/message" - "github.com/cloudevents/sdk-go/v2/event" - "github.com/stretchr/testify/assert" - - "github.com/openmeterio/openmeter/internal/watermill/driver/noop" -) - -const ( - CloudEventsHeaderType = "ce_type" - CloudEventsHeaderTime = "ce_time" - CloudEventsHeaderSource = "ce_source" - CloudEventsHeaderSubject = "ce_subject" -) - -type Publisher interface { - ForTopic(topic string) TopicPublisher -} - -type PublisherOptions struct { - // Publisher is the underlying watermill publisher object - Publisher message.Publisher - - // Transform is a function that can be used to transform the message before it is published, mainly used - // for driver specific tweaks. If more are required, we should add a chain function. - Transform TransformFunc -} - -type publisher struct { - publisher message.Publisher - marshaler CloudEventMarshaler -} - -func NewPublisher(opts PublisherOptions) (Publisher, error) { - if opts.Publisher == nil { - return nil, errors.New("publisher is required") - } - - return &publisher{ - publisher: opts.Publisher, - marshaler: NewCloudEventMarshaler(opts.Transform), - }, nil -} - -func NewMockTopicPublisher(t *testing.T) TopicPublisher { - pub, err := NewPublisher(PublisherOptions{ - Publisher: noop.Publisher{}, - }) - - assert.NoError(t, err) - return pub.ForTopic("test") -} - -func (p *publisher) ForTopic(topic string) TopicPublisher { - return &topicPublisher{ - publisher: p.publisher, - topic: topic, - marshaler: p.marshaler, - } -} - -type TopicPublisher interface { - Publish(event event.Event) error -} - -type topicPublisher struct { - publisher message.Publisher - topic string - marshaler CloudEventMarshaler -} - -func (p *topicPublisher) Publish(event event.Event) error { - msg, err := p.marshaler.MarshalEvent(event) - if err != nil { - return err - } - - return p.publisher.Publish(p.topic, msg) -} - -type CloudEventMarshaler interface { - MarshalEvent(event.Event) (*message.Message, error) -} - -type cloudEventMarshaler struct { - transform TransformFunc -} - -func NewCloudEventMarshaler(transform TransformFunc) CloudEventMarshaler { - return &cloudEventMarshaler{ - transform: transform, - } -} - -func (m *cloudEventMarshaler) MarshalEvent(event event.Event) (*message.Message, error) { - payload, err := event.MarshalJSON() - if err != nil { - return nil, err - } - - msg := message.NewMessage(watermill.NewUUID(), payload) - msg.Metadata.Set(CloudEventsHeaderType, event.Type()) - msg.Metadata.Set(CloudEventsHeaderTime, event.Time().In(time.UTC).Format(time.RFC3339)) - msg.Metadata.Set(CloudEventsHeaderSource, event.Source()) - if event.Subject() != "" { - msg.Metadata.Set(CloudEventsHeaderSubject, event.Subject()) - } - - if m.transform != nil { - msg, err = m.transform(msg, event) - if err != nil { - return nil, err - } - } - - return msg, nil -} diff --git a/internal/event/publisher/transformer.go b/internal/event/publisher/transformer.go deleted file mode 100644 index a2a14e6e6..000000000 --- a/internal/event/publisher/transformer.go +++ /dev/null @@ -1,8 +0,0 @@ -package publisher - -import ( - "github.com/ThreeDotsLabs/watermill/message" - "github.com/cloudevents/sdk-go/v2/event" -) - -type TransformFunc func(watermillIn *message.Message, cloudEvent event.Event) (*message.Message, error) diff --git a/internal/event/spec/event_type.go b/internal/event/spec/event_type.go deleted file mode 100644 index 0f6de621b..000000000 --- a/internal/event/spec/event_type.go +++ /dev/null @@ -1,88 +0,0 @@ -package spec - -// TODO: move to metadata -import ( - "fmt" - "time" -) - -type ( - EventSubsystem string - EventName string - EventVersion string - EventSubjectKind string - EventSpecVersion string -) - -type EventTypeSpec struct { - // Subsystem defines which connector/component is responsible for the event (e.g. ingest, entitlements, etc) - Subsystem EventSubsystem - - // Type is the actual event type (e.g. ingestion, flush, etc) - Name EventName - - // Version is the version of the event (e.g. v1, v2, etc) - Version EventVersion - - // cloudEventType is the actual cloud event type, so that we don't have the calculate it - // for each message - cloudEventType string -} - -// TODO: "string constructor" - -func (s *EventTypeSpec) Type() string { - if s.cloudEventType != "" { - return s.cloudEventType - } - s.cloudEventType = fmt.Sprintf("io.openmeter.%s.%s.%s", s.Subsystem, s.Version, s.Name) - return s.cloudEventType -} - -type EventSpec struct { - // ID of the event - ID string - - // Time specifies when the event occurred - Time time.Time - - // Subject meta - // Examples for source and subject pairs - // grant: - // source: //openmeter.io/namespace//entitlement//grant/ - // subject: //openmeter.io/namespace//subject/ - // - // entitlement: - // source: //openmeter.io/namespace//entitlement/ - // subject: //openmeter.io/namespace//subject/ - // - // ingest: - // source: //openmeter.io/namespace//event - // subject: //openmeter.io/namespace//subject/ - Subject string - Source string -} - -type EventMetadata struct { - // ID of the event - ID string - - // Time specifies when the event occurred - Time time.Time - - // Subject meta - // Examples for source and subject pairs - // grant: - // source: //openmeter.io/namespace//entitlement//grant/ - // subject: //openmeter.io/namespace//subject/ - // - // entitlement: - // source: //openmeter.io/namespace//entitlement/ - // subject: //openmeter.io/namespace//subject/ - // - // ingest: - // source: //openmeter.io/namespace//event - // subject: //openmeter.io/namespace//subject/ - Subject string - Source string -} diff --git a/internal/event/spec/parser.go b/internal/event/spec/parser.go deleted file mode 100644 index 10fa2f2cc..000000000 --- a/internal/event/spec/parser.go +++ /dev/null @@ -1,106 +0,0 @@ -package spec - -import ( - "errors" - "fmt" - "time" - - "github.com/cloudevents/sdk-go/v2/event" - "github.com/oklog/ulid/v2" -) - -type CloudEventsPayload interface { - Spec() *EventTypeSpec - Validate() error -} - -// NewCloudEvent creates a new CloudEvent with the given event spec and payload -// example usage: -// -// ev, err := CreateCloudEvent(EventSpec{ -// ID: "123", -// Source: "test", -// }, IngestEvent{...}) -func NewCloudEvent(eventSpec EventSpec, payload CloudEventsPayload) (event.Event, error) { - // Mandatory cloud events fields - if eventSpec.Source == "" { - return event.Event{}, errors.New("source is required") - } - - meta := payload.Spec() - ev := newCloudEventFromSpec(meta, eventSpec) - - if err := payload.Validate(); err != nil { - return event.Event{}, err - } - - if err := ev.SetData("application/json", payload); err != nil { - return event.Event{}, err - } - return ev, nil -} - -// newCloudEventFromSpec generates a new cloudevents without data being set based on the event spec -func newCloudEventFromSpec(meta *EventTypeSpec, spec EventSpec) event.Event { - ev := event.New() - ev.SetType(meta.Type()) - ev.SetSpecVersion(event.CloudEventsVersionV1) - - if spec.Time.IsZero() { - ev.SetTime(time.Now()) - } else { - ev.SetTime(spec.Time) - } - - if spec.ID == "" { - ev.SetID(ulid.Make().String()) - } else { - ev.SetID(spec.ID) - } - - ev.SetSource(spec.Source) - - ev.SetSubject(spec.Subject) - return ev -} - -// ParseCloudEvent unmarshals and validates a single CloudEvent into the given payload -// example usage: -// ingest, err := ParseCloudEvent[schema.IngestEvent](ev) -func ParseCloudEvent[PayloadType CloudEventsPayload](ev event.Event) (PayloadType, error) { - var payload PayloadType - - expectedType := payload.Spec().Type() - if expectedType != ev.Type() { - return payload, fmt.Errorf("cannot parse cloud event type %s as %s (expected by target payload)", ev.Type(), expectedType) - } - - if err := ev.DataAs(&payload); err != nil { - return payload, err - } - - if err := payload.Validate(); err != nil { - return payload, err - } - - return payload, nil -} - -type ParsedCloudEvent[PayloadType CloudEventsPayload] struct { - Event event.Event - Payload PayloadType -} - -func ParseCloudEventFromBytes[PayloadType CloudEventsPayload](data []byte) (*ParsedCloudEvent[PayloadType], error) { - cloudEvent := event.Event{} - if err := cloudEvent.UnmarshalJSON(data); err != nil { - return nil, fmt.Errorf("failed to unmarshal CloudEvent: %w", err) - } - - eventBody, err := ParseCloudEvent[PayloadType](cloudEvent) - if err != nil { - return nil, fmt.Errorf("failed to parse payload: %w", err) - } - - return &ParsedCloudEvent[PayloadType]{Event: cloudEvent, Payload: eventBody}, nil -} diff --git a/internal/event/spec/parser_test.go b/internal/event/spec/parser_test.go deleted file mode 100644 index 31f47df55..000000000 --- a/internal/event/spec/parser_test.go +++ /dev/null @@ -1,80 +0,0 @@ -package spec_test - -import ( - "errors" - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/openmeterio/openmeter/internal/event/spec" -) - -type event struct { - Namespace string -} - -func (e event) Spec() *spec.EventTypeSpec { - return &spec.EventTypeSpec{ - Subsystem: "subsys", - Name: "test", - Version: "v1", - } -} - -var errNamespaceIsRequired = errors.New("namespace is required") - -func (e event) Validate() error { - if e.Namespace == "" { - return errNamespaceIsRequired - } - return nil -} - -func TestParserSanity(t *testing.T) { - cloudEvent, err := spec.NewCloudEvent( - spec.EventSpec{ - ID: "test", - Source: "somesource", - - Subject: spec.ComposeResourcePath("default", "subject", "ID"), - }, - event{ - Namespace: "test", - }) - - assert.NoError(t, err) - assert.Equal(t, "io.openmeter.subsys.v1.test", cloudEvent.Type()) - assert.Equal(t, "//openmeter.io/namespace/default/subject/ID", cloudEvent.Subject()) - assert.Equal(t, "somesource", cloudEvent.Source()) - - // parsing - parsedEvent, err := spec.ParseCloudEvent[event](cloudEvent) - assert.NoError(t, err) - assert.Equal(t, "test", parsedEvent.Namespace) - - // validation support - _, err = spec.NewCloudEvent( - spec.EventSpec{ - ID: "test", - Source: "somesource", - - Subject: spec.ComposeResourcePath("default", "subject", "ID"), - }, - event{}, - ) - - assert.Error(t, err) - assert.Equal(t, errNamespaceIsRequired, err) - - // ID autogeneration - cloudEvent, err = spec.NewCloudEvent( - spec.EventSpec{ - Source: "somesource", - }, - event{ - Namespace: "test", - }) - - assert.NoError(t, err) - assert.NotEmpty(t, cloudEvent.ID()) -} diff --git a/internal/notification/consumer/consumer.go b/internal/notification/consumer/consumer.go index 65d19f37d..5d6d299ce 100644 --- a/internal/notification/consumer/consumer.go +++ b/internal/notification/consumer/consumer.go @@ -11,10 +11,9 @@ import ( "github.com/ThreeDotsLabs/watermill/message/router/middleware" "github.com/openmeterio/openmeter/internal/entitlement/snapshot" - "github.com/openmeterio/openmeter/internal/event/publisher" - "github.com/openmeterio/openmeter/internal/event/spec" "github.com/openmeterio/openmeter/internal/registry" "github.com/openmeterio/openmeter/internal/watermill/nopublisher" + "github.com/openmeterio/openmeter/openmeter/watermill/marshaler" ) type Options struct { @@ -22,6 +21,7 @@ type Options struct { Subscriber message.Subscriber Publisher message.Publisher + Marshaler marshaler.Marshaler DLQ *DLQOptions @@ -113,21 +113,22 @@ func (w *Consumer) Close() error { func (w *Consumer) handleSystemEvent(msg *message.Message) error { w.opts.Logger.Debug("received system event", w.messageToLogFields(msg)...) - ceType, found := msg.Metadata[publisher.CloudEventsHeaderType] + ceType, found := msg.Metadata[marshaler.CloudEventsHeaderType] if !found { w.opts.Logger.Warn("missing CloudEvents type, ignoring message") return nil } switch ceType { - case snapshot.SnapshotEvent{}.Spec().Type(): - event, err := spec.ParseCloudEventFromBytes[snapshot.SnapshotEvent](msg.Payload) - if err != nil { + case snapshot.SnapshotEvent{}.EventName(): + event := snapshot.SnapshotEvent{} + + if err := w.opts.Marshaler.Unmarshal(msg, &event); err != nil { w.opts.Logger.Error("failed to parse entitlement created event", w.messageToLogFields(msg)...) return err } - return w.handleSnapshotEvent(msg.Context(), event.Payload) + return w.handleSnapshotEvent(msg.Context(), event) } return nil } diff --git a/internal/registry/entitlement.go b/internal/registry/entitlement.go index af8a72157..800b98141 100644 --- a/internal/registry/entitlement.go +++ b/internal/registry/entitlement.go @@ -3,9 +3,9 @@ package registry import ( "log/slog" - "github.com/ThreeDotsLabs/watermill/components/cqrs" "github.com/openmeterio/openmeter/internal/ent/db" "github.com/openmeterio/openmeter/internal/meter" + "github.com/openmeterio/openmeter/internal/watermill/eventbus" "github.com/openmeterio/openmeter/openmeter/credit" "github.com/openmeterio/openmeter/openmeter/entitlement" meteredentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/metered" @@ -30,6 +30,5 @@ type EntitlementOptions struct { StreamingConnector streaming.Connector Logger *slog.Logger MeterRepository meter.Repository - // TODO: let's have an interface for the publisher, instead of watermill deps - EventBus *cqrs.EventBus + Publisher eventbus.Publisher } diff --git a/internal/sink/flushhandler/ingestnotification/events.go b/internal/sink/flushhandler/ingestnotification/events/events.go similarity index 60% rename from internal/sink/flushhandler/ingestnotification/events.go rename to internal/sink/flushhandler/ingestnotification/events/events.go index bcffbc77e..cfd512dcf 100644 --- a/internal/sink/flushhandler/ingestnotification/events.go +++ b/internal/sink/flushhandler/ingestnotification/events/events.go @@ -1,32 +1,35 @@ -package ingestnotification +package events import ( "errors" + "github.com/openmeterio/openmeter/internal/event/metadata" "github.com/openmeterio/openmeter/internal/event/models" - "github.com/openmeterio/openmeter/internal/event/spec" + "github.com/openmeterio/openmeter/openmeter/watermill/marshaler" ) const ( - EventSubsystem spec.EventSubsystem = "ingest" -) - -const ( - ingestedEventName spec.EventName = "events.ingested" + EventSubsystem metadata.EventSubsystem = "ingest" ) type EventBatchedIngest struct { Events []IngestEventData `json:"events"` } -var batchIngestEventSpec = spec.EventTypeSpec{ - Subsystem: EventSubsystem, - Name: ingestedEventName, - Version: "v1", -} +var ( + _ marshaler.Event = EventBatchedIngest{} + + batchIngestEventType = metadata.EventType{ + Subsystem: EventSubsystem, + Name: "events.ingested", + Version: "v1", + } + batchIngestEventName = metadata.GetEventName(batchIngestEventType) + EventVersionSubsystem = batchIngestEventType.VersionSubsystem() +) -func (b EventBatchedIngest) Spec() *spec.EventTypeSpec { - return &batchIngestEventSpec +func (b EventBatchedIngest) EventName() string { + return batchIngestEventName } func (b EventBatchedIngest) Validate() error { @@ -45,6 +48,12 @@ func (b EventBatchedIngest) Validate() error { return finalErr } +func (b EventBatchedIngest) EventMetadata() metadata.EventMetadata { + return metadata.EventMetadata{ + Source: metadata.ComposeResourcePathRaw(string(EventSubsystem)), + } +} + type IngestEventData struct { Namespace models.NamespaceID `json:"namespace"` SubjectKey string `json:"subjectKey"` diff --git a/internal/sink/flushhandler/ingestnotification/handler.go b/internal/sink/flushhandler/ingestnotification/handler.go index c5b102be2..7e2af7626 100644 --- a/internal/sink/flushhandler/ingestnotification/handler.go +++ b/internal/sink/flushhandler/ingestnotification/handler.go @@ -8,16 +8,16 @@ import ( "go.opentelemetry.io/otel/metric" eventmodels "github.com/openmeterio/openmeter/internal/event/models" - "github.com/openmeterio/openmeter/internal/event/spec" "github.com/openmeterio/openmeter/internal/sink/flushhandler" + ingestevents "github.com/openmeterio/openmeter/internal/sink/flushhandler/ingestnotification/events" sinkmodels "github.com/openmeterio/openmeter/internal/sink/models" - "github.com/openmeterio/openmeter/openmeter/event/publisher" + "github.com/openmeterio/openmeter/internal/watermill/eventbus" "github.com/openmeterio/openmeter/pkg/models" "github.com/openmeterio/openmeter/pkg/slicesx" ) type handler struct { - publisher publisher.TopicPublisher + publisher eventbus.Publisher logger *slog.Logger config HandlerConfig } @@ -26,7 +26,7 @@ type HandlerConfig struct { MaxEventsInBatch int } -func NewHandler(logger *slog.Logger, metricMeter metric.Meter, publisher publisher.TopicPublisher, config HandlerConfig) (flushhandler.FlushEventHandler, error) { +func NewHandler(logger *slog.Logger, metricMeter metric.Meter, publisher eventbus.Publisher, config HandlerConfig) (flushhandler.FlushEventHandler, error) { handler := &handler{ publisher: publisher, logger: logger, @@ -59,8 +59,8 @@ func (h *handler) OnFlushSuccess(ctx context.Context, events []sinkmodels.SinkMe } // Map the filtered events to the ingest event - iEvents := slicesx.Map(filtered, func(message sinkmodels.SinkMessage) IngestEventData { - return IngestEventData{ + iEvents := slicesx.Map(filtered, func(message sinkmodels.SinkMessage) ingestevents.IngestEventData { + return ingestevents.IngestEventData{ Namespace: eventmodels.NamespaceID{ID: message.Namespace}, SubjectKey: message.Serialized.Subject, MeterSlugs: h.getMeterSlugsFromMeters(message.Meters), @@ -70,18 +70,7 @@ func (h *handler) OnFlushSuccess(ctx context.Context, events []sinkmodels.SinkMe // We need to chunk the events to not exceed message size limits chunkedEvents := slicesx.Chunk(iEvents, h.config.MaxEventsInBatch) for _, chunk := range chunkedEvents { - event, err := spec.NewCloudEvent(spec.EventSpec{ - Source: spec.ComposeResourcePathRaw(string(EventBatchedIngest{}.Spec().Subsystem)), - }, EventBatchedIngest{ - Events: chunk, - }) - if err != nil { - finalErr = errors.Join(finalErr, err) - h.logger.Error("failed to create change notification", "error", err) - continue - } - - if err := h.publisher.Publish(event); err != nil { + if err := h.publisher.Publish(ctx, ingestevents.EventBatchedIngest{Events: chunk}); err != nil { finalErr = errors.Join(finalErr, err) h.logger.Error("failed to publish change notification", "error", err) } diff --git a/internal/watermill/driver/kafka/marshaler.go b/internal/watermill/driver/kafka/marshaler.go index 9a2303013..ab089b810 100644 --- a/internal/watermill/driver/kafka/marshaler.go +++ b/internal/watermill/driver/kafka/marshaler.go @@ -5,6 +5,8 @@ import ( "github.com/ThreeDotsLabs/watermill-kafka/v3/pkg/kafka" "github.com/ThreeDotsLabs/watermill/message" "github.com/cloudevents/sdk-go/v2/event" + + "github.com/openmeterio/openmeter/pkg/slicesx" ) const ( @@ -24,6 +26,9 @@ func (m marshalerWithPartitionKey) Marshal(topic string, msg *message.Message) ( partitionKey := msg.Metadata.Get(PartitionKeyMetadataKey) if partitionKey != "" { kafkaMsg.Key = sarama.ByteEncoder(partitionKey) + kafkaMsg.Headers = slicesx.Filter(kafkaMsg.Headers, func(header sarama.RecordHeader) bool { + return string(header.Key) != PartitionKeyMetadataKey + }) } return kafkaMsg, nil diff --git a/internal/watermill/driver/kafka/publisher.go b/internal/watermill/driver/kafka/publisher.go index 9a0d8f2d7..40a652c44 100644 --- a/internal/watermill/driver/kafka/publisher.go +++ b/internal/watermill/driver/kafka/publisher.go @@ -62,11 +62,14 @@ func NewPublisher(ctx context.Context, in PublisherOptions) (*kafka.Publisher, e Brokers: []string{in.KafkaConfig.Broker}, OverwriteSaramaConfig: sarama.NewConfig(), Marshaler: marshalerWithPartitionKey{}, - OTELEnabled: true, // This relies on the global trace provider + Tracer: kafka.NewOTELSaramaTracer(), // This relies on the global trace provider } wmConfig.OverwriteSaramaConfig.Metadata.RefreshFrequency = in.KafkaConfig.TopicMetadataRefreshInterval.Duration() - wmConfig.OverwriteSaramaConfig.ClientID = "openmeter/balance-worker" + if in.ClientID == "" { + return nil, errors.New("client ID is required") + } + wmConfig.OverwriteSaramaConfig.ClientID = fmt.Sprintf("%s-publisher", in.ClientID) // These are globals, so we cannot append the publisher/subscriber name to them sarama.Logger = &SaramaLoggerAdaptor{ diff --git a/internal/watermill/eventbus/eventbus.go b/internal/watermill/eventbus/eventbus.go new file mode 100644 index 000000000..6696e542e --- /dev/null +++ b/internal/watermill/eventbus/eventbus.go @@ -0,0 +1,90 @@ +package eventbus + +import ( + "context" + "log/slog" + "strings" + "testing" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/components/cqrs" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/stretchr/testify/assert" + + "github.com/openmeterio/openmeter/config" + ingestevents "github.com/openmeterio/openmeter/internal/sink/flushhandler/ingestnotification/events" + "github.com/openmeterio/openmeter/internal/watermill/driver/noop" + "github.com/openmeterio/openmeter/openmeter/watermill/marshaler" +) + +type Options struct { + Publisher message.Publisher + Config config.EventsConfiguration + Logger *slog.Logger + MarshalerTransformFunc marshaler.TransformFunc +} + +type Publisher interface { + Publish(ctx context.Context, event marshaler.Event) error + + Marshaler() marshaler.Marshaler +} + +type publisher struct { + eventBus *cqrs.EventBus + marshaler marshaler.Marshaler +} + +func (p publisher) Publish(ctx context.Context, event marshaler.Event) error { + return p.eventBus.Publish(ctx, event) +} + +func (p publisher) Marshaler() marshaler.Marshaler { + return p.marshaler +} + +func New(opts Options) (Publisher, error) { + marshaler := marshaler.New(opts.MarshalerTransformFunc) + + ingestVersionSubsystemPrefix := ingestevents.EventVersionSubsystem + "." + + eventBus, err := cqrs.NewEventBusWithConfig(opts.Publisher, cqrs.EventBusConfig{ + GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) { + switch { + case strings.HasPrefix(params.EventName, ingestVersionSubsystemPrefix): + return opts.Config.IngestEvents.Topic, nil + default: + return opts.Config.SystemEvents.Topic, nil + } + }, + + Marshaler: marshaler, + Logger: watermill.NewSlogLogger(opts.Logger), + }) + if err != nil { + return nil, err + } + + return publisher{ + eventBus: eventBus, + marshaler: marshaler, + }, nil +} + +func NewMock(t *testing.T) Publisher { + eventBus, err := New(Options{ + Publisher: &noop.Publisher{}, + Config: config.EventsConfiguration{ + SystemEvents: config.EventSubsystemConfiguration{ + Topic: "test", + }, + IngestEvents: config.EventSubsystemConfiguration{ + Topic: "test", + }, + }, + Logger: slog.Default(), + }) + + assert.NoError(t, err) + return eventBus +} diff --git a/openmeter/credit/adapters.go b/openmeter/credit/adapters.go index 9e8a528ea..69f645f68 100644 --- a/openmeter/credit/adapters.go +++ b/openmeter/credit/adapters.go @@ -5,7 +5,7 @@ import ( "time" "github.com/openmeterio/openmeter/internal/credit" - "github.com/openmeterio/openmeter/internal/event/publisher" + "github.com/openmeterio/openmeter/internal/watermill/eventbus" "github.com/openmeterio/openmeter/openmeter/streaming" ) @@ -16,7 +16,7 @@ func NewCreditConnector( streamingConnector streaming.Connector, logger *slog.Logger, granularity time.Duration, - publisher publisher.TopicPublisher, + publisher eventbus.Publisher, ) CreditConnector { return credit.NewCreditConnector( grantRepo, diff --git a/openmeter/entitlement/adapters.go b/openmeter/entitlement/adapters.go index 22d187492..61528d84a 100644 --- a/openmeter/entitlement/adapters.go +++ b/openmeter/entitlement/adapters.go @@ -1,8 +1,8 @@ package entitlement import ( - "github.com/ThreeDotsLabs/watermill/components/cqrs" "github.com/openmeterio/openmeter/internal/entitlement" + "github.com/openmeterio/openmeter/internal/watermill/eventbus" "github.com/openmeterio/openmeter/openmeter/meter" "github.com/openmeterio/openmeter/openmeter/productcatalog" ) @@ -14,7 +14,7 @@ func NewEntitlementConnector( metered SubTypeConnector, static SubTypeConnector, boolean SubTypeConnector, - eventBus *cqrs.EventBus, + eventBus eventbus.Publisher, ) EntitlementConnector { return entitlement.NewEntitlementConnector(edb, fc, meterRepo, metered, static, boolean, eventBus) } diff --git a/openmeter/entitlement/metered/adapters.go b/openmeter/entitlement/metered/adapters.go index 7be86cca3..d9e3353ac 100644 --- a/openmeter/entitlement/metered/adapters.go +++ b/openmeter/entitlement/metered/adapters.go @@ -4,9 +4,9 @@ import ( "log/slog" meteredentitlement "github.com/openmeterio/openmeter/internal/entitlement/metered" + "github.com/openmeterio/openmeter/internal/watermill/eventbus" "github.com/openmeterio/openmeter/openmeter/credit" "github.com/openmeterio/openmeter/openmeter/entitlement" - "github.com/openmeterio/openmeter/openmeter/event/publisher" "github.com/openmeterio/openmeter/openmeter/meter" "github.com/openmeterio/openmeter/openmeter/productcatalog" "github.com/openmeterio/openmeter/openmeter/streaming" @@ -19,7 +19,7 @@ func NewMeteredEntitlementConnector( grantConnector credit.GrantConnector, grantRepo credit.GrantRepo, entitlementRepo entitlement.EntitlementRepo, - publisher publisher.TopicPublisher, + publisher eventbus.Publisher, ) Connector { return meteredentitlement.NewMeteredEntitlementConnector( streamingConnector, diff --git a/openmeter/event/publisher/publisher.go b/openmeter/event/publisher/publisher.go deleted file mode 100644 index c5aec9318..000000000 --- a/openmeter/event/publisher/publisher.go +++ /dev/null @@ -1,22 +0,0 @@ -package publisher - -import "github.com/openmeterio/openmeter/internal/event/publisher" - -type ( - Publisher = publisher.Publisher - PublisherOptions = publisher.PublisherOptions - TopicPublisher = publisher.TopicPublisher - CloudEventMarshaler = publisher.CloudEventMarshaler -) - -type ( - TransformFunc = publisher.TransformFunc -) - -func NewPublisher(options PublisherOptions) (Publisher, error) { - return publisher.NewPublisher(options) -} - -func NewCloudEventMarshaler(transform TransformFunc) CloudEventMarshaler { - return publisher.NewCloudEventMarshaler(transform) -} diff --git a/openmeter/sink/flushhandler/ingestnotification/events/events.go b/openmeter/sink/flushhandler/ingestnotification/events/events.go new file mode 100644 index 000000000..c83e35523 --- /dev/null +++ b/openmeter/sink/flushhandler/ingestnotification/events/events.go @@ -0,0 +1,14 @@ +package events + +import "github.com/openmeterio/openmeter/internal/sink/flushhandler/ingestnotification/events" + +const ( + EventSubsystem = events.EventSubsystem +) + +var EventVersionSubsystem = events.EventVersionSubsystem + +type ( + IngestEventData = events.IngestEventData + EventBatchedIngest = events.EventBatchedIngest +) diff --git a/openmeter/sink/flushhandler/ingestnotification/ingestnotification.go b/openmeter/sink/flushhandler/ingestnotification/ingestnotification.go index 9300474c7..51da240d2 100644 --- a/openmeter/sink/flushhandler/ingestnotification/ingestnotification.go +++ b/openmeter/sink/flushhandler/ingestnotification/ingestnotification.go @@ -6,22 +6,16 @@ import ( "go.opentelemetry.io/otel/metric" "github.com/openmeterio/openmeter/internal/sink/flushhandler/ingestnotification" - "github.com/openmeterio/openmeter/openmeter/event/publisher" "github.com/openmeterio/openmeter/openmeter/sink/flushhandler" + "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" ) // Event types -const ( - EventSubsystem = ingestnotification.EventSubsystem -) - type ( - IngestEventData = ingestnotification.IngestEventData - EventBatchedIngest = ingestnotification.EventBatchedIngest - HandlerConfig = ingestnotification.HandlerConfig + HandlerConfig = ingestnotification.HandlerConfig ) // Ingest notification handler -func NewHandler(logger *slog.Logger, metricMeter metric.Meter, publisher publisher.TopicPublisher, config ingestnotification.HandlerConfig) (flushhandler.FlushEventHandler, error) { +func NewHandler(logger *slog.Logger, metricMeter metric.Meter, publisher eventbus.Publisher, config ingestnotification.HandlerConfig) (flushhandler.FlushEventHandler, error) { return ingestnotification.NewHandler(logger, metricMeter, publisher, config) } diff --git a/openmeter/watermill/eventbus/eventbus.go b/openmeter/watermill/eventbus/eventbus.go new file mode 100644 index 000000000..fbb12aa3d --- /dev/null +++ b/openmeter/watermill/eventbus/eventbus.go @@ -0,0 +1,20 @@ +package eventbus + +import ( + "testing" + + "github.com/openmeterio/openmeter/internal/watermill/eventbus" +) + +type ( + Publisher = eventbus.Publisher + Options = eventbus.Options +) + +func New(options Options) (Publisher, error) { + return eventbus.New(options) +} + +func NewMock(t *testing.T) Publisher { + return eventbus.NewMock(t) +} diff --git a/openmeter/watermill/marshaler/marshaler.go b/openmeter/watermill/marshaler/marshaler.go index db04c8e2f..6209b875a 100644 --- a/openmeter/watermill/marshaler/marshaler.go +++ b/openmeter/watermill/marshaler/marshaler.go @@ -10,7 +10,8 @@ import ( "github.com/ThreeDotsLabs/watermill/message" cloudevents "github.com/cloudevents/sdk-go/v2/event" "github.com/oklog/ulid/v2" - "github.com/openmeterio/openmeter/internal/event/spec" + + "github.com/openmeterio/openmeter/internal/event/metadata" ) const ( @@ -20,22 +21,32 @@ const ( CloudEventsHeaderSubject = "ce_subject" ) +const ( + UnknownEventName = "io.openmeter.unknown" +) + type TransformFunc func(watermillIn *message.Message, cloudEvent cloudevents.Event) (*message.Message, error) -type event interface { +type Marshaler = cqrs.CommandEventMarshaler + +type Event interface { EventName() string - EventMetadata() spec.EventMetadata + EventMetadata() metadata.EventMetadata Validate() error } -type marshaler struct{} +type marshaler struct { + transform TransformFunc +} -func New() cqrs.CommandEventMarshaler { - return &marshaler{} +func New(transform TransformFunc) Marshaler { + return &marshaler{ + transform: transform, + } } func (m *marshaler) Marshal(v interface{}) (*message.Message, error) { - ev, ok := v.(event) + ev, ok := v.(Event) if !ok { return nil, errors.New("invalid event type") } @@ -61,19 +72,17 @@ func (m *marshaler) Marshal(v interface{}) (*message.Message, error) { msg.Metadata.Set(CloudEventsHeaderSubject, ce.Subject()) } - /* - // TODO! - if m.transform != nil { - msg, err = m.transform(msg, event) - if err != nil { - return nil, err - } - }*/ + if m.transform != nil { + msg, err = m.transform(msg, ce) + if err != nil { + return nil, err + } + } return msg, nil } -func NewCloudEvent(ev event) (cloudevents.Event, error) { +func NewCloudEvent(ev Event) (cloudevents.Event, error) { metadata := ev.EventMetadata() // Mandatory cloud events fields if metadata.Source == "" { @@ -116,14 +125,25 @@ func (m *marshaler) Unmarshal(msg *message.Message, v interface{}) error { return fmt.Errorf("failed to unmarshal CloudEvent: %w", err) } - return json.Unmarshal(cloudEvent.Data(), v) + err := json.Unmarshal(cloudEvent.Data(), v) + if err != nil { + return err + } + + ev, ok := v.(Event) + if !ok { + return errors.New("invalid event type") + } + + return ev.Validate() } func (m *marshaler) Name(v interface{}) string { - ev, ok := v.(event) + ev, ok := v.(Event) if !ok { - // TODO: how to report error - return "TODO" + // This event name is passed to most of the cqrs functions, but given that we cannot + // return an error here, we are generating a name that's unlikely to match any event. + return UnknownEventName } return ev.EventName() diff --git a/openmeter/watermill/marshaler/source.go b/openmeter/watermill/marshaler/source.go new file mode 100644 index 000000000..08f6ead52 --- /dev/null +++ b/openmeter/watermill/marshaler/source.go @@ -0,0 +1,44 @@ +package marshaler + +import ( + "errors" + + "github.com/openmeterio/openmeter/internal/event/metadata" +) + +type eventWithSource struct { + Event `json:",inline"` + + source string `json:"-"` +} + +// WithSource can be used to add the CloudEvents source field to an event. +func WithSource(source string, ev Event) Event { + return &eventWithSource{ + source: source, + Event: ev, + } +} + +func (e *eventWithSource) EventMetadata() metadata.EventMetadata { + metadata := e.Event.EventMetadata() + metadata.Source = e.source + + return metadata +} + +func (e *eventWithSource) Validate() error { + if err := e.Event.Validate(); err != nil { + return err + } + + if e.source == "" { + return errors.New("source must be set") + } + + return nil +} + +func (e *eventWithSource) EventName() string { + return e.Event.EventName() +} diff --git a/test/entitlement/regression/framework_test.go b/test/entitlement/regression/framework_test.go index 69b8817d3..3680cca7d 100644 --- a/test/entitlement/regression/framework_test.go +++ b/test/entitlement/regression/framework_test.go @@ -16,12 +16,12 @@ import ( booleanentitlement "github.com/openmeterio/openmeter/internal/entitlement/boolean" meteredentitlement "github.com/openmeterio/openmeter/internal/entitlement/metered" staticentitlement "github.com/openmeterio/openmeter/internal/entitlement/static" - "github.com/openmeterio/openmeter/internal/event/publisher" "github.com/openmeterio/openmeter/internal/meter" "github.com/openmeterio/openmeter/internal/productcatalog" productcatalogrepo "github.com/openmeterio/openmeter/internal/productcatalog/adapter" streamingtestutils "github.com/openmeterio/openmeter/internal/streaming/testutils" "github.com/openmeterio/openmeter/internal/testutils" + "github.com/openmeterio/openmeter/internal/watermill/eventbus" "github.com/openmeterio/openmeter/pkg/models" ) @@ -89,6 +89,8 @@ func setupDependencies(t *testing.T) Dependencies { entitlementRepo := entitlementrepo.NewPostgresEntitlementRepo(dbClient) usageResetRepo := entitlementrepo.NewPostgresUsageResetRepo(dbClient) + mockPublisher := eventbus.NewMock(t) + owner := meteredentitlement.NewEntitlementGrantOwnerAdapter( featureRepo, entitlementRepo, @@ -104,7 +106,7 @@ func setupDependencies(t *testing.T) Dependencies { streaming, log, time.Minute, - publisher.NewMockTopicPublisher(t), + mockPublisher, ) meteredEntitlementConnector := meteredentitlement.NewMeteredEntitlementConnector( @@ -114,7 +116,7 @@ func setupDependencies(t *testing.T) Dependencies { creditConnector, grantRepo, entitlementRepo, - publisher.NewMockTopicPublisher(t), + mockPublisher, ) staticEntitlementConnector := staticentitlement.NewStaticEntitlementConnector() @@ -127,7 +129,7 @@ func setupDependencies(t *testing.T) Dependencies { meteredEntitlementConnector, staticEntitlementConnector, booleanEntitlementConnector, - publisher.NewMockTopicPublisher(t), + mockPublisher, ) return Dependencies{ From f680c38a3976f5ad12c80922204d37f0f13fda02 Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Wed, 7 Aug 2024 12:56:57 +0200 Subject: [PATCH 4/5] fix: expose event publisher interface --- openmeter/event/event.go | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 openmeter/event/event.go diff --git a/openmeter/event/event.go b/openmeter/event/event.go new file mode 100644 index 000000000..0df0b1ace --- /dev/null +++ b/openmeter/event/event.go @@ -0,0 +1,5 @@ +package event + +import "github.com/openmeterio/openmeter/internal/event" + +type Publisher = event.Publisher From 5419b3ccf3ff5ee8b6ca2d9943d9fbdbb2ccbac1 Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Wed, 7 Aug 2024 15:31:05 +0200 Subject: [PATCH 5/5] fix: marshaling of WithSubject events If a struct is composed with an interface the go json marshaler treats that as a field, thus we need to have a custom json marshaler for the type. --- openmeter/watermill/marshaler/source.go | 7 +++ openmeter/watermill/marshaler/source_test.go | 47 ++++++++++++++++++++ 2 files changed, 54 insertions(+) create mode 100644 openmeter/watermill/marshaler/source_test.go diff --git a/openmeter/watermill/marshaler/source.go b/openmeter/watermill/marshaler/source.go index 08f6ead52..160b647ea 100644 --- a/openmeter/watermill/marshaler/source.go +++ b/openmeter/watermill/marshaler/source.go @@ -1,6 +1,7 @@ package marshaler import ( + "encoding/json" "errors" "github.com/openmeterio/openmeter/internal/event/metadata" @@ -42,3 +43,9 @@ func (e *eventWithSource) Validate() error { func (e *eventWithSource) EventName() string { return e.Event.EventName() } + +// MarshalJSON marshals the event only, as JSON library embeds the Event name into the output, +// if the composed object is a pointer to an interface. (e.g. we would get "Event": {} in the payload) +func (e *eventWithSource) MarshalJSON() ([]byte, error) { + return json.Marshal(e.Event) +} diff --git a/openmeter/watermill/marshaler/source_test.go b/openmeter/watermill/marshaler/source_test.go new file mode 100644 index 000000000..c5cb04d82 --- /dev/null +++ b/openmeter/watermill/marshaler/source_test.go @@ -0,0 +1,47 @@ +package marshaler + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/openmeterio/openmeter/internal/event/metadata" +) + +type event struct { + Value string `json:"value"` +} + +func (e *event) EventMetadata() metadata.EventMetadata { + return metadata.EventMetadata{} +} + +func (e *event) Validate() error { + return nil +} + +func (e *event) EventName() string { + return "event" +} + +func TestWithSubject(t *testing.T) { + marshaler := New(nil) + + ev := &event{ + Value: "value", + } + + evWithSource := WithSource("source", ev) + msg, err := marshaler.Marshal(evWithSource) + + // Check if the source is set in the metadata + assert.NoError(t, err) + assert.Equal(t, "source", msg.Metadata.Get(CloudEventsHeaderSource)) + + // Check if the event can be unmarshaled + unmarshaledEvent := &event{} + err = marshaler.Unmarshal(msg, unmarshaledEvent) + assert.NoError(t, err) + + assert.Equal(t, ev, unmarshaledEvent) +}