Skip to content

Commit

Permalink
Merge pull request #257 from openmeterio/aggregation
Browse files Browse the repository at this point in the history
feat: rename processing to aggregation
  • Loading branch information
sagikazarmark authored Sep 12, 2023
2 parents f15593f + e9683e1 commit 49b043f
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 67 deletions.
46 changes: 46 additions & 0 deletions config/aggregation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package config

import (
"errors"
"fmt"

"github.com/spf13/viper"
)

type AggregationConfiguration struct {
ClickHouse ClickHouseAggregationConfiguration
}

// Validate validates the configuration.
func (c AggregationConfiguration) Validate() error {
if err := c.ClickHouse.Validate(); err != nil {
return fmt.Errorf("clickhouse: %w", err)
}

return nil
}

type ClickHouseAggregationConfiguration struct {
Address string
TLS bool
Username string
Password string
Database string
}

func (c ClickHouseAggregationConfiguration) Validate() error {
if c.Address == "" {
return errors.New("address is required")
}

return nil
}

// ConfigureAggregation configures some defaults in the Viper instance.
func ConfigureAggregation(v *viper.Viper) {
v.SetDefault("aggregation.clickhouse.address", "127.0.0.1:9000")
v.SetDefault("aggregation.clickhouse.tls", false)
v.SetDefault("aggregation.clickhouse.database", "openmeter")
v.SetDefault("aggregation.clickhouse.username", "default")
v.SetDefault("aggregation.clickhouse.password", "default")
}
16 changes: 8 additions & 8 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ type Configuration struct {

Telemetry TelemetryConfig

Namespace NamespaceConfiguration
Ingest IngestConfiguration
Processor ProcessorConfiguration
Sink SinkConfiguration
Dedupe DedupeConfiguration
Namespace NamespaceConfiguration
Ingest IngestConfiguration
Aggregation AggregationConfiguration
Sink SinkConfiguration
Dedupe DedupeConfiguration

Meters []*models.Meter
}
Expand All @@ -47,8 +47,8 @@ func (c Configuration) Validate() error {
return fmt.Errorf("ingest: %w", err)
}

if err := c.Processor.Validate(); err != nil {
return fmt.Errorf("processor: %w", err)
if err := c.Aggregation.Validate(); err != nil {
return fmt.Errorf("aggregation: %w", err)
}

if err := c.Sink.Validate(); err != nil {
Expand Down Expand Up @@ -95,7 +95,7 @@ func Configure(v *viper.Viper, flags *pflag.FlagSet) {

ConfigureNamespace(v)
ConfigureIngest(v)
ConfigureProcessor(v)
ConfigureAggregation(v)
configureSink(v)
ConfigureDedupe(v)
}
4 changes: 2 additions & 2 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ func TestComplete(t *testing.T) {
EventsTopicTemplate: "om_%s_events",
},
},
Processor: ProcessorConfiguration{
ClickHouse: ClickHouseProcessorConfiguration{
Aggregation: AggregationConfiguration{
ClickHouse: ClickHouseAggregationConfiguration{
Address: "127.0.0.1:9440",
TLS: true,
Username: "default",
Expand Down
46 changes: 0 additions & 46 deletions config/processor.go

This file was deleted.

2 changes: 1 addition & 1 deletion config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (c *ConnectorKafkaConnectSinkConfiguration) DecodeMap(v map[string]any) err
return nil
}

// This may feel repetative but clikhouse sink and processor configs can be different,
// This may feel repetative but clikhouse sink and aggregation configs can be different,
// for example Kafka Connect ClickHouse plugin uses 8123 HTTP port while client uses native protocol's 9000 port.
// Hostname can be also different, as Kafka Connect and ClickHouse communicates inside the docker compose network.
type ClickHouseConnectorTypeKafkaConnectSinkConfiguration struct {
Expand Down
2 changes: 1 addition & 1 deletion config/testdata/complete.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ ingest:
saslPassword: pass
partitions: 1

processor:
aggregation:
clickhouse:
address: 127.0.0.1:9440
tls: true
Expand Down
16 changes: 8 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,10 @@ func main() {
}
}

// Initialize ClickHouse Streaming Processor
// Initialize ClickHouse Aggregation
clickhouseStreamingConnector, err := initClickHouseStreaming(conf, meterRepository, logger)
if err != nil {
logger.Error("failed to initialize clickhouse streaming processor", "error", err)
logger.Error("failed to initialize clickhouse aggregation", "error", err)
os.Exit(1)
}

Expand Down Expand Up @@ -376,11 +376,11 @@ func initKafkaIngest(ctx context.Context, config config.Configuration, logger *s

func initClickHouseStreaming(config config.Configuration, meterRepository meter.Repository, logger *slog.Logger) (*clickhouse_connector.ClickhouseConnector, error) {
options := &clickhouse.Options{
Addr: []string{config.Processor.ClickHouse.Address},
Addr: []string{config.Aggregation.ClickHouse.Address},
Auth: clickhouse.Auth{
Database: config.Processor.ClickHouse.Database,
Username: config.Processor.ClickHouse.Username,
Password: config.Processor.ClickHouse.Password,
Database: config.Aggregation.ClickHouse.Database,
Username: config.Aggregation.ClickHouse.Username,
Password: config.Aggregation.ClickHouse.Password,
},
DialTimeout: time.Duration(10) * time.Second,
MaxOpenConns: 5,
Expand All @@ -391,7 +391,7 @@ func initClickHouseStreaming(config config.Configuration, meterRepository meter.
}
// This minimal TLS.Config is normally sufficient to connect to the secure native port (normally 9440) on a ClickHouse server.
// See: https://clickhouse.com/docs/en/integrations/go#using-tls
if config.Processor.ClickHouse.TLS {
if config.Aggregation.ClickHouse.TLS {
options.TLS = &tls.Config{}
}

Expand All @@ -404,7 +404,7 @@ func initClickHouseStreaming(config config.Configuration, meterRepository meter.
streamingConnector, err := clickhouse_connector.NewClickhouseConnector(clickhouse_connector.ClickhouseConnectorConfig{
Logger: logger,
ClickHouse: clickHouseClient,
Database: config.Processor.ClickHouse.Database,
Database: config.Aggregation.ClickHouse.Database,
Meters: meterRepository,
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion quickstart/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ ingest:
kafka:
broker: kafka:29092

processor:
aggregation:
clickhouse:
address: clickhouse:9000

Expand Down

0 comments on commit 49b043f

Please sign in to comment.