Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

poc: send data to segment #1466

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,12 @@ require (
sigs.k8s.io/yaml v1.3.0
)

require gopkg.in/yaml.v3 v3.0.1

require (
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/aymerick/douceur v0.2.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cucumber/gherkin-go/v19 v19.0.3 // indirect
Expand Down Expand Up @@ -120,6 +119,8 @@ require (
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/segmentio/analytics-go/v3 v3.2.1
github.com/segmentio/backo-go v1.0.0 // indirect
github.com/segmentio/ksuid v1.0.4 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect
Expand All @@ -133,6 +134,7 @@ require (
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.1
k8s.io/klog/v2 v2.60.1 // indirect
k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42 // indirect
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kB
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM=
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/bxcodec/faker/v3 v3.8.0 h1:F59Qqnsh0BOtZRC+c4cXoB/VNYDMS3R5mlSpxIap1oU=
github.com/bxcodec/faker/v3 v3.8.0/go.mod h1:gF31YgnMSMKgkvl+fyEo1xuSMbEuieyqfeslGYFjneM=
github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4=
Expand Down Expand Up @@ -634,6 +636,10 @@ github.com/santhosh-tekuri/jsonschema/v3 v3.1.0 h1:levPcBfnazlA1CyCMC3asL/QLZkq9
github.com/santhosh-tekuri/jsonschema/v3 v3.1.0/go.mod h1:8kzK2TC0k0YjOForaAHdNEa7ik0fokNa2k30BKJ/W7Y=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/segmentio/analytics-go/v3 v3.2.1 h1:G+f90zxtc1p9G+WigVyTR0xNfOghOGs/PYAlljLOyeg=
github.com/segmentio/analytics-go/v3 v3.2.1/go.mod h1:p8owAF8X+5o27jmvUognuXxdtqvSGtD0ZrfY2kcS9bE=
github.com/segmentio/backo-go v1.0.0 h1:kbOAtGJY2DqOR0jfRkYEorx/b18RgtepGtY3+Cpe6qA=
github.com/segmentio/backo-go v1.0.0/go.mod h1:kJ9mm9YmoWSkk+oQ+5Cj8DEoRCX2JT6As4kEtIIOp1M=
github.com/segmentio/ksuid v1.0.4 h1:sBo2BdShXjmcugAMwjugoGUdUV0pcxY5mW4xKRn3v4c=
github.com/segmentio/ksuid v1.0.4/go.mod h1:/XUiZBD3kVx5SmUOl55voK5yeAbBNNIed+2O73XgrPE=
github.com/selvatico/go-mocket v1.0.7 h1:jbVa7RkoOCzBanQYiYF+VWgySHZogg25fOIKkM38q5k=
Expand Down
24 changes: 21 additions & 3 deletions internal/kafka/internal/services/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/api"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/auth"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/client/aws"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/client/segment"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/db"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/errors"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/metrics"
Expand Down Expand Up @@ -137,9 +138,13 @@ type kafkaService struct {
dataplaneClusterConfig *config.DataplaneClusterConfig
providerConfig *config.ProviderConfig
clusterPlacementStrategy ClusterPlacementStrategy
segmentClient *segment.SegmentClientFactory
}

func NewKafkaService(connectionFactory *db.ConnectionFactory, clusterService ClusterService, keycloakService sso.KafkaKeycloakService, kafkaConfig *config.KafkaConfig, dataplaneClusterConfig *config.DataplaneClusterConfig, awsConfig *config.AWSConfig, quotaServiceFactory QuotaServiceFactory, awsClientFactory aws.ClientFactory, authorizationService authorization.Authorization, providerConfig *config.ProviderConfig, clusterPlacementStrategy ClusterPlacementStrategy) *kafkaService {
func NewKafkaService(connectionFactory *db.ConnectionFactory, clusterService ClusterService, keycloakService sso.KafkaKeycloakService, kafkaConfig *config.KafkaConfig,
dataplaneClusterConfig *config.DataplaneClusterConfig, awsConfig *config.AWSConfig, quotaServiceFactory QuotaServiceFactory, awsClientFactory aws.ClientFactory,
authorizationService authorization.Authorization, providerConfig *config.ProviderConfig, clusterPlacementStrategy ClusterPlacementStrategy,
segmentClient *segment.SegmentClientFactory) *kafkaService {
return &kafkaService{
connectionFactory: connectionFactory,
clusterService: clusterService,
Expand All @@ -152,6 +157,7 @@ func NewKafkaService(connectionFactory *db.ConnectionFactory, clusterService Clu
dataplaneClusterConfig: dataplaneClusterConfig,
providerConfig: providerConfig,
clusterPlacementStrategy: clusterPlacementStrategy,
segmentClient: segmentClient,
}
}

Expand Down Expand Up @@ -374,11 +380,13 @@ func (k *kafkaService) RegisterKafkaJob(kafkaRequest *dbapi.KafkaRequest) *error
err = errors.NewWithCause(errors.ErrorGeneral, err, "unable to validate your request, please try again")
logger.Logger.Errorf(err.Reason)
}
k.segmentClient.Track("Kafka Instance Create", kafkaRequest.Owner, "failed")
return err
}
if !hasCapacity {
errorMsg := fmt.Sprintf("capacity exhausted in '%s' region for '%s' instance type", kafkaRequest.Region, kafkaRequest.InstanceType)
logger.Logger.Warningf(errorMsg)
k.segmentClient.Track("Kafka Instance Create", kafkaRequest.Owner, "failed")
return errors.TooManyKafkaInstancesReached(fmt.Sprintf("region %s cannot accept instance type: %s at this moment", kafkaRequest.Region, kafkaRequest.InstanceType))
}

Expand All @@ -392,6 +400,7 @@ func (k *kafkaService) RegisterKafkaJob(kafkaRequest *dbapi.KafkaRequest) *error
logger.Logger.Infof(msg)
}

k.segmentClient.Track("Kafka Instance Create", kafkaRequest.Owner, "failed")
return errors.TooManyKafkaInstancesReached(fmt.Sprintf("region %s cannot accept instance type: %s at this moment", kafkaRequest.Region, kafkaRequest.InstanceType))
}

Expand All @@ -401,6 +410,7 @@ func (k *kafkaService) RegisterKafkaJob(kafkaRequest *dbapi.KafkaRequest) *error
subscriptionId, err := k.reserveQuota(kafkaRequest)

if err != nil {
k.segmentClient.Track("Kafka Instance Create", kafkaRequest.Owner, "failed")
return err
}

Expand All @@ -411,11 +421,13 @@ func (k *kafkaService) RegisterKafkaJob(kafkaRequest *dbapi.KafkaRequest) *error
// when creating new kafka - default storage size is assigned
instanceType, instanceTypeErr := k.kafkaConfig.SupportedInstanceTypes.Configuration.GetKafkaInstanceTypeByID(kafkaRequest.InstanceType)
if instanceTypeErr != nil {
k.segmentClient.Track("Kafka Instance Create", kafkaRequest.Owner, "failed")
return errors.InstanceTypeNotSupported(instanceTypeErr.Error())
}

size, sizeErr := instanceType.GetKafkaInstanceSizeByID(kafkaRequest.SizeId)
if sizeErr != nil {
k.segmentClient.Track("Kafka Instance Create", kafkaRequest.Owner, "failed")
return errors.InstancePlanNotSupported(sizeErr.Error())
}

Expand All @@ -427,11 +439,12 @@ func (k *kafkaService) RegisterKafkaJob(kafkaRequest *dbapi.KafkaRequest) *error
// we want to use the correct quota to perform the deletion.
kafkaRequest.QuotaType = k.kafkaConfig.Quota.Type
if err := dbConn.Create(kafkaRequest).Error; err != nil {
k.segmentClient.Track("Kafka Instance Create", kafkaRequest.Owner, "failed")
return errors.NewWithCause(errors.ErrorGeneral, err, "failed to create kafka request") //hide the db error to http caller
}

metrics.UpdateKafkaRequestsStatusSinceCreatedMetric(constants.KafkaRequestStatusAccepted, kafkaRequest.ID, kafkaRequest.ClusterID, time.Since(kafkaRequest.CreatedAt))

k.segmentClient.Track("Kafka Instance Create", kafkaRequest.Owner, "success")
return nil
}

