Skip to content

Commit

Permalink
Merge pull request #1314 from openmeterio/refactor/use-watermill-even…
Browse files Browse the repository at this point in the history
…t-bus

refactor: use watermill event bus
  • Loading branch information
turip authored Aug 7, 2024
2 parents 034e4cd + 5419b3c commit 7f7da0b
Show file tree
Hide file tree
Showing 43 changed files with 755 additions and 739 deletions.
51 changes: 20 additions & 31 deletions cmd/balance-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ import (
"github.com/openmeterio/openmeter/internal/ent/db"
entitlementpgadapter "github.com/openmeterio/openmeter/internal/entitlement/adapter"
"github.com/openmeterio/openmeter/internal/entitlement/balanceworker"
"github.com/openmeterio/openmeter/internal/event/publisher"
"github.com/openmeterio/openmeter/internal/meter"
"github.com/openmeterio/openmeter/internal/registry"
registrybuilder "github.com/openmeterio/openmeter/internal/registry/builder"
"github.com/openmeterio/openmeter/internal/streaming/clickhouse_connector"
"github.com/openmeterio/openmeter/internal/watermill/driver/kafka"
watermillkafka "github.com/openmeterio/openmeter/internal/watermill/driver/kafka"
"github.com/openmeterio/openmeter/internal/watermill/eventbus"
"github.com/openmeterio/openmeter/pkg/contextx"
"github.com/openmeterio/openmeter/pkg/framework/entutils"
"github.com/openmeterio/openmeter/pkg/framework/operation"
Expand Down Expand Up @@ -230,26 +231,37 @@ func main() {
}

// Create publisher
publishers, err := initEventPublisher(ctx, logger, conf, metricMeter)
eventPublisherDriver, err := initEventPublisherDriver(ctx, logger, conf, metricMeter)
if err != nil {
logger.Error("failed to initialize event publisher", slog.String("error", err.Error()))
os.Exit(1)
}

defer func() {
// We are using sync publishing, so it's fine to close the publisher using defers.
if err := publishers.watermillPublisher.Close(); err != nil {
if err := eventPublisherDriver.Close(); err != nil {
logger.Error("failed to close event publisher", slog.String("error", err.Error()))
}
}()

eventPublisher, err := eventbus.New(eventbus.Options{
Publisher: eventPublisherDriver,
Config: conf.Events,
Logger: logger,
MarshalerTransformFunc: kafka.AddPartitionKeyFromSubject,
})
if err != nil {
logger.Error("failed to initialize event publisher", slog.String("error", err.Error()))
os.Exit(1)
}

// Dependencies: entitlement
entitlementConnectors := registrybuilder.GetEntitlementRegistry(registry.EntitlementOptions{
DatabaseClient: pgClients.client,
StreamingConnector: clickhouseStreamingConnector,
MeterRepository: meterRepository,
Logger: logger,
Publisher: publishers.eventPublisher.ForTopic(conf.Events.SystemEvents.Topic),
Publisher: eventPublisher,
})

// Initialize worker
Expand All @@ -259,8 +271,8 @@ func main() {
Subscriber: wmSubscriber,

TargetTopic: conf.Events.SystemEvents.Topic,
Publisher: publishers.watermillPublisher,
Marshaler: publishers.marshaler,
Publisher: eventPublisherDriver,
Marshaler: eventPublisher.Marshaler(),

Entitlement: entitlementConnectors,
Repo: entitlementpgadapter.NewPostgresEntitlementRepo(pgClients.client),
Expand Down Expand Up @@ -353,13 +365,7 @@ func initKafkaSubscriber(conf config.Configuration, logger *slog.Logger) (messag
return subscriber, nil
}

type eventPublishers struct {
watermillPublisher message.Publisher
marshaler publisher.CloudEventMarshaler
eventPublisher publisher.Publisher
}

func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (*eventPublishers, error) {
func initEventPublisherDriver(ctx context.Context, logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (message.Publisher, error) {
provisionTopics := []watermillkafka.AutoProvisionTopic{}
if conf.BalanceWorker.DLQ.AutoProvision.Enabled {
provisionTopics = append(provisionTopics, watermillkafka.AutoProvisionTopic{
Expand All @@ -368,31 +374,14 @@ func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Co
})
}

eventDriver, err := watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{
return watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{
KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration,
ProvisionTopics: provisionTopics,
ClientID: otelName,
Logger: logger,
MetricMeter: metricMeter,
DebugLogging: conf.Telemetry.Log.Level == slog.LevelDebug,
})
if err != nil {
return nil, fmt.Errorf("failed to create event driver: %w", err)
}

eventPublisher, err := publisher.NewPublisher(publisher.PublisherOptions{
Publisher: eventDriver,
Transform: watermillkafka.AddPartitionKeyFromSubject,
})
if err != nil {
return nil, fmt.Errorf("failed to create event publisher: %w", err)
}

return &eventPublishers{
watermillPublisher: eventDriver,
marshaler: publisher.NewCloudEventMarshaler(watermillkafka.AddPartitionKeyFromSubject),
eventPublisher: eventPublisher,
}, nil
}

type pgClients struct {
Expand Down
50 changes: 20 additions & 30 deletions cmd/notification-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ import (

"github.com/openmeterio/openmeter/config"
"github.com/openmeterio/openmeter/internal/ent/db"
"github.com/openmeterio/openmeter/internal/event/publisher"
"github.com/openmeterio/openmeter/internal/meter"
"github.com/openmeterio/openmeter/internal/notification/consumer"
"github.com/openmeterio/openmeter/internal/registry"
registrybuilder "github.com/openmeterio/openmeter/internal/registry/builder"
"github.com/openmeterio/openmeter/internal/streaming/clickhouse_connector"
"github.com/openmeterio/openmeter/internal/watermill/driver/kafka"
watermillkafka "github.com/openmeterio/openmeter/internal/watermill/driver/kafka"
"github.com/openmeterio/openmeter/internal/watermill/eventbus"
"github.com/openmeterio/openmeter/pkg/contextx"
"github.com/openmeterio/openmeter/pkg/framework/entutils"
"github.com/openmeterio/openmeter/pkg/framework/operation"
Expand Down Expand Up @@ -229,34 +230,46 @@ func main() {
}

// Create publisher
publishers, err := initEventPublisher(ctx, logger, conf, metricMeter)
eventPublisherDriver, err := initEventPublisherDriver(ctx, logger, conf, metricMeter)
if err != nil {
logger.Error("failed to initialize event publisher", slog.String("error", err.Error()))
os.Exit(1)
}

defer func() {
// We are using sync producer, so it is fine to close this as a last step
if err := publishers.watermillPublisher.Close(); err != nil {
if err := eventPublisherDriver.Close(); err != nil {
logger.Error("failed to close kafka producer", slog.String("error", err.Error()))
}
}()

eventPublisher, err := eventbus.New(eventbus.Options{
Publisher: eventPublisherDriver,
Config: conf.Events,
Logger: logger,
MarshalerTransformFunc: kafka.AddPartitionKeyFromSubject,
})
if err != nil {
logger.Error("failed to initialize event publisher", slog.String("error", err.Error()))
os.Exit(1)
}

// Dependencies: entitlement
entitlementConnectors := registrybuilder.GetEntitlementRegistry(registry.EntitlementOptions{
DatabaseClient: pgClients.client,
StreamingConnector: clickhouseStreamingConnector,
MeterRepository: meterRepository,
Logger: logger,
Publisher: publishers.eventPublisher.ForTopic(conf.Events.SystemEvents.Topic),
Publisher: eventPublisher,
})

// Initialize consumer
consumerOptions := consumer.Options{
SystemEventsTopic: conf.Events.SystemEvents.Topic,
Subscriber: wmSubscriber,

Publisher: publishers.watermillPublisher,
Publisher: eventPublisherDriver,
Marshaler: eventPublisher.Marshaler(),

Entitlement: entitlementConnectors,

Expand Down Expand Up @@ -348,13 +361,7 @@ func initKafkaSubscriber(conf config.Configuration, logger *slog.Logger) (messag
return subscriber, nil
}

type eventPublishers struct {
watermillPublisher message.Publisher
marshaler publisher.CloudEventMarshaler
eventPublisher publisher.Publisher
}

func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (*eventPublishers, error) {
func initEventPublisherDriver(ctx context.Context, logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (message.Publisher, error) {
provisionTopics := []watermillkafka.AutoProvisionTopic{}
if conf.NotificationService.Consumer.DLQ.AutoProvision.Enabled {
provisionTopics = append(provisionTopics, watermillkafka.AutoProvisionTopic{
Expand All @@ -363,31 +370,14 @@ func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Co
})
}

eventDriver, err := watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{
return watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{
KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration,
ProvisionTopics: provisionTopics,
ClientID: otelName,
Logger: logger,
MetricMeter: metricMeter,
DebugLogging: conf.Telemetry.Log.Level == slog.LevelDebug,
})
if err != nil {
return nil, fmt.Errorf("failed to create event driver: %w", err)
}

eventPublisher, err := publisher.NewPublisher(publisher.PublisherOptions{
Publisher: eventDriver,
Transform: watermillkafka.AddPartitionKeyFromSubject,
})
if err != nil {
return nil, fmt.Errorf("failed to create event publisher: %w", err)
}

return &eventPublishers{
watermillPublisher: eventDriver,
marshaler: publisher.NewCloudEventMarshaler(watermillkafka.AddPartitionKeyFromSubject),
eventPublisher: eventPublisher,
}, nil
}

type pgClients struct {
Expand Down
53 changes: 18 additions & 35 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/openmeterio/openmeter/config"
"github.com/openmeterio/openmeter/internal/debug"
"github.com/openmeterio/openmeter/internal/ent/db"
"github.com/openmeterio/openmeter/internal/event/publisher"
"github.com/openmeterio/openmeter/internal/ingest"
"github.com/openmeterio/openmeter/internal/ingest/ingestdriver"
"github.com/openmeterio/openmeter/internal/ingest/kafkaingest"
Expand All @@ -54,6 +53,7 @@ import (
"github.com/openmeterio/openmeter/internal/streaming/clickhouse_connector"
watermillkafka "github.com/openmeterio/openmeter/internal/watermill/driver/kafka"
"github.com/openmeterio/openmeter/internal/watermill/driver/noop"
"github.com/openmeterio/openmeter/internal/watermill/eventbus"
"github.com/openmeterio/openmeter/pkg/contextx"
"github.com/openmeterio/openmeter/pkg/errorsx"
"github.com/openmeterio/openmeter/pkg/framework/entutils"
Expand Down Expand Up @@ -213,19 +213,30 @@ func main() {
os.Exit(1)
}

eventPublishers, err := initEventPublisher(ctx, logger, conf, metricMeter)
eventPublisherDriver, err := initEventPublisherDriver(ctx, logger, conf, metricMeter)
if err != nil {
logger.Error("failed to initialize event publisher", "error", err)
os.Exit(1)
}

defer func() {
logger.Info("closing event publisher")
if err = eventPublishers.driver.Close(); err != nil {
if err = eventPublisherDriver.Close(); err != nil {
logger.Error("failed to close event publisher", "error", err)
}
}()

eventPublisher, err := eventbus.New(eventbus.Options{
Publisher: eventPublisherDriver,
Config: conf.Events,
Logger: logger,
MarshalerTransformFunc: watermillkafka.AddPartitionKeyFromSubject,
})
if err != nil {
logger.Error("failed to initialize event bus", "error", err)
os.Exit(1)
}

// Initialize Kafka Ingest
ingestCollector, kafkaIngestNamespaceHandler, err := initKafkaIngest(
kafkaProducer,
Expand Down Expand Up @@ -322,7 +333,7 @@ func main() {
StreamingConnector: streamingConnector,
MeterRepository: meterRepository,
Logger: logger,
Publisher: eventPublishers.eventPublisher.ForTopic(conf.Events.SystemEvents.Topic),
Publisher: eventPublisher,
})
}

Expand Down Expand Up @@ -430,24 +441,9 @@ func main() {
}
}

type publishers struct {
eventPublisher publisher.Publisher
driver message.Publisher
}

func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (*publishers, error) {
func initEventPublisherDriver(ctx context.Context, logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (message.Publisher, error) {
if !conf.Events.Enabled {
publisher, err := publisher.NewPublisher(publisher.PublisherOptions{
Publisher: &noop.Publisher{},
})
if err != nil {
return nil, fmt.Errorf("failed to create event driver: %w", err)
}

return &publishers{
eventPublisher: publisher,
driver: &noop.Publisher{},
}, nil
return &noop.Publisher{}, nil
}

provisionTopics := []watermillkafka.AutoProvisionTopic{}
Expand All @@ -458,27 +454,14 @@ func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Co
})
}

eventDriver, err := watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{
return watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{
KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration,
ProvisionTopics: provisionTopics,
ClientID: otelName,
Logger: logger,
MetricMeter: metricMeter,
DebugLogging: conf.Telemetry.Log.Level == slog.LevelDebug,
})
if err != nil {
return nil, fmt.Errorf("failed to create event driver: %w", err)
}

eventPublisher, err := publisher.NewPublisher(publisher.PublisherOptions{
Publisher: eventDriver,
Transform: watermillkafka.AddPartitionKeyFromSubject,
})

return &publishers{
eventPublisher: eventPublisher,
driver: eventDriver,
}, err
}

func initKafkaProducer(ctx context.Context, config config.Configuration, logger *slog.Logger, metricMeter metric.Meter, group *run.Group) (*kafka.Producer, error) {
Expand Down
14 changes: 7 additions & 7 deletions cmd/sink-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ import (

"github.com/openmeterio/openmeter/config"
"github.com/openmeterio/openmeter/internal/dedupe"
"github.com/openmeterio/openmeter/internal/event/publisher"
"github.com/openmeterio/openmeter/internal/meter"
"github.com/openmeterio/openmeter/internal/sink"
"github.com/openmeterio/openmeter/internal/sink/flushhandler"
"github.com/openmeterio/openmeter/internal/sink/flushhandler/ingestnotification"
watermillkafka "github.com/openmeterio/openmeter/internal/watermill/driver/kafka"
"github.com/openmeterio/openmeter/internal/watermill/eventbus"
"github.com/openmeterio/openmeter/pkg/gosundheit"
pkgkafka "github.com/openmeterio/openmeter/pkg/kafka"
"github.com/openmeterio/openmeter/pkg/models"
Expand Down Expand Up @@ -243,16 +243,16 @@ func initIngestEventPublisher(ctx context.Context, logger *slog.Logger, conf con
return nil, err
}

eventPublisher, err := publisher.NewPublisher(publisher.PublisherOptions{
Publisher: eventDriver,
Transform: watermillkafka.AddPartitionKeyFromSubject,
eventPublisher, err := eventbus.New(eventbus.Options{
Publisher: eventDriver,
Config: conf.Events,
Logger: logger,
MarshalerTransformFunc: watermillkafka.AddPartitionKeyFromSubject,
})
if err != nil {
return nil, err
}

targetTopic := eventPublisher.ForTopic(conf.Events.IngestEvents.Topic)

flushHandlerMux := flushhandler.NewFlushEventHandlers()
// We should only close the producer once the ingest events are fully processed
flushHandlerMux.OnDrainComplete(func() {
Expand All @@ -262,7 +262,7 @@ func initIngestEventPublisher(ctx context.Context, logger *slog.Logger, conf con
}
})

ingestNotificationHandler, err := ingestnotification.NewHandler(logger, metricMeter, targetTopic, ingestnotification.HandlerConfig{
ingestNotificationHandler, err := ingestnotification.NewHandler(logger, metricMeter, eventPublisher, ingestnotification.HandlerConfig{
MaxEventsInBatch: conf.Sink.IngestNotifications.MaxEventsInBatch,
})
if err != nil {
Expand Down
Loading

0 comments on commit 7f7da0b

Please sign in to comment.