diff --git a/cmd/server/main.go b/cmd/server/main.go index a2c9bb02b..eb3bcf773 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -499,6 +499,8 @@ func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Co return nil, fmt.Errorf("failed to create Kafka admin client: %w", err) } + defer adminClient.Close() + if err := pkgkafka.ProvisionTopic(ctx, adminClient, logger, conf.Events.SystemEvents.Topic, conf.Events.SystemEvents.AutoProvision.Partitions); err != nil { return nil, fmt.Errorf("failed to auto provision topic: %w", err) } diff --git a/internal/watermill/driver/kafka/publisher.go b/internal/watermill/driver/kafka/publisher.go index c5ae308f4..60ce6a123 100644 --- a/internal/watermill/driver/kafka/publisher.go +++ b/internal/watermill/driver/kafka/publisher.go @@ -1,4 +1,4 @@ -package noop +package kafka import ( "github.com/ThreeDotsLabs/watermill/message" diff --git a/openmeter/credit/events.go b/openmeter/credit/events.go new file mode 100644 index 000000000..895ba8846 --- /dev/null +++ b/openmeter/credit/events.go @@ -0,0 +1,17 @@ +package credit + +import "github.com/openmeterio/openmeter/internal/credit" + +const ( + EventSubsystem = credit.EventSubsystem +) + +const ( + EventCreateGrant = credit.EventCreateGrant + EventVoidGrant = credit.EventVoidGrant +) + +type ( + GrantCreatedEvent = credit.GrantCreatedEvent + GrantVoidedEvent = credit.GrantVoidedEvent +) diff --git a/openmeter/entitlement/events.go b/openmeter/entitlement/events.go new file mode 100644 index 000000000..ae0ca3ad4 --- /dev/null +++ b/openmeter/entitlement/events.go @@ -0,0 +1,17 @@ +package entitlement + +import "github.com/openmeterio/openmeter/internal/entitlement" + +const ( + EventSubsystem = entitlement.EventSubsystem +) + +const ( + EventCreateEntitlement = entitlement.EventCreateEntitlement + EventDeleteEntitlement = entitlement.EventDeleteEntitlement +) + +type ( + EntitlementCreatedEvent = entitlement.EntitlementCreatedEvent + EntitlementDeletedEvent = entitlement.EntitlementDeletedEvent +) diff --git a/openmeter/entitlement/metered/events.go b/openmeter/entitlement/metered/events.go new file mode 100644 index 000000000..a306bf349 --- /dev/null +++ b/openmeter/entitlement/metered/events.go @@ -0,0 +1,15 @@ +package meteredentitlement + +import meteredentitlement "github.com/openmeterio/openmeter/internal/entitlement/metered" + +const ( + EventSubsystem = meteredentitlement.EventSubsystem +) + +const ( + EventResetEntitlementUsage = meteredentitlement.EventResetEntitlementUsage +) + +type ( + ResetEntitlementEvent = meteredentitlement.ResetEntitlementEvent +) diff --git a/openmeter/event/publisher/publisher.go b/openmeter/event/publisher/publisher.go index 37d72a277..ddf243c4f 100644 --- a/openmeter/event/publisher/publisher.go +++ b/openmeter/event/publisher/publisher.go @@ -7,3 +7,7 @@ type ( PublisherOptions = publisher.PublisherOptions TopicPublisher = publisher.TopicPublisher ) + +func NewPublisher(options PublisherOptions) (Publisher, error) { + return publisher.NewPublisher(options) +} diff --git a/openmeter/watermill/driver/kafka/driver.go b/openmeter/watermill/driver/kafka/driver.go new file mode 100644 index 000000000..1c6a1b62d --- /dev/null +++ b/openmeter/watermill/driver/kafka/driver.go @@ -0,0 +1,25 @@ +package kafka + +import ( + "github.com/ThreeDotsLabs/watermill/message" + "github.com/cloudevents/sdk-go/v2/event" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + + watermillkafka "github.com/openmeterio/openmeter/internal/watermill/driver/kafka" +) + +const ( + PartitionKeyMetadataKey = watermillkafka.PartitionKeyMetadataKey +) + +type ( + Publisher = watermillkafka.Publisher +) + +func NewPublisher(producer *kafka.Producer) *Publisher { + return watermillkafka.NewPublisher(producer) +} + +func AddPartitionKeyFromSubject(watermillIn *message.Message, cloudEvent event.Event) (*message.Message, error) { + return watermillkafka.AddPartitionKeyFromSubject(watermillIn, cloudEvent) +} diff --git a/openmeter/watermill/driver/noop/driver.go b/openmeter/watermill/driver/noop/driver.go new file mode 100644 index 000000000..dad5a47ee --- /dev/null +++ b/openmeter/watermill/driver/noop/driver.go @@ -0,0 +1,7 @@ +package noop + +import "github.com/openmeterio/openmeter/internal/watermill/driver/noop" + +type ( + Publisher = noop.Publisher +)