diff --git a/cmd/balance-worker/main.go b/cmd/balance-worker/main.go index 094dc6de8..5ac55386c 100644 --- a/cmd/balance-worker/main.go +++ b/cmd/balance-worker/main.go @@ -266,8 +266,8 @@ func initTopicProvisioner(conf config.Configuration, logger *slog.Logger, meter AdminClient: adminClient, Logger: logger, Meter: meter, - CacheSize: conf.Ingest.CacheSize, - CacheTTL: conf.Ingest.CacheTTL, + CacheSize: conf.Ingest.Kafka.CacheSize, + CacheTTL: conf.Ingest.Kafka.CacheTTL, }) if err != nil { return nil, fmt.Errorf("failed to initialize topic provisioner: %w", err) diff --git a/cmd/notification-service/main.go b/cmd/notification-service/main.go index bb29c75c2..98670b7c9 100644 --- a/cmd/notification-service/main.go +++ b/cmd/notification-service/main.go @@ -295,8 +295,8 @@ func initTopicProvisioner(conf config.Configuration, logger *slog.Logger, meter AdminClient: adminClient, Logger: logger, Meter: meter, - CacheSize: conf.Ingest.CacheSize, - CacheTTL: conf.Ingest.CacheTTL, + CacheSize: conf.Ingest.Kafka.CacheSize, + CacheTTL: conf.Ingest.Kafka.CacheTTL, }) if err != nil { return nil, fmt.Errorf("failed to initialize topic provisioner: %w", err) diff --git a/cmd/sink-worker/main.go b/cmd/sink-worker/main.go index 3a91845d8..8e1bc61ab 100644 --- a/cmd/sink-worker/main.go +++ b/cmd/sink-worker/main.go @@ -310,8 +310,8 @@ func initTopicProvisioner(conf config.Configuration, logger *slog.Logger, meter AdminClient: adminClient, Logger: logger, Meter: meter, - CacheSize: conf.Ingest.CacheSize, - CacheTTL: conf.Ingest.CacheTTL, + CacheSize: conf.Ingest.Kafka.CacheSize, + CacheTTL: conf.Ingest.Kafka.CacheTTL, }) if err != nil { return nil, fmt.Errorf("failed to initialize topic provisioner: %w", err) diff --git a/config/config_test.go b/config/config_test.go index be9455754..233eb26a4 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -101,9 +101,11 @@ func TestComplete(t *testing.T) { }, Partitions: 1, EventsTopicTemplate: "om_%s_events", + TopicProvisionerConfig: TopicProvisionerConfig{ + CacheSize: 200, + CacheTTL: 15 * time.Minute, + }, }, - CacheSize: 200, - CacheTTL: 15 * time.Minute, }, Aggregation: AggregationConfiguration{ ClickHouse: ClickHouseAggregationConfiguration{ diff --git a/config/ingest.go b/config/ingest.go index 90fd79896..3ed543741 100644 --- a/config/ingest.go +++ b/config/ingest.go @@ -14,13 +14,6 @@ import ( type IngestConfiguration struct { Kafka KafkaIngestConfiguration - - // The maximum number of entries stored in topic cache at a time which after the least recently used is evicted. - // Setting size to 0 makes it unlimited - CacheSize int - - // The maximum time an entries is kept in cache before being evicted - CacheTTL time.Duration } // Validate validates the configuration. @@ -37,6 +30,8 @@ func (c IngestConfiguration) Validate() error { type KafkaIngestConfiguration struct { KafkaConfiguration `mapstructure:",squash"` + TopicProvisionerConfig `mapstructure:",squash"` + Partitions int EventsTopicTemplate string } @@ -53,6 +48,10 @@ func (c KafkaIngestConfiguration) Validate() error { errs = append(errs, err) } + if err := c.TopicProvisionerConfig.Validate(); err != nil { + errs = append(errs, err) + } + return errors.Join(errs...) } @@ -167,6 +166,6 @@ func ConfigureIngest(v *viper.Viper) { v.SetDefault("ingest.kafka.saslPassword", "") v.SetDefault("ingest.kafka.partitions", 1) v.SetDefault("ingest.kafka.eventsTopicTemplate", "om_%s_events") - v.SetDefault("ingest.cacheSize", 250) - v.SetDefault("ingest.cacheTTL", "5m") + + ConfigureTopicProvisioner(v, "ingest", "kafka") } diff --git a/config/testdata/complete.yaml b/config/testdata/complete.yaml index 59422c6f2..6ca211e84 100644 --- a/config/testdata/complete.yaml +++ b/config/testdata/complete.yaml @@ -43,8 +43,8 @@ ingest: - broker - topic - consumer - cacheSize: 200 - cacheTTL: 15m + cacheSize: 200 + cacheTTL: 15m aggregation: clickhouse: diff --git a/config/topicprovisioner.go b/config/topicprovisioner.go new file mode 100644 index 000000000..784cf2992 --- /dev/null +++ b/config/topicprovisioner.go @@ -0,0 +1,41 @@ +package config + +import ( + "errors" + "fmt" + "time" + + "github.com/spf13/viper" +) + +// TopicProvisionerConfig stores the configuration for TopicProvisioner +type TopicProvisionerConfig struct { + // The maximum number of entries stored in topic cache at a time which after the least recently used is evicted. + // Setting size to 0 makes it unlimited + CacheSize int + + // The maximum time an entries is kept in cache before being evicted + CacheTTL time.Duration +} + +func (c TopicProvisionerConfig) Validate() error { + var errs []error + + if c.CacheSize < 0 { + errs = append(errs, fmt.Errorf("invalid cache size: %d", c.CacheSize)) + } + + if c.CacheTTL < 0 { + errs = append(errs, fmt.Errorf("invalid cache ttl: %d", c.CacheTTL)) + } + + return errors.Join(errs...) +} + +// ConfigureTopicProvisioner configures some defaults in the Viper instance. +func ConfigureTopicProvisioner(v *viper.Viper, prefixes ...string) { + prefixer := NewViperKeyPrefixer(prefixes...) + + v.SetDefault(prefixer("cacheSize"), 250) + v.SetDefault(prefixer("cacheTTL"), "5m") +} diff --git a/config/viper.go b/config/viper.go index 179d965d0..e8c4f9f60 100644 --- a/config/viper.go +++ b/config/viper.go @@ -13,3 +13,36 @@ func DecodeHook() mapstructure.DecodeHookFunc { mapstructure.StringToSliceHookFunc(","), ) } + +// ViperKeyPrefixer is a helper to prepend prefix to a key name. +type ViperKeyPrefixer func(s string) string + +const delimiter = "." + +// NewViperKeyPrefixer returns a new ViperKeyPrefixer which prepends a dot delimited prefix calculated by concatenating provided +// prefixes in the order they appear in prefixes list. +// +// prefixer := NewViperKeyPrefixer("a", "b") +// s := prefixer("c") +// fmt.Println(s) // -> "a.b.c" +func NewViperKeyPrefixer(prefixes ...string) ViperKeyPrefixer { + var prefix string + + for _, p := range prefixes { + if p == "" { + continue + } + + if prefix == "" { + prefix = p + } else { + prefix += delimiter + p + } + } + + if prefix == "" { + return func(s string) string { return s } + } else { + return func(s string) string { return prefix + delimiter + s } + } +} diff --git a/openmeter/app/kafka.go b/openmeter/app/kafka.go index 273a7fa47..2681fdfd9 100644 --- a/openmeter/app/kafka.go +++ b/openmeter/app/kafka.go @@ -58,8 +58,8 @@ func NewKafkaTopicProvisioner(conf config.IngestConfiguration, logger *slog.Logg AdminClient: adminClient, Logger: logger, Meter: meter, - CacheSize: conf.CacheSize, - CacheTTL: conf.CacheTTL, + CacheSize: conf.Kafka.CacheSize, + CacheTTL: conf.Kafka.CacheTTL, }) if err != nil { return nil, fmt.Errorf("failed to initialize topic provisioner: %w", err)