-
Notifications
You must be signed in to change notification settings - Fork 143
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add / report persistent policy error metadata in Coordinator #3076
Changes from all commits
bf3e634
c98ac6e
6252de3
eb9a295
2cb093d
06900f2
0b1303d
ed1c3a0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -214,11 +214,26 @@ type Coordinator struct { | |
// into the reported state before broadcasting -- State() will report | ||
// agentclient.Failed if one of these is set, even if the underlying | ||
// coordinator state is agentclient.Healthy. | ||
runtimeMgrErr error | ||
runtimeMgrErr error // Currently unused | ||
configMgrErr error | ||
actionsErr error | ||
varsMgrErr error | ||
|
||
// Errors resulting from different possible failure modes when setting a | ||
// new policy. Right now there are three different stages where a policy | ||
// update can fail: | ||
// - in generateAST, converting the policy to an AST | ||
// - in process, converting the AST and vars into a full component model | ||
// - while sending the final component model to the runtime manager | ||
// | ||
// The plan is to improve our preprocessing so we can always detect | ||
// failures immediately https://github.com/elastic/elastic-agent/issues/2887. | ||
// For now, we track three distinct errors for those three failure types, | ||
// and merge them into a readable error in generateReportableState. | ||
configErr error | ||
componentGenErr error | ||
runtimeUpdateErr error | ||
|
||
// The raw policy before spec lookup or variable substitution | ||
ast *transpiler.AST | ||
|
||
|
@@ -567,28 +582,28 @@ func (c *Coordinator) Run(ctx context.Context) error { | |
go c.watchRuntimeComponents(watchCtx) | ||
|
||
for { | ||
c.setState(agentclient.Starting, "Waiting for initial configuration and composable variables") | ||
c.setCoordinatorState(agentclient.Starting, "Waiting for initial configuration and composable variables") | ||
// The usual state refresh happens in the main run loop in Coordinator.runner, | ||
// so before/after the runner call we need to trigger state change broadcasts | ||
// manually. | ||
c.refreshState() | ||
err := c.runner(ctx) | ||
if err != nil { | ||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { | ||
c.setState(agentclient.Stopped, "Requested to be stopped") | ||
c.setCoordinatorState(agentclient.Stopped, "Requested to be stopped") | ||
c.setFleetState(agentclient.Stopped, "Requested to be stopped") | ||
c.refreshState() | ||
// do not restart | ||
return err | ||
} | ||
if errors.Is(err, ErrFatalCoordinator) { | ||
c.setState(agentclient.Failed, "Fatal coordinator error") | ||
c.setCoordinatorState(agentclient.Failed, "Fatal coordinator error") | ||
c.setFleetState(agentclient.Stopped, "Fatal coordinator error") | ||
c.refreshState() | ||
return err | ||
} | ||
} | ||
c.setState(agentclient.Failed, fmt.Sprintf("Coordinator failed and will be restarted: %s", err)) | ||
c.setCoordinatorState(agentclient.Failed, fmt.Sprintf("Coordinator failed and will be restarted: %s", err)) | ||
c.refreshState() | ||
c.logger.Errorf("coordinator failed and will be restarted: %s", err) | ||
} | ||
|
@@ -857,31 +872,36 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) { | |
|
||
case change := <-c.managerChans.configManagerUpdate: | ||
if err := c.processConfig(ctx, change.Config()); err != nil { | ||
c.setState(agentclient.Failed, err.Error()) | ||
c.logger.Errorf("%s", err) | ||
c.logger.Errorf("applying new policy: %s", err.Error()) | ||
change.Fail(err) | ||
} else { | ||
if err := change.Ack(); err != nil { | ||
err = fmt.Errorf("failed to ack configuration change: %w", err) | ||
c.setState(agentclient.Failed, err.Error()) | ||
c.logger.Errorf("%s", err) | ||
// Workaround: setConfigManagerError is usually used by the config | ||
// manager to report failed ACKs / etc when communicating with Fleet. | ||
// We need to report a failed ACK here, but the policy change has | ||
// already been successfully applied so we don't want to report it as | ||
// a general Coordinator or policy failure. | ||
// This arises uniquely here because this is the only case where an | ||
// action is responsible for reporting the failure of its own ACK | ||
// call. The "correct" fix is to make this Ack() call unfailable | ||
// and handle ACK retries and reporting in the config manager like | ||
// with other action types -- this error would then end up invoking | ||
// setConfigManagerError "organically" via the config manager's | ||
// reporting channel. In the meantime, we do it manually. | ||
c.setConfigManagerError(err) | ||
c.logger.Errorf("%s", err.Error()) | ||
} | ||
} | ||
|
||
case vars := <-c.managerChans.varsManagerUpdate: | ||
if ctx.Err() == nil { | ||
if err := c.processVars(ctx, vars); err != nil { | ||
c.setState(agentclient.Failed, err.Error()) | ||
c.logger.Errorf("%s", err) | ||
} | ||
c.processVars(ctx, vars) | ||
} | ||
|
||
case ll := <-c.logLevelCh: | ||
if ctx.Err() == nil { | ||
if err := c.processLogLevel(ctx, ll); err != nil { | ||
c.setState(agentclient.Failed, err.Error()) | ||
c.logger.Errorf("%s", err) | ||
} | ||
c.processLogLevel(ctx, ll) | ||
} | ||
} | ||
|
||
|
@@ -900,7 +920,34 @@ func (c *Coordinator) processConfig(ctx context.Context, cfg *config.Config) (er | |
span.End() | ||
}() | ||
|
||
if err := info.InjectAgentConfig(cfg); err != nil { | ||
err = c.generateAST(cfg) | ||
c.setConfigError(err) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Disabled for 8.8.0 release in order to limit the surface | ||
// https://github.com/elastic/security-team/issues/6501 | ||
|
||
// c.setProtection(protectionConfig) | ||
|
||
if c.vars != nil { | ||
return c.refreshComponentModel(ctx) | ||
} | ||
return nil | ||
} | ||
|
||
// Generate the AST for a new incoming configuration and, if successful, | ||
// assign it to the Coordinator's ast field. | ||
func (c *Coordinator) generateAST(cfg *config.Config) (err error) { | ||
defer func() { | ||
// Update configErr, which stores the results of the most recent policy | ||
// update and is merged into the Coordinator state in | ||
// generateReportableState. | ||
c.setConfigError(err) | ||
}() | ||
|
||
if err = info.InjectAgentConfig(cfg); err != nil { | ||
return err | ||
} | ||
|
||
|
@@ -943,63 +990,47 @@ func (c *Coordinator) processConfig(ctx context.Context, cfg *config.Config) (er | |
} | ||
|
||
c.ast = rawAst | ||
|
||
// Disabled for 8.8.0 release in order to limit the surface | ||
// https://github.com/elastic/security-team/issues/6501 | ||
|
||
// c.setProtection(protectionConfig) | ||
|
||
if c.vars != nil { | ||
return c.process(ctx) | ||
} | ||
return nil | ||
} | ||
|
||
// processVars updates the transpiler vars in the Coordinator. | ||
// Called on the main Coordinator goroutine. | ||
func (c *Coordinator) processVars(ctx context.Context, vars []*transpiler.Vars) (err error) { | ||
span, ctx := apm.StartSpan(ctx, "vars", "app.internal") | ||
defer func() { | ||
apm.CaptureError(ctx, err).Send() | ||
span.End() | ||
}() | ||
|
||
func (c *Coordinator) processVars(ctx context.Context, vars []*transpiler.Vars) { | ||
c.vars = vars | ||
|
||
if c.ast != nil { | ||
return c.process(ctx) | ||
err := c.refreshComponentModel(ctx) | ||
if err != nil { | ||
c.logger.Errorf("updating Coordinator variables: %s", err.Error()) | ||
} | ||
return nil | ||
} | ||
|
||
// Called on the main Coordinator goroutine. | ||
func (c *Coordinator) processLogLevel(ctx context.Context, ll logp.Level) (err error) { | ||
span, ctx := apm.StartSpan(ctx, "log_level", "app.internal") | ||
defer func() { | ||
apm.CaptureError(ctx, err).Send() | ||
span.End() | ||
}() | ||
|
||
func (c *Coordinator) processLogLevel(ctx context.Context, ll logp.Level) { | ||
c.setLogLevel(ll) | ||
|
||
if c.ast != nil && c.vars != nil { | ||
return c.process(ctx) | ||
err := c.refreshComponentModel(ctx) | ||
if err != nil { | ||
c.logger.Errorf("updating log level: %s", err.Error()) | ||
} | ||
return nil | ||
} | ||
|
||
// Regenerate the component model based on the current vars and AST, then | ||
// forward the result to the runtime manager. | ||
// Always called on the main Coordinator goroutine. | ||
func (c *Coordinator) process(ctx context.Context) (err error) { | ||
span, ctx := apm.StartSpan(ctx, "process", "app.internal") | ||
func (c *Coordinator) refreshComponentModel(ctx context.Context) (err error) { | ||
if c.ast == nil || c.vars == nil { | ||
// Nothing to process yet | ||
return nil | ||
} | ||
|
||
span, ctx := apm.StartSpan(ctx, "refreshComponentModel", "app.internal") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can this be considered breaking? renaming span name There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. being that APM tracing inside of the Elastic Agent itself is for development or debugging only I would think not |
||
defer func() { | ||
apm.CaptureError(ctx, err).Send() | ||
span.End() | ||
}() | ||
|
||
// regenerate the component model | ||
err = c.recomputeConfigAndComponents() | ||
err = c.generateComponentModel() | ||
if err != nil { | ||
return err | ||
return fmt.Errorf("generating component model: %w", err) | ||
} | ||
|
||
signed, err := component.SignedFromPolicy(c.derivedConfig) | ||
|
@@ -1021,18 +1052,24 @@ func (c *Coordinator) process(ctx context.Context) (err error) { | |
c.logger.Info("Updating running component model") | ||
c.logger.With("components", model.Components).Debug("Updating running component model") | ||
err = c.runtimeMgr.Update(model) | ||
c.setRuntimeUpdateError(err) | ||
if err != nil { | ||
return err | ||
return fmt.Errorf("updating runtime: %w", err) | ||
} | ||
c.setState(agentclient.Healthy, "Running") | ||
c.setCoordinatorState(agentclient.Healthy, "Running") | ||
return nil | ||
} | ||
|
||
// recomputeConfigAndComponents regenerates the configuration tree and | ||
// generateComponentModel regenerates the configuration tree and | ||
// components from the current AST and vars and returns the result. | ||
// Called from both the main Coordinator goroutine and from external | ||
// goroutines via diagnostics hooks. | ||
func (c *Coordinator) recomputeConfigAndComponents() error { | ||
func (c *Coordinator) generateComponentModel() (err error) { | ||
defer func() { | ||
// Update componentGenErr with the results. | ||
c.setComponentGenError(err) | ||
}() | ||
|
||
ast := c.ast.Clone() | ||
inputs, ok := transpiler.Lookup(ast, "inputs") | ||
if ok { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we keep it then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair question -- I asked @blakerouse a while ago and he said it was so the runtime manager satisfies the
Runner
interface. A more immediate answer is that it will probably be used soon: I have a PR in progress to centralize more of the runtime manager logic in its own run loop. For that, it's useful to decouple runtime managerUpdate
calls from the error result they produce, to avoid accessing runtime internals from the Coordinator goroutine. In that PR I've started using the previously-idle error channel to report component model update failures, and those end up being stored in this field.