Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.11](backport #3696) Fix TestManager_FakeInput_GoodUnitToBad #3698

Merged
merged 1 commit into from
Nov 3, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 95 additions & 98 deletions pkg/component/runtime/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1308,23 +1308,13 @@ func TestManager_FakeInput_GoodUnitToBad(t *testing.T) {
ai, _ := info.NewAgentInfo(ctx, true)
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
require.NoError(t, err)
errCh := make(chan error)
runResultChan := make(chan error, 1)
go func() {
err := m.Run(ctx)
if errors.Is(err, context.Canceled) {
err = nil
}
errCh <- err
runResultChan <- m.Run(ctx)
}()

waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second)
defer waitCancel()
if err := waitForReady(waitCtx, m); err != nil {
require.NoError(t, err)
}

binaryPath := testBinary(t, "component")
comp := component.Component{
healthyComp := component.Component{
ID: "fake-default",
InputSpec: &component.InputRuntimeSpec{
InputType: "fake",
Expand Down Expand Up @@ -1355,105 +1345,103 @@ func TestManager_FakeInput_GoodUnitToBad(t *testing.T) {
},
},
}
// unhealthyComp is a copy of healthyComp with an error inserted in the
// second unit
unhealthyComp := healthyComp
unhealthyComp.Units = make([]component.Unit, len(healthyComp.Units))
copy(unhealthyComp.Units, healthyComp.Units)
unhealthyComp.Units[1] = component.Unit{
ID: "good-input",
Type: client.UnitTypeInput,
Err: errors.New("hard-error for config"),
}
goodUnitKey := ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "good-input"}

subCtx, subCancel := context.WithCancel(context.Background())
defer subCancel()
subErrCh := make(chan error)
go func() {
unitGood := true

sub := m.Subscribe(subCtx, "fake-default")
for {
select {
case <-subCtx.Done():
return
case state := <-sub.Ch():
t.Logf("component state changed: %+v", state)
if state.State == client.UnitStateFailed {
subErrCh <- fmt.Errorf("component failed: %s", state.Message)
return
}

unit, ok := state.Units[ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "good-input"}]
if !ok {
subErrCh <- errors.New("unit missing: good-input")
return
}
if unitGood {
switch unit.State {
case client.UnitStateFailed:
subErrCh <- fmt.Errorf("unit failed: %s", unit.Message)
return
case client.UnitStateHealthy:
// good unit it; now make it bad
t.Logf("marking good-input as having a hard-error for config")
updatedComp := comp
updatedComp.Units = make([]component.Unit, len(comp.Units))
copy(updatedComp.Units, comp.Units)
updatedComp.Units[1] = component.Unit{
ID: "good-input",
Type: client.UnitTypeInput,
Err: errors.New("hard-error for config"),
}
unitGood = false
err := m.Update(component.Model{Components: []component.Component{updatedComp}})
if err != nil {
subErrCh <- err
}
case client.UnitStateStarting:
// acceptable
default:
// unknown state that should not have occurred
subErrCh <- fmt.Errorf("unit reported unexpected state: %v", unit.State)
}
} else {
switch unit.State {
case client.UnitStateFailed:
// went to failed; stop whole component
err := m.Update(component.Model{Components: []component.Component{}})
if err != nil {
subErrCh <- err
}
case client.UnitStateStopped:
// unit was stopped
subErrCh <- nil
default:
subErrCh <- fmt.Errorf(
"good-input unit should be either failed or stopped, but it's '%s'",
unit.State)
}
}
// Wait for Manager to start up
timedWaitForReady(t, m, 1*time.Second)

}
}
}()
sub := m.Subscribe(ctx, "fake-default")

defer drainErrChan(errCh)
defer drainErrChan(subErrCh)
endTimer := time.NewTimer(30 * time.Second)
defer endTimer.Stop()

err = m.Update(component.Model{Components: []component.Component{comp}})
err = m.Update(component.Model{Components: []component.Component{healthyComp}})
require.NoError(t, err)

endTimer := time.NewTimer(30 * time.Second)
defer endTimer.Stop()
// nextState tracks the stage of the test. We expect the sequence
// Starting -> Healthy -> Failed -> Stopped.
nextState := client.UnitStateHealthy

LOOP:
for {
var state ComponentState
select {
case <-endTimer.C:
t.Fatalf("timed out after 30 seconds")
case err := <-errCh:
require.NoError(t, err)
case err := <-subErrCh:
require.NoError(t, err)
break LOOP
require.Fail(t, "timed out waiting for component state update")
case state = <-sub.Ch():
t.Logf("component state changed: %+v", state)
}

require.NotEqual(t, client.UnitStateFailed, state.State, "component should not fail")
unit, ok := state.Units[goodUnitKey]
require.True(t, ok, "unit good-input must be present")

if nextState == client.UnitStateHealthy {
// Waiting for unit to become healthy, if it's still starting skip
// to the next update
if unit.State == client.UnitStateStarting {
continue LOOP
}
if unit.State == client.UnitStateHealthy {
// good unit is healthy; now make it bad
t.Logf("marking good-input as having a hard-error for config")
err := m.Update(component.Model{Components: []component.Component{unhealthyComp}})
require.NoError(t, err, "Component model update should succeed")

// We next expect to transition to Failed
nextState = client.UnitStateFailed
} else {
// Unit should only be starting or healthy in this stage,
// anything else is an error.
require.FailNowf(t, "Incorrect state", "Expected STARTING or HEALTHY, got %v", unit.State)
}
} else if nextState == client.UnitStateFailed {
// Waiting for unit to fail, if it's still healthy skip to the next
// update
if unit.State == client.UnitStateHealthy {
continue LOOP
}
if unit.State == client.UnitStateFailed {
// Reached the expected state, now send an empty component model
// to stop everything.
err := m.Update(component.Model{Components: []component.Component{}})
require.NoError(t, err, "Component model update should succeed")
nextState = client.UnitStateStopped
} else {
// Unit should only be healthy or failed in this stage, anything
// else is an error.
require.FailNow(t, "Incorrect state", "Expected HEALTHY or FAILED, got %v", unit.State)
}
} else if nextState == client.UnitStateStopped {
// Waiting for component to stop, if it's still Failed skip to
// the next update
if unit.State == client.UnitStateFailed {
continue LOOP
}
if unit.State == client.UnitStateStopped {
// Success, we've finished the whole sequence
break LOOP
} else {
// Unit should only be failed or stopped in this stage, anything
// else is an error.
require.FailNowf(t, "Incorrect state", "Expected FAILED or STOPPED, got %v", unit.State)
}
}
}

subCancel()
cancel()

err = <-errCh
require.NoError(t, err)
err = <-runResultChan
require.Equal(t, context.Canceled, err, "Run should return with context canceled, got %v", err.Error())
}

func TestManager_FakeInput_NoDeadlock(t *testing.T) {
Expand Down Expand Up @@ -3671,3 +3659,12 @@ func waitForReady(ctx context.Context, m *Manager) error {
}
return nil
}

func timedWaitForReady(t *testing.T, m *Manager, timeout time.Duration) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
err := waitForReady(ctx, m)
if err != nil {
require.FailNow(t, "timed out waiting for Manager to start")
}
}
Loading