From bb8decb85162c1936980a440a60d55b931817aa6 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 8 Sep 2023 08:56:35 -0700 Subject: [PATCH] [Upgrade Watcher][Crash Checker] Consider Agent process as crashed if its PID remains 0 (#3166) * Refactoring: extract helper method * Add check for PID remaining 0 * Update + add tests * Fix typo * Add CHANGELOG fragment * Better error messages * Bump up Agent version + cause error on start * Better logging for debugging * More logging for debugging * Trying secondary restart via service manager * Add FIXME comments for testing-only changes * Fix compile errors * Update testing version * Implement restart for upstart service manager * Include service provider name in error * Implement restart for sysv and darwin * Implement Restart for Windows * Remove all Restart() implementations * Removing extraneous logging statements * Undo vestigial changes * Rename all canc -> cancel * Use assert instead of require * Remove testing changes * Use assert instead of require (cherry picked from commit 2ce32f8f0bf51b4606c22309b88c66a2e2a5709d) --- .../1690917883-crash-checker-pid-zero.yaml | 32 ++++++++ .../application/upgrade/crash_checker.go | 73 ++++++++++++++++--- .../application/upgrade/crash_checker_test.go | 59 +++++++++++---- internal/pkg/agent/cmd/watch.go | 4 +- 4 files changed, 141 insertions(+), 27 deletions(-) create mode 100644 changelog/fragments/1690917883-crash-checker-pid-zero.yaml diff --git a/changelog/fragments/1690917883-crash-checker-pid-zero.yaml b/changelog/fragments/1690917883-crash-checker-pid-zero.yaml new file mode 100644 index 00000000000..2540afe4b94 --- /dev/null +++ b/changelog/fragments/1690917883-crash-checker-pid-zero.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# Change summary; a 80ish characters long description of the change. +summary: Rollback Elastic Agent upgrade if upgraded Agent process crashes immediately. + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/3166 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/3124 diff --git a/internal/pkg/agent/application/upgrade/crash_checker.go b/internal/pkg/agent/application/upgrade/crash_checker.go index e9c40bb0e12..fd6016208e0 100644 --- a/internal/pkg/agent/application/upgrade/crash_checker.go +++ b/internal/pkg/agent/application/upgrade/crash_checker.go @@ -29,7 +29,7 @@ type serviceHandler interface { // CrashChecker checks agent for crash pattern in Elastic Agent lifecycle. type CrashChecker struct { notifyChan chan error - q *disctintQueue + q *distinctQueue log *logger.Logger sc serviceHandler checkInterval time.Duration @@ -77,33 +77,72 @@ func (ch *CrashChecker) Run(ctx context.Context) { ch.log.Error(err) } + ch.log.Debugf("retrieved service PID [%d]", pid) ch.q.Push(pid) - restarts := ch.q.Distinct() - ch.log.Debugf("retrieved service PID [%d] changed %d times within %d", pid, restarts, evaluatedPeriods) - if restarts > crashesAllowed { - ch.notifyChan <- errors.New(fmt.Sprintf("service restarted '%d' times within '%v' seconds", restarts, ch.checkInterval.Seconds())) - } + + // We decide if the Agent process has crashed in either of + // these two ways. + ch.checkNotRunning() + ch.checkRestarted() } } } -type disctintQueue struct { +// checkNotRunning checks if the PID reported for the Agent process has +// remained 0 for most recent crashesAllowed times the PID was checked. +// If so, it decides that the service has crashed. +func (ch *CrashChecker) checkNotRunning() { + // If PID has remained 0 for the most recent crashesAllowed number of checks, + // we consider the Agent as having crashed. + if ch.q.Len() < crashesAllowed { + // Not enough history of PIDs yet + return + } + + recentPIDs := ch.q.Peek(crashesAllowed) + ch.log.Debugf("most recent %d service PIDs within %d evaulations: %v", crashesAllowed, evaluatedPeriods, recentPIDs) + + allZeroPIDs := true + for _, recentPID := range recentPIDs { + allZeroPIDs = allZeroPIDs && (recentPID == 0) + } + + if allZeroPIDs { + msg := fmt.Sprintf("service remained crashed (PID = 0) within '%v' seconds", ch.checkInterval.Seconds()) + ch.notifyChan <- errors.New(msg) + } +} + +// checkRestarted checks if the PID reported for the Agent process has +// changed more than crashesAllowed times. If so, it decides that the service +// has crashed. +func (ch *CrashChecker) checkRestarted() { + restarts := ch.q.Distinct() + ch.log.Debugf("service PID changed %d times within %d evaluations", restarts, evaluatedPeriods) + + if restarts > crashesAllowed { + msg := fmt.Sprintf("service restarted '%d' times within '%v' seconds", restarts, ch.checkInterval.Seconds()) + ch.notifyChan <- errors.New(msg) + } +} + +type distinctQueue struct { q []int size int lock sync.Mutex } -func newDistinctQueue(size int) (*disctintQueue, error) { +func newDistinctQueue(size int) (*distinctQueue, error) { if size < 1 { return nil, errors.New("invalid size", errors.TypeUnexpected) } - return &disctintQueue{ + return &distinctQueue{ q: make([]int, 0, size), size: size, }, nil } -func (dq *disctintQueue) Push(id int) { +func (dq *distinctQueue) Push(id int) { dq.lock.Lock() defer dq.lock.Unlock() @@ -114,7 +153,7 @@ func (dq *disctintQueue) Push(id int) { dq.q = append([]int{id}, dq.q[:cutIdx]...) } -func (dq *disctintQueue) Distinct() int { +func (dq *distinctQueue) Distinct() int { dq.lock.Lock() defer dq.lock.Unlock() @@ -126,3 +165,15 @@ func (dq *disctintQueue) Distinct() int { return len(dm) } + +func (dq *distinctQueue) Len() int { + return len(dq.q) +} + +func (dq *distinctQueue) Peek(size int) []int { + if size > len(dq.q) { + size = len(dq.q) + } + + return dq.q[:size] +} diff --git a/internal/pkg/agent/application/upgrade/crash_checker_test.go b/internal/pkg/agent/application/upgrade/crash_checker_test.go index e5741e1e649..0b62f3b99a8 100644 --- a/internal/pkg/agent/application/upgrade/crash_checker_test.go +++ b/internal/pkg/agent/application/upgrade/crash_checker_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/elastic/elastic-agent/pkg/core/logger" @@ -21,9 +22,9 @@ var ( func TestChecker(t *testing.T) { t.Run("no failure when no change", func(t *testing.T) { - pider := &testPider{} + pider := &testPider{pid: 111} ch, errChan := testableChecker(t, pider) - ctx, canc := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup wg.Add(1) @@ -41,14 +42,15 @@ func TestChecker(t *testing.T) { default: } - canc() + cancel() require.NoError(t, err) }) t.Run("no failure when unfrequent change", func(t *testing.T) { - pider := &testPider{} + const startingPID = 222 + pider := &testPider{pid: startingPID} ch, errChan := testableChecker(t, pider) - ctx, canc := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup wg.Add(1) @@ -60,7 +62,7 @@ func TestChecker(t *testing.T) { wg.Wait() for i := 0; i < 2; i++ { <-time.After(3 * testCheckPeriod) - pider.Change(i) + pider.Change(startingPID + i) } var err error select { @@ -68,14 +70,15 @@ func TestChecker(t *testing.T) { default: } - canc() + cancel() require.NoError(t, err) }) t.Run("no failure when change lower than limit", func(t *testing.T) { - pider := &testPider{} + const startingPID = 333 + pider := &testPider{pid: startingPID} ch, errChan := testableChecker(t, pider) - ctx, canc := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup wg.Add(1) @@ -87,7 +90,7 @@ func TestChecker(t *testing.T) { wg.Wait() for i := 0; i < 3; i++ { <-time.After(7 * testCheckPeriod) - pider.Change(i) + pider.Change(startingPID + i) } var err error select { @@ -95,14 +98,14 @@ func TestChecker(t *testing.T) { default: } - canc() + cancel() require.NoError(t, err) }) t.Run("fails when pid changes frequently", func(t *testing.T) { pider := &testPider{} ch, errChan := testableChecker(t, pider) - ctx, canc := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup wg.Add(1) @@ -122,8 +125,36 @@ func TestChecker(t *testing.T) { default: } - canc() - require.Error(t, err) + cancel() + assert.ErrorContains(t, err, "service restarted '3' times within '0.1' seconds") + }) + + t.Run("fails when pid remains 0", func(t *testing.T) { + const startingPID = 0 + pider := &testPider{pid: startingPID} + ch, errChan := testableChecker(t, pider) + ctx, cancel := context.WithCancel(context.Background()) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + wg.Done() + ch.Run(ctx) + }() + + wg.Wait() + for i := 0; i < 3; i++ { + <-time.After(testCheckPeriod * 3) + pider.Change(startingPID) // don't change PID + } + var err error + select { + case err = <-errChan: + default: + } + + cancel() + assert.ErrorContains(t, err, "service remained crashed (PID = 0) within '0.1' seconds") }) } diff --git a/internal/pkg/agent/cmd/watch.go b/internal/pkg/agent/cmd/watch.go index 83e31d38366..23009a3366b 100644 --- a/internal/pkg/agent/cmd/watch.go +++ b/internal/pkg/agent/cmd/watch.go @@ -174,11 +174,11 @@ WATCHLOOP: break WATCHLOOP // Agent in degraded state. case err := <-errChan: - log.Error("Agent Error detected", err) + log.Errorf("Agent Error detected: %s", err.Error()) return err // Agent keeps crashing unexpectedly case err := <-crashChan: - log.Error("Agent crash detected", err) + log.Errorf("Agent crash detected: %s", err.Error()) return err } }