Skip to content

Commit

Permalink
Merge pull request #1645 from openmeterio/refactor-topic-provisioner-…
Browse files Browse the repository at this point in the history
…config

refactor: topic provisioner config
  • Loading branch information
chrisgacsal authored Oct 10, 2024
2 parents 2b65279 + 1a2c0a8 commit 36e6435
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 21 deletions.
4 changes: 2 additions & 2 deletions cmd/balance-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,8 @@ func initTopicProvisioner(conf config.Configuration, logger *slog.Logger, meter
AdminClient: adminClient,
Logger: logger,
Meter: meter,
CacheSize: conf.Ingest.CacheSize,
CacheTTL: conf.Ingest.CacheTTL,
CacheSize: conf.Ingest.Kafka.CacheSize,
CacheTTL: conf.Ingest.Kafka.CacheTTL,
})
if err != nil {
return nil, fmt.Errorf("failed to initialize topic provisioner: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions cmd/notification-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,8 @@ func initTopicProvisioner(conf config.Configuration, logger *slog.Logger, meter
AdminClient: adminClient,
Logger: logger,
Meter: meter,
CacheSize: conf.Ingest.CacheSize,
CacheTTL: conf.Ingest.CacheTTL,
CacheSize: conf.Ingest.Kafka.CacheSize,
CacheTTL: conf.Ingest.Kafka.CacheTTL,
})
if err != nil {
return nil, fmt.Errorf("failed to initialize topic provisioner: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions cmd/sink-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,8 @@ func initTopicProvisioner(conf config.Configuration, logger *slog.Logger, meter
AdminClient: adminClient,
Logger: logger,
Meter: meter,
CacheSize: conf.Ingest.CacheSize,
CacheTTL: conf.Ingest.CacheTTL,
CacheSize: conf.Ingest.Kafka.CacheSize,
CacheTTL: conf.Ingest.Kafka.CacheTTL,
})
if err != nil {
return nil, fmt.Errorf("failed to initialize topic provisioner: %w", err)
Expand Down
6 changes: 4 additions & 2 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,11 @@ func TestComplete(t *testing.T) {
},
Partitions: 1,
EventsTopicTemplate: "om_%s_events",
TopicProvisionerConfig: TopicProvisionerConfig{
CacheSize: 200,
CacheTTL: 15 * time.Minute,
},
},
CacheSize: 200,
CacheTTL: 15 * time.Minute,
},
Aggregation: AggregationConfiguration{
ClickHouse: ClickHouseAggregationConfiguration{
Expand Down
17 changes: 8 additions & 9 deletions config/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,6 @@ import (

type IngestConfiguration struct {
Kafka KafkaIngestConfiguration

// The maximum number of entries stored in topic cache at a time which after the least recently used is evicted.
// Setting size to 0 makes it unlimited
CacheSize int

// The maximum time an entries is kept in cache before being evicted
CacheTTL time.Duration
}

// Validate validates the configuration.
Expand All @@ -37,6 +30,8 @@ func (c IngestConfiguration) Validate() error {
type KafkaIngestConfiguration struct {
KafkaConfiguration `mapstructure:",squash"`

TopicProvisionerConfig `mapstructure:",squash"`

Partitions int
EventsTopicTemplate string
}
Expand All @@ -53,6 +48,10 @@ func (c KafkaIngestConfiguration) Validate() error {
errs = append(errs, err)
}

if err := c.TopicProvisionerConfig.Validate(); err != nil {
errs = append(errs, err)
}

return errors.Join(errs...)
}

Expand Down Expand Up @@ -167,6 +166,6 @@ func ConfigureIngest(v *viper.Viper) {
v.SetDefault("ingest.kafka.saslPassword", "")
v.SetDefault("ingest.kafka.partitions", 1)
v.SetDefault("ingest.kafka.eventsTopicTemplate", "om_%s_events")
v.SetDefault("ingest.cacheSize", 250)
v.SetDefault("ingest.cacheTTL", "5m")

ConfigureTopicProvisioner(v, "ingest", "kafka")
}
4 changes: 2 additions & 2 deletions config/testdata/complete.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ ingest:
- broker
- topic
- consumer
cacheSize: 200
cacheTTL: 15m
cacheSize: 200
cacheTTL: 15m

aggregation:
clickhouse:
Expand Down
41 changes: 41 additions & 0 deletions config/topicprovisioner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package config

import (
"errors"
"fmt"
"time"

"github.com/spf13/viper"
)

// TopicProvisionerConfig stores the configuration for TopicProvisioner
type TopicProvisionerConfig struct {
// The maximum number of entries stored in topic cache at a time which after the least recently used is evicted.
// Setting size to 0 makes it unlimited
CacheSize int

// The maximum time an entries is kept in cache before being evicted
CacheTTL time.Duration
}

func (c TopicProvisionerConfig) Validate() error {
var errs []error

if c.CacheSize < 0 {
errs = append(errs, fmt.Errorf("invalid cache size: %d", c.CacheSize))
}

if c.CacheTTL < 0 {
errs = append(errs, fmt.Errorf("invalid cache ttl: %d", c.CacheTTL))
}

return errors.Join(errs...)
}

// ConfigureTopicProvisioner configures some defaults in the Viper instance.
func ConfigureTopicProvisioner(v *viper.Viper, prefixes ...string) {
prefixer := NewViperKeyPrefixer(prefixes...)

v.SetDefault(prefixer("cacheSize"), 250)
v.SetDefault(prefixer("cacheTTL"), "5m")
}
33 changes: 33 additions & 0 deletions config/viper.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,36 @@ func DecodeHook() mapstructure.DecodeHookFunc {
mapstructure.StringToSliceHookFunc(","),
)
}

// ViperKeyPrefixer is a helper to prepend prefix to a key name.
type ViperKeyPrefixer func(s string) string

const delimiter = "."

// NewViperKeyPrefixer returns a new ViperKeyPrefixer which prepends a dot delimited prefix calculated by concatenating provided
// prefixes in the order they appear in prefixes list.
//
// prefixer := NewViperKeyPrefixer("a", "b")
// s := prefixer("c")
// fmt.Println(s) // -> "a.b.c"
func NewViperKeyPrefixer(prefixes ...string) ViperKeyPrefixer {
var prefix string

for _, p := range prefixes {
if p == "" {
continue
}

if prefix == "" {
prefix = p
} else {
prefix += delimiter + p
}
}

if prefix == "" {
return func(s string) string { return s }
} else {
return func(s string) string { return prefix + delimiter + s }
}
}
4 changes: 2 additions & 2 deletions openmeter/app/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func NewKafkaTopicProvisioner(conf config.IngestConfiguration, logger *slog.Logg
AdminClient: adminClient,
Logger: logger,
Meter: meter,
CacheSize: conf.CacheSize,
CacheTTL: conf.CacheTTL,
CacheSize: conf.Kafka.CacheSize,
CacheTTL: conf.Kafka.CacheTTL,
})
if err != nil {
return nil, fmt.Errorf("failed to initialize topic provisioner: %w", err)
Expand Down

0 comments on commit 36e6435

Please sign in to comment.