From 91c0a9476c8843d3da4b8475efb0cbfc7ceb02f0 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Mon, 22 Jan 2024 14:06:23 -0500 Subject: [PATCH] Fix flakiness in TestManager_NoDeadlock (#4104) --- .../runtime/manager_fake_input_test.go | 171 +++++++----------- 1 file changed, 70 insertions(+), 101 deletions(-) diff --git a/pkg/component/runtime/manager_fake_input_test.go b/pkg/component/runtime/manager_fake_input_test.go index 06fb41860b2..5a61fcde1ac 100644 --- a/pkg/component/runtime/manager_fake_input_test.go +++ b/pkg/component/runtime/manager_fake_input_test.go @@ -1307,35 +1307,11 @@ LOOP: require.Equal(t, context.Canceled, err, "Run should return with context canceled, got %v", err.Error()) } -func (suite *FakeInputSuite) TestManager_NoDeadlock() { - t := suite.T() - // NOTE: This is a long-running test that spams the runtime managers `Update` function to try and - // trigger a deadlock. This test takes 2 minutes to run trying to re-produce issue: - // https://github.com/elastic/elastic-agent/issues/2691 - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - ai, _ := info.NewAgentInfo(ctx, true) - m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) - require.NoError(t, err) - errCh := make(chan error) - go func() { - err := m.Run(ctx) - if errors.Is(err, context.Canceled) { - err = nil - } - errCh <- err - }() - - waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) - defer waitCancel() - if err := waitForReady(waitCtx, m); err != nil { - require.NoError(t, err) - } - +// A component that can be fed to Manager.Update, with an index to allow +// looping with distinct configurations at each step. +func noDeadlockTestComponent(t *testing.T, index int) component.Component { binaryPath := testBinary(t, "component") - comp := component.Component{ + return component.Component{ ID: "fake-default", InputSpec: &component.InputRuntimeSpec{ InputType: "fake", @@ -1351,99 +1327,92 @@ func (suite *FakeInputSuite) TestManager_NoDeadlock() { Config: component.MustExpectedConfig(map[string]interface{}{ "type": "fake", "state": int(client.UnitStateHealthy), - "message": "Fake Healthy", + "message": fmt.Sprintf("Fake Healthy %d", index), }), }, }, } +} - updatedCh := make(chan time.Time) - updatedErr := make(chan error) - updatedCtx, updatedCancel := context.WithCancel(context.Background()) - defer updatedCancel() +func (suite *FakeInputSuite) TestManager_NoDeadlock() { + t := suite.T() + // NOTE: This is a long-running test that spams the runtime managers `Update` function to try and + // trigger a deadlock. This test takes 2 minutes to run trying to re-produce issue: + // https://github.com/elastic/elastic-agent/issues/2691 + + // How long to run the test + testDuration := 2 * time.Minute + + // How long without an update before we report it as a deadlock + maxUpdateInterval := 15 * time.Second + + // Create the runtime manager + ai, _ := info.NewAgentInfo(context.Background(), true) + m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) + require.NoError(t, err) + + // Start the runtime manager in a goroutine, passing its termination state + // to managerResultChan. + managerCtx, managerCancel := context.WithCancel(context.Background()) + defer managerCancel() + managerResultChan := make(chan error) go func() { - // spam update on component trying to cause a deadlock - comp := comp - i := 0 - for { - if updatedCtx.Err() != nil { - return - } - updatedComp := comp - updatedComp.Units = make([]component.Unit, 1) - updatedComp.Units[0] = component.Unit{ - ID: "fake-input", - Type: client.UnitTypeInput, - LogLevel: client.UnitLogLevelError, // test log will get spammed with the constant updates (error to prevent spam) - Config: component.MustExpectedConfig(map[string]interface{}{ - "type": "fake", - "state": int(client.UnitStateHealthy), - "message": fmt.Sprintf("Fake Healthy %d", i), - }), - } - i += 1 - comp = updatedComp - m.Update(component.Model{Components: []component.Component{updatedComp}}) - err := <-m.errCh - if err != nil { - updatedErr <- err - return - } - updatedCh <- time.Now() - } + managerResultChan <- m.Run(managerCtx) }() - deadlockErr := make(chan error) + // Wait for the manager to become active + timedWaitForReady(t, m, 1*time.Second) + + // Start a goroutine to spam the manager update trying to cause + // a deadlock. When the test context finishes, the update loop + // closes updateResultChan to signal that it is done. + updateResultChan := make(chan error) + updateLoopCtx, updateLoopCancel := context.WithTimeout(context.Background(), testDuration) + defer updateLoopCancel() go func() { - t := time.NewTimer(15 * time.Second) - defer t.Stop() - for { - select { - case <-updatedCtx.Done(): - return - case <-updatedCh: - // update did occur - t.Reset(15 * time.Second) - case <-t.C: - // timeout hit waiting for another update to work - deadlockErr <- errors.New("hit deadlock") + defer close(updateResultChan) + for i := 0; updateLoopCtx.Err() == nil; i++ { + comp := noDeadlockTestComponent(t, i) + m.Update(component.Model{Components: []component.Component{comp}}) + err := <-m.errCh + updateResultChan <- err + if err != nil { + // If the update gave an error, end the test return } } }() - defer drainErrChan(errCh) - defer drainErrChan(updatedErr) - defer drainErrChan(deadlockErr) - - // wait 2 minutes for a deadlock to occur - endTimer := time.NewTimer(2 * time.Minute) - defer endTimer.Stop() + // The component state is being changed constantly. If updateTimeout + // triggers without any updates, report it as a deadlock. + updateTimeout := time.NewTicker(maxUpdateInterval) + defer updateTimeout.Stop() LOOP: for { select { - case <-endTimer.C: - // no deadlock after timeout (all good stop the component) - updatedCancel() - m.Update(component.Model{Components: []component.Component{}}) - <-m.errCh // Don't care about the result of Update, just that it runs - break LOOP - case err := <-errCh: - require.NoError(t, err) - case err := <-updatedErr: - require.NoError(t, err) - break LOOP - case err := <-deadlockErr: - require.NoError(t, err) - break LOOP + case err, ok := <-updateResultChan: + if ok { + // update did occur + require.NoError(t, err) + updateTimeout.Reset(15 * time.Second) + } else { + // Update goroutine is terminating, test is over + break LOOP + } + case <-managerResultChan: + require.Fail(t, "Runtime manager terminated before test was over") + case <-updateTimeout.C: + require.Fail(t, "Timed out waiting for Manager.Update result") } } - - updatedCancel() - cancel() - - err = <-errCh + // Finished without a deadlock. Stop the component and shut down the manager. + m.Update(component.Model{Components: []component.Component{}}) + err = <-m.errCh require.NoError(t, err) + + managerCancel() + err = <-managerResultChan + require.Equal(t, err, context.Canceled) } func (suite *FakeInputSuite) TestManager_Configure() {