From 539a5f2956e423558b2d48fdb8bcdcf257348d51 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Tue, 1 Aug 2023 10:42:54 -0400 Subject: [PATCH] Clean up runtime manager test initialization (#3150) The runtime manager uses a mysterious lookup table, `waitReady`, that is injected into the TLS authentication callbacks to allow a special list of unregistered components to connect. However this turns out to only ever be used by the unit tests for a single connection, which itself is only used to probe whether the RPC server is listening yet. This can be done more simply by just setting an atomic flag when the server loop starts, so this PR does that and removes the extra synchronization baggage. Has no functional effect except when using the unit test helper `waitForReady`, and should effectively still be a no-op there (just with fewer redundant network connections). ## Checklist - [x] My code follows the style guidelines of this project - [x] I have commented my code, particularly in hard-to-understand areas - [ ] ~~I have made corresponding changes to the documentation~~ - [ ] ~~I have made corresponding change to the default configuration files~~ - [ ] ~~I have added tests that prove my fix is effective or that my feature works~~ - [ ] ~~I have added an entry in `./changelog/fragments` using the [changelog tool](https://github.com/elastic/elastic-agent#changelog)~~ - [ ] ~~I have added an integration test or an E2E test~~ --- pkg/component/runtime/manager.go | 92 ++------------------------- pkg/component/runtime/manager_test.go | 48 ++++++++------ 2 files changed, 34 insertions(+), 106 deletions(-) diff --git a/pkg/component/runtime/manager.go b/pkg/component/runtime/manager.go index 3beb67f6482..a799e899670 100644 --- a/pkg/component/runtime/manager.go +++ b/pkg/component/runtime/manager.go @@ -103,9 +103,8 @@ type Manager struct { listener net.Listener server *grpc.Server - // waitMx synchronizes the access to waitReady only - waitMx sync.RWMutex - waitReady map[string]waitForReady + // Set when the RPC server is ready to receive requests, for use by tests. + serverReady *atomic.Bool // updateMx protects the call to update to ensure that // only one call to update occurs at a time @@ -151,13 +150,13 @@ func NewManager( listenAddr: listenAddr, agentInfo: agentInfo, tracer: tracer, - waitReady: make(map[string]waitForReady), current: make(map[string]*componentRuntimeState), shipperConns: make(map[string]*shipperConn), subscriptions: make(map[string][]*Subscription), errCh: make(chan error), monitor: monitor, grpcConfig: grpcConfig, + serverReady: atomic.NewBool(false), } return m, nil } @@ -216,6 +215,7 @@ func (m *Manager) Run(ctx context.Context) error { wg.Add(1) go func() { defer wg.Done() + m.serverReady.Store(true) for { err := server.Serve(lis) if err != nil { @@ -242,73 +242,6 @@ func (m *Manager) Run(ctx context.Context) error { return ctx.Err() } -// waitForReady waits until the manager is ready to be used. -// Used for testing. -// -// This verifies that the GRPC server is up and running. -func (m *Manager) waitForReady(ctx context.Context) error { - tk, err := uuid.NewV4() - if err != nil { - return err - } - token := tk.String() - name, err := genServerName() - if err != nil { - return err - } - pair, err := m.ca.GeneratePairWithName(name) - if err != nil { - return err - } - cert, err := tls.X509KeyPair(pair.Crt, pair.Key) - if err != nil { - return err - } - caCertPool := x509.NewCertPool() - caCertPool.AppendCertsFromPEM(m.ca.Crt()) - trans := credentials.NewTLS(&tls.Config{ - ServerName: name, - Certificates: []tls.Certificate{cert}, - RootCAs: caCertPool, - MinVersion: tls.VersionTLS12, - }) - - m.waitMx.Lock() - m.waitReady[token] = waitForReady{ - name: name, - cert: pair, - } - m.waitMx.Unlock() - - defer func() { - m.waitMx.Lock() - delete(m.waitReady, token) - m.waitMx.Unlock() - }() - - for { - m.netMx.RLock() - lis := m.listener - srv := m.server - m.netMx.RUnlock() - if lis != nil && srv != nil { - addr := m.getListenAddr() - c, err := grpc.Dial(addr, grpc.WithTransportCredentials(trans)) - if err == nil { - _ = c.Close() - return nil - } - } - - t := time.NewTimer(100 * time.Millisecond) - select { - case <-ctx.Done(): - return ctx.Err() - case <-t.C: - } - } -} - // Errors returns channel that errors are reported on. func (m *Manager) Errors() <-chan error { return m.errCh @@ -917,18 +850,6 @@ func (m *Manager) getCertificate(chi *tls.ClientHelloInfo) (*tls.Certificate, er return cert, nil } - m.waitMx.RLock() - for _, waiter := range m.waitReady { - if waiter.name == chi.ServerName { - cert = waiter.cert.Certificate - break - } - } - m.waitMx.RUnlock() - if cert != nil { - return cert, nil - } - return nil, errors.New("no supported TLS certificate") } @@ -1057,8 +978,3 @@ func (m *Manager) performDiagAction(ctx context.Context, comp component.Componen } return res.Diagnostic, nil } - -type waitForReady struct { - name string - cert *authority.Pair -} diff --git a/pkg/component/runtime/manager_test.go b/pkg/component/runtime/manager_test.go index f503be69b61..8cb70e9c0ed 100644 --- a/pkg/component/runtime/manager_test.go +++ b/pkg/component/runtime/manager_test.go @@ -91,7 +91,7 @@ func TestManager_SimpleComponentErr(t *testing.T) { waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) defer waitCancel() - if err := m.waitForReady(waitCtx); err != nil { + if err := waitForReady(waitCtx, m); err != nil { require.NoError(t, err) } @@ -186,7 +186,7 @@ func TestManager_FakeInput_StartStop(t *testing.T) { waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) defer waitCancel() - if err := m.waitForReady(waitCtx); err != nil { + if err := waitForReady(waitCtx, m); err != nil { require.NoError(t, err) } @@ -313,7 +313,7 @@ func TestManager_FakeInput_Features(t *testing.T) { waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) defer waitCancel() - if err := m.waitForReady(waitCtx); err != nil { + if err := waitForReady(waitCtx, m); err != nil { require.NoError(t, err) } @@ -502,7 +502,7 @@ func TestManager_FakeInput_BadUnitToGood(t *testing.T) { waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) defer waitCancel() - if err := m.waitForReady(waitCtx); err != nil { + if err := waitForReady(waitCtx, m); err != nil { require.NoError(t, err) } @@ -668,7 +668,7 @@ func TestManager_FakeInput_GoodUnitToBad(t *testing.T) { waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) defer waitCancel() - if err := m.waitForReady(waitCtx); err != nil { + if err := waitForReady(waitCtx, m); err != nil { require.NoError(t, err) } @@ -824,7 +824,7 @@ func TestManager_FakeInput_NoDeadlock(t *testing.T) { waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) defer waitCancel() - if err := m.waitForReady(waitCtx); err != nil { + if err := waitForReady(waitCtx, m); err != nil { require.NoError(t, err) } @@ -958,7 +958,7 @@ func TestManager_FakeInput_Configure(t *testing.T) { waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) defer waitCancel() - if err := m.waitForReady(waitCtx); err != nil { + if err := waitForReady(waitCtx, m); err != nil { require.NoError(t, err) } @@ -1078,7 +1078,7 @@ func TestManager_FakeInput_RemoveUnit(t *testing.T) { waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) defer waitCancel() - if err := m.waitForReady(waitCtx); err != nil { + if err := waitForReady(waitCtx, m); err != nil { require.NoError(t, err) } @@ -1231,7 +1231,7 @@ func TestManager_FakeInput_ActionState(t *testing.T) { waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) defer waitCancel() - if err := m.waitForReady(waitCtx); err != nil { + if err := waitForReady(waitCtx, m); err != nil { require.NoError(t, err) } @@ -1355,7 +1355,7 @@ func TestManager_FakeInput_Restarts(t *testing.T) { waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) defer waitCancel() - if err := m.waitForReady(waitCtx); err != nil { + if err := waitForReady(waitCtx, m); err != nil { require.NoError(t, err) } @@ -1490,7 +1490,7 @@ func TestManager_FakeInput_Restarts_ConfigKill(t *testing.T) { waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) defer waitCancel() - if err := m.waitForReady(waitCtx); err != nil { + if err := waitForReady(waitCtx, m); err != nil { require.NoError(t, err) } @@ -1632,7 +1632,7 @@ func TestManager_FakeInput_KeepsRestarting(t *testing.T) { waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) defer waitCancel() - if err := m.waitForReady(waitCtx); err != nil { + if err := waitForReady(waitCtx, m); err != nil { require.NoError(t, err) } @@ -1774,7 +1774,7 @@ func TestManager_FakeInput_RestartsOnMissedCheckins(t *testing.T) { waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) defer waitCancel() - if err := m.waitForReady(waitCtx); err != nil { + if err := waitForReady(waitCtx, m); err != nil { require.NoError(t, err) } @@ -1889,7 +1889,7 @@ func TestManager_FakeInput_InvalidAction(t *testing.T) { waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) defer waitCancel() - if err := m.waitForReady(waitCtx); err != nil { + if err := waitForReady(waitCtx, m); err != nil { require.NoError(t, err) } @@ -2014,7 +2014,7 @@ func TestManager_FakeInput_MultiComponent(t *testing.T) { waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) defer waitCancel() - if err := m.waitForReady(waitCtx); err != nil { + if err := waitForReady(waitCtx, m); err != nil { require.NoError(t, err) } @@ -2227,7 +2227,7 @@ func TestManager_FakeInput_LogLevel(t *testing.T) { waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) defer waitCancel() - if err := m.waitForReady(waitCtx); err != nil { + if err := waitForReady(waitCtx, m); err != nil { require.NoError(t, err) } @@ -2371,7 +2371,7 @@ func TestManager_FakeShipper(t *testing.T) { waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) defer waitCancel() - if err := m.waitForReady(waitCtx); err != nil { + if err := waitForReady(waitCtx, m); err != nil { require.NoError(t, err) } @@ -2672,7 +2672,7 @@ func TestManager_FakeInput_OutputChange(t *testing.T) { waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) defer waitCancel() - if err := m.waitForReady(waitCtx); err != nil { + if err := waitForReady(waitCtx, m); err != nil { require.NoError(t, err) } @@ -2998,3 +2998,15 @@ func newTestMonitoringMgr() *testMonitoringManager { return &testMonitoringManag func (*testMonitoringManager) EnrichArgs(_ string, _ string, args []string) []string { return args } func (*testMonitoringManager) Prepare(_ string) error { return nil } func (*testMonitoringManager) Cleanup(string) error { return nil } + +// waitForReady waits until the RPC server is ready to be used. +func waitForReady(ctx context.Context, m *Manager) error { + for !m.serverReady.Load() { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(100 * time.Millisecond): + } + } + return nil +}