Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "[Service Runtime] Add ability for service operations to be retried (#2889)" #3092

Merged
merged 2 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 0 additions & 32 deletions changelog/fragments/1687301871-service-command-retries.yaml

This file was deleted.

2 changes: 0 additions & 2 deletions docs/component-specs.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,6 @@ The path to this service's logs directory.
- `args` (identical to `command.args`): the command-line arguments to pass for this operation
- `env` (identical to `command.env`): the environment variables to set for this operation
- `timeout`: the timeout duration for this operation.
- `retry`: (optional) configuration for retrying the operation
- `init_interval`: interval before the first retry. The interval between subsequent retries will increase exponentially (with some random jitter).

For example:

Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/agent/install/uninstall.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func uninstallServiceComponent(ctx context.Context, log *logp.Logger, comp compo
// Do not use infinite retries when uninstalling from the command line. If the uninstall needs to be
// retried the entire uninstall command can be retried. Retries may complete asynchronously with the
// execution of the uninstall command, leading to bugs like https://github.com/elastic/elastic-agent/issues/3060.
return comprt.UninstallService(ctx, log, comp, false)
return comprt.UninstallService(ctx, log, comp)
}

func serviceComponentsFromConfig(specs component.RuntimeSpecs, cfg *config.Config) ([]component.Component, error) {
Expand Down
18 changes: 8 additions & 10 deletions pkg/component/runtime/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ var (
ErrInvalidServiceSpec = errors.New("invalid service spec")
)

// executeServiceCommandFunc executes the given binary according to configuration in spec. If shouldRetry == true,
// the command will be retried indefinitely; otherwise, it will not be retried.
type executeServiceCommandFunc func(ctx context.Context, log *logger.Logger, binaryPath string, spec *component.ServiceOperationsCommandSpec, shouldRetry bool) error
type executeServiceCommandFunc func(ctx context.Context, log *logger.Logger, binaryPath string, spec *component.ServiceOperationsCommandSpec) error

// serviceRuntime provides the command runtime for running a component as a service.
type serviceRuntime struct {
Expand Down Expand Up @@ -435,7 +433,7 @@ func (s *serviceRuntime) check(ctx context.Context) error {
return ErrOperationSpecUndefined
}
s.log.Debugf("check if the %s is installed", s.comp.InputSpec.BinaryName)
return s.executeServiceCommandImpl(ctx, s.log, s.comp.InputSpec.BinaryPath, s.comp.InputSpec.Spec.Service.Operations.Check, false)
return s.executeServiceCommandImpl(ctx, s.log, s.comp.InputSpec.BinaryPath, s.comp.InputSpec.Spec.Service.Operations.Check)
}

// install executes the service install command
Expand All @@ -445,26 +443,26 @@ func (s *serviceRuntime) install(ctx context.Context) error {
return ErrOperationSpecUndefined
}
s.log.Debugf("install %s service", s.comp.InputSpec.BinaryName)
return s.executeServiceCommandImpl(ctx, s.log, s.comp.InputSpec.BinaryPath, s.comp.InputSpec.Spec.Service.Operations.Install, true)
return s.executeServiceCommandImpl(ctx, s.log, s.comp.InputSpec.BinaryPath, s.comp.InputSpec.Spec.Service.Operations.Install)
}

// uninstall executes the service uninstall command
func (s *serviceRuntime) uninstall(ctx context.Context) error {
// Always retry for internal attempts to uninstall, because they are an attempt to converge the agent's current state
// with its desired state based on the agent policy.
return uninstallService(ctx, s.log, s.comp, s.executeServiceCommandImpl, true)
return uninstallService(ctx, s.log, s.comp, s.executeServiceCommandImpl)
}

