Skip to content

Commit

Permalink
[Feature] Start/stop monitoring server based on monitoring config (#3584
Browse files Browse the repository at this point in the history
)
  • Loading branch information
michalpristas authored Oct 30, 2023
1 parent 318a4bc commit 66e0f95
Show file tree
Hide file tree
Showing 7 changed files with 458 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: enhancement

# Change summary; a 80ish characters long description of the change.
summary: Start/stop monitoring server based on monitoring config

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
#description:

# Affected component; a word indicating the component this changeset affects.
component: elastic-agent

# PR number; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: 3492

# Issue number; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: 2735
16 changes: 16 additions & 0 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ type ComponentsModifier func(comps []component.Component, cfg map[string]interfa
// CoordinatorShutdownTimeout is how long the coordinator will wait during shutdown to receive a "clean" shutdown from other components
var CoordinatorShutdownTimeout = time.Second * 5

type configReloader interface {
Reload(*config.Config) error
}

// Coordinator manages the entire state of the Elastic Agent.
//
// All configuration changes, update variables, and upgrade actions are managed and controlled by the coordinator.
Expand All @@ -175,6 +179,8 @@ type Coordinator struct {
upgradeMgr UpgradeManager
monitorMgr MonitorManager

monitoringServerReloader configReloader

runtimeMgr RuntimeManager
configMgr ConfigManager
varsMgr VarsManager
Expand Down Expand Up @@ -376,6 +382,10 @@ func (c *Coordinator) State() State {
return c.stateBroadcaster.Get()
}

func (c *Coordinator) RegisterMonitoringServer(s configReloader) {
c.monitoringServerReloader = s
}

// StateSubscribe returns a channel that reports changes in Coordinator state.
//
// bufferLen specifies how many state changes should be queued in addition to
Expand Down Expand Up @@ -1038,6 +1048,12 @@ func (c *Coordinator) generateAST(cfg *config.Config) (err error) {
}
}

if c.monitoringServerReloader != nil {
if err := c.monitoringServerReloader.Reload(cfg); err != nil {
return fmt.Errorf("failed to reload monitor manager configuration: %w", err)
}
}

c.ast = rawAst
return nil
}
Expand Down
154 changes: 154 additions & 0 deletions internal/pkg/agent/application/coordinator/coordinator_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ import (
"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/reload"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
"github.com/elastic/elastic-agent/internal/pkg/agent/transpiler"
"github.com/elastic/elastic-agent/internal/pkg/config"
monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/component/runtime"
agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client"
Expand Down Expand Up @@ -521,6 +523,136 @@ inputs:
}
}

func TestCoordinatorPolicyChangeUpdatesMonitorReloader(t *testing.T) {
// Send a test policy to the Coordinator as a Config Manager update,
// verify it generates the right component model and sends it to the
// runtime manager, then send an empty policy and verify it calls
// another runtime manager update with an empty component model.

// Set a one-second timeout -- nothing here should block, but if it
// does let's report a failure instead of timing out the test runner.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
logger := logp.NewLogger("testing")

configChan := make(chan ConfigChange, 1)

// Create a mocked runtime manager that will report the update call
runtimeManager := &fakeRuntimeManager{
updateCallback: func(comp []component.Component) error {
return nil
},
}

monitoringServer := &fakeMonitoringServer{}
newServerFn := func() (reload.ServerController, error) {
return monitoringServer, nil
}
monitoringReloader := reload.NewServerReloader(newServerFn, logger, monitoringCfg.DefaultConfig())

coord := &Coordinator{
logger: logger,
agentInfo: &info.AgentInfo{},
stateBroadcaster: broadcaster.New(State{}, 0, 0),
managerChans: managerChans{
configManagerUpdate: configChan,
},
runtimeMgr: runtimeManager,
vars: emptyVars(t),
}
coord.RegisterMonitoringServer(monitoringReloader)

// Create a policy with one input and one output
cfg := config.MustNewConfigFrom(`
outputs:
default:
type: elasticsearch
inputs:
- id: test-input
type: filestream
use_output: default
`)

// Send the policy change and make sure it was acknowledged.
cfgChange := &configChange{cfg: cfg}
configChan <- cfgChange
coord.runLoopIteration(ctx)
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")

// server is started by default
assert.True(t, monitoringServer.startTriggered)
assert.True(t, monitoringServer.isRunning)

// disable monitoring
cfgDisableMonitoring := config.MustNewConfigFrom(`
agent.monitoring.enabled: false
outputs:
default:
type: elasticsearch
inputs:
- id: test-input
type: filestream
use_output: default
`)

// Send the policy change and make sure it was acknowledged.
monitoringServer.Reset()
cfgChange = &configChange{cfg: cfgDisableMonitoring}
configChan <- cfgChange
coord.runLoopIteration(ctx)
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")

// server is stopped: monitoring is disabled
assert.True(t, monitoringServer.stopTriggered)
assert.False(t, monitoringServer.isRunning)

// enable monitoring
cfgEnabledMonitoring := config.MustNewConfigFrom(`
agent.monitoring.enabled: true
outputs:
default:
type: elasticsearch
inputs:
- id: test-input
type: filestream
use_output: default
`)

// Send the policy change and make sure it was acknowledged.
monitoringServer.Reset()
cfgChange = &configChange{cfg: cfgEnabledMonitoring}
configChan <- cfgChange
coord.runLoopIteration(ctx)
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")

// server is started again
assert.True(t, monitoringServer.startTriggered)
assert.True(t, monitoringServer.isRunning)

// enable monitoring and disable metrics
cfgEnabledMonitoringNoMetrics := config.MustNewConfigFrom(`
agent.monitoring.enabled: true
agent.monitoring.metrics: false
outputs:
default:
type: elasticsearch
inputs:
- id: test-input
type: filestream
use_output: default
`)

// Send the policy change and make sure it was acknowledged.
monitoringServer.Reset()
cfgChange = &configChange{cfg: cfgEnabledMonitoringNoMetrics}
configChan <- cfgChange
coord.runLoopIteration(ctx)
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")

// server is running: monitoring.metrics is disabled does not have an effect
assert.True(t, monitoringServer.isRunning)
}

