Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support thresholds and the end-of-test summary in distributed execution #3213

Open
wants to merge 1 commit into
base: basic-distributed-execution
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions cmd/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package cmd

import (
"bytes"
"context"
"encoding/json"
"time"

"github.com/sirupsen/logrus"
"github.com/spf13/afero"
"github.com/spf13/cobra"
"go.k6.io/k6/cmd/state"
Expand All @@ -13,11 +16,76 @@ import (
"go.k6.io/k6/lib"
"go.k6.io/k6/loader"
"go.k6.io/k6/metrics"
"go.k6.io/k6/metrics/engine"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"gopkg.in/guregu/null.v3"
)

// TODO: something cleaner
func getMetricsHook(
ctx context.Context, instanceID uint32,
client distributed.DistributedTestClient, logger logrus.FieldLogger,
) func(*engine.MetricsEngine) func() {
logger = logger.WithField("component", "metric-engine-hook")
return func(me *engine.MetricsEngine) func() {
stop := make(chan struct{})
done := make(chan struct{})

dumpMetrics := func() {
logger.Debug("Starting metric dump...")
me.MetricsLock.Lock()
defer me.MetricsLock.Unlock()

metrics := make([]*distributed.MetricDump, 0, len(me.ObservedMetrics))
for _, om := range me.ObservedMetrics {
data, err := om.Sink.Drain()
if err != nil {
logger.Errorf("There was a problem draining the sink for metric %s: %s", om.Name, err)
}
metrics = append(metrics, &distributed.MetricDump{
Name: om.Name,
Data: data,
})
}

data := &distributed.MetricsDump{
InstanceID: instanceID,
Metrics: metrics,
}
_, err := client.SendMetrics(ctx, data)
if err != nil {
logger.Errorf("There was a problem dumping metrics: %s", err)
}
}

go func() {
defer close(done)
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
dumpMetrics()
case <-stop:
dumpMetrics()
return
}
}
}()

finalize := func() {
logger.Debug("Final metric dump...")
close(stop)
<-done
logger.Debug("Done!")
}

return finalize
}
}

