Skip to content

Commit

Permalink
[Upgrade Watcher][Crash Checker] Consider Agent process as crashed if…
Browse files Browse the repository at this point in the history
… its PID remains 0 (#3166)

* Refactoring: extract helper method

* Add check for PID remaining 0

* Update + add tests

* Fix typo

* Add CHANGELOG fragment

* Better error messages

* Bump up Agent version + cause error on start

* Better logging for debugging

* More logging for debugging

* Trying secondary restart via service manager

* Add FIXME comments for testing-only changes

* Fix compile errors

* Update testing version

* Implement restart for upstart service manager

* Include service provider name in error

* Implement restart for sysv and darwin

* Implement Restart for Windows

* Remove all Restart() implementations

* Removing extraneous logging statements

* Undo vestigial changes

* Rename all canc -> cancel

* Use assert instead of require

* Remove testing changes

* Use assert instead of require

(cherry picked from commit 2ce32f8)
  • Loading branch information
ycombinator committed Sep 12, 2023
1 parent 0728828 commit 33d5605
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 27 deletions.
32 changes: 32 additions & 0 deletions changelog/fragments/1690917883-crash-checker-pid-zero.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# Change summary; a 80ish characters long description of the change.
summary: Rollback Elastic Agent upgrade if upgraded Agent process crashes immediately.

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: elastic-agent

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/3166

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/3124
73 changes: 62 additions & 11 deletions internal/pkg/agent/application/upgrade/crash_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type serviceHandler interface {
// CrashChecker checks agent for crash pattern in Elastic Agent lifecycle.
type CrashChecker struct {
notifyChan chan error
q *disctintQueue
q *distinctQueue
log *logger.Logger
sc serviceHandler
checkInterval time.Duration
Expand Down Expand Up @@ -77,33 +77,72 @@ func (ch *CrashChecker) Run(ctx context.Context) {
ch.log.Error(err)
}

ch.log.Debugf("retrieved service PID [%d]", pid)
ch.q.Push(pid)
restarts := ch.q.Distinct()
ch.log.Debugf("retrieved service PID [%d] changed %d times within %d", pid, restarts, evaluatedPeriods)
if restarts > crashesAllowed {
ch.notifyChan <- errors.New(fmt.Sprintf("service restarted '%d' times within '%v' seconds", restarts, ch.checkInterval.Seconds()))
}

// We decide if the Agent process has crashed in either of
// these two ways.
ch.checkNotRunning()
ch.checkRestarted()
}
}
}

