Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cursor pagination to internal streaming API #1534

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions openmeter/server/router/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,20 @@ func (a *Router) ListEvents(w http.ResponseWriter, r *http.Request, params api.L
}

queryParams := streaming.ListEventsParams{
From: &from,
To: params.To,
IngestedAtFrom: params.IngestedAtFrom,
IngestedAtTo: params.IngestedAtTo,
ID: params.Id,
Subject: params.Subject,
HasError: params.HasError,
Limit: limit,
Filters: streaming.EventsTableFilters{
From: &from,
To: params.To,
IngestedAtFrom: params.IngestedAtFrom,
IngestedAtTo: params.IngestedAtTo,
ID: params.Id,
Subject: params.Subject,
HasError: params.HasError,
},
Limit: limit,
}

// Query events
events, err := a.config.StreamingConnector.ListEvents(ctx, namespace, queryParams)
events, _, err := a.config.StreamingConnector.ListEvents(ctx, namespace, queryParams)
if err != nil {
err := fmt.Errorf("query events: %w", err)

Expand Down
10 changes: 8 additions & 2 deletions openmeter/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,23 @@ var (

type MockStreamingConnector struct{}

var _ streaming.Connector = &MockStreamingConnector{}

func (c *MockStreamingConnector) CountEvents(ctx context.Context, namespace string, params streaming.CountEventsParams) ([]streaming.CountEventRow, error) {
return []streaming.CountEventRow{}, nil
}

func (c *MockStreamingConnector) ListEvents(ctx context.Context, namespace string, params streaming.ListEventsParams) ([]api.IngestedEvent, error) {
func (c *MockStreamingConnector) ListEvents(ctx context.Context, namespace string, params streaming.ListEventsParams) ([]api.IngestedEvent, *streaming.EventsCursor, error) {
events := []api.IngestedEvent{
{
Event: mockEvent,
},
}
return events, nil
return events, nil, nil
}

func (c *MockStreamingConnector) PaginateEvents(ctx context.Context, namespace string, params streaming.PaginateEventsParams) ([]api.IngestedEvent, *streaming.EventsCursor, error) {
return []api.IngestedEvent{}, nil, nil
}

func (c *MockStreamingConnector) CreateMeter(ctx context.Context, namespace string, meter *models.Meter) error {
Expand Down
192 changes: 139 additions & 53 deletions openmeter/streaming/clickhouse_connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ package clickhouse_connector

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"strings"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"

"github.com/openmeterio/openmeter/api"
"github.com/openmeterio/openmeter/openmeter/meter"
Expand All @@ -27,6 +26,8 @@ type ClickhouseConnector struct {
config ClickhouseConnectorConfig
}

var _ streaming.Connector = &ClickhouseConnector{}

type ClickhouseConnectorConfig struct {
Logger *slog.Logger
ClickHouse clickhouse.Conn
Expand All @@ -44,21 +45,81 @@ func NewClickhouseConnector(config ClickhouseConnectorConfig) (*ClickhouseConnec
return connector, nil
}

func (c *ClickhouseConnector) ListEvents(ctx context.Context, namespace string, params streaming.ListEventsParams) ([]api.IngestedEvent, error) {
func (c *ClickhouseConnector) ListEvents(ctx context.Context, namespace string, params streaming.ListEventsParams) ([]api.IngestedEvent, *streaming.EventsCursor, error) {
return c.queryEvents(
ctx,
namespace,
params.Filters,
func() ([]ScannedEventRow, error) {
return c.queryEventsTable(ctx, namespace, params)
},
)
}

func (c *ClickhouseConnector) PaginateEvents(ctx context.Context, namespace string, params streaming.PaginateEventsParams) ([]api.IngestedEvent, *streaming.EventsCursor, error) {
if err := params.Cursor.Cursor.Validate(); err != nil {
return nil, nil, fmt.Errorf("cursor validation: %w", err)
}

return c.queryEvents(
ctx,
namespace,
params.Cursor.Filters,
func() ([]ScannedEventRow, error) {
return c.paginateEventsTable(ctx, namespace, params)
},
)
}

func (c *ClickhouseConnector) queryEvents(
_ context.Context,
namespace string,
filters streaming.EventsTableFilters,
querier func() ([]ScannedEventRow, error),
) ([]api.IngestedEvent, *streaming.EventsCursor, error) {
if namespace == "" {
return nil, fmt.Errorf("namespace is required")
return nil, nil, fmt.Errorf("namespace is required")
}

events, err := c.queryEventsTable(ctx, namespace, params)
scannedRows, err := querier()
if err != nil {
if _, ok := err.(*models.NamespaceNotFoundError); ok {
return nil, err
return nil, nil, err
}

return nil, nil, fmt.Errorf("query events: %w", err)
}

var events []api.IngestedEvent

for _, row := range scannedRows {
event, err := parseEventRow(row)
if err != nil {
return nil, nil, fmt.Errorf("query events: %w", err)
}

return nil, fmt.Errorf("query events: %w", err)
events = append(events, event)
}

return events, nil
var cursor *streaming.EventsCursor

if len(scannedRows) > 0 {
lastRow := scannedRows[len(scannedRows)-1]

cursor = &streaming.EventsCursor{
Filters: filters,
Cursor: streaming.EventsTableCursor{
Namespace: namespace,
Time: lastRow.eventTime,
Type: lastRow.eventType,
Subject: lastRow.subject,
ID: lastRow.id,
IsGreater: false, // We use SORT DESC when querying TODO: maybe tidy this up, its a bit arbitrary
},
}
}

return events, cursor, nil
}

func (c *ClickhouseConnector) CreateMeter(ctx context.Context, namespace string, meter *models.Meter) error {
Expand Down Expand Up @@ -216,21 +277,22 @@ func (c *ClickhouseConnector) createEventsTable(ctx context.Context) error {
return nil
}

func (c *ClickhouseConnector) queryEventsTable(ctx context.Context, namespace string, params streaming.ListEventsParams) ([]api.IngestedEvent, error) {
func (c *ClickhouseConnector) queryEventsTable(ctx context.Context, namespace string, params streaming.ListEventsParams) ([]ScannedEventRow, error) {
table := queryEventsTable{
Database: c.config.Database,
Namespace: namespace,
From: params.From,
To: params.To,
IngestedAtFrom: params.IngestedAtFrom,
IngestedAtTo: params.IngestedAtTo,
ID: params.ID,
Subject: params.Subject,
HasError: params.HasError,
Limit: params.Limit,
Database: c.config.Database,
Namespace: namespace,
Limit: params.Limit,
}

sql, args := table.toSQL()
sql, args := table.toSQLWithWhere(queryEventsFilters{
From: params.Filters.From,
To: params.Filters.To,
IngestedAtFrom: params.Filters.IngestedAtFrom,
IngestedAtTo: params.Filters.IngestedAtTo,
ID: params.Filters.ID,
Subject: params.Filters.Subject,
HasError: params.Filters.HasError,
})

rows, err := c.config.ClickHouse.Query(ctx, sql, args...)
if err != nil {
Expand All @@ -241,7 +303,50 @@ func (c *ClickhouseConnector) queryEventsTable(ctx context.Context, namespace st
return nil, fmt.Errorf("query events table query: %w", err)
}

events := []api.IngestedEvent{}
return c.scanEventRows(rows)
}

func (c *ClickhouseConnector) paginateEventsTable(ctx context.Context, namespace string, params streaming.PaginateEventsParams) ([]ScannedEventRow, error) {
table := queryEventsTable{
Database: c.config.Database,
Namespace: namespace,
Limit: params.Limit,
}

sql, args := table.toSQLWithWhere(queryEventsCursor{
cursor: eventsTableCursor{
Namespace: params.Cursor.Cursor.Namespace,
Time: params.Cursor.Cursor.Time,
Type: params.Cursor.Cursor.Type,
Subject: params.Cursor.Cursor.Subject,
ID: params.Cursor.Cursor.ID,
IsGreater: params.Cursor.Cursor.IsGreater,
},
filters: queryEventsFilters{
From: params.Cursor.Filters.From,
To: params.Cursor.Filters.To,
IngestedAtFrom: params.Cursor.Filters.IngestedAtFrom,
IngestedAtTo: params.Cursor.Filters.IngestedAtTo,
ID: params.Cursor.Filters.ID,
Subject: params.Cursor.Filters.Subject,
HasError: params.Cursor.Filters.HasError,
},
})

rows, err := c.config.ClickHouse.Query(ctx, sql, args...)
if err != nil {
if strings.Contains(err.Error(), "code: 60") {
return nil, &models.NamespaceNotFoundError{Namespace: namespace}
}

return nil, fmt.Errorf("query events table query: %w", err)
}

return c.scanEventRows(rows)
}

func (c *ClickhouseConnector) scanEventRows(rows driver.Rows) ([]ScannedEventRow, error) {
scannedRows := []ScannedEventRow{}

for rows.Next() {
var id string
Expand All @@ -254,43 +359,24 @@ func (c *ClickhouseConnector) queryEventsTable(ctx context.Context, namespace st
var ingestedAt time.Time
var storedAt time.Time

if err = rows.Scan(&id, &eventType, &subject, &source, &eventTime, &dataStr, &validationError, &ingestedAt, &storedAt); err != nil {
if err := rows.Scan(&id, &eventType, &subject, &source, &eventTime, &dataStr, &validationError, &ingestedAt, &storedAt); err != nil {
return nil, err
}

// Parse data
var data interface{}
err := json.Unmarshal([]byte(dataStr), &data)
if err != nil {
return nil, fmt.Errorf("query events parse data: %w", err)
}

event := event.New()
event.SetID(id)
event.SetType(eventType)
event.SetSubject(subject)
event.SetSource(source)
event.SetTime(eventTime)
err = event.SetData("application/json", data)
if err != nil {
return nil, fmt.Errorf("query events set data: %w", err)
}

ingestedEvent := api.IngestedEvent{
Event: event,
}

if validationError != "" {
ingestedEvent.ValidationError = &validationError
}

ingestedEvent.IngestedAt = ingestedAt
ingestedEvent.StoredAt = storedAt

events = append(events, ingestedEvent)
scannedRows = append(scannedRows, ScannedEventRow{
id: id,
eventType: eventType,
subject: subject,
source: source,
eventTime: eventTime,
dataStr: dataStr,
validationError: validationError,
ingestedAt: ingestedAt,
storedAt: storedAt,
})
}

return events, nil
return scannedRows, nil
}

func (c *ClickhouseConnector) queryCountEvents(ctx context.Context, namespace string, params streaming.CountEventsParams) ([]streaming.CountEventRow, error) {
Expand Down
55 changes: 54 additions & 1 deletion openmeter/streaming/clickhouse_connector/model.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,62 @@
package clickhouse_connector

import "github.com/openmeterio/openmeter/pkg/models"
import (
"encoding/json"
"fmt"
"time"

"github.com/cloudevents/sdk-go/v2/event"

"github.com/openmeterio/openmeter/api"
"github.com/openmeterio/openmeter/pkg/models"
)

type MeterView struct {
Slug string
Aggregation models.MeterAggregation
GroupBy []string
}

type ScannedEventRow struct {
id string
eventType string
subject string
source string
eventTime time.Time
dataStr string
validationError string
ingestedAt time.Time
storedAt time.Time
}

func parseEventRow(row ScannedEventRow) (api.IngestedEvent, error) {
var data interface{}
err := json.Unmarshal([]byte(row.dataStr), &data)
if err != nil {
return api.IngestedEvent{}, fmt.Errorf("query events parse data: %w", err)
}

event := event.New()
event.SetID(row.id)
event.SetType(row.eventType)
event.SetSubject(row.subject)
event.SetSource(row.source)
event.SetTime(row.eventTime)
err = event.SetData("application/json", data)
if err != nil {
return api.IngestedEvent{}, fmt.Errorf("query events set data: %w", err)
}

ingestedEvent := api.IngestedEvent{
Event: event,
}

if row.validationError != "" {
ingestedEvent.ValidationError = &row.validationError
}

ingestedEvent.IngestedAt = row.ingestedAt
ingestedEvent.StoredAt = row.storedAt

return ingestedEvent, nil
}
Loading
Loading