From 15a7adaf0c38b0fdc9e4e05f5f2f0f5de4585737 Mon Sep 17 00:00:00 2001 From: Jacek Wysocki Date: Wed, 24 Jul 2024 10:52:36 +0200 Subject: [PATCH] feat: added runner id to the agent runner --- cmd/api-server/main.go | 1 + internal/config/config.go | 1 + internal/config/procontext.go | 1 + pkg/agent/agent.go | 23 +++++++++++++++-------- pkg/agent/events.go | 7 +------ pkg/agent/logs.go | 2 -- pkg/agent/testworkflows.go | 2 -- 7 files changed, 19 insertions(+), 18 deletions(-) diff --git a/cmd/api-server/main.go b/cmd/api-server/main.go index 97222a56e8f..ede2e618692 100644 --- a/cmd/api-server/main.go +++ b/cmd/api-server/main.go @@ -894,6 +894,7 @@ func newGRPCTransportCredentials(cfg *config.Config) (credentials.TransportCrede func newProContext(cfg *config.Config, grpcClient cloud.TestKubeCloudAPIClient) config.ProContext { proContext := config.ProContext{ APIKey: cfg.TestkubeProAPIKey, + RunnerId: cfg.TestkubeProRunnerId, URL: cfg.TestkubeProURL, TLSInsecure: cfg.TestkubeProTLSInsecure, WorkerCount: cfg.TestkubeProWorkerCount, diff --git a/internal/config/config.go b/internal/config/config.go index 7f48a517350..922dad71221 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -67,6 +67,7 @@ type Config struct { TestkubeOAuthProvider string `envconfig:"TESTKUBE_OAUTH_PROVIDER" default:""` TestkubeOAuthScopes string `envconfig:"TESTKUBE_OAUTH_SCOPES" default:""` TestkubeProAPIKey string `envconfig:"TESTKUBE_PRO_API_KEY" default:""` + TestkubeProRunnerId string `envconfig:"TESTKUBE_PRO_RUNNER_ID" default:""` TestkubeProURL string `envconfig:"TESTKUBE_PRO_URL" default:""` TestkubeProTLSInsecure bool `envconfig:"TESTKUBE_PRO_TLS_INSECURE" default:"false"` TestkubeProWorkerCount int `envconfig:"TESTKUBE_PRO_WORKER_COUNT" default:"50"` diff --git a/internal/config/procontext.go b/internal/config/procontext.go index aa3b090be81..7fdb1e41c46 100644 --- a/internal/config/procontext.go +++ b/internal/config/procontext.go @@ -13,4 +13,5 @@ type ProContext struct { Migrate string ConnectionTimeout int DashboardURI string + RunnerId string } diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 8d98b4ea2b5..bb9018ec17d 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -34,6 +34,7 @@ import ( const ( timeout = 10 * time.Second apiKeyMeta = "api-key" + runnerIdMeta = "runner-id" clusterIDMeta = "cluster-id" cloudMigrateMeta = "migrate" orgIdMeta = "environment-id" @@ -173,7 +174,6 @@ func NewAgent(logger *zap.SugaredLogger, return &Agent{ handler: handler, logger: logger.With("service", "Agent", "environmentId", proContext.EnvID), - apiKey: proContext.APIKey, client: client, events: make(chan testkube.Event), workerCount: proContext.WorkerCount, @@ -212,8 +212,22 @@ func (ag *Agent) Run(ctx context.Context) error { } } +// updateContextWithMetadata adds metadata to the context +func (ag *Agent) updateContextWithMetadata(ctx context.Context) context.Context { + ctx = AddAPIKeyMeta(ctx, ag.proContext.APIKey) + ctx = metadata.AppendToOutgoingContext(ctx, clusterIDMeta, ag.clusterID) + ctx = metadata.AppendToOutgoingContext(ctx, cloudMigrateMeta, ag.proContext.Migrate) + ctx = metadata.AppendToOutgoingContext(ctx, envIdMeta, ag.proContext.EnvID) + ctx = metadata.AppendToOutgoingContext(ctx, orgIdMeta, ag.proContext.OrgID) + ctx = metadata.AppendToOutgoingContext(ctx, runnerIdMeta, ag.proContext.RunnerId) + + return ctx +} func (ag *Agent) run(ctx context.Context) (err error) { + ctx = ag.updateContextWithMetadata(ctx) + g, groupCtx := errgroup.WithContext(ctx) + g.Go(func() error { return ag.runCommandLoop(groupCtx) }) @@ -308,13 +322,6 @@ func (ag *Agent) receiveCommand(ctx context.Context, stream cloud.TestKubeCloudA } func (ag *Agent) runCommandLoop(ctx context.Context) error { - ctx = AddAPIKeyMeta(ctx, ag.proContext.APIKey) - - ctx = metadata.AppendToOutgoingContext(ctx, clusterIDMeta, ag.clusterID) - ctx = metadata.AppendToOutgoingContext(ctx, cloudMigrateMeta, ag.proContext.Migrate) - ctx = metadata.AppendToOutgoingContext(ctx, envIdMeta, ag.proContext.EnvID) - ctx = metadata.AppendToOutgoingContext(ctx, orgIdMeta, ag.proContext.OrgID) - ag.logger.Infow("initiating streaming connection with control plane") // creates a new Stream from the client side. ctx is used for the lifetime of the stream. opts := []grpc.CallOption{grpc.UseCompressor(gzip.Name), grpc.MaxCallRecvMsgSize(math.MaxInt32)} diff --git a/pkg/agent/events.go b/pkg/agent/events.go index 42cd201d5c0..cb3215d1872 100644 --- a/pkg/agent/events.go +++ b/pkg/agent/events.go @@ -9,7 +9,6 @@ import ( "github.com/pkg/errors" "google.golang.org/grpc" "google.golang.org/grpc/encoding/gzip" - "google.golang.org/grpc/metadata" "github.com/kubeshop/testkube/pkg/api/v1/testkube" "github.com/kubeshop/testkube/pkg/cloud" @@ -62,11 +61,7 @@ func (ag *Agent) Notify(event testkube.Event) (result testkube.EventResult) { } func (ag *Agent) runEventLoop(ctx context.Context) error { - opts := []grpc.CallOption{grpc.UseCompressor(gzip.Name)} - md := metadata.Pairs(apiKeyMeta, ag.apiKey) - ctx = metadata.NewOutgoingContext(ctx, md) - - stream, err := ag.client.Send(ctx, opts...) + stream, err := ag.client.Send(ctx, grpc.UseCompressor(gzip.Name)) if err != nil { ag.logger.Errorf("failed to execute: %v", err) return errors.Wrap(err, "failed to setup stream") diff --git a/pkg/agent/logs.go b/pkg/agent/logs.go index 4040e6f6f68..32340b2543c 100644 --- a/pkg/agent/logs.go +++ b/pkg/agent/logs.go @@ -18,8 +18,6 @@ import ( const logStreamRetryCount = 10 func (ag *Agent) runLogStreamLoop(ctx context.Context) error { - ctx = AddAPIKeyMeta(ctx, ag.apiKey) - ag.logger.Infow("initiating log streaming connection with control plane") // creates a new Stream from the client side. ctx is used for the lifetime of the stream. opts := []grpc.CallOption{grpc.UseCompressor(gzip.Name), grpc.MaxCallRecvMsgSize(math.MaxInt32)} diff --git a/pkg/agent/testworkflows.go b/pkg/agent/testworkflows.go index 10f42021538..c1485f6d576 100644 --- a/pkg/agent/testworkflows.go +++ b/pkg/agent/testworkflows.go @@ -29,8 +29,6 @@ func getTestWorkflowNotificationType(n testkube.TestWorkflowExecutionNotificatio } func (ag *Agent) runTestWorkflowNotificationsLoop(ctx context.Context) error { - ctx = AddAPIKeyMeta(ctx, ag.apiKey) - ag.logger.Infow("initiating workflow notifications streaming connection with Cloud API") // creates a new Stream from the client side. ctx is used for the lifetime of the stream. opts := []grpc.CallOption{grpc.UseCompressor(gzip.Name), grpc.MaxCallRecvMsgSize(math.MaxInt32)}