Expand Down Expand Up @@ -561,9 +574,12 @@ func (k *kafkaService) RegisterKafkaDeprovisionJob(ctx context.Context, id strin
// filter kafka request by owner to only retrieve request of the current authenticated user
claims, err := auth.GetClaimsFromContext(ctx)
if err != nil {
k.segmentClient.Track("Kafka Instance Delete", "", "failed")
return errors.NewWithCause(errors.ErrorUnauthenticated, err, "user not authenticated")
}

user, _ := claims.GetUsername()

dbConn := k.connectionFactory.New()

if auth.GetIsAdminFromContext(ctx) {
Expand All @@ -572,12 +588,12 @@ func (k *kafkaService) RegisterKafkaDeprovisionJob(ctx context.Context, id strin
orgId, _ := claims.GetOrgId()
dbConn = dbConn.Where("id = ?", id).Where("organisation_id = ?", orgId)
} else {
user, _ := claims.GetUsername()
dbConn = dbConn.Where("id = ?", id).Where("owner = ? ", user)
}

var kafkaRequest dbapi.KafkaRequest
if err := dbConn.First(&kafkaRequest).Error; err != nil {
k.segmentClient.Track("Kafka Instance Delete", user, "failed")
return services.HandleGetError("KafkaResource", "id", id, err)
}
metrics.IncreaseKafkaTotalOperationsCountMetric(constants.KafkaOperationDeprovision)
Expand All @@ -586,10 +602,12 @@ func (k *kafkaService) RegisterKafkaDeprovisionJob(ctx context.Context, id strin

if executed, err := k.UpdateStatus(id, deprovisionStatus); executed {
if err != nil {
k.segmentClient.Track("Kafka Instance Delete", user, "failed")
return services.HandleGetError("KafkaResource", "id", id, err)
}
metrics.IncreaseKafkaSuccessOperationsCountMetric(constants.KafkaOperationDeprovision)
metrics.UpdateKafkaRequestsStatusSinceCreatedMetric(deprovisionStatus, kafkaRequest.ID, kafkaRequest.ClusterID, time.Since(kafkaRequest.CreatedAt))
k.segmentClient.Track("Kafka Instance Delete", user, "success")
}

return nil
Expand Down
6 changes: 5 additions & 1 deletion internal/kafka/internal/services/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/auth"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/client/aws"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/client/keycloak"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/client/segment"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/db"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/errors"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/services"
Expand Down Expand Up @@ -3970,6 +3971,7 @@ func Test_NewKafkaService(t *testing.T) {
authorizationService authorization.Authorization
providerConfig *config.ProviderConfig
clusterPlacementStrategy ClusterPlacementStrategy
segmentClient *segment.SegmentClientFactory
}
tests := []struct {
name string
Expand All @@ -3989,6 +3991,7 @@ func Test_NewKafkaService(t *testing.T) {
awsClientFactory: &aws.MockClientFactory{},
providerConfig: &config.ProviderConfig{},
clusterPlacementStrategy: &ClusterPlacementStrategyMock{},
segmentClient: &segment.SegmentClientFactory{},
},
want: &kafkaService{
connectionFactory: &db.ConnectionFactory{},
Expand All @@ -4001,14 +4004,15 @@ func Test_NewKafkaService(t *testing.T) {
awsClientFactory: &aws.MockClientFactory{},
providerConfig: &config.ProviderConfig{},
clusterPlacementStrategy: &ClusterPlacementStrategyMock{},
segmentClient: &segment.SegmentClientFactory{},
},
},
}

for _, testcase := range tests {
g := gomega.NewWithT(t)
tt := testcase
g.Expect(NewKafkaService(tt.args.connectionFactory, tt.args.clusterService, tt.args.keycloakService, tt.args.kafkaConfig, tt.args.dataplaneClusterConfig, tt.args.awsConfig, tt.args.quotaServiceFactory, tt.args.awsClientFactory, tt.args.authorizationService, tt.args.providerConfig, tt.args.clusterPlacementStrategy)).To(gomega.Equal(tt.want))
g.Expect(NewKafkaService(tt.args.connectionFactory, tt.args.clusterService, tt.args.keycloakService, tt.args.kafkaConfig, tt.args.dataplaneClusterConfig, tt.args.awsConfig, tt.args.quotaServiceFactory, tt.args.awsClientFactory, tt.args.authorizationService, tt.args.providerConfig, tt.args.clusterPlacementStrategy, tt.args.segmentClient)).To(gomega.Equal(tt.want))
}
}

Expand Down
98 changes: 98 additions & 0 deletions pkg/client/segment/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package segment

import (
"time"

"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/buildinformation"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/api"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/environments"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/logger"

analytics "github.com/segmentio/analytics-go/v3"
)

type SegmentClient interface {
// Adds a message to the queue.
// Messages are sent asynchronously to Segment when batch upload conditions are met,
// see https://segment.com/docs/connections/sources/catalog/libraries/server/go/#batching for further information.
//
// Returns an error if the message cannot be added to the queue which may happen if the client was closed at the time of the method
// call or if the message is malformed.
Track(event, userId, result string)
}

type SegmentClientFactory struct {
// Client used for interacting with Segment
client analytics.Client
// KAS Fleet Manager build info.
// This can be used for setting information about KFM in the event context.
kfmBuildInfo *buildinformation.BuildInfo
}

var _ SegmentClient = &SegmentClientFactory{}

func NewClientWithConfig(apiKey string, config analytics.Config) (*SegmentClientFactory, error) {
// https://pkg.go.dev/gopkg.in/segmentio/analytics-go.v3#Client
client, err := analytics.NewWithConfig(apiKey, config)
if err != nil {
return nil, err
}

kfmBuildInfo, err := buildinformation.GetBuildInfo()
if err != nil {
logger.Logger.Error(err)
return nil, err
}

return &SegmentClientFactory{
client: client,
kfmBuildInfo: kfmBuildInfo,
}, nil
}

func (sc *SegmentClientFactory) Track(event, userId, result string) {
// The message to be sent to Segment. This is defined by an analytics.Track object
// The analytics.Track spec is described here: https://segment.com/docs/connections/spec/common/#structure
message := analytics.Track{
// Required Fields
// The event name/descriptor.
// Event naming convention: must be Upper Case i.e. Kafka Instance Creation (based on https://github.com/bf2fc6cc711aee1a0c2a/kafka-ui/pull/1004#discussion_r1048680467)
Event: event,
// A unique identifier for a user
UserId: userId,

// Optional Fields
// Time when the event occurred
// https://segment.com/docs/connections/spec/common/#timestamps
Timestamp: time.Now(),
// Additional information related to the event can be added to properties.
// Any key-value data can be set here.
// https://segment.com/docs/connections/spec/track/#properties
Properties: analytics.NewProperties().
Set("result", result),
// Additional information not directly related to the event can be added here
// https://segment.com/docs/connections/spec/common/#context
Context: &analytics.Context{
App: analytics.AppInfo{
Name: "KAS Fleet Manager",
Version: sc.kfmBuildInfo.GetCommitSHA(),
Namespace: environments.GetEnvironmentStrFromEnv(),
},
},
}

// If userId can't be set, 'anonymousId' must be specified
if message.UserId == "" {
// Note that the segment client uses google/uuid for generating ids
// https://github.com/segmentio/analytics-go/blob/v3.0/config.go#L172
message.AnonymousId = api.NewID()
}

// Adds message to queue, the client handles the upload to Segment when it meets the upload requirements
// based on how the client is configured.
//
// This returns an error if the message can't be added to the queue.
if err := sc.client.Enqueue(message); err != nil {
logger.Logger.Error(err)
}
}
8 changes: 8 additions & 0 deletions pkg/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ func (l LogEvent) ToString() string {
type UHCLogger interface {
V(level int32) UHCLogger
Infof(format string, args ...interface{})
// Logf is required in order to use it as a logger for the Segment client
Logf(format string, args ...interface{})
Warningf(format string, args ...interface{})
Errorf(format string, args ...interface{})
Error(err error)
Expand Down Expand Up @@ -194,6 +196,12 @@ func (l *logger) Infof(format string, args ...interface{}) {
glog.V(glog.Level(l.level)).Infof(prefixed)
}

// Logf is required in order to use it as a logger for the Segment client
func (l *logger) Logf(format string, args ...interface{}) {
prefixed := l.prepareLogPrefix(format, args...)
glog.V(glog.Level(l.level)).Infof(prefixed)
}

func (l *logger) Warningf(format string, args ...interface{}) {
prefixed := l.prepareLogPrefix(format, args...)
glog.Warningln(prefixed)
Expand Down
24 changes: 24 additions & 0 deletions pkg/providers/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/client/keycloak"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/client/observatorium"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/client/ocm"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/client/segment"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/cmd/migrate"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/cmd/serve"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/db"
Expand All @@ -21,6 +22,8 @@ import (
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/services/sso"
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/workers"
"github.com/goava/di"

"github.com/segmentio/analytics-go/v3"
)

func CoreConfigProviders() di.Option {
Expand Down Expand Up @@ -96,6 +99,27 @@ func ServiceProviders() di.Option {
Build()
}),

di.Provide(func() *segment.SegmentClientFactory {
// The API Key should be read from a secret file
// For more information on analytics.Config fields, see https://pkg.go.dev/gopkg.in/segmentio/analytics-go.v3#Config
//
// Note on API errors: Segment API returns a 200 response for all API requests apart from certain errors which return 400
// see https://segment.com/docs/connections/sources/catalog/libraries/server/http-api/#errors for more details.
// e.g. The API will return a 200 even if the API Key is invalid.
segmentClient, err := segment.NewClientWithConfig("API_KEY", analytics.Config{
// Determines when the client will upload all messages in the queue.
// By default this is set to 20. The client will only send messages to Segment if the queue reaches this number or if the
// Interval was reached first (by default this is every 5 seconds)
BatchSize: 1,
Logger: logger.Logger,
Verbose: true,
})
if err != nil {
logger.Logger.Error(err)
}
return segmentClient
}),

// Types registered as a BootService are started when the env is started
di.Provide(server.NewAPIServer, di.As(new(environments.BootService))),
di.Provide(server.NewMetricsServer, di.As(new(environments.BootService))),
Expand Down