Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Upgrade Decision engine incorporated into reconciliation logic #270

Merged
merged 72 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from 71 commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
a5e8c04
remove needsPausing() function
juliev0 Sep 11, 2024
fc9bd6f
Merge remote-tracking branch 'origin/main' into decision-engine
juliev0 Sep 13, 2024
fae0aed
usde
juliev0 Sep 14, 2024
3f27424
save state
juliev0 Sep 15, 2024
49f148d
decision engine
juliev0 Sep 15, 2024
78635dd
decision engine
juliev0 Sep 15, 2024
5c60a19
decision engine
juliev0 Sep 15, 2024
9ee9a02
decision engine
juliev0 Sep 15, 2024
8ba1f03
decision engine
juliev0 Sep 15, 2024
529a687
renaming
juliev0 Sep 15, 2024
8bb849a
fix inProgressStrategy unsetting to be idempotent
juliev0 Sep 16, 2024
d236aef
decision engine
juliev0 Sep 16, 2024
7c64207
Merge remote-tracking branch 'origin/main' into decision-engine
juliev0 Sep 16, 2024
2f245b3
fix: empty commit
juliev0 Sep 16, 2024
f130c1d
fix: empty commit
juliev0 Sep 16, 2024
81f1702
refactoring to enable this to work if somebody changes desiredPhase i…
juliev0 Sep 17, 2024
7aca76d
decision engine
juliev0 Sep 17, 2024
f83fdc5
bump timeout for isbservice
juliev0 Sep 17, 2024
d816e37
Merge remote-tracking branch 'origin/main' into decision-engine
juliev0 Sep 17, 2024
6a2c3d8
fix: empty commit
juliev0 Sep 17, 2024
4f0cff7
decision engine
juliev0 Sep 17, 2024
4fdad36
decision engine
juliev0 Sep 17, 2024
758611d
fix spelling
juliev0 Sep 17, 2024
753c2cf
usde unit test
juliev0 Sep 17, 2024
9352548
fix compilation error
juliev0 Sep 17, 2024
c7a1747
decision engine
juliev0 Sep 17, 2024
a417b21
merge main
juliev0 Sep 17, 2024
53c1d8b
fix unit test
juliev0 Sep 17, 2024
d14b92b
adding e2e test for manual pausing and unpausing
juliev0 Sep 18, 2024
4b76f7f
e2e test
juliev0 Sep 18, 2024
f7e0029
fix: empty commit
juliev0 Sep 18, 2024
752c161
fix: empty commit
juliev0 Sep 18, 2024
f6fda53
decision engine
juliev0 Sep 18, 2024
4bb6fe4
add 2 second sleeps between certain tests
juliev0 Sep 18, 2024
6335804
fix: empty commit
juliev0 Sep 18, 2024
84bd2ca
fix: empty commit
juliev0 Sep 18, 2024
0d52387
fix: empty commit
juliev0 Sep 18, 2024
015a82a
unit test
juliev0 Sep 18, 2024
82b9baa
use latest numaflow in test
juliev0 Sep 18, 2024
18b963a
put back previous numaflow controller definitions
juliev0 Sep 18, 2024
4f420c0
unit test
juliev0 Sep 18, 2024
f19c9f0
unit test
juliev0 Sep 19, 2024
a58549b
unit test
juliev0 Sep 19, 2024
f5649c7
unit test
juliev0 Sep 19, 2024
f22ecd1
unit test
juliev0 Sep 19, 2024
c5ce306
unit test
juliev0 Sep 19, 2024
7f87d88
unit test
juliev0 Sep 19, 2024
5c24c03
unit test
juliev0 Sep 19, 2024
d5f1ca1
merge main
juliev0 Sep 19, 2024
88e5002
tests
juliev0 Sep 19, 2024
08a3628
e2e
juliev0 Sep 19, 2024
8368cc1
fix e2e pause condition test
juliev0 Sep 19, 2024
d693eee
e2e and unit test
juliev0 Sep 19, 2024
3011da9
decision engine
juliev0 Sep 19, 2024
7102ef6
fix application of desiredPhase field
juliev0 Sep 19, 2024
06b94ed
consolidate desiredphase functionality
juliev0 Sep 19, 2024
4483d05
decision engine
juliev0 Sep 19, 2024
a458c68
decision engine - resolve some comments
juliev0 Sep 20, 2024
fc1866d
move defaultUpgradeStrategy field from global config to USDE config
juliev0 Sep 21, 2024
2cb63ae
explictly set user strategy for unit test
juliev0 Sep 21, 2024
946964e
fix unit test
juliev0 Sep 21, 2024
dcf37e5
Merge branch 'main' into decision-engine
juliev0 Sep 21, 2024
e58d343
remove feature flag from numaplane main config
juliev0 Sep 21, 2024
46929f2
Merge branch 'decision-engine' of github.com:numaproj/numaplane into …
juliev0 Sep 21, 2024
3cdd599
remove unneeded file
juliev0 Sep 21, 2024
c01d119
typo
juliev0 Sep 21, 2024
a9eed6a
remove defaultUpgradeStrategy from numaplane config
juliev0 Sep 21, 2024
1483dd7
check inProgressStrategy
juliev0 Sep 23, 2024
2dde94a
fix: empty commit
juliev0 Sep 23, 2024
fb97193
renaming
juliev0 Sep 23, 2024
eea2ad0
merge main
juliev0 Sep 23, 2024
fe52380
comment
juliev0 Sep 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook"

