Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor BackendManager / BackendStorage. #610

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 70 additions & 168 deletions pkg/server/backend_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,13 @@ import (
"context"
"fmt"
"io"
"math/rand"
"slices"
"strings"
"sync"
"time"

"google.golang.org/grpc/metadata"
"k8s.io/klog/v2"

commonmetrics "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/common/metrics"
client "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client"
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client"
"sigs.k8s.io/apiserver-network-proxy/pkg/server/metrics"
"sigs.k8s.io/apiserver-network-proxy/proto/agent"
"sigs.k8s.io/apiserver-network-proxy/proto/header"
Expand Down Expand Up @@ -196,176 +192,105 @@ func NewBackend(conn agent.AgentService_ConnectServer) (*Backend, error) {
return &Backend{conn: conn, id: agentID, idents: agentIdentifiers}, nil
}

// BackendStorage is an interface to manage the storage of the backend
// connections, i.e., get, add and remove
type BackendStorage interface {
// addBackend adds a backend.
addBackend(identifier string, idType header.IdentifierType, backend *Backend)
// removeBackend removes a backend.
removeBackend(identifier string, idType header.IdentifierType, backend *Backend)
// NumBackends returns the number of backends.
NumBackends() int
}

// BackendManager is an interface to manage backend connections, i.e.,
// connection to the proxy agents.
type BackendManager interface {
// Backend returns a single backend.
// WARNING: the context passed to the function should be a session-scoped
// context instead of a request-scoped context, as the backend manager will
// pick a backend for every tunnel session and each tunnel session may
// contains multiple requests.
Backend(ctx context.Context) (*Backend, error)
// Backend returns a backend connection according to proxy strategies.
Backend(addr string) (*Backend, error)
// AddBackend adds a backend.
AddBackend(backend *Backend)
// RemoveBackend adds a backend.
RemoveBackend(backend *Backend)
BackendStorage
// NumBackends returns the number of backends.
NumBackends() int
ReadinessManager
}

var _ BackendManager = &DefaultBackendManager{}

// DefaultBackendManager is the default backend manager.
type DefaultBackendManager struct {
*DefaultBackendStorage
}

func (dbm *DefaultBackendManager) Backend(_ context.Context) (*Backend, error) {
klog.V(5).InfoS("Get a random backend through the DefaultBackendManager")
return dbm.DefaultBackendStorage.GetRandomBackend()
}

func (dbm *DefaultBackendManager) AddBackend(backend *Backend) {
agentID := backend.GetAgentID()
klog.V(5).InfoS("Add the agent to DefaultBackendManager", "agentID", agentID)
dbm.addBackend(agentID, header.UID, backend)
}

func (dbm *DefaultBackendManager) RemoveBackend(backend *Backend) {
agentID := backend.GetAgentID()
klog.V(5).InfoS("Remove the agent from the DefaultBackendManager", "agentID", agentID)
dbm.removeBackend(agentID, header.UID, backend)
}

// DefaultBackendStorage is the default backend storage.
type DefaultBackendStorage struct {
mu sync.RWMutex //protects the following
// A map between agentID and its grpc connections.
// For a given agent, ProxyServer prefers backends[agentID][0] to send
// traffic, because backends[agentID][1:] are more likely to be closed
// by the agent to deduplicate connections to the same server.
//
// TODO: fix documentation. This is not always agentID, e.g. in
// the case of DestHostBackendManager.
backends map[string][]*Backend
// agentID is tracked in this slice to enable randomly picking an
// agentID in the Backend() method. There is no reliable way to
// randomly pick a key from a map (in this case, the backends) in
// Golang.
agentIDs []string
random *rand.Rand
// idTypes contains the valid identifier types for this
// DefaultBackendStorage. The DefaultBackendStorage may only tolerate certain
// types of identifiers when associating to a specific BackendManager,
// e.g., when associating to the DestHostBackendManager, it can only use the
// identifiers of types, IPv4, IPv6 and Host.
idTypes []header.IdentifierType
proxyStrategies []ProxyStrategy

// All backends by agentID.
all BackendStorage
// All backends by host identifier(s). Only used with ProxyStrategyDestHost.
byHost BackendStorage
// All default-route backends, by agentID. Only used with ProxyStrategyDefaultRoute.
byDefaultRoute BackendStorage
}

// NewDefaultBackendManager returns a DefaultBackendManager.
func NewDefaultBackendManager() *DefaultBackendManager {
func NewDefaultBackendManager(proxyStrategies []ProxyStrategy) *DefaultBackendManager {
metrics.Metrics.SetBackendCount(0)
return &DefaultBackendManager{
DefaultBackendStorage: NewDefaultBackendStorage(
[]header.IdentifierType{header.UID})}
proxyStrategies: proxyStrategies,
all: NewDefaultBackendStorage(),
byHost: NewDefaultBackendStorage(),
byDefaultRoute: NewDefaultBackendStorage(),
}
}

// NewDefaultBackendStorage returns a DefaultBackendStorage
func NewDefaultBackendStorage(idTypes []header.IdentifierType) *DefaultBackendStorage {
// Set an explicit value, so that the metric is emitted even when
// no agent ever successfully connects.
metrics.Metrics.SetBackendCount(0)
return &DefaultBackendStorage{
backends: make(map[string][]*Backend),
random: rand.New(rand.NewSource(time.Now().UnixNano())),
idTypes: idTypes,
} /* #nosec G404 */
func (s *DefaultBackendManager) Backend(addr string) (*Backend, error) {
for _, strategy := range s.proxyStrategies {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we want to consolidate all possible proxy strategies into a single Manager?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we want to consolidate all possible proxy strategies into a single Manager?

At HEAD we have something of a mess, with 1:1 ProxyStrategy to BackendManager / BackendStorage, but logic spread between pkg/server/.*backend_manager.go and pkg/server/server.go. This change improves separation of concerns. For example, the main loop (pkg/server/server.go) does not need to be aware of proxy strategies.

After this consolidation, I can make the (single) BackendManager aware of agent drained state more safely.

var b *Backend
var e error
e = &ErrNotFound{}
switch strategy {
case ProxyStrategyDefault:
b, e = s.all.RandomBackend()
case ProxyStrategyDestHost:
b, e = s.byHost.GetBackend(addr)
case ProxyStrategyDefaultRoute:
b, e = s.byDefaultRoute.RandomBackend()
}
if e == nil {
return b, nil
}
}
return nil, &ErrNotFound{}
}

func containIDType(idTypes []header.IdentifierType, idType header.IdentifierType) bool {
return slices.Contains(idTypes, idType)
func hostIdentifiers(backend *Backend) []string {
hosts := []string{}
hosts = append(hosts, backend.GetAgentIdentifiers().IPv4...)
hosts = append(hosts, backend.GetAgentIdentifiers().IPv6...)
hosts = append(hosts, backend.GetAgentIdentifiers().Host...)
return hosts
}

// addBackend adds a backend.
func (s *DefaultBackendStorage) addBackend(identifier string, idType header.IdentifierType, backend *Backend) {
if !containIDType(s.idTypes, idType) {
klog.V(4).InfoS("fail to add backend", "backend", identifier, "error", &ErrWrongIDType{idType, s.idTypes})
return
func (s *DefaultBackendManager) AddBackend(backend *Backend) {
agentID := backend.GetAgentID()
count := s.all.AddBackend([]string{agentID}, backend)
if slices.Contains(s.proxyStrategies, ProxyStrategyDestHost) {
idents := hostIdentifiers(backend)
s.byHost.AddBackend(idents, backend)
}
klog.V(5).InfoS("Register backend for agent", "agentID", identifier)
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.backends[identifier]
if ok {
for _, b := range s.backends[identifier] {
if b == backend {
klog.V(1).InfoS("This should not happen. Adding existing backend for agent", "agentID", identifier)
return
}
if slices.Contains(s.proxyStrategies, ProxyStrategyDefaultRoute) {
if backend.GetAgentIdentifiers().DefaultRoute {
s.byDefaultRoute.AddBackend([]string{agentID}, backend)
}
s.backends[identifier] = append(s.backends[identifier], backend)
return
}
s.backends[identifier] = []*Backend{backend}
metrics.Metrics.SetBackendCount(len(s.backends))
s.agentIDs = append(s.agentIDs, identifier)
metrics.Metrics.SetBackendCount(count)
}

// removeBackend removes a backend.
func (s *DefaultBackendStorage) removeBackend(identifier string, idType header.IdentifierType, backend *Backend) {
if !containIDType(s.idTypes, idType) {
klog.ErrorS(&ErrWrongIDType{idType, s.idTypes}, "fail to remove backend")
return
}
klog.V(5).InfoS("Remove connection for agent", "agentID", identifier)
s.mu.Lock()
defer s.mu.Unlock()
backends, ok := s.backends[identifier]
if !ok {
klog.V(1).InfoS("Cannot find agent in backends", "identifier", identifier)
return
}
var found bool
for i, b := range backends {
if b == backend {
s.backends[identifier] = append(s.backends[identifier][:i], s.backends[identifier][i+1:]...)
if i == 0 && len(s.backends[identifier]) != 0 {
klog.V(1).InfoS("This should not happen. Removed connection that is not the first connection", "agentID", identifier)
}
found = true
}
func (s *DefaultBackendManager) RemoveBackend(backend *Backend) {
agentID := backend.GetAgentID()
count := s.all.RemoveBackend([]string{agentID}, backend)
if slices.Contains(s.proxyStrategies, ProxyStrategyDestHost) {
idents := hostIdentifiers(backend)
s.byHost.RemoveBackend(idents, backend)
}
if len(s.backends[identifier]) == 0 {
delete(s.backends, identifier)
for i := range s.agentIDs {
if s.agentIDs[i] == identifier {
s.agentIDs[i] = s.agentIDs[len(s.agentIDs)-1]
s.agentIDs = s.agentIDs[:len(s.agentIDs)-1]
break
}
if slices.Contains(s.proxyStrategies, ProxyStrategyDefaultRoute) {
if backend.GetAgentIdentifiers().DefaultRoute {
s.byDefaultRoute.RemoveBackend([]string{agentID}, backend)
}
}
if !found {
klog.V(1).InfoS("Could not find connection matching identifier to remove", "agentID", identifier, "idType", idType)
}
metrics.Metrics.SetBackendCount(len(s.backends))
metrics.Metrics.SetBackendCount(count)
}

// NumBackends resturns the number of available backends
func (s *DefaultBackendStorage) NumBackends() int {
s.mu.RLock()
defer s.mu.RUnlock()
return len(s.backends)
func (s *DefaultBackendManager) NumBackends() int {
return s.all.NumKeys()
}

// ErrNotFound indicates that no backend can be found.
Expand All @@ -376,32 +301,9 @@ func (e *ErrNotFound) Error() string {
return "No agent available"
}

type ErrWrongIDType struct {
got header.IdentifierType
expect []header.IdentifierType
}

func (e *ErrWrongIDType) Error() string {
return fmt.Sprintf("incorrect id type: got %s, expect %s", e.got, e.expect)
}

func ignoreNotFound(err error) error {
if _, ok := err.(*ErrNotFound); ok {
return nil
}
return err
}

// GetRandomBackend returns a random backend connection from all connected agents.
func (s *DefaultBackendStorage) GetRandomBackend() (*Backend, error) {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.backends) == 0 {
return nil, &ErrNotFound{}
func (s *DefaultBackendManager) Ready() (bool, string) {
if s.NumBackends() == 0 {
return false, "no connection to any proxy agent"
}
agentID := s.agentIDs[s.random.Intn(len(s.agentIDs))]
klog.V(5).InfoS("Pick agent as backend", "agentID", agentID)
// always return the first connection to an agent, because the agent
// will close later connections if there are multiple.
return s.backends[agentID][0], nil
return true, ""
}
Loading