Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add / report persistent policy error metadata in Coordinator #3076

Merged
merged 8 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions control_v2.proto
Original file line number Diff line number Diff line change
Expand Up @@ -164,19 +164,23 @@ message StateAgentInfo {
}

// StateResponse is the current state of Elastic Agent.
// Next unused id: 7
message StateResponse {
// Overall information of Elastic Agent.
StateAgentInfo info = 1;
// Overall state of Elastic Agent.

// Overall state + message of Elastic Agent, aggregating errors in other
// states and components
State state = 2;
// Overall state message of Elastic Agent.
string message = 3;
// State of each component in Elastic Agent.
repeated ComponentState components = 4;
// Fleet connectivity state of Elastic Agent.

// Fleet state: healthy / "Connected" if the last RPC call to Fleet
// succeeded, otherwise failed with the associated error string.
State fleetState = 5;
// Fleet connectivity state message of Elastic Agent.
string fleetMessage = 6;

// State of each component in Elastic Agent.
repeated ComponentState components = 4;
}

// DiagnosticFileResult is a file result from a diagnostic result.
Expand Down
151 changes: 94 additions & 57 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,26 @@ type Coordinator struct {
// into the reported state before broadcasting -- State() will report
// agentclient.Failed if one of these is set, even if the underlying
// coordinator state is agentclient.Healthy.
runtimeMgrErr error
runtimeMgrErr error // Currently unused
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we keep it then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair question -- I asked @blakerouse a while ago and he said it was so the runtime manager satisfies the Runner interface. A more immediate answer is that it will probably be used soon: I have a PR in progress to centralize more of the runtime manager logic in its own run loop. For that, it's useful to decouple runtime manager Update calls from the error result they produce, to avoid accessing runtime internals from the Coordinator goroutine. In that PR I've started using the previously-idle error channel to report component model update failures, and those end up being stored in this field.

configMgrErr error
actionsErr error
varsMgrErr error

// Errors resulting from different possible failure modes when setting a
// new policy. Right now there are three different stages where a policy
// update can fail:
// - in generateAST, converting the policy to an AST
// - in process, converting the AST and vars into a full component model
// - while sending the final component model to the runtime manager
//
// The plan is to improve our preprocessing so we can always detect
// failures immediately https://github.com/elastic/elastic-agent/issues/2887.
// For now, we track three distinct errors for those three failure types,
// and merge them into a readable error in generateReportableState.
configErr error
componentGenErr error
runtimeUpdateErr error

// The raw policy before spec lookup or variable substitution
ast *transpiler.AST

Expand Down Expand Up @@ -567,28 +582,28 @@ func (c *Coordinator) Run(ctx context.Context) error {
go c.watchRuntimeComponents(watchCtx)

for {
c.setState(agentclient.Starting, "Waiting for initial configuration and composable variables")
c.setCoordinatorState(agentclient.Starting, "Waiting for initial configuration and composable variables")
// The usual state refresh happens in the main run loop in Coordinator.runner,
// so before/after the runner call we need to trigger state change broadcasts
// manually.
c.refreshState()
err := c.runner(ctx)
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
c.setState(agentclient.Stopped, "Requested to be stopped")
c.setCoordinatorState(agentclient.Stopped, "Requested to be stopped")
c.setFleetState(agentclient.Stopped, "Requested to be stopped")
c.refreshState()
// do not restart
return err
}
if errors.Is(err, ErrFatalCoordinator) {
c.setState(agentclient.Failed, "Fatal coordinator error")
c.setCoordinatorState(agentclient.Failed, "Fatal coordinator error")
c.setFleetState(agentclient.Stopped, "Fatal coordinator error")
c.refreshState()
return err
}
}
c.setState(agentclient.Failed, fmt.Sprintf("Coordinator failed and will be restarted: %s", err))
c.setCoordinatorState(agentclient.Failed, fmt.Sprintf("Coordinator failed and will be restarted: %s", err))
c.refreshState()
c.logger.Errorf("coordinator failed and will be restarted: %s", err)
}
Expand Down Expand Up @@ -857,31 +872,36 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) {

case change := <-c.managerChans.configManagerUpdate:
if err := c.processConfig(ctx, change.Config()); err != nil {
c.setState(agentclient.Failed, err.Error())
c.logger.Errorf("%s", err)
c.logger.Errorf("applying new policy: %s", err.Error())
change.Fail(err)
} else {
if err := change.Ack(); err != nil {
err = fmt.Errorf("failed to ack configuration change: %w", err)
c.setState(agentclient.Failed, err.Error())
c.logger.Errorf("%s", err)
// Workaround: setConfigManagerError is usually used by the config
// manager to report failed ACKs / etc when communicating with Fleet.
// We need to report a failed ACK here, but the policy change has
// already been successfully applied so we don't want to report it as
// a general Coordinator or policy failure.
// This arises uniquely here because this is the only case where an
// action is responsible for reporting the failure of its own ACK
// call. The "correct" fix is to make this Ack() call unfailable
// and handle ACK retries and reporting in the config manager like
// with other action types -- this error would then end up invoking
// setConfigManagerError "organically" via the config manager's
// reporting channel. In the meantime, we do it manually.
c.setConfigManagerError(err)
c.logger.Errorf("%s", err.Error())
}
}

case vars := <-c.managerChans.varsManagerUpdate:
if ctx.Err() == nil {
if err := c.processVars(ctx, vars); err != nil {
c.setState(agentclient.Failed, err.Error())
c.logger.Errorf("%s", err)
}
c.processVars(ctx, vars)
}

case ll := <-c.logLevelCh:
if ctx.Err() == nil {
if err := c.processLogLevel(ctx, ll); err != nil {
c.setState(agentclient.Failed, err.Error())
c.logger.Errorf("%s", err)
}
c.processLogLevel(ctx, ll)
}
}