type disctintQueue struct {
// checkNotRunning checks if the PID reported for the Agent process has
// remained 0 for most recent crashesAllowed times the PID was checked.
// If so, it decides that the service has crashed.
func (ch *CrashChecker) checkNotRunning() {
// If PID has remained 0 for the most recent crashesAllowed number of checks,
// we consider the Agent as having crashed.
if ch.q.Len() < crashesAllowed {
// Not enough history of PIDs yet
return
}

recentPIDs := ch.q.Peek(crashesAllowed)
ch.log.Debugf("most recent %d service PIDs within %d evaulations: %v", crashesAllowed, evaluatedPeriods, recentPIDs)

allZeroPIDs := true
for _, recentPID := range recentPIDs {
allZeroPIDs = allZeroPIDs && (recentPID == 0)
}

if allZeroPIDs {
msg := fmt.Sprintf("service remained crashed (PID = 0) within '%v' seconds", ch.checkInterval.Seconds())
ch.notifyChan <- errors.New(msg)
}
}

// checkRestarted checks if the PID reported for the Agent process has
// changed more than crashesAllowed times. If so, it decides that the service
// has crashed.
func (ch *CrashChecker) checkRestarted() {
restarts := ch.q.Distinct()
ch.log.Debugf("service PID changed %d times within %d evaluations", restarts, evaluatedPeriods)

if restarts > crashesAllowed {
msg := fmt.Sprintf("service restarted '%d' times within '%v' seconds", restarts, ch.checkInterval.Seconds())
ch.notifyChan <- errors.New(msg)
}
}

type distinctQueue struct {
q []int
size int
lock sync.Mutex
}

func newDistinctQueue(size int) (*disctintQueue, error) {
func newDistinctQueue(size int) (*distinctQueue, error) {
if size < 1 {
return nil, errors.New("invalid size", errors.TypeUnexpected)
}
return &disctintQueue{
return &distinctQueue{
q: make([]int, 0, size),
size: size,
}, nil
}

func (dq *disctintQueue) Push(id int) {
func (dq *distinctQueue) Push(id int) {
dq.lock.Lock()
defer dq.lock.Unlock()

Expand All @@ -114,7 +153,7 @@ func (dq *disctintQueue) Push(id int) {
dq.q = append([]int{id}, dq.q[:cutIdx]...)
}

func (dq *disctintQueue) Distinct() int {
func (dq *distinctQueue) Distinct() int {
dq.lock.Lock()
defer dq.lock.Unlock()

Expand All @@ -126,3 +165,15 @@ func (dq *disctintQueue) Distinct() int {

return len(dm)
}

func (dq *distinctQueue) Len() int {
return len(dq.q)
}

func (dq *distinctQueue) Peek(size int) []int {
if size > len(dq.q) {
size = len(dq.q)
}

return dq.q[:size]
}
59 changes: 45 additions & 14 deletions internal/pkg/agent/application/upgrade/crash_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent/pkg/core/logger"
Expand All @@ -21,9 +22,9 @@ var (

func TestChecker(t *testing.T) {
t.Run("no failure when no change", func(t *testing.T) {
pider := &testPider{}
pider := &testPider{pid: 111}
ch, errChan := testableChecker(t, pider)
ctx, canc := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())

var wg sync.WaitGroup
wg.Add(1)
Expand All @@ -41,14 +42,15 @@ func TestChecker(t *testing.T) {
default:
}

canc()
cancel()
require.NoError(t, err)
})

t.Run("no failure when unfrequent change", func(t *testing.T) {
pider := &testPider{}
const startingPID = 222
pider := &testPider{pid: startingPID}
ch, errChan := testableChecker(t, pider)
ctx, canc := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())

var wg sync.WaitGroup
wg.Add(1)
Expand All @@ -60,22 +62,23 @@ func TestChecker(t *testing.T) {
wg.Wait()
for i := 0; i < 2; i++ {
<-time.After(3 * testCheckPeriod)
pider.Change(i)
pider.Change(startingPID + i)
}
var err error
select {
case err = <-errChan:
default:
}

canc()
cancel()
require.NoError(t, err)
})

t.Run("no failure when change lower than limit", func(t *testing.T) {
pider := &testPider{}
const startingPID = 333
pider := &testPider{pid: startingPID}
ch, errChan := testableChecker(t, pider)
ctx, canc := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())

var wg sync.WaitGroup
wg.Add(1)
Expand All @@ -87,22 +90,22 @@ func TestChecker(t *testing.T) {
wg.Wait()
for i := 0; i < 3; i++ {
<-time.After(7 * testCheckPeriod)
pider.Change(i)
pider.Change(startingPID + i)
}
var err error
select {
case err = <-errChan:
default:
}

canc()
cancel()
require.NoError(t, err)
})

t.Run("fails when pid changes frequently", func(t *testing.T) {
pider := &testPider{}
ch, errChan := testableChecker(t, pider)
ctx, canc := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())

var wg sync.WaitGroup
wg.Add(1)
Expand All @@ -122,8 +125,36 @@ func TestChecker(t *testing.T) {
default:
}

canc()
require.Error(t, err)
cancel()
assert.ErrorContains(t, err, "service restarted '3' times within '0.1' seconds")
})

t.Run("fails when pid remains 0", func(t *testing.T) {
const startingPID = 0
pider := &testPider{pid: startingPID}
ch, errChan := testableChecker(t, pider)
ctx, cancel := context.WithCancel(context.Background())

var wg sync.WaitGroup
wg.Add(1)
go func() {
wg.Done()
ch.Run(ctx)
}()

wg.Wait()
for i := 0; i < 3; i++ {
<-time.After(testCheckPeriod * 3)
pider.Change(startingPID) // don't change PID
}
var err error
select {
case err = <-errChan:
default:
}

cancel()
assert.ErrorContains(t, err, "service remained crashed (PID = 0) within '0.1' seconds")
})
}

Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/agent/cmd/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,11 @@ WATCHLOOP:
break WATCHLOOP
// Agent in degraded state.
case err := <-errChan:
log.Error("Agent Error detected", err)
log.Errorf("Agent Error detected: %s", err.Error())
return err
// Agent keeps crashing unexpectedly
case err := <-crashChan:
log.Error("Agent crash detected", err)
log.Errorf("Agent crash detected: %s", err.Error())
return err
}
}
Expand Down

0 comments on commit 33d5605

Please sign in to comment.