Skip to content

Commit

Permalink
Merge branch 'main' into shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
yhl25 authored Nov 5, 2024
2 parents 78302cf + 9c1d3ce commit 3013927
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3174,6 +3174,7 @@ spec:
- Pending
- Running
- Failed
- Deleting
type: string
type:
type: string
Expand Down
1 change: 1 addition & 0 deletions config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3173,6 +3173,7 @@ spec:
- Pending
- Running
- Failed
- Deleting
type: string
type:
type: string
Expand Down
1 change: 1 addition & 0 deletions config/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3173,6 +3173,7 @@ spec:
- Pending
- Running
- Failed
- Deleting
type: string
type:
type: string
Expand Down
14 changes: 8 additions & 6 deletions pkg/apis/numaflow/v1alpha1/isbsvc_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// +kubebuilder:validation:Enum="";Pending;Running;Failed
// +kubebuilder:validation:Enum="";Pending;Running;Failed;Deleting
type ISBSvcPhase string

const (
ISBSvcPhaseUnknown ISBSvcPhase = ""
ISBSvcPhasePending ISBSvcPhase = "Pending"
ISBSvcPhaseRunning ISBSvcPhase = "Running"
ISBSvcPhaseFailed ISBSvcPhase = "Failed"
ISBSvcPhaseUnknown ISBSvcPhase = ""
ISBSvcPhasePending ISBSvcPhase = "Pending"
ISBSvcPhaseRunning ISBSvcPhase = "Running"
ISBSvcPhaseFailed ISBSvcPhase = "Failed"
ISBSvcPhaseDeleting ISBSvcPhase = "Deleting"

// ISBSvcConditionConfigured has the status True when the InterStepBufferService
// has valid configuration.
Expand Down Expand Up @@ -150,7 +151,8 @@ func (iss *InterStepBufferServiceStatus) SetObservedGeneration(value int64) {

// IsHealthy indicates whether the InterStepBufferService is healthy or not
func (iss *InterStepBufferServiceStatus) IsHealthy() bool {
if iss.Phase != ISBSvcPhaseRunning {
// Deleting is a special case, we don't want to mark it as unhealthy as Pipeline reconciliation relies on it
if iss.Phase != ISBSvcPhaseRunning && iss.Phase != ISBSvcPhaseDeleting {
return false
}
return iss.IsReady()
Expand Down
1 change: 1 addition & 0 deletions pkg/reconciler/isbsvc/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func (r *interStepBufferServiceReconciler) reconcile(ctx context.Context, isbSvc
// Finalizer logic should be added here.
if err := installer.Uninstall(ctx, isbSvc, r.client, r.kubeClient, r.config, log, r.recorder); err != nil {
log.Errorw("Failed to uninstall", zap.Error(err))
isbSvc.Status.SetPhase(dfv1.ISBSvcPhaseDeleting, err.Error())
return err
}
controllerutil.RemoveFinalizer(isbSvc, finalizerName)
Expand Down
27 changes: 27 additions & 0 deletions pkg/reconciler/isbsvc/installer/installer.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,37 @@ func getInstaller(isbSvc *dfv1.InterStepBufferService, client client.Client, kub
//
// It could also be used to check if the ISB Service object can be safely deleted.
func Uninstall(ctx context.Context, isbSvc *dfv1.InterStepBufferService, client client.Client, kubeClient kubernetes.Interface, config *reconciler.GlobalConfig, logger *zap.SugaredLogger, recorder record.EventRecorder) error {
pls, err := referencedPipelines(ctx, client, isbSvc)
if err != nil {
return fmt.Errorf("failed to check if there is any pipeline using this InterStepBufferService, %w", err)
}
if pls > 0 {
return fmt.Errorf("can not delete InterStepBufferService %q which has %d pipelines connected", isbSvc.Name, pls)
}
installer, err := getInstaller(isbSvc, client, kubeClient, config, logger, recorder)
if err != nil {
logger.Errorw("Failed to get an installer", zap.Error(err))
return err
}
return installer.Uninstall(ctx)
}

func referencedPipelines(ctx context.Context, c client.Client, isbSvc *dfv1.InterStepBufferService) (int, error) {
pipelines := &dfv1.PipelineList{}
if err := c.List(ctx, pipelines, &client.ListOptions{
Namespace: isbSvc.Namespace,
}); err != nil {
return 0, err
}
result := 0
for _, pl := range pipelines.Items {
isbSvcName := pl.Spec.InterStepBufferServiceName
if isbSvcName == "" {
isbSvcName = dfv1.DefaultISBSvcName
}
if isbSvcName == isbSvc.Name {
result++
}
}
return result, nil
}
67 changes: 67 additions & 0 deletions pkg/reconciler/isbsvc/installer/installer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,4 +250,71 @@ func TestUnInstall(t *testing.T) {
err := Uninstall(ctx, testObj, cl, kubeClient, fakeConfig, zaptest.NewLogger(t).Sugar(), record.NewFakeRecorder(64))
assert.NoError(t, err)
})

t.Run("test has pl connected", func(t *testing.T) {
testPipeline := &dfv1.Pipeline{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pl",
Namespace: testNamespace,
},
Spec: dfv1.PipelineSpec{
InterStepBufferServiceName: testISBSName,
},
}
err := cl.Create(ctx, testPipeline)
assert.NoError(t, err)
testObj := testJetStreamIsbSvc.DeepCopy()
err = Uninstall(ctx, testObj, cl, kubeClient, fakeConfig, zaptest.NewLogger(t).Sugar(), record.NewFakeRecorder(64))
assert.Error(t, err)
assert.Contains(t, err.Error(), "connected")
})
}

func Test_referencedPipelines(t *testing.T) {
cl := fake.NewClientBuilder().Build()
ctx := context.TODO()

t.Run("test no referenced pls", func(t *testing.T) {
testObj := testJetStreamIsbSvc.DeepCopy()
pls, err := referencedPipelines(ctx, cl, testObj)
assert.NoError(t, err)
assert.Equal(t, 0, pls)
})

t.Run("test having referenced pls - non default isbsvc", func(t *testing.T) {
testPipeline := &dfv1.Pipeline{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pl",
Namespace: testNamespace,
},
Spec: dfv1.PipelineSpec{
InterStepBufferServiceName: testISBSName,
},
}
err := cl.Create(ctx, testPipeline)
assert.NoError(t, err)
testObj := testJetStreamIsbSvc.DeepCopy()
pls, err := referencedPipelines(ctx, cl, testObj)
assert.NoError(t, err)
assert.Equal(t, 1, pls)
})

t.Run("test having referenced pls - default isbsvc", func(t *testing.T) {
testPipeline := &dfv1.Pipeline{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pl-1",
Namespace: testNamespace,
},
Spec: dfv1.PipelineSpec{
InterStepBufferServiceName: "",
},
}
err := cl.Create(ctx, testPipeline)
assert.NoError(t, err)
testObj := testJetStreamIsbSvc.DeepCopy()
testObj.Name = "default"
pls, err := referencedPipelines(ctx, cl, testObj)
assert.NoError(t, err)
assert.Equal(t, 1, pls)
})
}

0 comments on commit 3013927

Please sign in to comment.