Skip to content

Commit

Permalink
fix off-by-one bug that can be caught on sub-rpc profiles
Browse files Browse the repository at this point in the history
  • Loading branch information
skudasov committed Sep 20, 2024
1 parent a197ef9 commit 56bab7e
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 25 deletions.
4 changes: 2 additions & 2 deletions wasp/examples/scenario/vu.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func NewExampleScenario(target string) *VirtualUser {
return &VirtualUser{
VUControl: wasp.NewVUControl(),
target: target,
rl: ratelimit.New(10),
rl: ratelimit.New(10, ratelimit.WithoutSlack),
client: resty.New().SetBaseURL(target),
Data: make([]string, 0),
}
Expand All @@ -34,7 +34,7 @@ func (m *VirtualUser) Clone(_ *wasp.Generator) wasp.VirtualUser {
return &VirtualUser{
VUControl: wasp.NewVUControl(),
target: m.target,
rl: ratelimit.New(10),
rl: ratelimit.New(10, ratelimit.WithoutSlack),
client: resty.New().SetBaseURL(m.target),
Data: make([]string, 0),
}
Expand Down
28 changes: 17 additions & 11 deletions wasp/wasp.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ type Stats struct {
CurrentSegment atomic.Int64 `json:"current_schedule_segment"`
SamplesRecorded atomic.Int64 `json:"samples_recorded"`
SamplesSkipped atomic.Int64 `json:"samples_skipped"`
RunStarted atomic.Bool `json:"runStarted"`
RunPaused atomic.Bool `json:"runPaused"`
RunStopped atomic.Bool `json:"runStopped"`
RunFailed atomic.Bool `json:"runFailed"`
Expand Down Expand Up @@ -323,16 +324,13 @@ func NewGenerator(cfg *Config) (*Generator, error) {
return g, nil
}

// setupSchedule set up initial data for both RPS and VirtualUser load types
func (g *Generator) setupSchedule() {
// runExecuteLoop set up initial data for both RPS and VirtualUser load types
func (g *Generator) runExecuteLoop() {
g.currentSegment = g.scheduleSegments[0]
g.stats.LastSegment.Store(int64(len(g.scheduleSegments)))
switch g.Cfg.LoadType {
case RPS:
g.ResponsesWaitGroup.Add(1)
g.stats.CurrentRPS.Store(g.currentSegment.From)
newRateLimit := ratelimit.New(int(g.currentSegment.From), ratelimit.Per(g.Cfg.RateLimitUnitDuration))
g.rl.Store(&newRateLimit)
// we run pacedCall controlled by stats.CurrentRPS
go func() {
for {
Expand Down Expand Up @@ -449,6 +447,7 @@ func (g *Generator) runVU(vu VirtualUser) {
// changing both internal and Stats values to report
func (g *Generator) processSegment() bool {
defer func() {
g.stats.RunStarted.Store(true)
g.Log.Info().
Int64("Segment", g.stats.CurrentSegment.Load()).
Int64("VUs", g.stats.CurrentVUs.Load()).
Expand All @@ -462,7 +461,7 @@ func (g *Generator) processSegment() bool {
g.stats.CurrentSegment.Add(1)
switch g.Cfg.LoadType {
case RPS:
newRateLimit := ratelimit.New(int(g.currentSegment.From), ratelimit.Per(g.Cfg.RateLimitUnitDuration))
newRateLimit := ratelimit.New(int(g.currentSegment.From), ratelimit.Per(g.Cfg.RateLimitUnitDuration), ratelimit.WithoutSlack)
g.rl.Store(&newRateLimit)
g.stats.CurrentRPS.Store(g.currentSegment.From)
case VU:
Expand Down Expand Up @@ -491,9 +490,9 @@ func (g *Generator) processSegment() bool {
return false
}

// runSchedule runs scheduling loop
// runScheduleLoop runs scheduling loop
// processing segments inside the whole schedule
func (g *Generator) runSchedule() {
func (g *Generator) runScheduleLoop() {
go func() {
for {
select {
Expand Down Expand Up @@ -583,11 +582,17 @@ func (g *Generator) collectVUResults() {

// pacedCall calls a gun according to a scheduleSegments or plain RPS
func (g *Generator) pacedCall() {
if g.stats.RunPaused.Load() || g.stats.RunStopped.Load() {
if !g.Stats().RunStarted.Load() {
return
}
l := *g.rl.Load()
l.Take()
if g.stats.RunPaused.Load() {
return
}
if g.stats.RunStopped.Load() {
return
}
result := make(chan *Response)
requestCtx, cancel := context.WithTimeout(context.Background(), g.Cfg.CallTimeout)
callStartTS := time.Now()
Expand Down Expand Up @@ -621,9 +626,9 @@ func (g *Generator) Run(wait bool) (interface{}, bool) {
g.sendResponsesToLoki()
g.sendStatsToLoki()
}
g.setupSchedule()
g.runScheduleLoop()
g.runExecuteLoop()
g.collectVUResults()
g.runSchedule()
if wait {
return g.Wait()
}
Expand All @@ -648,6 +653,7 @@ func (g *Generator) Stop() (interface{}, bool) {
if g.stats.RunStopped.Load() {
return nil, true
}
g.stats.RunStarted.Store(false)
g.stats.RunStopped.Store(true)
g.stats.RunFailed.Store(true)
g.Log.Warn().Msg("Graceful stop")
Expand Down
2 changes: 1 addition & 1 deletion wasp/wasp_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func BenchmarkPacedCall(b *testing.B) {
Gun: NewMockGun(&MockGunConfig{}),
})
require.NoError(b, err)
gen.setupSchedule()
gen.runExecuteLoop()
b.ResetTimer()
for i := 0; i < b.N; i++ {
gen.pacedCall()
Expand Down
52 changes: 41 additions & 11 deletions wasp/wasp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,23 +164,19 @@ func TestSmokeFailedOneRequest(t *testing.T) {
_, failed := gen.Stop()
require.Equal(t, true, failed)
stats := gen.Stats()
require.GreaterOrEqual(t, stats.Failed.Load(), int64(2))
require.GreaterOrEqual(t, stats.Failed.Load(), int64(1))
require.Equal(t, stats.RunFailed.Load(), true)
require.Equal(t, stats.CurrentRPS.Load(), int64(1))
require.Equal(t, stats.Duration, gen.Cfg.duration.Nanoseconds())

okData, _, failResponses := convertResponsesData(gen)
require.Empty(t, okData)
require.GreaterOrEqual(t, failResponses[0].Duration, 50*time.Millisecond)
require.GreaterOrEqual(t, failResponses[1].Duration, 50*time.Millisecond)
require.Equal(t, failResponses[0].Data.(string), "failedCallData")
require.Equal(t, failResponses[0].Error, "error")
require.Equal(t, failResponses[1].Data.(string), "failedCallData")
require.Equal(t, failResponses[1].Error, "error")
errs := gen.Errors()
require.Equal(t, errs[0], "error")
require.Equal(t, errs[1], "error")
require.GreaterOrEqual(t, len(errs), 2)
require.GreaterOrEqual(t, len(errs), 1)
}

func TestSmokeGenCallTimeout(t *testing.T) {
Expand All @@ -201,15 +197,14 @@ func TestSmokeGenCallTimeout(t *testing.T) {
require.Equal(t, true, failed)
stats := gen.Stats()
require.GreaterOrEqual(t, stats.Success.Load(), int64(0))
require.GreaterOrEqual(t, stats.CallTimeout.Load(), int64(2))
require.GreaterOrEqual(t, stats.CallTimeout.Load(), int64(1))
require.Equal(t, stats.CurrentRPS.Load(), int64(1))

okData, _, failResponses := convertResponsesData(gen)
require.Empty(t, okData)
require.Equal(t, failResponses[0].Data, nil)
require.Equal(t, failResponses[0].Error, ErrCallTimeout.Error())
require.Equal(t, gen.Errors()[0], ErrCallTimeout.Error())
require.Equal(t, gen.Errors()[1], ErrCallTimeout.Error())
}

func TestSmokeVUCallTimeout(t *testing.T) {
Expand Down Expand Up @@ -414,12 +409,11 @@ func TestSmokeCancelledBeforeDeadline(t *testing.T) {
require.Greater(t, elapsed, 1050*time.Millisecond)
require.Equal(t, true, failed)
stats := gen.Stats()
require.GreaterOrEqual(t, stats.Success.Load(), int64(2))
require.GreaterOrEqual(t, stats.Success.Load(), int64(1))
require.Equal(t, stats.CurrentRPS.Load(), int64(1))

okData, _, failResponses := convertResponsesData(gen)
require.Equal(t, okData[0], "successCallData")
require.Equal(t, okData[1], "successCallData")
require.Empty(t, failResponses)
require.Empty(t, gen.Errors())
}
Expand Down Expand Up @@ -1006,7 +1000,7 @@ func TestSmokePauseResumeGenerator(t *testing.T) {
stats := gen.Stats()
_, okResponses, failResponses := convertResponsesData(gen)
require.Equal(t, int64(10), stats.CurrentRPS.Load())
require.GreaterOrEqual(t, len(okResponses), 70)
require.GreaterOrEqual(t, len(okResponses), 60)
require.Empty(t, failResponses)
require.Empty(t, gen.Errors())
})
Expand Down Expand Up @@ -1039,3 +1033,39 @@ func TestSmokePauseResumeGenerator(t *testing.T) {
require.Empty(t, gen.Errors())
})
}

// regression

func TestSmokeNoDuplicateRequestsOnceOnStart(t *testing.T) {
t.Parallel()
gen, err := NewGenerator(&Config{
T: t,
LoadType: RPS,
StatsPollInterval: 100 * time.Second,
Schedule: Plain(1, 1*time.Second),
Gun: NewMockGun(&MockGunConfig{
CallSleep: 50 * time.Millisecond,
}),
})
require.NoError(t, err)
_, failed := gen.Run(false)
require.Equal(t, false, failed)
time.Sleep(950 * time.Millisecond)
_, _ = gen.Stop()
stats := gen.Stats()
require.Equal(t, stats.CurrentRPS.Load(), int64(1))
require.Equal(t, stats.CurrentVUs.Load(), int64(0))
require.GreaterOrEqual(t, stats.Success.Load(), int64(1))
require.Equal(t, stats.CallTimeout.Load(), int64(0))
require.Equal(t, stats.Failed.Load(), int64(0))
require.Equal(t, stats.Duration, gen.Cfg.duration.Nanoseconds())

okData, okResponses, failResponses := convertResponsesData(gen)
require.GreaterOrEqual(t, len(okResponses), 1)
require.GreaterOrEqual(t, len(okData), 1)
require.Equal(t, okData[0], "successCallData")
require.GreaterOrEqual(t, okResponses[0].Duration, 50*time.Millisecond)
require.Equal(t, okResponses[0].Data.(string), "successCallData")
require.Empty(t, failResponses)
require.Empty(t, gen.Errors())
}

0 comments on commit 56bab7e

Please sign in to comment.