func TestCoordinatorPolicyChangeUpdatesRuntimeManager(t *testing.T) {
// Send a test policy to the Coordinator as a Config Manager update,
// verify it generates the right component model and sends it to the
Expand Down Expand Up @@ -867,3 +999,25 @@ func emptyAST(t *testing.T) *transpiler.AST {
require.NoError(t, err, "AST creation must succeed")
return ast
}

type fakeMonitoringServer struct {
startTriggered bool
stopTriggered bool
isRunning bool
}

func (fs *fakeMonitoringServer) Start() {
fs.startTriggered = true
fs.isRunning = true
}

func (fs *fakeMonitoringServer) Stop() error {
fs.stopTriggered = true
fs.isRunning = false
return nil
}

func (fs *fakeMonitoringServer) Reset() {
fs.stopTriggered = false
fs.startTriggered = false
}
102 changes: 102 additions & 0 deletions internal/pkg/agent/application/monitoring/reload/reload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package reload

import (
"sync/atomic"

"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
aConfig "github.com/elastic/elastic-agent/internal/pkg/config"
monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config"
"github.com/elastic/elastic-agent/pkg/core/logger"
)

// ServerController controls the server runtime
type ServerController interface {
Start()
Stop() error
}
type serverConstructor func() (ServerController, error)

type ServerReloader struct {
s ServerController
log *logger.Logger
newServerFn serverConstructor

config *monitoringCfg.MonitoringConfig
isServerRunning atomic.Bool
}

func NewServerReloader(newServerFn serverConstructor, log *logger.Logger, mcfg *monitoringCfg.MonitoringConfig) *ServerReloader {
sr := &ServerReloader{
log: log,
config: mcfg,
newServerFn: newServerFn,
}

return sr
}

func (sr *ServerReloader) Start() {
if sr.s != nil && sr.isServerRunning.Load() {
// server is already running
return
}

sr.log.Info("Starting server")
var err error
sr.s, err = sr.newServerFn()
if err != nil {
sr.log.Errorf("Failed creating a server: %v", err)
return
}

sr.s.Start()
sr.log.Debugf("Server started")
sr.isServerRunning.Store(true)
}

func (sr *ServerReloader) Stop() error {
if sr.s == nil {
// stopping not started server
sr.isServerRunning.Store(false)
return nil
}
sr.log.Info("Stopping server")

sr.isServerRunning.Store(false)
if err := sr.s.Stop(); err != nil {
return err
}

sr.log.Debugf("Server stopped")
sr.s = nil
return nil
}

func (sr *ServerReloader) Reload(rawConfig *aConfig.Config) error {
newConfig := configuration.DefaultConfiguration()
if err := rawConfig.Unpack(&newConfig); err != nil {
return errors.New(err, "failed to unpack monitoring config during reload")
}

sr.config = newConfig.Settings.MonitoringConfig

shouldRunMetrics := sr.config.Enabled
if shouldRunMetrics && !sr.isServerRunning.Load() {
sr.Start()

sr.isServerRunning.Store(true)
return nil
}

if !shouldRunMetrics && sr.isServerRunning.Load() {
sr.isServerRunning.Store(false)
return sr.Stop()
}

return nil
}
Loading

0 comments on commit 66e0f95

Please sign in to comment.