Skip to content

Commit

Permalink
PMM-13057 Changes to generic endpoint and generic structures.
Browse files Browse the repository at this point in the history
  • Loading branch information
JiriCtvrtka committed Oct 9, 2024
1 parent a29235f commit 3865d07
Show file tree
Hide file tree
Showing 17 changed files with 119 additions and 100 deletions.
2 changes: 1 addition & 1 deletion managed/models/template_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import (
"github.com/AlekSi/pointer"
"github.com/brianvoe/gofakeit/v6"
"github.com/google/uuid"
"github.com/percona/promconfig"
"github.com/percona/saas/pkg/alert"
"github.com/percona/saas/pkg/common"
"github.com/percona/promconfig"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/reform.v1"
Expand Down
8 changes: 4 additions & 4 deletions managed/services/telemetry/datasource_envvars.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"context"
"os"

pmmv1 "github.com/percona/saas/gen/telemetry/events/pmm"
reporter "github.com/percona/saas/gen/telemetry/generic"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -51,8 +51,8 @@ func (d *dsEnvvars) Init(_ context.Context) error {
return nil
}

func (d *dsEnvvars) FetchMetrics(_ context.Context, config Config) ([]*pmmv1.ServerMetric_Metric, error) {
var metrics []*pmmv1.ServerMetric_Metric
func (d *dsEnvvars) FetchMetrics(_ context.Context, config Config) ([]*reporter.GenericReport_Metric, error) {
var metrics []*reporter.GenericReport_Metric

check := make(map[string]bool, len(config.Data))

Expand All @@ -69,7 +69,7 @@ func (d *dsEnvvars) FetchMetrics(_ context.Context, config Config) ([]*pmmv1.Ser

check[col.MetricName] = true

metrics = append(metrics, &pmmv1.ServerMetric_Metric{
metrics = append(metrics, &reporter.GenericReport_Metric{
Key: col.MetricName,
Value: value,
})
Expand Down
5 changes: 2 additions & 3 deletions managed/services/telemetry/datasource_grafanadb_select.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import (
"net/url"
"time"

// Events, errors and driver for grafana database.
pmmv1 "github.com/percona/saas/gen/telemetry/events/pmm"
reporter "github.com/percona/saas/gen/telemetry/generic"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -95,7 +94,7 @@ func openGrafanaDBConnection(config DSConfigGrafanaDB, l *logrus.Entry) (*sql.DB
return db, nil
}

func (d *dsGrafanaDBSelect) FetchMetrics(ctx context.Context, config Config) ([]*pmmv1.ServerMetric_Metric, error) {
func (d *dsGrafanaDBSelect) FetchMetrics(ctx context.Context, config Config) ([]*reporter.GenericReport_Metric, error) {
return fetchMetricsFromDB(ctx, d.l, d.config.Timeout, d.db, config)
}

Expand Down
4 changes: 2 additions & 2 deletions managed/services/telemetry/datasource_pmmdb_select.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"net/url"
"time"

pmmv1 "github.com/percona/saas/gen/telemetry/events/pmm"
reporter "github.com/percona/saas/gen/telemetry/generic"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -94,7 +94,7 @@ func openPMMDBConnection(config DSConfigPMMDB, l *logrus.Entry) (*sql.DB, error)
return db, nil
}

func (d *dsPmmDBSelect) FetchMetrics(ctx context.Context, config Config) ([]*pmmv1.ServerMetric_Metric, error) {
func (d *dsPmmDBSelect) FetchMetrics(ctx context.Context, config Config) ([]*reporter.GenericReport_Metric, error) {
return fetchMetricsFromDB(ctx, d.l, d.config.Timeout, d.db, config)
}

Expand Down
4 changes: 2 additions & 2 deletions managed/services/telemetry/datasource_qandb_select.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"context"
"database/sql"

pmmv1 "github.com/percona/saas/gen/telemetry/events/pmm"
reporter "github.com/percona/saas/gen/telemetry/generic"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -69,7 +69,7 @@ func openQANDBConnection(dsn string, enabled bool, l *logrus.Entry) (*sql.DB, er
return db, nil
}

func (d *dsQanDBSelect) FetchMetrics(ctx context.Context, config Config) ([]*pmmv1.ServerMetric_Metric, error) {
func (d *dsQanDBSelect) FetchMetrics(ctx context.Context, config Config) ([]*reporter.GenericReport_Metric, error) {
return fetchMetricsFromDB(ctx, d.l, d.config.Timeout, d.db, config)
}

Expand Down
10 changes: 5 additions & 5 deletions managed/services/telemetry/datasource_victoria_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"context"
"time"

pmmv1 "github.com/percona/saas/gen/telemetry/events/pmm"
reporter "github.com/percona/saas/gen/telemetry/generic"
"github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -66,7 +66,7 @@ func NewDataSourceVictoriaMetrics(config DSConfigVM, l *logrus.Entry) (DataSourc
}, nil
}

func (d *dataSourceVictoriaMetrics) FetchMetrics(ctx context.Context, config Config) ([]*pmmv1.ServerMetric_Metric, error) {
func (d *dataSourceVictoriaMetrics) FetchMetrics(ctx context.Context, config Config) ([]*reporter.GenericReport_Metric, error) {
localCtx, cancel := context.WithTimeout(ctx, d.config.Timeout)
defer cancel()

Expand All @@ -75,20 +75,20 @@ func (d *dataSourceVictoriaMetrics) FetchMetrics(ctx context.Context, config Con
return nil, err
}

var metrics []*pmmv1.ServerMetric_Metric
var metrics []*reporter.GenericReport_Metric

for _, v := range result.(model.Vector) { //nolint:forcetypeassert
for _, configItem := range config.Data {
if configItem.Label != "" {
value := v.Metric[model.LabelName(configItem.Label)]
metrics = append(metrics, &pmmv1.ServerMetric_Metric{
metrics = append(metrics, &reporter.GenericReport_Metric{
Key: configItem.MetricName,
Value: string(value),
})
}

if configItem.Value != "" {
metrics = append(metrics, &pmmv1.ServerMetric_Metric{
metrics = append(metrics, &reporter.GenericReport_Metric{
Key: configItem.MetricName,
Value: v.Value.String(),
})
Expand Down
8 changes: 4 additions & 4 deletions managed/services/telemetry/datasources.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"time"

"github.com/AlekSi/pointer"
pmmv1 "github.com/percona/saas/gen/telemetry/events/pmm"
reporter "github.com/percona/saas/gen/telemetry/generic"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -77,7 +77,7 @@ func (r *dataSourceRegistry) LocateTelemetryDataSource(name string) (DataSource,
return ds, nil
}

func fetchMetricsFromDB(ctx context.Context, l *logrus.Entry, timeout time.Duration, db *sql.DB, config Config) ([]*pmmv1.ServerMetric_Metric, error) {
func fetchMetricsFromDB(ctx context.Context, l *logrus.Entry, timeout time.Duration, db *sql.DB, config Config) ([]*reporter.GenericReport_Metric, error) {
localCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
tx, err := db.BeginTx(localCtx, &sql.TxOptions{})
Expand All @@ -104,7 +104,7 @@ func fetchMetricsFromDB(ctx context.Context, l *logrus.Entry, timeout time.Durat
}
cfgColumns := config.mapByColumn()

var metrics []*pmmv1.ServerMetric_Metric
var metrics []*reporter.GenericReport_Metric
for rows.Next() {
if err := rows.Scan(values...); err != nil {
l.Error(err)
Expand All @@ -116,7 +116,7 @@ func fetchMetricsFromDB(ctx context.Context, l *logrus.Entry, timeout time.Durat

if cols, ok := cfgColumns[column]; ok {
for _, col := range cols {
metrics = append(metrics, &pmmv1.ServerMetric_Metric{
metrics = append(metrics, &reporter.GenericReport_Metric{
Key: col.MetricName,
Value: value,
})
Expand Down
4 changes: 2 additions & 2 deletions managed/services/telemetry/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"context"

pmmv1 "github.com/percona/saas/gen/telemetry/events/pmm"
reporter "github.com/percona/saas/gen/telemetry/reporter"
reporter "github.com/percona/saas/gen/telemetry/generic"

serverv1 "github.com/percona/pmm/api/server/v1"
)
Expand All @@ -42,7 +42,7 @@ type DataSourceLocator interface {
// DataSource telemetry data source.
type DataSource interface {
Init(ctx context.Context) error
FetchMetrics(ctx context.Context, config Config) ([]*pmmv1.ServerMetric_Metric, error)
FetchMetrics(ctx context.Context, config Config) ([]*reporter.GenericReport_Metric, error)
Dispose(ctx context.Context) error
Enabled() bool
}
4 changes: 2 additions & 2 deletions managed/services/telemetry/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ package telemetry
import (
"context"

pmmv1 "github.com/percona/saas/gen/telemetry/events/pmm"
reporter "github.com/percona/saas/gen/telemetry/generic"
)

// Extension provides dynamic extension point for Telemetry.
type Extension interface {
FetchMetrics(ctx context.Context, config Config) ([]*pmmv1.ServerMetric_Metric, error)
FetchMetrics(ctx context.Context, config Config) ([]*reporter.GenericReport_Metric, error)
}
12 changes: 6 additions & 6 deletions managed/services/telemetry/mock_data_source_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions managed/services/telemetry/mock_sender_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 24 additions & 17 deletions managed/services/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

"github.com/google/uuid"
pmmv1 "github.com/percona/saas/gen/telemetry/events/pmm"
reporter "github.com/percona/saas/gen/telemetry/reporter"
reporter "github.com/percona/saas/gen/telemetry/generic"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/durationpb"
Expand Down Expand Up @@ -53,7 +53,7 @@ type Service struct {
os string
sDistributionMethod serverv1.DistributionMethod
tDistributionMethod pmmv1.DistributionMethod
sendCh chan *pmmv1.ServerMetric
sendCh chan *reporter.GenericReport
dataSourcesMap map[DataSourceName]DataSource

extensions map[ExtensionType]Extension
Expand Down Expand Up @@ -89,7 +89,7 @@ func NewService(db *reform.DB, portalClient *platform.Client, pmmVersion string,
config: config,
dsRegistry: registry,
dus: dus,
sendCh: make(chan *pmmv1.ServerMetric, sendChSize),
sendCh: make(chan *reporter.GenericReport, sendChSize),
extensions: extensions,
}

Expand Down Expand Up @@ -167,7 +167,7 @@ func (s *Service) DistributionMethod() serverv1.DistributionMethod {

func (s *Service) processSendCh(ctx context.Context) {
var reportsBufSync sync.Mutex
var reportsBuf []*pmmv1.ServerMetric
var reportsBuf []*reporter.GenericReport
var sendCtx context.Context //nolint:contextcheck
var cancel context.CancelFunc

Expand All @@ -182,14 +182,19 @@ func (s *Service) processSendCh(ctx context.Context) {
sendCtx, cancel = context.WithTimeout(ctx, s.config.Reporting.SendTimeout)

reportsBufSync.Lock()
reportsBuf = append(reportsBuf, report)
reportsBuf = append(reportsBuf, &reporter.GenericReport{
Id: string(report.GetId()),
CreateTime: report.GetCreateTime(),
InstanceId: string(report.GetInstanceId()),
ProductFamily: reporter.ProductFamily_PRODUCT_FAMILY_PMM,
Metrics: report.Metrics,
})
reportsToSend := reportsBuf
reportsBuf = []*pmmv1.ServerMetric{}
reportsBufSync.Unlock()

go func(ctx context.Context) {
err := s.send(ctx, &reporter.ReportRequest{
Metrics: reportsToSend,
Reports: []*reporter.GenericReport{},
})
if err != nil {
s.l.Debugf("Telemetry info not sent, due to error: %s.", err)
Expand All @@ -211,7 +216,7 @@ func (s *Service) processSendCh(ctx context.Context) {
}
}

func (s *Service) prepareReport(ctx context.Context) *pmmv1.ServerMetric {
func (s *Service) prepareReport(ctx context.Context) *reporter.GenericReport {
initializedDataSources := make(map[DataSourceName]DataSource)
telemetryMetric, _ := s.makeMetric(ctx)
var totalTime time.Duration
Expand Down Expand Up @@ -324,7 +329,7 @@ func (s *Service) locateDataSources(telemetryConfig []Config) map[DataSourceName
return dataSources
}

func (s *Service) makeMetric(ctx context.Context) (*pmmv1.ServerMetric, error) {
func (s *Service) makeMetric(ctx context.Context) (*reporter.GenericReport, error) {
var settings *models.Settings
useServerID := false
err := s.db.InTransaction(func(tx *reform.TX) error {
Expand Down Expand Up @@ -362,13 +367,15 @@ func (s *Service) makeMetric(ctx context.Context) (*pmmv1.ServerMetric, error) {
_, distMethod, _ := s.dus.GetDistributionMethodAndOS()

eventID := uuid.New()
return &pmmv1.ServerMetric{
Id: eventID[:],
Time: timestamppb.New(time.Now()),
PmmServerTelemetryId: serverID,
PmmServerVersion: s.pmmVersion,
UpDuration: durationpb.New(time.Since(s.start)),
DistributionMethod: distMethod,
return &reporter.GenericReport{
Id: string(eventID[:]),
CreateTime: timestamppb.New(time.Now()),
InstanceId: string(serverID),
Metrics: []*reporter.GenericReport_Metric{
{Key: "PMMServerVersion", Value: s.pmmVersion},
{Key: "UpDuration", Value: durationpb.New(time.Since(s.start)).String()},
{Key: "DistributionMethod", Value: distMethod.String()},
},
}, nil
}

Expand Down Expand Up @@ -412,7 +419,7 @@ func (s *Service) send(ctx context.Context, report *reporter.ReportRequest) erro
}

// Format returns the formatted representation of the provided server metric.
func (s *Service) Format(report *pmmv1.ServerMetric) string {
func (s *Service) Format(report *reporter.GenericReport) string {
var builder strings.Builder
for _, m := range report.Metrics {
builder.WriteString(m.Key)
Expand Down
Loading

0 comments on commit 3865d07

Please sign in to comment.