Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Harden linux/windows vault implementation for concurrent access. #3240

Merged
merged 17 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions .buildkite/scripts/steps/integration_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@ TESTS_EXIT_STATUS=$?
set -e

# HTML report
go install github.com/alexec/junit2html@latest
junit2html < build/TEST-go-integration.xml > build/TEST-report.html
outputXML="build/TEST-go-integration.xml"

if [ -f "$outputXML" ]; then
go install github.com/alexec/junit2html@latest
junit2html < "$outputXML" > build/TEST-report.html
else
echo "Cannot generate HTML test report: $outputXML not found"
fi

exit $TESTS_EXIT_STATUS
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ import (
func TestPolicyChange(t *testing.T) {
log, _ := logger.New("", false)
ack := noopacker.New()
agentInfo, _ := info.NewAgentInfo(true)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

agentInfo, _ := info.NewAgentInfo(ctx, true)
nullStore := &storage.NullStore{}

t.Run("Receive a config change and successfully emits a raw configuration", func(t *testing.T) {
Expand All @@ -59,7 +63,10 @@ func TestPolicyChange(t *testing.T) {

func TestPolicyAcked(t *testing.T) {
log, _ := logger.New("", false)
agentInfo, _ := info.NewAgentInfo(true)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

agentInfo, _ := info.NewAgentInfo(ctx, true)
nullStore := &storage.NullStore{}

t.Run("Config change should ACK", func(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (h *Settings) Handle(ctx context.Context, a fleetapi.Action, acker acker.Ac
return fmt.Errorf("failed to unpack log level: %w", err)
}

if err := h.agentInfo.SetLogLevel(action.LogLevel); err != nil {
if err := h.agentInfo.SetLogLevel(ctx, action.LogLevel); err != nil {
return fmt.Errorf("failed to update log level: %w", err)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestUpgradeHandler(t *testing.T) {
defer cancel()

log, _ := logger.New("", false)
agentInfo, _ := info.NewAgentInfo(true)
agentInfo, _ := info.NewAgentInfo(ctx, true)
msgChan := make(chan string)

// Create and start the coordinator
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestUpgradeHandlerSameVersion(t *testing.T) {
defer cancel()

log, _ := logger.New("", false)
agentInfo, _ := info.NewAgentInfo(true)
agentInfo, _ := info.NewAgentInfo(ctx, true)
msgChan := make(chan string)

// Create and start the Coordinator
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestUpgradeHandlerNewVersion(t *testing.T) {
defer cancel()

log, _ := logger.New("", false)
agentInfo, _ := info.NewAgentInfo(true)
agentInfo, _ := info.NewAgentInfo(ctx, true)
msgChan := make(chan string)

// Create and start the Coordinator
Expand Down
10 changes: 6 additions & 4 deletions internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package application

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -34,6 +35,7 @@ import (

// New creates a new Agent and bootstrap the required subsystem.
func New(
ctx context.Context,
log *logger.Logger,
baseLogger *logger.Logger,
logLevel logp.Level,
Expand Down Expand Up @@ -139,7 +141,7 @@ func New(
} else {
isManaged = true
var store storage.Store
store, cfg, err = mergeFleetConfig(rawConfig)
store, cfg, err = mergeFleetConfig(ctx, rawConfig)
if err != nil {
return nil, nil, nil, err
}
Expand All @@ -158,7 +160,7 @@ func New(
EndpointSignedComponentModifier(),
)

managed, err = newManagedConfigManager(log, agentInfo, cfg, store, runtime, fleetInitTimeout)
managed, err = newManagedConfigManager(ctx, log, agentInfo, cfg, store, runtime, fleetInitTimeout)
if err != nil {
return nil, nil, nil, err
}
Expand Down Expand Up @@ -196,9 +198,9 @@ func New(
return coord, configMgr, composable, nil
}

func mergeFleetConfig(rawConfig *config.Config) (storage.Store, *configuration.Configuration, error) {
func mergeFleetConfig(ctx context.Context, rawConfig *config.Config) (storage.Store, *configuration.Configuration, error) {
path := paths.AgentConfigFile()
store := storage.NewEncryptedDiskStore(path)
store := storage.NewEncryptedDiskStore(ctx, path)

reader, err := store.Load()
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion internal/pkg/agent/application/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package application

import (
"context"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -37,7 +38,7 @@ func TestMergeFleetConfig(t *testing.T) {
}

rawConfig := config.MustNewConfigFrom(cfg)
storage, conf, err := mergeFleetConfig(rawConfig)
storage, conf, err := mergeFleetConfig(context.Background(), rawConfig)
require.NoError(t, err)
assert.NotNil(t, storage)
assert.NotNil(t, conf)
Expand All @@ -48,7 +49,11 @@ func TestMergeFleetConfig(t *testing.T) {

func TestLimitsLog(t *testing.T) {
log, obs := logger.NewTesting("TestLimitsLog")
ctx, cn := context.WithCancel(context.Background())
defer cn()

_, _, _, err := New(
ctx,
log,
log,
logp.DebugLevel,
Expand Down
16 changes: 8 additions & 8 deletions internal/pkg/agent/application/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestCoordinator_State_Starting(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

coord, cfgMgr, varsMgr := createCoordinator(t)
coord, cfgMgr, varsMgr := createCoordinator(t, ctx)
stateChan := coord.StateSubscribe(ctx, 32)
go func() {
err := coord.Run(ctx)
Expand Down Expand Up @@ -147,7 +147,7 @@ func TestCoordinator_State_ConfigError_NotManaged(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

coord, cfgMgr, varsMgr := createCoordinator(t)
coord, cfgMgr, varsMgr := createCoordinator(t, ctx)
go func() {
err := coord.Run(ctx)
if errors.Is(err, context.Canceled) {
Expand Down Expand Up @@ -190,7 +190,7 @@ func TestCoordinator_State_ConfigError_Managed(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

coord, cfgMgr, varsMgr := createCoordinator(t, ManagedCoordinator(true))
coord, cfgMgr, varsMgr := createCoordinator(t, ctx, ManagedCoordinator(true))
go func() {
err := coord.Run(ctx)
if errors.Is(err, context.Canceled) {
Expand Down Expand Up @@ -232,7 +232,7 @@ func TestCoordinator_StateSubscribe(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

coord, cfgMgr, varsMgr := createCoordinator(t)
coord, cfgMgr, varsMgr := createCoordinator(t, ctx)
go func() {
err := coord.Run(ctx)
if errors.Is(err, context.Canceled) {
Expand Down Expand Up @@ -392,7 +392,7 @@ func TestCoordinator_ReExec(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

coord, cfgMgr, varsMgr := createCoordinator(t)
coord, cfgMgr, varsMgr := createCoordinator(t, ctx)
go func() {
err := coord.Run(ctx)
if errors.Is(err, context.Canceled) {
Expand Down Expand Up @@ -431,7 +431,7 @@ func TestCoordinator_Upgrade(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

coord, cfgMgr, varsMgr := createCoordinator(t)
coord, cfgMgr, varsMgr := createCoordinator(t, ctx)
go func() {
err := coord.Run(ctx)
if errors.Is(err, context.Canceled) {
Expand Down Expand Up @@ -472,7 +472,7 @@ func ManagedCoordinator(managed bool) CoordinatorOpt {
// createCoordinator creates a coordinator that using a fake config manager and a fake vars manager.
//
// The runtime specifications is set up to use both the fake component and fake shipper.
func createCoordinator(t *testing.T, opts ...CoordinatorOpt) (*Coordinator, *fakeConfigManager, *fakeVarsManager) {
func createCoordinator(t *testing.T, ctx context.Context, opts ...CoordinatorOpt) (*Coordinator, *fakeConfigManager, *fakeVarsManager) {
t.Helper()

o := &createCoordinatorOpts{}
Expand All @@ -482,7 +482,7 @@ func createCoordinator(t *testing.T, opts ...CoordinatorOpt) (*Coordinator, *fak

l := newErrorLogger(t)

ai, err := info.NewAgentInfo(false)
ai, err := info.NewAgentInfo(ctx, false)
require.NoError(t, err)

componentSpec := component.InputRuntimeSpec{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (f *fleetGateway) convertToCheckinComponents(components []runtime.Component
}

func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, time.Duration, error) {
ecsMeta, err := info.Metadata(f.log)
ecsMeta, err := info.Metadata(ctx, f.log)
if err != nil {
f.log.Error(errors.New("failed to load metadata", err))
}
Expand Down
15 changes: 8 additions & 7 deletions internal/pkg/agent/application/info/agent_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package info

import (
"bytes"
"context"
"fmt"
"io"
"time"
Expand Down Expand Up @@ -40,8 +41,8 @@ type ioStore interface {
}

// updateLogLevel updates log level and persists it to disk.
func updateLogLevel(level string) error {
ai, err := loadAgentInfoWithBackoff(false, defaultLogLevel, false)
func updateLogLevel(ctx context.Context, level string) error {
ai, err := loadAgentInfoWithBackoff(ctx, false, defaultLogLevel, false)
if err != nil {
return err
}
Expand All @@ -52,7 +53,7 @@ func updateLogLevel(level string) error {
}

agentConfigFile := paths.AgentConfigFile()
diskStore := storage.NewEncryptedDiskStore(agentConfigFile)
diskStore := storage.NewEncryptedDiskStore(ctx, agentConfigFile)

ai.LogLevel = level
return updateAgentInfo(diskStore, ai)
Expand Down Expand Up @@ -173,7 +174,7 @@ func yamlToReader(in interface{}) (io.Reader, error) {
return bytes.NewReader(data), nil
}

func loadAgentInfoWithBackoff(forceUpdate bool, logLevel string, createAgentID bool) (*persistentAgentInfo, error) {
func loadAgentInfoWithBackoff(ctx context.Context, forceUpdate bool, logLevel string, createAgentID bool) (*persistentAgentInfo, error) {
var err error
var ai *persistentAgentInfo

Expand All @@ -182,7 +183,7 @@ func loadAgentInfoWithBackoff(forceUpdate bool, logLevel string, createAgentID b

for i := 0; i <= maxRetriesloadAgentInfo; i++ {
backExp.Wait()
ai, err = loadAgentInfo(forceUpdate, logLevel, createAgentID)
ai, err = loadAgentInfo(ctx, forceUpdate, logLevel, createAgentID)
if !errors.Is(err, filelock.ErrAppAlreadyRunning) {
break
}
Expand All @@ -192,7 +193,7 @@ func loadAgentInfoWithBackoff(forceUpdate bool, logLevel string, createAgentID b
return ai, err
}

func loadAgentInfo(forceUpdate bool, logLevel string, createAgentID bool) (*persistentAgentInfo, error) {
func loadAgentInfo(ctx context.Context, forceUpdate bool, logLevel string, createAgentID bool) (*persistentAgentInfo, error) {
idLock := paths.AgentConfigFileLock()
if err := idLock.TryLock(); err != nil {
return nil, err
Expand All @@ -201,7 +202,7 @@ func loadAgentInfo(forceUpdate bool, logLevel string, createAgentID bool) (*pers
defer idLock.Unlock()

agentConfigFile := paths.AgentConfigFile()
diskStore := storage.NewEncryptedDiskStore(agentConfigFile)
diskStore := storage.NewEncryptedDiskStore(ctx, agentConfigFile)

agentInfo, err := getInfoFromStore(diskStore, logLevel)
if err != nil {
Expand Down
18 changes: 10 additions & 8 deletions internal/pkg/agent/application/info/agent_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package info

import (
"context"

"github.com/elastic/elastic-agent/internal/pkg/release"
"github.com/elastic/elastic-agent/pkg/core/logger"
)
Expand All @@ -25,8 +27,8 @@ type AgentInfo struct {
// new unique identifier for agent.
// If agent config file does not exist it gets created.
// Initiates log level to predefined value.
func NewAgentInfoWithLog(level string, createAgentID bool) (*AgentInfo, error) {
agentInfo, err := loadAgentInfoWithBackoff(false, level, createAgentID)
func NewAgentInfoWithLog(ctx context.Context, level string, createAgentID bool) (*AgentInfo, error) {
agentInfo, err := loadAgentInfoWithBackoff(ctx, false, level, createAgentID)
if err != nil {
return nil, err
}
Expand All @@ -43,8 +45,8 @@ func NewAgentInfoWithLog(level string, createAgentID bool) (*AgentInfo, error) {
// this created ID otherwise it generates
// new unique identifier for agent.
// If agent config file does not exist it gets created.
func NewAgentInfo(createAgentID bool) (*AgentInfo, error) {
return NewAgentInfoWithLog(defaultLogLevel, createAgentID)
func NewAgentInfo(ctx context.Context, createAgentID bool) (*AgentInfo, error) {
return NewAgentInfoWithLog(ctx, defaultLogLevel, createAgentID)
}

// LogLevel retrieves a log level.
Expand All @@ -56,8 +58,8 @@ func (i *AgentInfo) LogLevel() string {
}

// SetLogLevel updates log level of agent.
func (i *AgentInfo) SetLogLevel(level string) error {
if err := updateLogLevel(level); err != nil {
func (i *AgentInfo) SetLogLevel(ctx context.Context, level string) error {
if err := updateLogLevel(ctx, level); err != nil {
return err
}

Expand All @@ -66,8 +68,8 @@ func (i *AgentInfo) SetLogLevel(level string) error {
}

// ReloadID reloads agent info ID from configuration file.
func (i *AgentInfo) ReloadID() error {
newInfo, err := NewAgentInfoWithLog(i.logLevel, false)
func (i *AgentInfo) ReloadID(ctx context.Context) error {
newInfo, err := NewAgentInfoWithLog(ctx, i.logLevel, false)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions internal/pkg/agent/application/info/agent_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package info

import (
"context"
"fmt"
"runtime"
"strings"
Expand Down Expand Up @@ -125,8 +126,8 @@ const (
)

// Metadata loads metadata from disk.
func Metadata(l *logger.Logger) (*ECSMeta, error) {
agentInfo, err := NewAgentInfo(false)
func Metadata(ctx context.Context, l *logger.Logger) (*ECSMeta, error) {
agentInfo, err := NewAgentInfo(ctx, false)
if err != nil {
return nil, fmt.Errorf("failed to create new agent info: %w", err)
}
Expand Down
5 changes: 3 additions & 2 deletions internal/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type managedConfigManager struct {
}

func newManagedConfigManager(
ctx context.Context,
log *logger.Logger,
agentInfo *info.AgentInfo,
cfg *configuration.Configuration,
Expand All @@ -72,7 +73,7 @@ func newManagedConfigManager(
}

// Create the state store that will persist the last good policy change on disk.
stateStore, err := store.NewStateStoreWithMigration(log, paths.AgentActionStoreFile(), paths.AgentStateStoreFile())
stateStore, err := store.NewStateStoreWithMigration(ctx, log, paths.AgentActionStoreFile(), paths.AgentStateStoreFile())
if err != nil {
return nil, errors.New(err, fmt.Sprintf("fail to read action store '%s'", paths.AgentActionStoreFile()))
}
Expand Down Expand Up @@ -116,7 +117,7 @@ func (m *managedConfigManager) Run(ctx context.Context) error {
}

// Reload ID because of win7 sync issue
if err := m.agentInfo.ReloadID(); err != nil {
if err := m.agentInfo.ReloadID(ctx); err != nil {
return err
}

Expand Down
Loading
Loading