Skip to content

Commit

Permalink
feat: generate and push fallback config on update failure (#6071)
Browse files Browse the repository at this point in the history
Makes KongClient try to recover from regular config update failures by generating 
a fallback configuration, excluding affected objects reported back by gateways.
  • Loading branch information
czeslavo authored May 23, 2024
1 parent 585b4f2 commit 9b86378
Show file tree
Hide file tree
Showing 8 changed files with 435 additions and 116 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/_integration_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ jobs:
# https://github.com/Kong/kubernetes-ingress-controller/issues/5127 is resolved.
router-flavor: 'traditional'
go_test_flags: -run=TestIngressRecoverFromInvalidPath
- name: postgres-fallback-config
test: postgres
feature_gates: "GatewayAlpha=true,FallbackConfiguration=true"
- name: dbless-fallback-config
test: dbless
feature_gates: "GatewayAlpha=true,FallbackConfiguration=true"
# Experimental tests, in the future all integration tests will be migrated to them.
# Set enterprise to 'true' to enable isolated integration test cases requiring enterprise features.
- name: isolated-dbless
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ Adding a new version? You'll need three changes:
[#5993](https://github.com/Kong/kubernetes-ingress-controller/pull/5993)
[#6010](https://github.com/Kong/kubernetes-ingress-controller/pull/6010)
[#6047](https://github.com/Kong/kubernetes-ingress-controller/pull/6047)
[#6071](https://github.com/Kong/kubernetes-ingress-controller/pull/6071)

- Add support for Kubernetes Gateway API v1.1:
- add a flag `--enable-controller-gwapi-grpcroute` to control whether enable or disable GRPCRoute controller.
Expand Down
2 changes: 1 addition & 1 deletion config/variants/multi-gw/debug/manager_debug.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ spec:
- /manager-debug
- --
args:
- --feature-gates=GatewayAlpha=true
- --feature-gates=GatewayAlpha=true,FallbackConfiguration=true
- --anonymous-reports=false
env:
- name: CONTROLLER_LOG_LEVEL
Expand Down
12 changes: 11 additions & 1 deletion internal/dataplane/fallback/fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package fallback
import (
"fmt"

"github.com/go-logr/logr"

"github.com/kong/kubernetes-ingress-controller/v3/internal/store"
"github.com/kong/kubernetes-ingress-controller/v3/internal/util"
)

type CacheGraphProvider interface {
Expand All @@ -14,11 +17,13 @@ type CacheGraphProvider interface {
// Generator is responsible for generating fallback cache snapshots.
type Generator struct {
cacheGraphProvider CacheGraphProvider
logger logr.Logger
}

func NewGenerator(cacheGraphProvider CacheGraphProvider) *Generator {
func NewGenerator(cacheGraphProvider CacheGraphProvider, logger logr.Logger) *Generator {
return &Generator{
cacheGraphProvider: cacheGraphProvider,
logger: logger.WithName("fallback-generator"),
}
}

Expand Down Expand Up @@ -46,6 +51,11 @@ func (g *Generator) GenerateExcludingAffected(
if err := fallbackCache.Delete(obj); err != nil {
return store.CacheStores{}, fmt.Errorf("failed to delete %s from the cache: %w", GetObjectHash(obj), err)
}
g.logger.V(util.DebugLevel).Info("Excluded object from fallback cache",
"object_kind", obj.GetObjectKind(),
"object_name", obj.GetName(),
"object_namespace", obj.GetNamespace(),
)
}
}

Expand Down
3 changes: 2 additions & 1 deletion internal/dataplane/fallback/fallback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package fallback_test
import (
"testing"

"github.com/go-logr/logr"
"github.com/stretchr/testify/require"

"github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/fallback"
Expand Down Expand Up @@ -53,7 +54,7 @@ func TestGenerator_GenerateExcludingAffected(t *testing.T) {
require.NoError(t, err)

graphProvider := &mockGraphProvider{graph: graph}
g := fallback.NewGenerator(graphProvider)
g := fallback.NewGenerator(graphProvider, logr.Discard())

t.Run("ingressClass is broken", func(t *testing.T) {
fallbackCache, err := g.GenerateExcludingAffected(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(ingressClass)})
Expand Down
154 changes: 129 additions & 25 deletions internal/dataplane/kong_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/deckerrors"
"github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/deckgen"
"github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/failures"
"github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/fallback"
"github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/kongstate"
"github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/sendconfig"
"github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/translator"
Expand Down Expand Up @@ -59,6 +60,11 @@ type KongConfigBuilder interface {
UpdateCache(store.CacheStores)
}

// FallbackConfigGenerator generates a fallback configuration based on a cache snapshot and a set of broken objects.
type FallbackConfigGenerator interface {
GenerateExcludingAffected(store.CacheStores, []fallback.ObjectHash) (store.CacheStores, error)
}

// KongClient is a threadsafe high level API client for the Kong data-plane(s)
// which parses Kubernetes object caches into Kong Admin configurations and
// sends them as updates to the data-plane(s) (Kong Admin API).
Expand Down Expand Up @@ -142,6 +148,9 @@ type KongClient struct {

// currentConfigStatus is the current status of the configuration synchronisation.
currentConfigStatus clients.ConfigStatus

// fallbackConfigGenerator is used to generate a fallback configuration in case of sync failures.
fallbackConfigGenerator FallbackConfigGenerator
}

// NewKongClient provides a new KongClient object after connecting to the
Expand All @@ -159,22 +168,24 @@ func NewKongClient(
kongConfigFetcher configfetcher.LastValidConfigFetcher,
kongConfigBuilder KongConfigBuilder,
cacheStores store.CacheStores,
fallbackConfigGenerator FallbackConfigGenerator,
) (*KongClient, error) {
c := &KongClient{
logger: logger,
requestTimeout: timeout,
diagnostic: diagnostic,
prometheusMetrics: metrics.NewCtrlFuncMetrics(),
cache: &cacheStores,
kongConfig: kongConfig,
eventRecorder: eventRecorder,
dbmode: dbMode,
clientsProvider: clientsProvider,
configStatusNotifier: clients.NoOpConfigStatusNotifier{},
updateStrategyResolver: updateStrategyResolver,
configChangeDetector: configChangeDetector,
kongConfigBuilder: kongConfigBuilder,
kongConfigFetcher: kongConfigFetcher,
logger: logger,
requestTimeout: timeout,
diagnostic: diagnostic,
prometheusMetrics: metrics.NewCtrlFuncMetrics(),
cache: &cacheStores,
kongConfig: kongConfig,
eventRecorder: eventRecorder,
dbmode: dbMode,
clientsProvider: clientsProvider,
configStatusNotifier: clients.NoOpConfigStatusNotifier{},
updateStrategyResolver: updateStrategyResolver,
configChangeDetector: configChangeDetector,
kongConfigBuilder: kongConfigBuilder,
kongConfigFetcher: kongConfigFetcher,
fallbackConfigGenerator: fallbackConfigGenerator,
}
c.initializeControllerPodReference()

Expand Down Expand Up @@ -400,8 +411,12 @@ func (c *KongClient) Update(ctx context.Context) error {
// If FallbackConfiguration is enabled, we take a snapshot of the cache so that we operate on a consistent
// set of resources in case of failures being returned from Kong. As we're going to generate a fallback config
// based on the cache contents, we need to ensure it is not modified during the process.
var cacheSnapshot store.CacheStores
if c.kongConfig.FallbackConfiguration {
cacheSnapshot, err := c.cache.TakeSnapshot()
// TODO: https://github.com/Kong/kubernetes-ingress-controller/issues/6080
// Use TakeSnapshotIfChanged to avoid taking a snapshot if the cache hasn't changed.
var err error
cacheSnapshot, err = c.cache.TakeSnapshot()
if err != nil {
return fmt.Errorf("failed to take snapshot of cache: %w", err)
}
Expand Down Expand Up @@ -436,16 +451,11 @@ func (c *KongClient) Update(ctx context.Context) error {

// In case of a failure in syncing configuration with Gateways, propagate the error.
if gatewaysSyncErr != nil {
// TODO: https://github.com/Kong/kubernetes-ingress-controller/issues/5931
// In case of a failure in syncing configuration with Gateways, if FallbackConfiguration is enabled,
// we should generate a fallback configuration and push it to the gateways first.
if state, found := c.kongConfigFetcher.LastValidConfig(); found {
_, fallbackSyncErr := c.sendOutToGatewayClients(ctx, state, c.kongConfig)
if fallbackSyncErr != nil {
return errors.Join(gatewaysSyncErr, fallbackSyncErr)
}
c.logger.V(util.DebugLevel).Info("Due to errors in the current config, the last valid config has been pushed to Gateways")
if recoveringErr := c.tryRecoveringFromGatewaysSyncError(ctx, cacheSnapshot, gatewaysSyncErr); recoveringErr != nil {
return fmt.Errorf("failed to recover from gateways sync error: %w", recoveringErr)
}
// Update result is positive only if gateways were successfully synced with the current config, so we still
// need to return the error here even if we succeeded recovering.
return gatewaysSyncErr
}

Expand All @@ -464,6 +474,100 @@ func (c *KongClient) Update(ctx context.Context) error {
return nil
}

// tryRecoveringFromGatewaysSyncError tries to recover from a configuration rejection by:
// 1. Generating a fallback configuration and pushing it to the gateways if FallbackConfiguration feature is enabled.
// 2. Applying the last valid configuration to the gateways if FallbackConfiguration is disabled or fallback
// configuration generation fails.
func (c *KongClient) tryRecoveringFromGatewaysSyncError(
ctx context.Context,
cacheSnapshot store.CacheStores,
gatewaysSyncErr error,
) error {
// If configuration was rejected by the gateways and FallbackConfiguration is enabled,
// we should generate a fallback configuration and push it to the gateways.
if c.kongConfig.FallbackConfiguration {
recoveringErr := c.tryRecoveringWithFallbackConfiguration(ctx, cacheSnapshot, gatewaysSyncErr)
if recoveringErr == nil {
c.logger.Info("Successfully recovered from configuration rejection with fallback configuration")
return nil
}
// If we failed to recover using the fallback configuration, we should log the error and carry on.
c.logger.Error(recoveringErr, "Failed to recover from configuration rejection with fallback configuration")
}

// If FallbackConfiguration is disabled, or we failed to recover using the fallback configuration, we should
// apply the last valid configuration to the gateways.
if state, found := c.kongConfigFetcher.LastValidConfig(); found {
if _, fallbackSyncErr := c.sendOutToGatewayClients(ctx, state, c.kongConfig); fallbackSyncErr != nil {
return errors.Join(gatewaysSyncErr, fallbackSyncErr)
}
c.logger.V(util.DebugLevel).Info("Due to errors in the current config, the last valid config has been pushed to Gateways")
}
return nil
}

// tryRecoveringWithFallbackConfiguration tries to recover from a configuration rejection by generating a fallback
// configuration excluding affected objects from the cache.
func (c *KongClient) tryRecoveringWithFallbackConfiguration(
ctx context.Context,
cacheSnapshot store.CacheStores,
gatewaysSyncErr error,
) error {
// Extract the broken objects from the update error and generate a fallback configuration excluding them.
brokenObjects, err := extractBrokenObjectsFromUpdateError(gatewaysSyncErr)
if err != nil {
return fmt.Errorf("failed to extract broken objects from update error: %w", err)
}
fallbackCache, err := c.fallbackConfigGenerator.GenerateExcludingAffected(
cacheSnapshot,
brokenObjects,
)
if err != nil {
return fmt.Errorf("failed to generate fallback configuration: %w", err)
}

// Update the KongConfigBuilder with the fallback configuration and build the KongConfig.
c.kongConfigBuilder.UpdateCache(fallbackCache)
fallbackParsingResult := c.kongConfigBuilder.BuildKongConfig()

// TODO: https://github.com/Kong/kubernetes-ingress-controller/issues/6081
// Emit Kubernetes events depending on fallback configuration parsing result.

// TODO: https://github.com/Kong/kubernetes-ingress-controller/issues/6082
// Expose Prometheus metrics for fallback configuration parsing result.

_, gatewaysSyncErr = c.sendOutToGatewayClients(ctx, fallbackParsingResult.KongState, c.kongConfig)
if gatewaysSyncErr != nil {
return fmt.Errorf("failed to sync fallback configuration with gateways: %w", gatewaysSyncErr)
}
konnectSyncErr := c.maybeSendOutToKonnectClient(ctx, fallbackParsingResult.KongState, c.kongConfig)
if konnectSyncErr != nil {
// If Konnect sync fails, we should log the error and carry on as it's not a critical error.
c.logger.Error(konnectSyncErr, "Failed to sync fallback configuration with Konnect")
}
return nil
}

// extractBrokenObjectsFromUpdateError.
func extractBrokenObjectsFromUpdateError(err error) ([]fallback.ObjectHash, error) {
var brokenObjects []client.Object

var updateErr sendconfig.UpdateError
if ok := errors.As(err, &updateErr); !ok {
return nil, fmt.Errorf("expected UpdateError, cannot extract broken objects from %T", err) //nolint:errorlint
}
for _, resourceFailure := range updateErr.ResourceFailures() {
brokenObjects = append(brokenObjects, resourceFailure.CausingObjects()...)
}
if len(brokenObjects) == 0 {
return nil, fmt.Errorf("no broken objects found in UpdateError")
}

return lo.Map(brokenObjects, func(obj client.Object, _ int) fallback.ObjectHash {
return fallback.GetObjectHash(obj)
}), nil
}

// sendOutToGatewayClients will generate deck content (config) from the provided kong state
// and send it out to each of the configured gateway clients.
func (c *KongClient) sendOutToGatewayClients(
Expand Down Expand Up @@ -614,7 +718,7 @@ func (c *KongClient) sendToClient(
if err := ctx.Err(); err != nil {
logger.Error(err, "Exceeded Kong API timeout, consider increasing --proxy-timeout-seconds")
}
return "", fmt.Errorf("performing update for %s failed: %w", client.BaseRootURL(), updateErr)
return "", fmt.Errorf("performing update for %s failed: %w", client.BaseRootURL(), err)
}
sendDiagnostic(false, nil) // No error occurred.
// update the lastConfigSHA with the new updated checksum
Expand Down
Loading

0 comments on commit 9b86378

Please sign in to comment.