numaflowv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaplane/internal/common"
"github.com/numaproj/numaplane/internal/controller"
"github.com/numaproj/numaplane/internal/controller/config"
"github.com/numaproj/numaplane/internal/util/kubernetes"
Expand Down Expand Up @@ -239,7 +238,4 @@ func loadConfigs() {
logger.SetBaseLogger(numaLogger)
clog.SetLogger(*numaLogger.LogrLogger)

// feature flag
common.DataLossPrevention = config.DataLossPrevention

}
1 change: 1 addition & 0 deletions config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1408,6 +1408,7 @@ metadata:
---
apiVersion: v1
data:
defaultUpgradeStrategy: ""
pipelineSpecExcludedPaths: |
- "lifecycle"
- "limits"
Expand Down
1 change: 1 addition & 0 deletions config/manager/usde-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ metadata:
labels:
numaplane.numaproj.io/config: usde-config
data:
defaultUpgradeStrategy: ""
pipelineSpecExcludedPaths: |
- "lifecycle"
- "limits"
Expand Down
3 changes: 0 additions & 3 deletions internal/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,4 @@ var (

// default requeue time used by Reconcilers
DefaultDelayedRequeue = ctrl.Result{RequeueAfter: 20 * time.Second}

// DataLossPrevention is a feature flag used to turn on/off the automatic pause feature for pipelines based on how it's set in the Config
DataLossPrevention bool
)
2 changes: 0 additions & 2 deletions internal/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ func (*ConfigManager) GetControllerDefinitionsMgr() *NumaflowControllerDefinitio
type GlobalConfig struct {
LogLevel int `json:"logLevel" mapstructure:"logLevel"`
IncludedResources string `json:"includedResources" mapstructure:"includedResources"`
// Feature flag - if enabled causes pipeline(s) to be paused when pipeline, numaflow controller, or ISB Service gets updated
DataLossPrevention bool `json:"dataLossPrevention" mapstructure:"dataLossPrevention"`
// List of Numaflow Controller image names to look for
NumaflowControllerImageNames []string `json:"numaflowControllerImageNames" mapstructure:"numaflowControllerImageNames"`
}
Expand Down
1 change: 0 additions & 1 deletion internal/controller/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ func TestLoadConfigMatchValues(t *testing.T) {

assert.Nil(t, err, "Failed to load configuration")
assert.Equal(t, 3, config.LogLevel, "Log Level does not match")
assert.Equal(t, true, config.DataLossPrevention, "DataLossPrevention does not match")
assert.Contains(t, config.NumaflowControllerImageNames, "numaflow")
assert.Contains(t, config.NumaflowControllerImageNames, "numaflow-rc")
// now verify that if we modify the file, it will still be okay
Expand Down
38 changes: 4 additions & 34 deletions internal/controller/config/usde_config.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,10 @@
package config

import (
"encoding/json"
"fmt"
)

type USDEUserStrategy string
Copy link
Collaborator Author

@juliev0 juliev0 Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved the stuff from this file that pertains to the user config to its own file (user_config.go)


const (
// ProgressiveStrategyID USDEUserStrategy = "progressive" // TODO-PROGRESSIVE: enable this
PPNDStrategyID USDEUserStrategy = "pause-and-drain"
)

type USDEConfig struct {
PipelineSpecExcludedPaths []string `json:"pipelineSpecExcludedPaths,omitempty" yaml:"pipelineSpecExcludedPaths,omitempty"`
ISBServiceSpecExcludedPaths []string `json:"isbServiceSpecExcludedPaths,omitempty" yaml:"isbServiceSpecExcludedPaths,omitempty"`
// If user's config doesn't exist or doesn't specify strategy, this is the default
DefaultUpgradeStrategy USDEUserStrategy `json:"defaultUpgradeStrategy" mapstructure:"defaultUpgradeStrategy"`
PipelineSpecExcludedPaths []string `json:"pipelineSpecExcludedPaths,omitempty" yaml:"pipelineSpecExcludedPaths,omitempty"`
ISBServiceSpecExcludedPaths []string `json:"isbServiceSpecExcludedPaths,omitempty" yaml:"isbServiceSpecExcludedPaths,omitempty"`
}

func (cm *ConfigManager) UpdateUSDEConfig(config USDEConfig) {
Expand All @@ -37,23 +27,3 @@ func (cm *ConfigManager) GetUSDEConfig() USDEConfig {

return cm.usdeConfig
}

func (s *USDEUserStrategy) UnmarshalJSON(data []byte) (err error) {
var usdeUserStrategyStr string
if err := json.Unmarshal(data, &usdeUserStrategyStr); err != nil {
return err
}

// Make sure the string is one of the possible strategy values
if usdeUserStrategyStr != string(PPNDStrategyID) {
return fmt.Errorf("invalid strategy '%s' (allowed value is: %s)", usdeUserStrategyStr, PPNDStrategyID)
}
// TODO-PROGRESSIVE: replace if-statement above for if-statement below
// if usdeUserStrategyStr != string(ProgressiveStrategyID) && usdeUserStrategyStr != string(PPNDStrategyID) {
// return fmt.Errorf("invalid strategy '%s' (allowed values are: %s or %s)", usdeUserStrategyStr, ProgressiveStrategyID, PPNDStrategyID)
// }

*s = USDEUserStrategy(usdeUserStrategyStr)

return nil
}
47 changes: 47 additions & 0 deletions internal/controller/config/user_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package config

import (
"encoding/json"
"fmt"
)

type USDEUserStrategy string

const (
ProgressiveStrategyID USDEUserStrategy = "progressive"
PPNDStrategyID USDEUserStrategy = "pause-and-drain"
NoStrategyID USDEUserStrategy = ""
)

func (s *USDEUserStrategy) UnmarshalJSON(data []byte) (err error) {
var usdeUserStrategyStr string
if err := json.Unmarshal(data, &usdeUserStrategyStr); err != nil {
return err
}

allowedValues := map[USDEUserStrategy]struct{}{
ProgressiveStrategyID: {},
PPNDStrategyID: {},
NoStrategyID: {}} // TODO: this will no longer be allowed in the future

// Make sure the string is one of the possible strategy values
_, found := allowedValues[USDEUserStrategy(usdeUserStrategyStr)]
if !found {
return fmt.Errorf("invalid strategy '%s' (allowed values: %+v)", usdeUserStrategyStr, allowedValues)
}

*s = USDEUserStrategy(usdeUserStrategyStr)

return nil
}

func (s USDEUserStrategy) IsValid() bool {
switch s {
case ProgressiveStrategyID:
return true
case PPNDStrategyID:
return true
default:
return false
}
}
103 changes: 103 additions & 0 deletions internal/controller/in_progress_strategy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package controller

import (
"context"
"sync"

k8stypes "k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1"
)

// inProgressStrategyMgr is responsible to maintain a Rollout's inProgressStrategy
// state is maintained both in memory as well as in the Rollout's Status
// in memory always gives us the latest state in case the Informer cache is out of date
// the Rollout's Status is useful as a backup mechanism in case Numaplane has just restarted
type inProgressStrategyMgr struct {
getRolloutStrategy func(context.Context, client.Object) *apiv1.UpgradeStrategy
setRolloutStrategy func(context.Context, client.Object, apiv1.UpgradeStrategy)
store *inProgressStrategyStore
}

// in memory storage of UpgradeStrategy in progress for a given Rollout
type inProgressStrategyStore struct {
inProgressUpgradeStrategies map[string]apiv1.UpgradeStrategy
mutex sync.RWMutex
}

func newInProgressStrategyMgr(
getRolloutStrategy func(context.Context, client.Object) *apiv1.UpgradeStrategy,
setRolloutStrategy func(context.Context, client.Object, apiv1.UpgradeStrategy)) *inProgressStrategyMgr {

return &inProgressStrategyMgr{
getRolloutStrategy: getRolloutStrategy,
setRolloutStrategy: setRolloutStrategy,
store: newInProgressStrategyStore(),
}
}

func newInProgressStrategyStore() *inProgressStrategyStore {
return &inProgressStrategyStore{
inProgressUpgradeStrategies: map[string]apiv1.UpgradeStrategy{},
}
}

func (mgr *inProgressStrategyMgr) getStrategy(ctx context.Context, rollout client.Object) apiv1.UpgradeStrategy {
return mgr.synchronize(ctx, rollout)
}

// make sure in-memory value and Rollout Status value are synchronized
// if in-memory value is set, make sure Rollout Status value gets set the same
juliev0 marked this conversation as resolved.
Show resolved Hide resolved
// if in-memory value isn't set, make sure in-memory value gets set to Rollout Status value
func (mgr *inProgressStrategyMgr) synchronize(ctx context.Context, rollout client.Object) apiv1.UpgradeStrategy {
namespacedName := k8stypes.NamespacedName{Namespace: rollout.GetNamespace(), Name: rollout.GetName()}

// first look for value in memory
foundInMemory, inMemoryStrategy := mgr.store.getStrategy(namespacedName)

// now look for the value in the Resource
crDefinedStrategy := mgr.getRolloutStrategy(ctx, rollout)

// if in-memory value is set, make sure Rollout Status value gets set the same
if foundInMemory {
mgr.setRolloutStrategy(ctx, rollout, inMemoryStrategy)
return inMemoryStrategy
} else {
// make sure in-memory value gets set to Rollout Status value
if crDefinedStrategy != nil {
mgr.store.setStrategy(namespacedName, *crDefinedStrategy)
return *crDefinedStrategy
} else {
return apiv1.UpgradeStrategyNoOp
}
}
}

// store in both memory and the Resource itself
func (mgr *inProgressStrategyMgr) setStrategy(ctx context.Context, rollout client.Object, upgradeStrategy apiv1.UpgradeStrategy) {
namespacedName := k8stypes.NamespacedName{Namespace: rollout.GetNamespace(), Name: rollout.GetName()}

mgr.store.setStrategy(namespacedName, upgradeStrategy)
mgr.setRolloutStrategy(ctx, rollout, upgradeStrategy)
}

func (mgr *inProgressStrategyMgr) unsetStrategy(ctx context.Context, rollout client.Object) {
mgr.setStrategy(ctx, rollout, apiv1.UpgradeStrategyNoOp)
}

// return whether found, and if so, the value
func (store *inProgressStrategyStore) getStrategy(namespacedName k8stypes.NamespacedName) (bool, apiv1.UpgradeStrategy) {
key := namespacedNameToKey(namespacedName)
store.mutex.RLock()
strategy, found := store.inProgressUpgradeStrategies[key]
store.mutex.RUnlock()
return found, strategy
}

func (store *inProgressStrategyStore) setStrategy(namespacedName k8stypes.NamespacedName, upgradeStrategy apiv1.UpgradeStrategy) {
key := namespacedNameToKey(namespacedName)
store.mutex.Lock()
store.inProgressUpgradeStrategies[key] = upgradeStrategy
store.mutex.Unlock()
}
79 changes: 79 additions & 0 deletions internal/controller/in_progress_strategy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package controller

import (
"context"
"testing"

"github.com/stretchr/testify/assert"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1"
)

func Test_inProgressStrategyMgr_getStrategy(t *testing.T) {

progressiveStrategy := apiv1.UpgradeStrategyProgressive
ppndStrategy := apiv1.UpgradeStrategyPPND
noStrategy := apiv1.UpgradeStrategyNoOp

testCases := []struct {
name string
inMemoryStrategy *apiv1.UpgradeStrategy
rolloutStatusStrategy *apiv1.UpgradeStrategy
resultStrategy apiv1.UpgradeStrategy
}{
{
name: "in memory and in Rollout Status (progressive result)",
inMemoryStrategy: &progressiveStrategy,
rolloutStatusStrategy: &noStrategy,
resultStrategy: progressiveStrategy,
},
{
name: "in memory and in Rollout Status (no op result)",
inMemoryStrategy: &noStrategy,
rolloutStatusStrategy: &ppndStrategy,
resultStrategy: noStrategy,
},
{
name: "in memory and not in Rollout Status",
inMemoryStrategy: &progressiveStrategy,
rolloutStatusStrategy: nil,
resultStrategy: progressiveStrategy,
},
{
name: "in Rollout Status and not in memory",
inMemoryStrategy: nil,
rolloutStatusStrategy: &ppndStrategy,
resultStrategy: ppndStrategy,
},
{
name: "neither in Rollout Status nor in memory",
inMemoryStrategy: nil,
rolloutStatusStrategy: nil,
resultStrategy: noStrategy,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
inProgressStrategyMgr := newInProgressStrategyMgr(
// getRolloutStrategy function:
func(ctx context.Context, rollout client.Object) *apiv1.UpgradeStrategy {
return tc.rolloutStatusStrategy
},
// setRolloutStrategy function:
func(ctx context.Context, rollout client.Object, strategy apiv1.UpgradeStrategy) {},
)
pipelineRollout := &apiv1.PipelineRollout{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "my-pipeline"}}
namespacedName := k8stypes.NamespacedName{Namespace: pipelineRollout.GetNamespace(), Name: pipelineRollout.GetName()}
if tc.inMemoryStrategy != nil {
inProgressStrategyMgr.store.setStrategy(namespacedName, *tc.inMemoryStrategy)
}
upgradeStrategyResult := inProgressStrategyMgr.getStrategy(context.Background(), pipelineRollout)
assert.Equal(t, tc.resultStrategy, upgradeStrategyResult)
})
}
}
32 changes: 24 additions & 8 deletions internal/controller/isbservicerollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controller
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

