diff --git a/openmeter/dedupe/dedupe.go b/openmeter/dedupe/dedupe.go new file mode 100644 index 000000000..c5f867921 --- /dev/null +++ b/openmeter/dedupe/dedupe.go @@ -0,0 +1,13 @@ +// Package dedupe implements in-process event deduplication. +package dedupe + +import ( + "github.com/cloudevents/sdk-go/v2/event" + + "github.com/openmeterio/openmeter/internal/dedupe" +) + +// GetEventKey creates a unique key from an event. +func GetEventKey(namespace string, ev event.Event) string { + return dedupe.GetEventKey(namespace, ev) +} diff --git a/openmeter/dedupe/memorydedupe/memorydedupe.go b/openmeter/dedupe/memorydedupe/memorydedupe.go new file mode 100644 index 000000000..4afd642c8 --- /dev/null +++ b/openmeter/dedupe/memorydedupe/memorydedupe.go @@ -0,0 +1,14 @@ +// Package memorydedupe implements in-memory event deduplication. +package memorydedupe + +import ( + "github.com/openmeterio/openmeter/internal/dedupe/memorydedupe" +) + +// Deduplicator implements in-memory event deduplication. +type Deduplicator = memorydedupe.Deduplicator + +// NewDeduplicator returns a new {Deduplicator}. +func NewDeduplicator(size int) (*Deduplicator, error) { + return memorydedupe.NewDeduplicator(size) +} diff --git a/openmeter/dedupe/redisdedupe/redisdedupe.go b/openmeter/dedupe/redisdedupe/redisdedupe.go new file mode 100644 index 000000000..e45fd331e --- /dev/null +++ b/openmeter/dedupe/redisdedupe/redisdedupe.go @@ -0,0 +1,9 @@ +// Package redisdedupe implements event deduplication using Redis. +package redisdedupe + +import ( + "github.com/openmeterio/openmeter/internal/dedupe/redisdedupe" +) + +// Deduplicator implements event deduplication using Redis. +type Deduplicator = redisdedupe.Deduplicator diff --git a/openmeter/ingest/dedupe.go b/openmeter/ingest/dedupe.go new file mode 100644 index 000000000..6886acedf --- /dev/null +++ b/openmeter/ingest/dedupe.go @@ -0,0 +1,11 @@ +package ingest + +import ( + "github.com/openmeterio/openmeter/internal/ingest" +) + +// Deduplicator checks if an event is unique. +type Deduplicator = ingest.Deduplicator + +// DeduplicatingCollector implements event deduplication at event ingestion. +type DeduplicatingCollector = ingest.DeduplicatingCollector diff --git a/openmeter/ingest/httpingest/httpingest.go b/openmeter/ingest/httpingest/httpingest.go new file mode 100644 index 000000000..d1cfaafb6 --- /dev/null +++ b/openmeter/ingest/httpingest/httpingest.go @@ -0,0 +1,14 @@ +package httpingest + +import ( + "github.com/openmeterio/openmeter/internal/ingest/httpingest" +) + +// Handler receives an event in CloudEvents format and forwards it to a {Collector}. +type Handler = httpingest.Handler + +type HandlerConfig = httpingest.HandlerConfig + +func NewHandler(config HandlerConfig) (*Handler, error) { + return httpingest.NewHandler(config) +} diff --git a/openmeter/ingest/ingest.go b/openmeter/ingest/ingest.go new file mode 100644 index 000000000..506f43d3e --- /dev/null +++ b/openmeter/ingest/ingest.go @@ -0,0 +1,9 @@ +// Package ingest implements event ingestion. +package ingest + +import ( + "github.com/openmeterio/openmeter/internal/ingest" +) + +// Collector is a receiver of events that handles sending those events to some downstream broker. +type Collector = ingest.Collector diff --git a/openmeter/ingest/inmemory.go b/openmeter/ingest/inmemory.go new file mode 100644 index 000000000..534dfe709 --- /dev/null +++ b/openmeter/ingest/inmemory.go @@ -0,0 +1,8 @@ +package ingest + +import ( + "github.com/openmeterio/openmeter/internal/ingest" +) + +// InMemoryCollector is a {Collector} backed by in-memory storage. +type InMemoryCollector = ingest.InMemoryCollector diff --git a/openmeter/ingest/kafkaingest/collector.go b/openmeter/ingest/kafkaingest/collector.go new file mode 100644 index 000000000..ff8f522fd --- /dev/null +++ b/openmeter/ingest/kafkaingest/collector.go @@ -0,0 +1,17 @@ +package kafkaingest + +import ( + "context" + "log/slog" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + + "github.com/openmeterio/openmeter/internal/ingest/kafkaingest" +) + +// Collector is a receiver of events that handles sending those events to a downstream Kafka broker. +type Collector = kafkaingest.Collector + +func KafkaProducerGroup(ctx context.Context, producer *kafka.Producer, logger *slog.Logger) (execute func() error, interrupt func(error)) { + return kafkaingest.KafkaProducerGroup(ctx, producer, logger) +} diff --git a/openmeter/ingest/kafkaingest/namespace.go b/openmeter/ingest/kafkaingest/namespace.go new file mode 100644 index 000000000..c887c0970 --- /dev/null +++ b/openmeter/ingest/kafkaingest/namespace.go @@ -0,0 +1,8 @@ +package kafkaingest + +import ( + "github.com/openmeterio/openmeter/internal/ingest/kafkaingest" +) + +// NamespaceHandler is a namespace handler for Kafka ingest topics. +type NamespaceHandler = kafkaingest.NamespaceHandler diff --git a/openmeter/ingest/kafkaingest/serializer/json.go b/openmeter/ingest/kafkaingest/serializer/json.go new file mode 100644 index 000000000..8ef9d51d3 --- /dev/null +++ b/openmeter/ingest/kafkaingest/serializer/json.go @@ -0,0 +1,13 @@ +package serializer + +import ( + _ "embed" + + "github.com/openmeterio/openmeter/internal/ingest/kafkaingest/serializer" +) + +type JSONSerializer = serializer.JSONSerializer + +func NewJSONSerializer() JSONSerializer { + return serializer.NewJSONSerializer() +} diff --git a/openmeter/ingest/kafkaingest/serializer/serializer.go b/openmeter/ingest/kafkaingest/serializer/serializer.go new file mode 100644 index 000000000..32bde47bf --- /dev/null +++ b/openmeter/ingest/kafkaingest/serializer/serializer.go @@ -0,0 +1,11 @@ +package serializer + +import ( + _ "embed" + + "github.com/openmeterio/openmeter/internal/ingest/kafkaingest/serializer" +) + +type Serializer = serializer.Serializer + +type CloudEventsKafkaPayload = serializer.CloudEventsKafkaPayload diff --git a/openmeter/meter/inmemory.go b/openmeter/meter/inmemory.go new file mode 100644 index 000000000..850d666a3 --- /dev/null +++ b/openmeter/meter/inmemory.go @@ -0,0 +1,14 @@ +package meter + +import ( + "github.com/openmeterio/openmeter/internal/meter" + "github.com/openmeterio/openmeter/pkg/models" +) + +// InMemoryRepository is an in-memory meter repository. +type InMemoryRepository = meter.InMemoryRepository + +// NewInMemoryRepository returns a in-memory meter repository. +func NewInMemoryRepository(meters []models.Meter) *InMemoryRepository { + return meter.NewInMemoryRepository(meters) +} diff --git a/openmeter/meter/meter.go b/openmeter/meter/meter.go new file mode 100644 index 000000000..3f7b866ce --- /dev/null +++ b/openmeter/meter/meter.go @@ -0,0 +1,8 @@ +package meter + +import ( + "github.com/openmeterio/openmeter/internal/meter" +) + +// Repository is an interface to the meter store. +type Repository = meter.Repository diff --git a/openmeter/namespace/namespace.go b/openmeter/namespace/namespace.go new file mode 100644 index 000000000..12bc9ac2e --- /dev/null +++ b/openmeter/namespace/namespace.go @@ -0,0 +1,23 @@ +// Package namespace adds a concept of tenancy to OpenMeter allowing to segment clients. +package namespace + +import ( + "github.com/openmeterio/openmeter/internal/namespace" +) + +// Manager is responsible for managing namespaces in different components. +type Manager = namespace.Manager + +type ManagerConfig = namespace.ManagerConfig + +func NewManager(config ManagerConfig) (*Manager, error) { + return namespace.NewManager(config) +} + +// Handler is responsible for creating a namespace in a given component. +// +// An empty name means a default namespace is supposed to be created. +// The concept of a default namespace is implementation specific. +// +// The behavior for trying to create a namespace that already exists is unspecified at the moment. +type Handler = namespace.Handler diff --git a/openmeter/openmeter.go b/openmeter/openmeter.go new file mode 100644 index 000000000..3c495d2b9 --- /dev/null +++ b/openmeter/openmeter.go @@ -0,0 +1,2 @@ +// Package openmeter is an experimental package to expose the core of OpenMeter as a library. +package openmeter diff --git a/openmeter/server/router/router.go b/openmeter/server/router/router.go new file mode 100644 index 000000000..fc9fc0dfc --- /dev/null +++ b/openmeter/server/router/router.go @@ -0,0 +1,15 @@ +package router + +import ( + "github.com/openmeterio/openmeter/internal/server/router" +) + +type IngestHandler = router.IngestHandler + +type Config = router.Config + +type Router = router.Router + +func NewRouter(config Config) (*Router, error) { + return router.NewRouter(config) +} diff --git a/openmeter/server/server.go b/openmeter/server/server.go new file mode 100644 index 000000000..2e08c25be --- /dev/null +++ b/openmeter/server/server.go @@ -0,0 +1,13 @@ +package server + +import ( + "github.com/openmeterio/openmeter/internal/server" +) + +type Server = server.Server + +type Config = server.Config + +func NewServer(config *Config) (*Server, error) { + return server.NewServer(config) +} diff --git a/openmeter/streaming/clickhouse_connector/connector.go b/openmeter/streaming/clickhouse_connector/connector.go new file mode 100644 index 000000000..00ef963dd --- /dev/null +++ b/openmeter/streaming/clickhouse_connector/connector.go @@ -0,0 +1,14 @@ +package clickhouse_connector + +import ( + "github.com/openmeterio/openmeter/internal/streaming/clickhouse_connector" +) + +// ClickhouseConnector implements `ingest.Connector“ and `namespace.Handler interfaces. +type ClickhouseConnector = clickhouse_connector.ClickhouseConnector + +type ClickhouseConnectorConfig = clickhouse_connector.ClickhouseConnectorConfig + +func NewClickhouseConnector(config ClickhouseConnectorConfig) (*ClickhouseConnector, error) { + return clickhouse_connector.NewClickhouseConnector(config) +} diff --git a/openmeter/streaming/clickhouse_connector/model.go b/openmeter/streaming/clickhouse_connector/model.go new file mode 100644 index 000000000..b09d6045e --- /dev/null +++ b/openmeter/streaming/clickhouse_connector/model.go @@ -0,0 +1,7 @@ +package clickhouse_connector + +import ( + "github.com/openmeterio/openmeter/internal/streaming/clickhouse_connector" +) + +type MeterView = clickhouse_connector.MeterView diff --git a/openmeter/streaming/connector.go b/openmeter/streaming/connector.go new file mode 100644 index 000000000..317939d1c --- /dev/null +++ b/openmeter/streaming/connector.go @@ -0,0 +1,13 @@ +package streaming + +import ( + "github.com/openmeterio/openmeter/internal/streaming" +) + +type ListEventsParams = streaming.ListEventsParams + +type QueryParams = streaming.QueryParams + +type QueryResult = streaming.QueryResult + +type Connector = streaming.Connector