// TODO: a whole lot of cleanup, refactoring, error handling and hardening
func getCmdAgent(gs *state.GlobalState) *cobra.Command { //nolint: funlen
c := &cmdsRunAndAgent{gs: gs}
Expand All @@ -42,6 +110,8 @@ func getCmdAgent(gs *state.GlobalState) *cobra.Command { //nolint: funlen
return nil, nil, err
}

c.metricsEngineHook = getMetricsHook(gs.Ctx, resp.InstanceID, client, gs.Logger)

controller, err := distributed.NewAgentController(gs.Ctx, resp.InstanceID, client, gs.Logger)
if err != nil {
return nil, nil, err
Expand Down
72 changes: 70 additions & 2 deletions cmd/coordinator.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
package cmd

import (
"fmt"
"net"
"strings"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"go.k6.io/k6/cmd/state"
"go.k6.io/k6/errext"
"go.k6.io/k6/errext/exitcodes"
"go.k6.io/k6/execution"
"go.k6.io/k6/execution/distributed"
"go.k6.io/k6/lib"
"go.k6.io/k6/metrics/engine"
"google.golang.org/grpc"
)

Expand All @@ -17,19 +24,80 @@ type cmdCoordinator struct {
instanceCount int
}

func (c *cmdCoordinator) run(cmd *cobra.Command, args []string) (err error) {
// TODO: split apart
func (c *cmdCoordinator) run(cmd *cobra.Command, args []string) (err error) { //nolint: funlen
ctx, runAbort := execution.NewTestRunContext(c.gs.Ctx, c.gs.Logger)

test, err := loadAndConfigureLocalTest(c.gs, cmd, args, getPartialConfig)
if err != nil {
return err
}

// Only consolidated options, not derived
testRunState, err := test.buildTestRunState(test.consolidatedConfig.Options)
if err != nil {
return err
}

metricsEngine, err := engine.NewMetricsEngine(testRunState.Registry, c.gs.Logger)
if err != nil {
return err
}

coordinator, err := distributed.NewCoordinatorServer(
c.instanceCount, test.initRunner.MakeArchive(), c.gs.Logger,
c.instanceCount, test.initRunner.MakeArchive(), metricsEngine, c.gs.Logger,
)
if err != nil {
return err
}

if !testRunState.RuntimeOptions.NoSummary.Bool {
defer func() {
c.gs.Logger.Debug("Generating the end-of-test summary...")
summaryResult, serr := test.initRunner.HandleSummary(ctx, &lib.Summary{
Metrics: metricsEngine.ObservedMetrics,
RootGroup: test.initRunner.GetDefaultGroup(),
TestRunDuration: coordinator.GetCurrentTestRunDuration(),
NoColor: c.gs.Flags.NoColor,
UIState: lib.UIState{
IsStdOutTTY: c.gs.Stdout.IsTTY,
IsStdErrTTY: c.gs.Stderr.IsTTY,
},
})
if serr == nil {
serr = handleSummaryResult(c.gs.FS, c.gs.Stdout, c.gs.Stderr, summaryResult)
}
if serr != nil {
c.gs.Logger.WithError(serr).Error("Failed to handle the end-of-test summary")
}
}()
}

if !testRunState.RuntimeOptions.NoThresholds.Bool {
getCurrentTestDuration := coordinator.GetCurrentTestRunDuration
finalizeThresholds := metricsEngine.StartThresholdCalculations(nil, runAbort, getCurrentTestDuration)

defer func() {
// This gets called after all of the outputs have stopped, so we are
// sure there won't be any more metrics being sent.
c.gs.Logger.Debug("Finalizing thresholds...")
breachedThresholds := finalizeThresholds()
if len(breachedThresholds) > 0 {
tErr := errext.WithAbortReasonIfNone(
errext.WithExitCodeIfNone(
fmt.Errorf("thresholds on metrics '%s' have been breached", strings.Join(breachedThresholds, ", ")),
exitcodes.ThresholdsHaveFailed,
), errext.AbortedByThresholdsAfterTestEnd)

if err == nil {
err = tErr
} else {
c.gs.Logger.WithError(tErr).Debug("Breached thresholds, but test already exited with another error")
}
}
}()
}

c.gs.Logger.Infof("Starting gRPC server on %s", c.gRPCAddress)
listener, err := net.Listen("tcp", c.gRPCAddress)
if err != nil {
Expand Down
10 changes: 8 additions & 2 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type cmdsRunAndAgent struct {

// TODO: figure out something more elegant?
loadConfiguredTest func(cmd *cobra.Command, args []string) (*loadedAndConfiguredTest, execution.Controller, error)
metricsEngineHook func(*engine.MetricsEngine) func()
testEndHook func(err error)
}

Expand Down Expand Up @@ -179,9 +180,9 @@ func (c *cmdsRunAndAgent) run(cmd *cobra.Command, args []string) (err error) {
}

// We'll need to pipe metrics to the MetricsEngine and process them if any
// of these are enabled: thresholds, end-of-test summary
// of these are enabled: thresholds, end-of-test summary, engine hook
shouldProcessMetrics := (!testRunState.RuntimeOptions.NoSummary.Bool ||
!testRunState.RuntimeOptions.NoThresholds.Bool)
!testRunState.RuntimeOptions.NoThresholds.Bool || c.metricsEngineHook != nil)
var metricsIngester *engine.OutputIngester
if shouldProcessMetrics {
err = metricsEngine.InitSubMetricsAndThresholds(conf.Options, testRunState.RuntimeOptions.NoThresholds.Bool)
Expand Down Expand Up @@ -244,6 +245,11 @@ func (c *cmdsRunAndAgent) run(cmd *cobra.Command, args []string) (err error) {
stopOutputs(err)
}()

if c.metricsEngineHook != nil {
hookFinalize := c.metricsEngineHook(metricsEngine)
defer hookFinalize()
}

if !testRunState.RuntimeOptions.NoThresholds.Bool {
finalizeThresholds := metricsEngine.StartThresholdCalculations(
metricsIngester, runAbort, executionState.GetCurrentTestRunDuration,
Expand Down
17 changes: 16 additions & 1 deletion execution/distributed/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/sirupsen/logrus"
"go.k6.io/k6/lib"
"go.k6.io/k6/metrics/engine"
)

// CoordinatorServer coordinates multiple k6 agents.
Expand All @@ -21,6 +22,7 @@ type CoordinatorServer struct {
instanceCount int
test *lib.Archive
logger logrus.FieldLogger
metricsEngine *engine.MetricsEngine

testStartTimeLock sync.Mutex
testStartTime *time.Time
Expand All @@ -34,7 +36,7 @@ type CoordinatorServer struct {

// NewCoordinatorServer initializes and returns a new CoordinatorServer.
func NewCoordinatorServer(
instanceCount int, test *lib.Archive, logger logrus.FieldLogger,
instanceCount int, test *lib.Archive, metricsEngine *engine.MetricsEngine, logger logrus.FieldLogger,
) (*CoordinatorServer, error) {
segments, err := test.Options.ExecutionSegment.Split(int64(instanceCount))
if err != nil {
Expand All @@ -58,6 +60,7 @@ func NewCoordinatorServer(
cs := &CoordinatorServer{
instanceCount: instanceCount,
test: test,
metricsEngine: metricsEngine,
logger: logger,
ess: ess,
cc: newCoordinatorController(instanceCount, logger),
Expand Down Expand Up @@ -144,6 +147,18 @@ func (cs *CoordinatorServer) CommandAndControl(stream DistributedTest_CommandAnd
return cs.cc.handleInstanceStream(initInstMsg.InitInstanceID, stream)
}

// SendMetrics accepts and imports the given metrics in the coordinator's MetricsEngine.
func (cs *CoordinatorServer) SendMetrics(_ context.Context, dumpMsg *MetricsDump) (*MetricsDumpResponse, error) {
// TODO: something nicer?
for _, md := range dumpMsg.Metrics {
if err := cs.metricsEngine.ImportMetric(md.Name, md.Data); err != nil {
cs.logger.Errorf("Error merging sink for metric %s: %w", md.Name, err)
// return nil, err
}
}
return &MetricsDumpResponse{}, nil
}

// Wait blocks until all instances have disconnected.
func (cs *CoordinatorServer) Wait() {
cs.wg.Wait()
Expand Down
Loading