From 56bab7efe2d8e12076319825214f72408505ccd6 Mon Sep 17 00:00:00 2001 From: skudasov Date: Fri, 20 Sep 2024 16:21:21 +0200 Subject: [PATCH] fix off-by-one bug that can be caught on sub-rpc profiles --- wasp/examples/scenario/vu.go | 4 +-- wasp/wasp.go | 28 +++++++++++-------- wasp/wasp_bench_test.go | 2 +- wasp/wasp_test.go | 52 ++++++++++++++++++++++++++++-------- 4 files changed, 61 insertions(+), 25 deletions(-) diff --git a/wasp/examples/scenario/vu.go b/wasp/examples/scenario/vu.go index 410efcde5..5bfd79fdf 100644 --- a/wasp/examples/scenario/vu.go +++ b/wasp/examples/scenario/vu.go @@ -24,7 +24,7 @@ func NewExampleScenario(target string) *VirtualUser { return &VirtualUser{ VUControl: wasp.NewVUControl(), target: target, - rl: ratelimit.New(10), + rl: ratelimit.New(10, ratelimit.WithoutSlack), client: resty.New().SetBaseURL(target), Data: make([]string, 0), } @@ -34,7 +34,7 @@ func (m *VirtualUser) Clone(_ *wasp.Generator) wasp.VirtualUser { return &VirtualUser{ VUControl: wasp.NewVUControl(), target: m.target, - rl: ratelimit.New(10), + rl: ratelimit.New(10, ratelimit.WithoutSlack), client: resty.New().SetBaseURL(m.target), Data: make([]string, 0), } diff --git a/wasp/wasp.go b/wasp/wasp.go index 0e9bef2c8..b6b3e2cdd 100644 --- a/wasp/wasp.go +++ b/wasp/wasp.go @@ -195,6 +195,7 @@ type Stats struct { CurrentSegment atomic.Int64 `json:"current_schedule_segment"` SamplesRecorded atomic.Int64 `json:"samples_recorded"` SamplesSkipped atomic.Int64 `json:"samples_skipped"` + RunStarted atomic.Bool `json:"runStarted"` RunPaused atomic.Bool `json:"runPaused"` RunStopped atomic.Bool `json:"runStopped"` RunFailed atomic.Bool `json:"runFailed"` @@ -323,16 +324,13 @@ func NewGenerator(cfg *Config) (*Generator, error) { return g, nil } -// setupSchedule set up initial data for both RPS and VirtualUser load types -func (g *Generator) setupSchedule() { +// runExecuteLoop set up initial data for both RPS and VirtualUser load types +func (g *Generator) runExecuteLoop() { g.currentSegment = g.scheduleSegments[0] g.stats.LastSegment.Store(int64(len(g.scheduleSegments))) switch g.Cfg.LoadType { case RPS: g.ResponsesWaitGroup.Add(1) - g.stats.CurrentRPS.Store(g.currentSegment.From) - newRateLimit := ratelimit.New(int(g.currentSegment.From), ratelimit.Per(g.Cfg.RateLimitUnitDuration)) - g.rl.Store(&newRateLimit) // we run pacedCall controlled by stats.CurrentRPS go func() { for { @@ -449,6 +447,7 @@ func (g *Generator) runVU(vu VirtualUser) { // changing both internal and Stats values to report func (g *Generator) processSegment() bool { defer func() { + g.stats.RunStarted.Store(true) g.Log.Info(). Int64("Segment", g.stats.CurrentSegment.Load()). Int64("VUs", g.stats.CurrentVUs.Load()). @@ -462,7 +461,7 @@ func (g *Generator) processSegment() bool { g.stats.CurrentSegment.Add(1) switch g.Cfg.LoadType { case RPS: - newRateLimit := ratelimit.New(int(g.currentSegment.From), ratelimit.Per(g.Cfg.RateLimitUnitDuration)) + newRateLimit := ratelimit.New(int(g.currentSegment.From), ratelimit.Per(g.Cfg.RateLimitUnitDuration), ratelimit.WithoutSlack) g.rl.Store(&newRateLimit) g.stats.CurrentRPS.Store(g.currentSegment.From) case VU: @@ -491,9 +490,9 @@ func (g *Generator) processSegment() bool { return false } -// runSchedule runs scheduling loop +// runScheduleLoop runs scheduling loop // processing segments inside the whole schedule -func (g *Generator) runSchedule() { +func (g *Generator) runScheduleLoop() { go func() { for { select { @@ -583,11 +582,17 @@ func (g *Generator) collectVUResults() { // pacedCall calls a gun according to a scheduleSegments or plain RPS func (g *Generator) pacedCall() { - if g.stats.RunPaused.Load() || g.stats.RunStopped.Load() { + if !g.Stats().RunStarted.Load() { return } l := *g.rl.Load() l.Take() + if g.stats.RunPaused.Load() { + return + } + if g.stats.RunStopped.Load() { + return + } result := make(chan *Response) requestCtx, cancel := context.WithTimeout(context.Background(), g.Cfg.CallTimeout) callStartTS := time.Now() @@ -621,9 +626,9 @@ func (g *Generator) Run(wait bool) (interface{}, bool) { g.sendResponsesToLoki() g.sendStatsToLoki() } - g.setupSchedule() + g.runScheduleLoop() + g.runExecuteLoop() g.collectVUResults() - g.runSchedule() if wait { return g.Wait() } @@ -648,6 +653,7 @@ func (g *Generator) Stop() (interface{}, bool) { if g.stats.RunStopped.Load() { return nil, true } + g.stats.RunStarted.Store(false) g.stats.RunStopped.Store(true) g.stats.RunFailed.Store(true) g.Log.Warn().Msg("Graceful stop") diff --git a/wasp/wasp_bench_test.go b/wasp/wasp_bench_test.go index d5a7a6f73..bdbbfc55c 100644 --- a/wasp/wasp_bench_test.go +++ b/wasp/wasp_bench_test.go @@ -21,7 +21,7 @@ func BenchmarkPacedCall(b *testing.B) { Gun: NewMockGun(&MockGunConfig{}), }) require.NoError(b, err) - gen.setupSchedule() + gen.runExecuteLoop() b.ResetTimer() for i := 0; i < b.N; i++ { gen.pacedCall() diff --git a/wasp/wasp_test.go b/wasp/wasp_test.go index 202cf2735..a411faa92 100644 --- a/wasp/wasp_test.go +++ b/wasp/wasp_test.go @@ -164,7 +164,7 @@ func TestSmokeFailedOneRequest(t *testing.T) { _, failed := gen.Stop() require.Equal(t, true, failed) stats := gen.Stats() - require.GreaterOrEqual(t, stats.Failed.Load(), int64(2)) + require.GreaterOrEqual(t, stats.Failed.Load(), int64(1)) require.Equal(t, stats.RunFailed.Load(), true) require.Equal(t, stats.CurrentRPS.Load(), int64(1)) require.Equal(t, stats.Duration, gen.Cfg.duration.Nanoseconds()) @@ -172,15 +172,11 @@ func TestSmokeFailedOneRequest(t *testing.T) { okData, _, failResponses := convertResponsesData(gen) require.Empty(t, okData) require.GreaterOrEqual(t, failResponses[0].Duration, 50*time.Millisecond) - require.GreaterOrEqual(t, failResponses[1].Duration, 50*time.Millisecond) require.Equal(t, failResponses[0].Data.(string), "failedCallData") require.Equal(t, failResponses[0].Error, "error") - require.Equal(t, failResponses[1].Data.(string), "failedCallData") - require.Equal(t, failResponses[1].Error, "error") errs := gen.Errors() require.Equal(t, errs[0], "error") - require.Equal(t, errs[1], "error") - require.GreaterOrEqual(t, len(errs), 2) + require.GreaterOrEqual(t, len(errs), 1) } func TestSmokeGenCallTimeout(t *testing.T) { @@ -201,7 +197,7 @@ func TestSmokeGenCallTimeout(t *testing.T) { require.Equal(t, true, failed) stats := gen.Stats() require.GreaterOrEqual(t, stats.Success.Load(), int64(0)) - require.GreaterOrEqual(t, stats.CallTimeout.Load(), int64(2)) + require.GreaterOrEqual(t, stats.CallTimeout.Load(), int64(1)) require.Equal(t, stats.CurrentRPS.Load(), int64(1)) okData, _, failResponses := convertResponsesData(gen) @@ -209,7 +205,6 @@ func TestSmokeGenCallTimeout(t *testing.T) { require.Equal(t, failResponses[0].Data, nil) require.Equal(t, failResponses[0].Error, ErrCallTimeout.Error()) require.Equal(t, gen.Errors()[0], ErrCallTimeout.Error()) - require.Equal(t, gen.Errors()[1], ErrCallTimeout.Error()) } func TestSmokeVUCallTimeout(t *testing.T) { @@ -414,12 +409,11 @@ func TestSmokeCancelledBeforeDeadline(t *testing.T) { require.Greater(t, elapsed, 1050*time.Millisecond) require.Equal(t, true, failed) stats := gen.Stats() - require.GreaterOrEqual(t, stats.Success.Load(), int64(2)) + require.GreaterOrEqual(t, stats.Success.Load(), int64(1)) require.Equal(t, stats.CurrentRPS.Load(), int64(1)) okData, _, failResponses := convertResponsesData(gen) require.Equal(t, okData[0], "successCallData") - require.Equal(t, okData[1], "successCallData") require.Empty(t, failResponses) require.Empty(t, gen.Errors()) } @@ -1006,7 +1000,7 @@ func TestSmokePauseResumeGenerator(t *testing.T) { stats := gen.Stats() _, okResponses, failResponses := convertResponsesData(gen) require.Equal(t, int64(10), stats.CurrentRPS.Load()) - require.GreaterOrEqual(t, len(okResponses), 70) + require.GreaterOrEqual(t, len(okResponses), 60) require.Empty(t, failResponses) require.Empty(t, gen.Errors()) }) @@ -1039,3 +1033,39 @@ func TestSmokePauseResumeGenerator(t *testing.T) { require.Empty(t, gen.Errors()) }) } + +// regression + +func TestSmokeNoDuplicateRequestsOnceOnStart(t *testing.T) { + t.Parallel() + gen, err := NewGenerator(&Config{ + T: t, + LoadType: RPS, + StatsPollInterval: 100 * time.Second, + Schedule: Plain(1, 1*time.Second), + Gun: NewMockGun(&MockGunConfig{ + CallSleep: 50 * time.Millisecond, + }), + }) + require.NoError(t, err) + _, failed := gen.Run(false) + require.Equal(t, false, failed) + time.Sleep(950 * time.Millisecond) + _, _ = gen.Stop() + stats := gen.Stats() + require.Equal(t, stats.CurrentRPS.Load(), int64(1)) + require.Equal(t, stats.CurrentVUs.Load(), int64(0)) + require.GreaterOrEqual(t, stats.Success.Load(), int64(1)) + require.Equal(t, stats.CallTimeout.Load(), int64(0)) + require.Equal(t, stats.Failed.Load(), int64(0)) + require.Equal(t, stats.Duration, gen.Cfg.duration.Nanoseconds()) + + okData, okResponses, failResponses := convertResponsesData(gen) + require.GreaterOrEqual(t, len(okResponses), 1) + require.GreaterOrEqual(t, len(okData), 1) + require.Equal(t, okData[0], "successCallData") + require.GreaterOrEqual(t, okResponses[0].Duration, 50*time.Millisecond) + require.Equal(t, okResponses[0].Data.(string), "successCallData") + require.Empty(t, failResponses) + require.Empty(t, gen.Errors()) +}