Expand All @@ -900,7 +920,34 @@ func (c *Coordinator) processConfig(ctx context.Context, cfg *config.Config) (er
span.End()
}()

if err := info.InjectAgentConfig(cfg); err != nil {
err = c.generateAST(cfg)
c.setConfigError(err)
if err != nil {
return err
}

// Disabled for 8.8.0 release in order to limit the surface
// https://github.com/elastic/security-team/issues/6501

// c.setProtection(protectionConfig)

if c.vars != nil {
return c.refreshComponentModel(ctx)
}
return nil
}

// Generate the AST for a new incoming configuration and, if successful,
// assign it to the Coordinator's ast field.
func (c *Coordinator) generateAST(cfg *config.Config) (err error) {
defer func() {
// Update configErr, which stores the results of the most recent policy
// update and is merged into the Coordinator state in
// generateReportableState.
c.setConfigError(err)
}()

if err = info.InjectAgentConfig(cfg); err != nil {
return err
}

Expand Down Expand Up @@ -943,63 +990,47 @@ func (c *Coordinator) processConfig(ctx context.Context, cfg *config.Config) (er
}

c.ast = rawAst

// Disabled for 8.8.0 release in order to limit the surface
// https://github.com/elastic/security-team/issues/6501

// c.setProtection(protectionConfig)

if c.vars != nil {
return c.process(ctx)
}
return nil
}

// processVars updates the transpiler vars in the Coordinator.
// Called on the main Coordinator goroutine.
func (c *Coordinator) processVars(ctx context.Context, vars []*transpiler.Vars) (err error) {
span, ctx := apm.StartSpan(ctx, "vars", "app.internal")
defer func() {
apm.CaptureError(ctx, err).Send()
span.End()
}()

func (c *Coordinator) processVars(ctx context.Context, vars []*transpiler.Vars) {
c.vars = vars

if c.ast != nil {
return c.process(ctx)
err := c.refreshComponentModel(ctx)
if err != nil {
c.logger.Errorf("updating Coordinator variables: %s", err.Error())
}
return nil
}

// Called on the main Coordinator goroutine.
func (c *Coordinator) processLogLevel(ctx context.Context, ll logp.Level) (err error) {
span, ctx := apm.StartSpan(ctx, "log_level", "app.internal")
defer func() {
apm.CaptureError(ctx, err).Send()
span.End()
}()

func (c *Coordinator) processLogLevel(ctx context.Context, ll logp.Level) {
c.setLogLevel(ll)

if c.ast != nil && c.vars != nil {
return c.process(ctx)
err := c.refreshComponentModel(ctx)
if err != nil {
c.logger.Errorf("updating log level: %s", err.Error())
}
return nil
}

// Regenerate the component model based on the current vars and AST, then
// forward the result to the runtime manager.
// Always called on the main Coordinator goroutine.
func (c *Coordinator) process(ctx context.Context) (err error) {
span, ctx := apm.StartSpan(ctx, "process", "app.internal")
func (c *Coordinator) refreshComponentModel(ctx context.Context) (err error) {
if c.ast == nil || c.vars == nil {
// Nothing to process yet
return nil
}

span, ctx := apm.StartSpan(ctx, "refreshComponentModel", "app.internal")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be considered breaking? renaming span name

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

being that APM tracing inside of the Elastic Agent itself is for development or debugging only I would think not

defer func() {
apm.CaptureError(ctx, err).Send()
span.End()
}()

// regenerate the component model
err = c.recomputeConfigAndComponents()
err = c.generateComponentModel()
if err != nil {
return err
return fmt.Errorf("generating component model: %w", err)
}

signed, err := component.SignedFromPolicy(c.derivedConfig)
Expand All @@ -1021,18 +1052,24 @@ func (c *Coordinator) process(ctx context.Context) (err error) {
c.logger.Info("Updating running component model")
c.logger.With("components", model.Components).Debug("Updating running component model")
err = c.runtimeMgr.Update(model)
c.setRuntimeUpdateError(err)
if err != nil {
return err
return fmt.Errorf("updating runtime: %w", err)
}
c.setState(agentclient.Healthy, "Running")
c.setCoordinatorState(agentclient.Healthy, "Running")
return nil
}

// recomputeConfigAndComponents regenerates the configuration tree and
// generateComponentModel regenerates the configuration tree and
// components from the current AST and vars and returns the result.
// Called from both the main Coordinator goroutine and from external
// goroutines via diagnostics hooks.
func (c *Coordinator) recomputeConfigAndComponents() error {
func (c *Coordinator) generateComponentModel() (err error) {
defer func() {
// Update componentGenErr with the results.
c.setComponentGenError(err)
}()

ast := c.ast.Clone()
inputs, ok := transpiler.Lookup(ast, "inputs")
if ok {
Expand Down
Loading
Loading