// UninstallService uninstalls the service. When shouldRetry is true the uninstall command will be retried until it succeeds.
func UninstallService(ctx context.Context, log *logger.Logger, comp component.Component, shouldRetry bool) error {
return uninstallService(ctx, log, comp, executeServiceCommand, shouldRetry)
func UninstallService(ctx context.Context, log *logger.Logger, comp component.Component) error {
return uninstallService(ctx, log, comp, executeServiceCommand)
}

func uninstallService(ctx context.Context, log *logger.Logger, comp component.Component, executeServiceCommandImpl executeServiceCommandFunc, shouldRetry bool) error {
func uninstallService(ctx context.Context, log *logger.Logger, comp component.Component, executeServiceCommandImpl executeServiceCommandFunc) error {
if comp.InputSpec.Spec.Service.Operations.Uninstall == nil {
log.Errorf("missing uninstall spec for %s service", comp.InputSpec.BinaryName)
return ErrOperationSpecUndefined
}
log.Debugf("uninstall %s service", comp.InputSpec.BinaryName)
return executeServiceCommandImpl(ctx, log, comp.InputSpec.BinaryPath, comp.InputSpec.Spec.Service.Operations.Uninstall, shouldRetry)
return executeServiceCommandImpl(ctx, log, comp.InputSpec.BinaryPath, comp.InputSpec.Spec.Service.Operations.Uninstall)
}
230 changes: 18 additions & 212 deletions pkg/component/runtime/service_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,20 @@
import (
"bufio"
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"os/exec"
"path/filepath"
"sort"
"strings"
"sync"
"time"

"github.com/cenkalti/backoff/v4"

"github.com/dolmen-go/contextio"

"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/core/logger"
"github.com/elastic/elastic-agent/pkg/core/process"
)

var serviceCmdRetrier = cmdRetrier{}

func executeCommand(ctx context.Context, log *logger.Logger, binaryPath string, args []string, env []string, timeout time.Duration) error {
log = log.With("context", "command output")
// Create context with timeout if the timeout is greater than 0
Expand Down Expand Up @@ -62,23 +54,25 @@

// channel for the last error message from the stderr output
errch := make(chan string, 1)
ctxStderr := contextio.NewReader(ctx, proc.Stderr)

Check failure on line 57 in pkg/component/runtime/service_command.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

SA4023(related information): the lhs of the comparison is the 1st return value of this function call (staticcheck)
go func() {
var errText string
scanner := bufio.NewScanner(ctxStderr)
for scanner.Scan() {
line := scanner.Text()
if len(line) > 0 {
txt := strings.TrimSpace(line)
if len(txt) > 0 {
errText = txt
// Log error output line
log.Error(errText)
if ctxStderr != nil {

Check failure on line 58 in pkg/component/runtime/service_command.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

SA4023: this comparison is always true (staticcheck)
go func() {
var errText string
scanner := bufio.NewScanner(ctxStderr)
for scanner.Scan() {
line := scanner.Text()
if len(line) > 0 {
txt := strings.TrimSpace(line)
if len(txt) > 0 {
errText = txt
// Log error output line
log.Error(errText)
}
}
}
}
errch <- errText
}()
errch <- errText
}()
}

procState := <-proc.Wait()
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
Expand All @@ -98,200 +92,12 @@
return err
}

func executeServiceCommand(ctx context.Context, log *logger.Logger, binaryPath string, spec *component.ServiceOperationsCommandSpec, shouldRetry bool) error {
func executeServiceCommand(ctx context.Context, log *logger.Logger, binaryPath string, spec *component.ServiceOperationsCommandSpec) error {
if spec == nil {
log.Warnf("spec is nil, nothing to execute, binaryPath: %s", binaryPath)
return nil
}

if !shouldRetry {
return executeCommand(ctx, log, binaryPath, spec.Args, envSpecToEnv(spec.Env), spec.Timeout)
}

executeServiceCommandWithRetries(
ctx, log, binaryPath, spec,
context.Background(), 20*time.Second, 15*time.Minute,
)
return nil
}

func executeServiceCommandWithRetries(
cmdCtx context.Context, log *logger.Logger, binaryPath string, spec *component.ServiceOperationsCommandSpec,
retryCtx context.Context, defaultRetryInitInterval time.Duration, retryMaxInterval time.Duration,
) {
// If no initial retry interval is specified, use default value
retryInitInterval := spec.Retry.InitInterval
if retryInitInterval == 0 {
retryInitInterval = defaultRetryInitInterval
}

serviceCmdRetrier.Start(
cmdCtx, log,
binaryPath, spec.Args, envSpecToEnv(spec.Env), spec.Timeout,
retryCtx, retryInitInterval, retryMaxInterval,
)
}

type cmdRetryInfo struct {
retryCancelFn context.CancelFunc
cmdCancelFn context.CancelFunc
cmdDone <-chan struct{}
}

type cmdRetrier struct {
mu sync.RWMutex
cmds map[string]cmdRetryInfo
}

func (cr *cmdRetrier) Start(
cmdCtx context.Context, log *logger.Logger,
binaryPath string, args []string, env []string, timeout time.Duration,
retryCtx context.Context, retrySleepInitDuration time.Duration, retrySleepMaxDuration time.Duration,
) {
cmdKey := cr.cmdKey(binaryPath, args, env)

// Due to infinite retries, we may still be trying to (re)execute
// a command from a previous call to Start(). We should first stop
// these retries as well as the command process.
cr.Stop(cmdKey, log)

// Track the command so we can cancel it and it's retries later.
cmdCtx, cmdCancelFn := context.WithCancel(cmdCtx)
retryCtx, retryCancelFn := context.WithCancel(retryCtx)
cmdDone := make(chan struct{}, 1)
cr.track(cmdKey, cmdCancelFn, retryCancelFn, cmdDone)

// Execute command with retries and exponential backoff between attempts
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = retrySleepInitDuration
expBackoff.MaxInterval = retrySleepMaxDuration

backoffCtx := backoff.WithContext(expBackoff, retryCtx)

// Since we will be executing the command with infinite retries, we don't
// want to block. So we execute the command with retries in its own
// goroutine.
go func() {
retryAttempt := 0

// Here we indefinitely retry the executeCommand call, as long as it
// returns a non-nil error. We will block here until executeCommand
// returns a nil error, indicating that the command being executed has
// successfully completed execution.
//nolint: errcheck // No point checking the error inside the goroutine.
backoff.RetryNotify(
func() error {
err := executeCommand(cmdCtx, log, binaryPath, args, env, timeout)
cmdDone <- struct{}{}
return err
},
backoffCtx,
func(err error, retryAfter time.Duration) {
retryAttempt++
log.Warnf(
"service command execution failed with error [%s], retrying (will be retry [%d]) after [%s]",
err.Error(),
retryAttempt,
retryAfter,
)
<-cmdDone
},
)

cr.untrack(cmdKey)
}()
}

func (cr *cmdRetrier) Stop(cmdKey string, log *logger.Logger) {
cr.mu.Lock()
defer cr.mu.Unlock()
info, exists := cr.cmds[cmdKey]
if !exists {
log.Debugf("no retries for command key [%s] are pending; nothing to do", cmdKey)
return
}

// Cancel the previous retries
info.retryCancelFn()

// Cancel the previous command
info.cmdCancelFn()

// Ensure that the previous command actually stopped running
<-info.cmdDone

// Stop tracking
delete(cr.cmds, cmdKey)
log.Debugf("retries and command process for command key [%s] stopped", cmdKey)
}

func (cr *cmdRetrier) track(cmdKey string, cmdCancelFn context.CancelFunc, retryCanceFn context.CancelFunc, cmdDone <-chan struct{}) {
cr.mu.Lock()
defer cr.mu.Unlock()

// Initialize map if needed
if cr.cmds == nil {
cr.cmds = map[string]cmdRetryInfo{}
}

cr.cmds[cmdKey] = cmdRetryInfo{
retryCancelFn: retryCanceFn,
cmdCancelFn: cmdCancelFn,
cmdDone: cmdDone,
}
}

func (cr *cmdRetrier) untrack(cmdKey string) {
cr.mu.Lock()
defer cr.mu.Unlock()

delete(cr.cmds, cmdKey)
}

// cmdKey returns a unique, deterministic integer for the combination of the given
// binaryPath, args, and env. This integer can be used to determine if the same command
// is being executed again or not.
func (cr *cmdRetrier) cmdKey(binaryPath string, args []string, env []string) string {
var sb strings.Builder

sb.WriteString(binaryPath)
sb.WriteString("|")

var posArgs, flagArgs []string
for i := 0; i < len(args); i++ {
arg := args[i]
if strings.HasPrefix(arg, "-") {
flagArgs = append(flagArgs, arg+" "+args[i+1])
i++
} else {
posArgs = append(posArgs, arg)
}
}

sort.Strings(posArgs)
sort.Strings(flagArgs)

for _, arg := range posArgs {
sb.WriteString(arg)
sb.WriteString("|")
}

for _, arg := range flagArgs {
sb.WriteString(arg)
sb.WriteString("|")
}

sort.Strings(env)

for _, kv := range env {
sb.WriteString(kv)
sb.WriteString("|")
}

digest := sha256.New()
digest.Write([]byte(sb.String()))

return hex.EncodeToString(digest.Sum(nil))
return executeCommand(ctx, log, binaryPath, spec.Args, envSpecToEnv(spec.Env), spec.Timeout)
}

func envSpecToEnv(envSpecs []component.CommandEnvSpec) []string {
Expand Down
Loading
Loading