Expand All @@ -43,6 +44,8 @@ import (

numaflowv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaplane/internal/common"
"github.com/numaproj/numaplane/internal/controller/config"
"github.com/numaproj/numaplane/internal/usde"
"github.com/numaproj/numaplane/internal/util"
"github.com/numaproj/numaplane/internal/util/kubernetes"
"github.com/numaproj/numaplane/internal/util/logger"
Expand Down Expand Up @@ -290,8 +293,15 @@ func (r *ISBServiceRolloutReconciler) processExistingISBService(ctx context.Cont
isbServiceRollout.Status.MarkDeployed(isbServiceRollout.Generation)
}

if common.DataLossPrevention {
return processChildObjectWithoutDataLoss(ctx, isbServiceRollout, r, isbServiceNeedsUpdating, isbServiceIsUpdating, func() error {
// determine the Upgrade Strategy user prefers
upgradeStrategy, err := usde.GetUserStrategy(ctx, isbServiceRollout.Namespace)
if err != nil {
return false, err
}

switch upgradeStrategy {
case config.PPNDStrategyID:
return processChildObjectWithPPND(ctx, isbServiceRollout, r, isbServiceNeedsUpdating, isbServiceIsUpdating, func() error {
r.recorder.Eventf(isbServiceRollout, corev1.EventTypeNormal, "PipelinesPaused", "All Pipelines have paused for ISBService update")
err = r.updateISBService(ctx, isbServiceRollout, newISBServiceDef)
if err != nil {
Expand All @@ -300,13 +310,19 @@ func (r *ISBServiceRolloutReconciler) processExistingISBService(ctx context.Cont
r.customMetrics.ReconciliationDuration.WithLabelValues(ControllerISBSVCRollout, "update").Observe(time.Since(syncStartTime).Seconds())
return nil
})
} else if isbServiceNeedsUpdating {
// update ISBService
err = r.updateISBService(ctx, isbServiceRollout, newISBServiceDef)
if err != nil {
return false, err
case config.NoStrategyID:
xdevxy marked this conversation as resolved.
Show resolved Hide resolved
if isbServiceNeedsUpdating {
// update ISBService
err = r.updateISBService(ctx, isbServiceRollout, newISBServiceDef)
if err != nil {
return false, err
}
r.customMetrics.ReconciliationDuration.WithLabelValues(ControllerISBSVCRollout, "update").Observe(time.Since(syncStartTime).Seconds())
}
r.customMetrics.ReconciliationDuration.WithLabelValues(ControllerISBSVCRollout, "update").Observe(time.Since(syncStartTime).Seconds())
case config.ProgressiveStrategyID:
return false, errors.New("Progressive Strategy not supported yet")
default:
return false, fmt.Errorf("%v strategy not recognized", upgradeStrategy)
}

return false, nil
Expand Down
Loading
Loading