Skip to content

Commit

Permalink
Revert "[Service Runtime] Add ability for service operations to be re…
Browse files Browse the repository at this point in the history
…tried (#2889)" (#3092) (#3095)

* Revert "[Service Runtime] Add ability for service operations to be retried (#2889)"

This reverts commit a3403e5.

* Remove extra parameter

(cherry picked from commit 7e6b4f1)

Co-authored-by: Shaunak Kashyap <[email protected]>
  • Loading branch information
mergify[bot] and ycombinator authored Jul 18, 2023
1 parent bbbe77e commit cddbd51
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 566 deletions.
32 changes: 0 additions & 32 deletions changelog/fragments/1687301871-service-command-retries.yaml

This file was deleted.

2 changes: 0 additions & 2 deletions docs/component-specs.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,6 @@ The path to this service's logs directory.
- `args` (identical to `command.args`): the command-line arguments to pass for this operation
- `env` (identical to `command.env`): the environment variables to set for this operation
- `timeout`: the timeout duration for this operation.
- `retry`: (optional) configuration for retrying the operation
- `init_interval`: interval before the first retry. The interval between subsequent retries will increase exponentially (with some random jitter).

For example:

Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/agent/install/uninstall.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func uninstallServiceComponent(ctx context.Context, log *logp.Logger, comp compo
// Do not use infinite retries when uninstalling from the command line. If the uninstall needs to be
// retried the entire uninstall command can be retried. Retries may complete asynchronously with the
// execution of the uninstall command, leading to bugs like https://github.com/elastic/elastic-agent/issues/3060.
return comprt.UninstallService(ctx, log, comp, false)
return comprt.UninstallService(ctx, log, comp)
}

func serviceComponentsFromConfig(specs component.RuntimeSpecs, cfg *config.Config) ([]component.Component, error) {
Expand Down
18 changes: 8 additions & 10 deletions pkg/component/runtime/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ var (
ErrInvalidServiceSpec = errors.New("invalid service spec")
)

// executeServiceCommandFunc executes the given binary according to configuration in spec. If shouldRetry == true,
// the command will be retried indefinitely; otherwise, it will not be retried.
type executeServiceCommandFunc func(ctx context.Context, log *logger.Logger, binaryPath string, spec *component.ServiceOperationsCommandSpec, shouldRetry bool) error
type executeServiceCommandFunc func(ctx context.Context, log *logger.Logger, binaryPath string, spec *component.ServiceOperationsCommandSpec) error

// serviceRuntime provides the command runtime for running a component as a service.
type serviceRuntime struct {
Expand Down Expand Up @@ -435,7 +433,7 @@ func (s *serviceRuntime) check(ctx context.Context) error {
return ErrOperationSpecUndefined
}
s.log.Debugf("check if the %s is installed", s.comp.InputSpec.BinaryName)
return s.executeServiceCommandImpl(ctx, s.log, s.comp.InputSpec.BinaryPath, s.comp.InputSpec.Spec.Service.Operations.Check, false)
return s.executeServiceCommandImpl(ctx, s.log, s.comp.InputSpec.BinaryPath, s.comp.InputSpec.Spec.Service.Operations.Check)
}

// install executes the service install command
Expand All @@ -445,26 +443,26 @@ func (s *serviceRuntime) install(ctx context.Context) error {
return ErrOperationSpecUndefined
}
s.log.Debugf("install %s service", s.comp.InputSpec.BinaryName)
return s.executeServiceCommandImpl(ctx, s.log, s.comp.InputSpec.BinaryPath, s.comp.InputSpec.Spec.Service.Operations.Install, true)
return s.executeServiceCommandImpl(ctx, s.log, s.comp.InputSpec.BinaryPath, s.comp.InputSpec.Spec.Service.Operations.Install)
}

// uninstall executes the service uninstall command
func (s *serviceRuntime) uninstall(ctx context.Context) error {
// Always retry for internal attempts to uninstall, because they are an attempt to converge the agent's current state
// with its desired state based on the agent policy.
return uninstallService(ctx, s.log, s.comp, s.executeServiceCommandImpl, true)
return uninstallService(ctx, s.log, s.comp, s.executeServiceCommandImpl)
}

// UninstallService uninstalls the service. When shouldRetry is true the uninstall command will be retried until it succeeds.
func UninstallService(ctx context.Context, log *logger.Logger, comp component.Component, shouldRetry bool) error {
return uninstallService(ctx, log, comp, executeServiceCommand, shouldRetry)
func UninstallService(ctx context.Context, log *logger.Logger, comp component.Component) error {
return uninstallService(ctx, log, comp, executeServiceCommand)
}

func uninstallService(ctx context.Context, log *logger.Logger, comp component.Component, executeServiceCommandImpl executeServiceCommandFunc, shouldRetry bool) error {
func uninstallService(ctx context.Context, log *logger.Logger, comp component.Component, executeServiceCommandImpl executeServiceCommandFunc) error {
if comp.InputSpec.Spec.Service.Operations.Uninstall == nil {
log.Errorf("missing uninstall spec for %s service", comp.InputSpec.BinaryName)
return ErrOperationSpecUndefined
}
log.Debugf("uninstall %s service", comp.InputSpec.BinaryName)
return executeServiceCommandImpl(ctx, log, comp.InputSpec.BinaryPath, comp.InputSpec.Spec.Service.Operations.Uninstall, shouldRetry)
return executeServiceCommandImpl(ctx, log, comp.InputSpec.BinaryPath, comp.InputSpec.Spec.Service.Operations.Uninstall)
}
230 changes: 18 additions & 212 deletions pkg/component/runtime/service_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,20 @@ package runtime
import (
"bufio"
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"os/exec"
"path/filepath"
"sort"
"strings"
"sync"
"time"

"github.com/cenkalti/backoff/v4"

"github.com/dolmen-go/contextio"

"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/core/logger"
"github.com/elastic/elastic-agent/pkg/core/process"
)

var serviceCmdRetrier = cmdRetrier{}

func executeCommand(ctx context.Context, log *logger.Logger, binaryPath string, args []string, env []string, timeout time.Duration) error {
log = log.With("context", "command output")
// Create context with timeout if the timeout is greater than 0
Expand Down Expand Up @@ -63,22 +55,24 @@ func executeCommand(ctx context.Context, log *logger.Logger, binaryPath string,
// channel for the last error message from the stderr output
errch := make(chan string, 1)
ctxStderr := contextio.NewReader(ctx, proc.Stderr)
go func() {
var errText string
scanner := bufio.NewScanner(ctxStderr)
for scanner.Scan() {
line := scanner.Text()
if len(line) > 0 {
txt := strings.TrimSpace(line)
if len(txt) > 0 {
errText = txt
// Log error output line
log.Error(errText)
if ctxStderr != nil {
go func() {
var errText string
scanner := bufio.NewScanner(ctxStderr)
for scanner.Scan() {
line := scanner.Text()
if len(line) > 0 {
txt := strings.TrimSpace(line)
if len(txt) > 0 {
errText = txt
// Log error output line
log.Error(errText)
}
}
}
}
errch <- errText
}()
errch <- errText
}()
}

procState := <-proc.Wait()
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
Expand All @@ -98,200 +92,12 @@ func executeCommand(ctx context.Context, log *logger.Logger, binaryPath string,
return err
}

func executeServiceCommand(ctx context.Context, log *logger.Logger, binaryPath string, spec *component.ServiceOperationsCommandSpec, shouldRetry bool) error {
func executeServiceCommand(ctx context.Context, log *logger.Logger, binaryPath string, spec *component.ServiceOperationsCommandSpec) error {
if spec == nil {
log.Warnf("spec is nil, nothing to execute, binaryPath: %s", binaryPath)
return nil
}

if !shouldRetry {
return executeCommand(ctx, log, binaryPath, spec.Args, envSpecToEnv(spec.Env), spec.Timeout)
}

executeServiceCommandWithRetries(
ctx, log, binaryPath, spec,
context.Background(), 20*time.Second, 15*time.Minute,
)
return nil
}

func executeServiceCommandWithRetries(
cmdCtx context.Context, log *logger.Logger, binaryPath string, spec *component.ServiceOperationsCommandSpec,
retryCtx context.Context, defaultRetryInitInterval time.Duration, retryMaxInterval time.Duration,
) {
// If no initial retry interval is specified, use default value
retryInitInterval := spec.Retry.InitInterval
if retryInitInterval == 0 {
retryInitInterval = defaultRetryInitInterval
}

serviceCmdRetrier.Start(
cmdCtx, log,
binaryPath, spec.Args, envSpecToEnv(spec.Env), spec.Timeout,
retryCtx, retryInitInterval, retryMaxInterval,
)
}

type cmdRetryInfo struct {
retryCancelFn context.CancelFunc
cmdCancelFn context.CancelFunc
cmdDone <-chan struct{}
}

type cmdRetrier struct {
mu sync.RWMutex
cmds map[string]cmdRetryInfo
}

func (cr *cmdRetrier) Start(
cmdCtx context.Context, log *logger.Logger,
binaryPath string, args []string, env []string, timeout time.Duration,
retryCtx context.Context, retrySleepInitDuration time.Duration, retrySleepMaxDuration time.Duration,
) {
cmdKey := cr.cmdKey(binaryPath, args, env)

// Due to infinite retries, we may still be trying to (re)execute
// a command from a previous call to Start(). We should first stop
// these retries as well as the command process.
cr.Stop(cmdKey, log)

// Track the command so we can cancel it and it's retries later.
cmdCtx, cmdCancelFn := context.WithCancel(cmdCtx)
retryCtx, retryCancelFn := context.WithCancel(retryCtx)
cmdDone := make(chan struct{}, 1)
cr.track(cmdKey, cmdCancelFn, retryCancelFn, cmdDone)

// Execute command with retries and exponential backoff between attempts
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = retrySleepInitDuration
expBackoff.MaxInterval = retrySleepMaxDuration

backoffCtx := backoff.WithContext(expBackoff, retryCtx)

// Since we will be executing the command with infinite retries, we don't
// want to block. So we execute the command with retries in its own
// goroutine.
go func() {
retryAttempt := 0

// Here we indefinitely retry the executeCommand call, as long as it
// returns a non-nil error. We will block here until executeCommand
// returns a nil error, indicating that the command being executed has
// successfully completed execution.
//nolint: errcheck // No point checking the error inside the goroutine.
backoff.RetryNotify(
func() error {
err := executeCommand(cmdCtx, log, binaryPath, args, env, timeout)
cmdDone <- struct{}{}
return err
},
backoffCtx,
func(err error, retryAfter time.Duration) {
retryAttempt++
log.Warnf(
"service command execution failed with error [%s], retrying (will be retry [%d]) after [%s]",
err.Error(),
retryAttempt,
retryAfter,
)
<-cmdDone
},
)

cr.untrack(cmdKey)
}()
}

func (cr *cmdRetrier) Stop(cmdKey string, log *logger.Logger) {
cr.mu.Lock()
defer cr.mu.Unlock()
info, exists := cr.cmds[cmdKey]
if !exists {
log.Debugf("no retries for command key [%s] are pending; nothing to do", cmdKey)
return
}

// Cancel the previous retries
info.retryCancelFn()

// Cancel the previous command
info.cmdCancelFn()

// Ensure that the previous command actually stopped running
<-info.cmdDone

// Stop tracking
delete(cr.cmds, cmdKey)
log.Debugf("retries and command process for command key [%s] stopped", cmdKey)
}

func (cr *cmdRetrier) track(cmdKey string, cmdCancelFn context.CancelFunc, retryCanceFn context.CancelFunc, cmdDone <-chan struct{}) {
cr.mu.Lock()
defer cr.mu.Unlock()

// Initialize map if needed
if cr.cmds == nil {
cr.cmds = map[string]cmdRetryInfo{}
}

cr.cmds[cmdKey] = cmdRetryInfo{
retryCancelFn: retryCanceFn,
cmdCancelFn: cmdCancelFn,
cmdDone: cmdDone,
}
}

func (cr *cmdRetrier) untrack(cmdKey string) {
cr.mu.Lock()
defer cr.mu.Unlock()

delete(cr.cmds, cmdKey)
}

// cmdKey returns a unique, deterministic integer for the combination of the given
// binaryPath, args, and env. This integer can be used to determine if the same command
// is being executed again or not.
func (cr *cmdRetrier) cmdKey(binaryPath string, args []string, env []string) string {
var sb strings.Builder

sb.WriteString(binaryPath)
sb.WriteString("|")

var posArgs, flagArgs []string
for i := 0; i < len(args); i++ {
arg := args[i]
if strings.HasPrefix(arg, "-") {
flagArgs = append(flagArgs, arg+" "+args[i+1])
i++
} else {
posArgs = append(posArgs, arg)
}
}

sort.Strings(posArgs)
sort.Strings(flagArgs)

for _, arg := range posArgs {
sb.WriteString(arg)
sb.WriteString("|")
}

for _, arg := range flagArgs {
sb.WriteString(arg)
sb.WriteString("|")
}

sort.Strings(env)

for _, kv := range env {
sb.WriteString(kv)
sb.WriteString("|")
}

digest := sha256.New()
digest.Write([]byte(sb.String()))

return hex.EncodeToString(digest.Sum(nil))
return executeCommand(ctx, log, binaryPath, spec.Args, envSpecToEnv(spec.Env), spec.Timeout)
}

func envSpecToEnv(envSpecs []component.CommandEnvSpec) []string {
Expand Down
Loading

0 comments on commit cddbd51

Please sign in to comment.