From 34baf0f285320db052f8ddb23a8a0598d3518d84 Mon Sep 17 00:00:00 2001 From: frodesundby Date: Tue, 16 Apr 2024 15:07:20 +0200 Subject: [PATCH] Add materialized view for resource utilization and refresh it Co-authored-by: Thomas Krampl --- internal/database/gensql/mock_querier.go | 46 +++++++++++++++++++ internal/database/gensql/querier.go | 2 + internal/database/gensql/resourceusage.sql.go | 23 +++++----- .../0004_mat_view_resource_utilization.sql | 15 ++++++ internal/database/mock_database.go | 46 +++++++++++++++++++ internal/database/queries/resourceusage.sql | 17 ++----- internal/database/resource_utilization.go | 6 +++ internal/resourceusage/updater.go | 31 +++++++++---- 8 files changed, 154 insertions(+), 32 deletions(-) create mode 100644 internal/database/migrations/0004_mat_view_resource_utilization.sql diff --git a/internal/database/gensql/mock_querier.go b/internal/database/gensql/mock_querier.go index 1640fd209..019f31004 100644 --- a/internal/database/gensql/mock_querier.go +++ b/internal/database/gensql/mock_querier.go @@ -5167,6 +5167,52 @@ func (_c *MockQuerier_MonthlyCostForTeam_Call) RunAndReturn(run func(context.Con return _c } +// RefreshResourceTeamRange provides a mock function with given fields: ctx +func (_m *MockQuerier) RefreshResourceTeamRange(ctx context.Context) error { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for RefreshResourceTeamRange") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockQuerier_RefreshResourceTeamRange_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RefreshResourceTeamRange' +type MockQuerier_RefreshResourceTeamRange_Call struct { + *mock.Call +} + +// RefreshResourceTeamRange is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockQuerier_Expecter) RefreshResourceTeamRange(ctx interface{}) *MockQuerier_RefreshResourceTeamRange_Call { + return &MockQuerier_RefreshResourceTeamRange_Call{Call: _e.mock.On("RefreshResourceTeamRange", ctx)} +} + +func (_c *MockQuerier_RefreshResourceTeamRange_Call) Run(run func(ctx context.Context)) *MockQuerier_RefreshResourceTeamRange_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *MockQuerier_RefreshResourceTeamRange_Call) Return(_a0 error) *MockQuerier_RefreshResourceTeamRange_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockQuerier_RefreshResourceTeamRange_Call) RunAndReturn(run func(context.Context) error) *MockQuerier_RefreshResourceTeamRange_Call { + _c.Call.Return(run) + return _c +} + // RemoveAllServiceAccountRoles provides a mock function with given fields: ctx, serviceAccountID func (_m *MockQuerier) RemoveAllServiceAccountRoles(ctx context.Context, serviceAccountID uuid.UUID) error { ret := _m.Called(ctx, serviceAccountID) diff --git a/internal/database/gensql/querier.go b/internal/database/gensql/querier.go index 80cfe4e14..6ea3dc400 100644 --- a/internal/database/gensql/querier.go +++ b/internal/database/gensql/querier.go @@ -115,6 +115,8 @@ type Querier interface { MaxResourceUtilizationDate(ctx context.Context) (pgtype.Timestamptz, error) MonthlyCostForApp(ctx context.Context, arg MonthlyCostForAppParams) ([]*MonthlyCostForAppRow, error) MonthlyCostForTeam(ctx context.Context, teamSlug slug.Slug) ([]*MonthlyCostForTeamRow, error) + // Refresh materialized view resource_team_range + RefreshResourceTeamRange(ctx context.Context) error RemoveAllServiceAccountRoles(ctx context.Context, serviceAccountID uuid.UUID) error RemoveApiKeysFromServiceAccount(ctx context.Context, serviceAccountID uuid.UUID) error RemoveReconcilerOptOut(ctx context.Context, arg RemoveReconcilerOptOutParams) error diff --git a/internal/database/gensql/resourceusage.sql.go b/internal/database/gensql/resourceusage.sql.go index ecb6b791c..24d9e14f4 100644 --- a/internal/database/gensql/resourceusage.sql.go +++ b/internal/database/gensql/resourceusage.sql.go @@ -55,6 +55,16 @@ func (q *Queries) MaxResourceUtilizationDate(ctx context.Context) (pgtype.Timest return column_1, err } +const refreshResourceTeamRange = `-- name: RefreshResourceTeamRange :exec +REFRESH MATERIALIZED VIEW CONCURRENTLY resource_team_range +` + +// Refresh materialized view resource_team_range +func (q *Queries) RefreshResourceTeamRange(ctx context.Context) error { + _, err := q.db.Exec(ctx, refreshResourceTeamRange) + return err +} + const resourceUtilizationForApp = `-- name: ResourceUtilizationForApp :many SELECT id, timestamp, environment, team_slug, app, resource_type, usage, request @@ -269,18 +279,7 @@ func (q *Queries) ResourceUtilizationRangeForApp(ctx context.Context, arg Resour } const resourceUtilizationRangeForTeam = `-- name: ResourceUtilizationRangeForTeam :one -WITH team_range AS ( - SELECT timestamp - FROM - resource_utilization_metrics - WHERE - team_slug = $1 -) -SELECT - MIN(timestamp)::timestamptz AS "from", - MAX(timestamp)::timestamptz AS "to" -FROM - team_range +SELECT "from", "to" FROM resource_team_range WHERE team_slug = $1 ` type ResourceUtilizationRangeForTeamRow struct { diff --git a/internal/database/migrations/0004_mat_view_resource_utilization.sql b/internal/database/migrations/0004_mat_view_resource_utilization.sql new file mode 100644 index 000000000..c35f35d0b --- /dev/null +++ b/internal/database/migrations/0004_mat_view_resource_utilization.sql @@ -0,0 +1,15 @@ +-- +goose Up + +-- Team range for resource utilization +DROP MATERIALIZED VIEW IF EXISTS resource_team_range; +CREATE MATERIALIZED VIEW resource_team_range AS +SELECT + team_slug, + MIN(timestamp)::timestamptz AS "from", + MAX(timestamp)::timestamptz AS "to" +FROM + resource_utilization_metrics +GROUP BY + team_slug; + +CREATE UNIQUE INDEX ON resource_team_range (team_slug); \ No newline at end of file diff --git a/internal/database/mock_database.go b/internal/database/mock_database.go index e28b0e0af..fe24b2285 100644 --- a/internal/database/mock_database.go +++ b/internal/database/mock_database.go @@ -5400,6 +5400,52 @@ func (_c *MockDatabase_ResourceUtilizationRangeForTeam_Call) RunAndReturn(run fu return _c } +// ResourceUtilizationRefresh provides a mock function with given fields: ctx +func (_m *MockDatabase) ResourceUtilizationRefresh(ctx context.Context) error { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for ResourceUtilizationRefresh") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockDatabase_ResourceUtilizationRefresh_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ResourceUtilizationRefresh' +type MockDatabase_ResourceUtilizationRefresh_Call struct { + *mock.Call +} + +// ResourceUtilizationRefresh is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockDatabase_Expecter) ResourceUtilizationRefresh(ctx interface{}) *MockDatabase_ResourceUtilizationRefresh_Call { + return &MockDatabase_ResourceUtilizationRefresh_Call{Call: _e.mock.On("ResourceUtilizationRefresh", ctx)} +} + +func (_c *MockDatabase_ResourceUtilizationRefresh_Call) Run(run func(ctx context.Context)) *MockDatabase_ResourceUtilizationRefresh_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *MockDatabase_ResourceUtilizationRefresh_Call) Return(_a0 error) *MockDatabase_ResourceUtilizationRefresh_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockDatabase_ResourceUtilizationRefresh_Call) RunAndReturn(run func(context.Context) error) *MockDatabase_ResourceUtilizationRefresh_Call { + _c.Call.Return(run) + return _c +} + // ResourceUtilizationUpsert provides a mock function with given fields: ctx, arg func (_m *MockDatabase) ResourceUtilizationUpsert(ctx context.Context, arg []gensql.ResourceUtilizationUpsertParams) *gensql.ResourceUtilizationUpsertBatchResults { ret := _m.Called(ctx, arg) diff --git a/internal/database/queries/resourceusage.sql b/internal/database/queries/resourceusage.sql index 1f7a1bb2c..a7bce519d 100644 --- a/internal/database/queries/resourceusage.sql +++ b/internal/database/queries/resourceusage.sql @@ -1,17 +1,6 @@ -- ResourceUtilizationRangeForTeam will return the min and max timestamps for a specific team. -- name: ResourceUtilizationRangeForTeam :one -WITH team_range AS ( - SELECT timestamp - FROM - resource_utilization_metrics - WHERE - team_slug = @team_slug -) -SELECT - MIN(timestamp)::timestamptz AS "from", - MAX(timestamp)::timestamptz AS "to" -FROM - team_range; +SELECT "from", "to" FROM resource_team_range WHERE team_slug = @team_slug; -- ResourceUtilizationRangeForApp will return the min and max timestamps for a specific app. -- name: ResourceUtilizationRangeForApp :one @@ -134,3 +123,7 @@ WHERE AND timestamp >= sqlc.arg(timestamp)::timestamptz - INTERVAL '1 week' AND timestamp < sqlc.arg(timestamp)::timestamptz AND request > usage; + +-- Refresh materialized view resource_team_range +-- name: RefreshResourceTeamRange :exec +REFRESH MATERIALIZED VIEW CONCURRENTLY resource_team_range; \ No newline at end of file diff --git a/internal/database/resource_utilization.go b/internal/database/resource_utilization.go index 39ad14ea1..33e722b78 100644 --- a/internal/database/resource_utilization.go +++ b/internal/database/resource_utilization.go @@ -16,6 +16,7 @@ type ResourceUtilizationRepo interface { ResourceUtilizationOverageForTeam(ctx context.Context, teamSlug slug.Slug, timestamp pgtype.Timestamptz, resourceType gensql.ResourceType) ([]*gensql.ResourceUtilizationOverageForTeamRow, error) ResourceUtilizationRangeForApp(ctx context.Context, environment string, teamSlug slug.Slug, app string) (*gensql.ResourceUtilizationRangeForAppRow, error) ResourceUtilizationRangeForTeam(ctx context.Context, teamSlug slug.Slug) (*gensql.ResourceUtilizationRangeForTeamRow, error) + ResourceUtilizationRefresh(ctx context.Context) error ResourceUtilizationUpsert(ctx context.Context, arg []gensql.ResourceUtilizationUpsertParams) *gensql.ResourceUtilizationUpsertBatchResults SpecificResourceUtilizationForApp(ctx context.Context, environment string, teamSlug slug.Slug, app string, resourceType gensql.ResourceType, timestamp pgtype.Timestamptz) (*gensql.SpecificResourceUtilizationForAppRow, error) SpecificResourceUtilizationForTeam(ctx context.Context, teamSlug slug.Slug, resourceType gensql.ResourceType, timestamp pgtype.Timestamptz) (*gensql.SpecificResourceUtilizationForTeamRow, error) @@ -27,6 +28,11 @@ func (d *database) ResourceUtilizationUpsert(ctx context.Context, arg []gensql.R return d.querier.ResourceUtilizationUpsert(ctx, arg) } +func (d *database) ResourceUtilizationRefresh(ctx context.Context) error { + return d.querier.RefreshResourceTeamRange(ctx) +} + + func (d *database) AverageResourceUtilizationForTeam(ctx context.Context, teamSlug slug.Slug, resourceType gensql.ResourceType, timestamp pgtype.Timestamptz) (*gensql.AverageResourceUtilizationForTeamRow, error) { return d.querier.AverageResourceUtilizationForTeam(ctx, gensql.AverageResourceUtilizationForTeamParams{ TeamSlug: teamSlug, diff --git a/internal/resourceusage/updater.go b/internal/resourceusage/updater.go index 8526cf191..bb46fcd0b 100644 --- a/internal/resourceusage/updater.go +++ b/internal/resourceusage/updater.go @@ -15,6 +15,8 @@ import ( promv1 "github.com/prometheus/client_golang/api/prometheus/v1" prom "github.com/prometheus/common/model" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel" + metricapi "go.opentelemetry.io/otel/metric" ) type ( @@ -23,10 +25,11 @@ type ( ) type Updater struct { - k8sClient *k8s.Client - db database.Database - promClients map[string]promv1.API - log logrus.FieldLogger + k8sClient *k8s.Client + db database.Database + promClients map[string]promv1.API + log logrus.FieldLogger + errorCounter metricapi.Int64Counter } const ( @@ -60,11 +63,18 @@ var ( // NewUpdater creates a new resourceusage updater func NewUpdater(k8sClient *k8s.Client, promClients map[string]promv1.API, db database.Database, log logrus.FieldLogger) *Updater { + meter := otel.Meter("resourceusage") + counter, err := meter.Int64Counter("mat_view_update_error", metricapi.WithDescription("Counter for errors when updating materialized view")) + if err != nil { + log.Fatalf("failed to create counter: %v", err) + } + counter.Add(context.Background(), 1) return &Updater{ - k8sClient: k8sClient, - db: db, - promClients: promClients, - log: log, + k8sClient: k8sClient, + db: db, + promClients: promClients, + log: log, + errorCounter: counter, } } @@ -108,6 +118,11 @@ func (u *Updater) UpdateResourceUsage(ctx context.Context) (rowsUpserted int, er } } + if err := u.db.ResourceUtilizationRefresh(ctx); err != nil { + u.log.WithError(err).Errorf("unable to refresh resource team range") + u.errorCounter.Add(ctx, 1) + } + return rowsUpserted, nil }