Skip to content

Commit

Permalink
chore: expose event subsystem for the cloud deployment
Browse files Browse the repository at this point in the history
  • Loading branch information
turip committed Jul 26, 2024
1 parent d9f2427 commit 4acd15c
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 1 deletion.
2 changes: 2 additions & 0 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,8 @@ func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Co
return nil, fmt.Errorf("failed to create Kafka admin client: %w", err)
}

defer adminClient.Close()

if err := pkgkafka.ProvisionTopic(ctx, adminClient, logger, conf.Events.SystemEvents.Topic, conf.Events.SystemEvents.AutoProvision.Partitions); err != nil {
return nil, fmt.Errorf("failed to auto provision topic: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/watermill/driver/kafka/publisher.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package noop
package kafka

import (
"github.com/ThreeDotsLabs/watermill/message"
Expand Down
17 changes: 17 additions & 0 deletions openmeter/credit/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package credit

import "github.com/openmeterio/openmeter/internal/credit"

const (
EventSubsystem = credit.EventSubsystem
)

const (
EventCreateGrant = credit.EventCreateGrant
EventVoidGrant = credit.EventVoidGrant
)

type (
GrantCreatedEvent = credit.GrantCreatedEvent
GrantVoidedEvent = credit.GrantVoidedEvent
)
17 changes: 17 additions & 0 deletions openmeter/entitlement/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package entitlement

import "github.com/openmeterio/openmeter/internal/entitlement"

const (
EventSubsystem = entitlement.EventSubsystem
)

const (
EventCreateEntitlement = entitlement.EventCreateEntitlement
EventDeleteEntitlement = entitlement.EventDeleteEntitlement
)

type (
EntitlementCreatedEvent = entitlement.EntitlementCreatedEvent
EntitlementDeletedEvent = entitlement.EntitlementDeletedEvent
)
15 changes: 15 additions & 0 deletions openmeter/entitlement/metered/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package meteredentitlement

import meteredentitlement "github.com/openmeterio/openmeter/internal/entitlement/metered"

const (
EventSubsystem = meteredentitlement.EventSubsystem
)

const (
EventResetEntitlementUsage = meteredentitlement.EventResetEntitlementUsage
)

type (
ResetEntitlementEvent = meteredentitlement.ResetEntitlementEvent
)
4 changes: 4 additions & 0 deletions openmeter/event/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ type (
PublisherOptions = publisher.PublisherOptions
TopicPublisher = publisher.TopicPublisher
)

func NewPublisher(options PublisherOptions) (Publisher, error) {
return publisher.NewPublisher(options)
}
25 changes: 25 additions & 0 deletions openmeter/watermill/driver/kafka/driver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package kafka

import (
"github.com/ThreeDotsLabs/watermill/message"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"

watermillkafka "github.com/openmeterio/openmeter/internal/watermill/driver/kafka"
)

const (
PartitionKeyMetadataKey = watermillkafka.PartitionKeyMetadataKey
)

type (
Publisher = watermillkafka.Publisher
)

func NewPublisher(producer *kafka.Producer) *Publisher {
return watermillkafka.NewPublisher(producer)
}

func AddPartitionKeyFromSubject(watermillIn *message.Message, cloudEvent event.Event) (*message.Message, error) {
return watermillkafka.AddPartitionKeyFromSubject(watermillIn, cloudEvent)
}
7 changes: 7 additions & 0 deletions openmeter/watermill/driver/noop/driver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package noop

import "github.com/openmeterio/openmeter/internal/watermill/driver/noop"

type (
Publisher = noop.Publisher
)

0 comments on commit 4acd15c

Please sign in to comment.