-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Upgrade Decision engine incorporated into reconciliation logic #270
Merged
Merged
Changes from 71 commits
Commits
Show all changes
72 commits
Select commit
Hold shift + click to select a range
a5e8c04
remove needsPausing() function
juliev0 fc9bd6f
Merge remote-tracking branch 'origin/main' into decision-engine
juliev0 fae0aed
usde
juliev0 3f27424
save state
juliev0 49f148d
decision engine
juliev0 78635dd
decision engine
juliev0 5c60a19
decision engine
juliev0 9ee9a02
decision engine
juliev0 8ba1f03
decision engine
juliev0 529a687
renaming
juliev0 8bb849a
fix inProgressStrategy unsetting to be idempotent
juliev0 d236aef
decision engine
juliev0 7c64207
Merge remote-tracking branch 'origin/main' into decision-engine
juliev0 2f245b3
fix: empty commit
juliev0 f130c1d
fix: empty commit
juliev0 81f1702
refactoring to enable this to work if somebody changes desiredPhase i…
juliev0 7aca76d
decision engine
juliev0 f83fdc5
bump timeout for isbservice
juliev0 d816e37
Merge remote-tracking branch 'origin/main' into decision-engine
juliev0 6a2c3d8
fix: empty commit
juliev0 4f0cff7
decision engine
juliev0 4fdad36
decision engine
juliev0 758611d
fix spelling
juliev0 753c2cf
usde unit test
juliev0 9352548
fix compilation error
juliev0 c7a1747
decision engine
juliev0 a417b21
merge main
juliev0 53c1d8b
fix unit test
juliev0 d14b92b
adding e2e test for manual pausing and unpausing
juliev0 4b76f7f
e2e test
juliev0 f7e0029
fix: empty commit
juliev0 752c161
fix: empty commit
juliev0 f6fda53
decision engine
juliev0 4bb6fe4
add 2 second sleeps between certain tests
juliev0 6335804
fix: empty commit
juliev0 84bd2ca
fix: empty commit
juliev0 0d52387
fix: empty commit
juliev0 015a82a
unit test
juliev0 82b9baa
use latest numaflow in test
juliev0 18b963a
put back previous numaflow controller definitions
juliev0 4f420c0
unit test
juliev0 f19c9f0
unit test
juliev0 a58549b
unit test
juliev0 f5649c7
unit test
juliev0 f22ecd1
unit test
juliev0 c5ce306
unit test
juliev0 7f87d88
unit test
juliev0 5c24c03
unit test
juliev0 d5f1ca1
merge main
juliev0 88e5002
tests
juliev0 08a3628
e2e
juliev0 8368cc1
fix e2e pause condition test
juliev0 d693eee
e2e and unit test
juliev0 3011da9
decision engine
juliev0 7102ef6
fix application of desiredPhase field
juliev0 06b94ed
consolidate desiredphase functionality
juliev0 4483d05
decision engine
juliev0 a458c68
decision engine - resolve some comments
juliev0 fc1866d
move defaultUpgradeStrategy field from global config to USDE config
juliev0 2cb63ae
explictly set user strategy for unit test
juliev0 946964e
fix unit test
juliev0 dcf37e5
Merge branch 'main' into decision-engine
juliev0 e58d343
remove feature flag from numaplane main config
juliev0 46929f2
Merge branch 'decision-engine' of github.com:numaproj/numaplane into …
juliev0 3cdd599
remove unneeded file
juliev0 c01d119
typo
juliev0 a9eed6a
remove defaultUpgradeStrategy from numaplane config
juliev0 1483dd7
check inProgressStrategy
juliev0 2dde94a
fix: empty commit
juliev0 fb97193
renaming
juliev0 eea2ad0
merge main
juliev0 fe52380
comment
juliev0 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
package config | ||
|
||
import ( | ||
"encoding/json" | ||
"fmt" | ||
) | ||
|
||
type USDEUserStrategy string | ||
|
||
const ( | ||
ProgressiveStrategyID USDEUserStrategy = "progressive" | ||
PPNDStrategyID USDEUserStrategy = "pause-and-drain" | ||
NoStrategyID USDEUserStrategy = "" | ||
) | ||
|
||
func (s *USDEUserStrategy) UnmarshalJSON(data []byte) (err error) { | ||
var usdeUserStrategyStr string | ||
if err := json.Unmarshal(data, &usdeUserStrategyStr); err != nil { | ||
return err | ||
} | ||
|
||
allowedValues := map[USDEUserStrategy]struct{}{ | ||
ProgressiveStrategyID: {}, | ||
PPNDStrategyID: {}, | ||
NoStrategyID: {}} // TODO: this will no longer be allowed in the future | ||
|
||
// Make sure the string is one of the possible strategy values | ||
_, found := allowedValues[USDEUserStrategy(usdeUserStrategyStr)] | ||
if !found { | ||
return fmt.Errorf("invalid strategy '%s' (allowed values: %+v)", usdeUserStrategyStr, allowedValues) | ||
} | ||
|
||
*s = USDEUserStrategy(usdeUserStrategyStr) | ||
|
||
return nil | ||
} | ||
|
||
func (s USDEUserStrategy) IsValid() bool { | ||
switch s { | ||
case ProgressiveStrategyID: | ||
return true | ||
case PPNDStrategyID: | ||
return true | ||
default: | ||
return false | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
package controller | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
|
||
k8stypes "k8s.io/apimachinery/pkg/types" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
|
||
apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1" | ||
) | ||
|
||
// inProgressStrategyMgr is responsible to maintain a Rollout's inProgressStrategy | ||
// state is maintained both in memory as well as in the Rollout's Status | ||
// in memory always gives us the latest state in case the Informer cache is out of date | ||
// the Rollout's Status is useful as a backup mechanism in case Numaplane has just restarted | ||
type inProgressStrategyMgr struct { | ||
getRolloutStrategy func(context.Context, client.Object) *apiv1.UpgradeStrategy | ||
setRolloutStrategy func(context.Context, client.Object, apiv1.UpgradeStrategy) | ||
store *inProgressStrategyStore | ||
} | ||
|
||
// in memory storage of UpgradeStrategy in progress for a given Rollout | ||
type inProgressStrategyStore struct { | ||
inProgressUpgradeStrategies map[string]apiv1.UpgradeStrategy | ||
mutex sync.RWMutex | ||
} | ||
|
||
func newInProgressStrategyMgr( | ||
getRolloutStrategy func(context.Context, client.Object) *apiv1.UpgradeStrategy, | ||
setRolloutStrategy func(context.Context, client.Object, apiv1.UpgradeStrategy)) *inProgressStrategyMgr { | ||
|
||
return &inProgressStrategyMgr{ | ||
getRolloutStrategy: getRolloutStrategy, | ||
setRolloutStrategy: setRolloutStrategy, | ||
store: newInProgressStrategyStore(), | ||
} | ||
} | ||
|
||
func newInProgressStrategyStore() *inProgressStrategyStore { | ||
return &inProgressStrategyStore{ | ||
inProgressUpgradeStrategies: map[string]apiv1.UpgradeStrategy{}, | ||
} | ||
} | ||
|
||
func (mgr *inProgressStrategyMgr) getStrategy(ctx context.Context, rollout client.Object) apiv1.UpgradeStrategy { | ||
return mgr.synchronize(ctx, rollout) | ||
} | ||
|
||
// make sure in-memory value and Rollout Status value are synchronized | ||
// if in-memory value is set, make sure Rollout Status value gets set the same | ||
juliev0 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// if in-memory value isn't set, make sure in-memory value gets set to Rollout Status value | ||
func (mgr *inProgressStrategyMgr) synchronize(ctx context.Context, rollout client.Object) apiv1.UpgradeStrategy { | ||
namespacedName := k8stypes.NamespacedName{Namespace: rollout.GetNamespace(), Name: rollout.GetName()} | ||
|
||
// first look for value in memory | ||
foundInMemory, inMemoryStrategy := mgr.store.getStrategy(namespacedName) | ||
|
||
// now look for the value in the Resource | ||
crDefinedStrategy := mgr.getRolloutStrategy(ctx, rollout) | ||
|
||
// if in-memory value is set, make sure Rollout Status value gets set the same | ||
if foundInMemory { | ||
mgr.setRolloutStrategy(ctx, rollout, inMemoryStrategy) | ||
return inMemoryStrategy | ||
} else { | ||
// make sure in-memory value gets set to Rollout Status value | ||
if crDefinedStrategy != nil { | ||
mgr.store.setStrategy(namespacedName, *crDefinedStrategy) | ||
return *crDefinedStrategy | ||
} else { | ||
return apiv1.UpgradeStrategyNoOp | ||
} | ||
} | ||
} | ||
|
||
// store in both memory and the Resource itself | ||
func (mgr *inProgressStrategyMgr) setStrategy(ctx context.Context, rollout client.Object, upgradeStrategy apiv1.UpgradeStrategy) { | ||
namespacedName := k8stypes.NamespacedName{Namespace: rollout.GetNamespace(), Name: rollout.GetName()} | ||
|
||
mgr.store.setStrategy(namespacedName, upgradeStrategy) | ||
mgr.setRolloutStrategy(ctx, rollout, upgradeStrategy) | ||
} | ||
|
||
func (mgr *inProgressStrategyMgr) unsetStrategy(ctx context.Context, rollout client.Object) { | ||
mgr.setStrategy(ctx, rollout, apiv1.UpgradeStrategyNoOp) | ||
} | ||
|
||
// return whether found, and if so, the value | ||
func (store *inProgressStrategyStore) getStrategy(namespacedName k8stypes.NamespacedName) (bool, apiv1.UpgradeStrategy) { | ||
key := namespacedNameToKey(namespacedName) | ||
store.mutex.RLock() | ||
strategy, found := store.inProgressUpgradeStrategies[key] | ||
store.mutex.RUnlock() | ||
return found, strategy | ||
} | ||
|
||
func (store *inProgressStrategyStore) setStrategy(namespacedName k8stypes.NamespacedName, upgradeStrategy apiv1.UpgradeStrategy) { | ||
key := namespacedNameToKey(namespacedName) | ||
store.mutex.Lock() | ||
store.inProgressUpgradeStrategies[key] = upgradeStrategy | ||
store.mutex.Unlock() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
package controller | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
|
||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
k8stypes "k8s.io/apimachinery/pkg/types" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
|
||
apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1" | ||
) | ||
|
||
func Test_inProgressStrategyMgr_getStrategy(t *testing.T) { | ||
|
||
progressiveStrategy := apiv1.UpgradeStrategyProgressive | ||
ppndStrategy := apiv1.UpgradeStrategyPPND | ||
noStrategy := apiv1.UpgradeStrategyNoOp | ||
|
||
testCases := []struct { | ||
name string | ||
inMemoryStrategy *apiv1.UpgradeStrategy | ||
rolloutStatusStrategy *apiv1.UpgradeStrategy | ||
resultStrategy apiv1.UpgradeStrategy | ||
}{ | ||
{ | ||
name: "in memory and in Rollout Status (progressive result)", | ||
inMemoryStrategy: &progressiveStrategy, | ||
rolloutStatusStrategy: &noStrategy, | ||
resultStrategy: progressiveStrategy, | ||
}, | ||
{ | ||
name: "in memory and in Rollout Status (no op result)", | ||
inMemoryStrategy: &noStrategy, | ||
rolloutStatusStrategy: &ppndStrategy, | ||
resultStrategy: noStrategy, | ||
}, | ||
{ | ||
name: "in memory and not in Rollout Status", | ||
inMemoryStrategy: &progressiveStrategy, | ||
rolloutStatusStrategy: nil, | ||
resultStrategy: progressiveStrategy, | ||
}, | ||
{ | ||
name: "in Rollout Status and not in memory", | ||
inMemoryStrategy: nil, | ||
rolloutStatusStrategy: &ppndStrategy, | ||
resultStrategy: ppndStrategy, | ||
}, | ||
{ | ||
name: "neither in Rollout Status nor in memory", | ||
inMemoryStrategy: nil, | ||
rolloutStatusStrategy: nil, | ||
resultStrategy: noStrategy, | ||
}, | ||
} | ||
|
||
for _, tc := range testCases { | ||
t.Run(tc.name, func(t *testing.T) { | ||
inProgressStrategyMgr := newInProgressStrategyMgr( | ||
// getRolloutStrategy function: | ||
func(ctx context.Context, rollout client.Object) *apiv1.UpgradeStrategy { | ||
return tc.rolloutStatusStrategy | ||
}, | ||
// setRolloutStrategy function: | ||
func(ctx context.Context, rollout client.Object, strategy apiv1.UpgradeStrategy) {}, | ||
) | ||
pipelineRollout := &apiv1.PipelineRollout{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "my-pipeline"}} | ||
namespacedName := k8stypes.NamespacedName{Namespace: pipelineRollout.GetNamespace(), Name: pipelineRollout.GetName()} | ||
if tc.inMemoryStrategy != nil { | ||
inProgressStrategyMgr.store.setStrategy(namespacedName, *tc.inMemoryStrategy) | ||
} | ||
upgradeStrategyResult := inProgressStrategyMgr.getStrategy(context.Background(), pipelineRollout) | ||
assert.Equal(t, tc.resultStrategy, upgradeStrategyResult) | ||
}) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved the stuff from this file that pertains to the user config to its own file (
user_config.go
)