Skip to content

Commit

Permalink
fix: avoid to update resource status with explictly status check
Browse files Browse the repository at this point in the history
Signed-off-by: bjwswang <[email protected]>
  • Loading branch information
bjwswang committed Jan 19, 2024
1 parent 360066b commit 349c5c6
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 109 deletions.
42 changes: 42 additions & 0 deletions api/base/v1alpha1/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ limitations under the License.

package v1alpha1

import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
LabelDatasourceType = Group + "/datasource-type"
)
Expand All @@ -41,3 +46,40 @@ func (ds DatasourceSpec) Type() DatasourceType {
return DatasourceTypeUnknown
}
}

func (datasource Datasource) ReadyCondition() Condition {
currCon := datasource.Status.GetCondition(TypeReady)
// return current condition if condition not changed
if currCon.Status == corev1.ConditionTrue && currCon.Reason == ReasonAvailable {
return currCon
}
return Condition{
Type: TypeReady,
Status: corev1.ConditionTrue,
Reason: ReasonAvailable,
Message: "Check Success",
LastTransitionTime: metav1.Now(),
LastSuccessfulTime: metav1.Now(),
}
}

func (datasource Datasource) ErrorCondition(msg string) Condition {
currCon := datasource.Status.GetCondition(TypeReady)
// return current condition if condition not changed
if currCon.Status == corev1.ConditionFalse && currCon.Reason == ReasonUnavailable && currCon.Message == msg {
return currCon
}
// keep original LastSuccessfulTime if have
lastSuccessfulTime := metav1.Now()
if currCon.LastSuccessfulTime.IsZero() {
lastSuccessfulTime = currCon.LastSuccessfulTime
}
return Condition{
Type: TypeReady,
Status: corev1.ConditionFalse,
Reason: ReasonUnavailable,
Message: msg,
LastSuccessfulTime: lastSuccessfulTime,
LastTransitionTime: metav1.Now(),
}
}
39 changes: 39 additions & 0 deletions api/base/v1alpha1/embedder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package v1alpha1
import (
"context"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -75,3 +77,40 @@ func (e Embedder) Get3rdPartyModels() []string {

return []string{}
}

func (e Embedder) ReadyCondition(msg string) Condition {
currCon := e.Status.GetCondition(TypeReady)
// return current condition if condition not changed
if currCon.Status == corev1.ConditionTrue && currCon.Reason == ReasonAvailable && currCon.Message == msg {
return currCon
}
return Condition{
Type: TypeReady,
Status: corev1.ConditionTrue,
Reason: ReasonAvailable,
Message: msg,
LastTransitionTime: metav1.Now(),
LastSuccessfulTime: metav1.Now(),
}
}

func (e Embedder) ErrorCondition(msg string) Condition {
currCon := e.Status.GetCondition(TypeReady)
// return current condition if condition not changed
if currCon.Status == corev1.ConditionFalse && currCon.Reason == ReasonUnavailable && currCon.Message == msg {
return currCon
}
// keep original LastSuccessfulTime if have
lastSuccessfulTime := metav1.Now()
if currCon.LastSuccessfulTime.IsZero() {
lastSuccessfulTime = currCon.LastSuccessfulTime
}
return Condition{
Type: TypeReady,
Status: corev1.ConditionFalse,
Reason: ReasonUnavailable,
Message: msg,
LastSuccessfulTime: lastSuccessfulTime,
LastTransitionTime: metav1.Now(),
}
}
39 changes: 39 additions & 0 deletions api/base/v1alpha1/llm.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package v1alpha1
import (
"context"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -84,3 +86,40 @@ func (llm LLM) Get3rdPartyModels() []string {
}
return []string{}
}

func (llm LLM) ReadyCondition(msg string) Condition {
currCon := llm.Status.GetCondition(TypeReady)
// return current condition if condition not changed
if currCon.Status == corev1.ConditionTrue && currCon.Reason == ReasonAvailable && currCon.Message == msg {
return currCon
}
return Condition{
Type: TypeReady,
Status: corev1.ConditionTrue,
Reason: ReasonAvailable,
Message: msg,
LastTransitionTime: metav1.Now(),
LastSuccessfulTime: metav1.Now(),
}
}

func (llm LLM) ErrorCondition(msg string) Condition {
currCon := llm.Status.GetCondition(TypeReady)
// return current condition if condition not changed
if currCon.Status == corev1.ConditionFalse && currCon.Reason == ReasonUnavailable && currCon.Message == msg {
return currCon
}
// keep original LastSuccessfulTime if have
lastSuccessfulTime := metav1.Now()
if currCon.LastSuccessfulTime.IsZero() {
lastSuccessfulTime = currCon.LastSuccessfulTime
}
return Condition{
Type: TypeReady,
Status: corev1.ConditionFalse,
Reason: ReasonUnavailable,
Message: msg,
LastSuccessfulTime: lastSuccessfulTime,
LastTransitionTime: metav1.Now(),
}
}
40 changes: 40 additions & 0 deletions api/base/v1alpha1/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package v1alpha1
import (
"fmt"
"strings"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
Expand Down Expand Up @@ -58,3 +61,40 @@ func (model Model) FullPath() string {
func (model Model) ObjectPath() string {
return fmt.Sprintf("model/%s/", model.Name)
}

func (model Model) ReadyCondition() Condition {
currCon := model.Status.GetCondition(TypeReady)
// return current condition if condition not changed
if currCon.Status == corev1.ConditionTrue && currCon.Reason == ReasonAvailable {
return currCon
}
return Condition{
Type: TypeReady,
Status: corev1.ConditionTrue,
Reason: ReasonAvailable,
Message: "Check Success",
LastTransitionTime: metav1.Now(),
LastSuccessfulTime: metav1.Now(),
}
}

func (model Model) ErrorCondition(msg string) Condition {
currCon := model.Status.GetCondition(TypeReady)
// return current condition if condition not changed
if currCon.Status == corev1.ConditionFalse && currCon.Reason == ReasonUnavailable && currCon.Message == msg {
return currCon
}
// keep original LastSuccessfulTime if have
lastSuccessfulTime := metav1.Now()
if currCon.LastSuccessfulTime.IsZero() {
lastSuccessfulTime = currCon.LastSuccessfulTime
}
return Condition{
Type: TypeReady,
Status: corev1.ConditionFalse,
Reason: ReasonUnavailable,
Message: msg,
LastSuccessfulTime: lastSuccessfulTime,
LastTransitionTime: metav1.Now(),
}
}
19 changes: 2 additions & 17 deletions controllers/base/datasource_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"reflect"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand Down Expand Up @@ -198,23 +196,10 @@ func (r *DatasourceReconciler) UpdateStatus(ctx context.Context, instance *arcad
var newCondition arcadiav1alpha1.Condition
if err != nil {
// set condition to False
newCondition = arcadiav1alpha1.Condition{
Type: arcadiav1alpha1.TypeReady,
Status: corev1.ConditionFalse,
Reason: arcadiav1alpha1.ReasonUnavailable,
Message: err.Error(),
LastTransitionTime: metav1.Now(),
}
newCondition = instance.ErrorCondition(err.Error())
} else {
// set condition to True
newCondition = arcadiav1alpha1.Condition{
Type: arcadiav1alpha1.TypeReady,
Status: corev1.ConditionTrue,
Reason: arcadiav1alpha1.ReasonAvailable,
Message: "Check Success",
LastTransitionTime: metav1.Now(),
LastSuccessfulTime: metav1.Now(),
}
newCondition = instance.ReadyCondition()
}
instanceCopy.Status.SetConditions(newCondition)
return r.Client.Status().Update(ctx, instanceCopy)
Expand Down
33 changes: 4 additions & 29 deletions controllers/base/embedder_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (
"github.com/go-logr/logr"
langchainembeddings "github.com/tmc/langchaingo/embeddings"
langchainopenai "github.com/tmc/langchaingo/llms/openai"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -127,16 +125,6 @@ func (r *EmbedderReconciler) SetupWithManager(mgr ctrl.Manager) error {
newEmbedder := ue.ObjectNew.(*arcadiav1alpha1.Embedder)
return !reflect.DeepEqual(oldEmbedder.Spec, newEmbedder.Spec) || newEmbedder.DeletionTimestamp != nil
},
// for other event handler, we must add the function explicitly.
CreateFunc: func(event.CreateEvent) bool {
return true
},
DeleteFunc: func(event.DeleteEvent) bool {
return true
},
GenericFunc: func(event.GenericEvent) bool {
return true
},
})).
Watches(&source.Kind{Type: &arcadiav1alpha1.Worker{}},
handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request {
Expand Down Expand Up @@ -257,28 +245,15 @@ func (r *EmbedderReconciler) UpdateStatus(ctx context.Context, instance *arcadia
instanceCopy := instance.DeepCopy()
var newCondition arcadiav1alpha1.Condition
if err != nil {
// Set status to unavailable
newCondition = arcadiav1alpha1.Condition{
Type: arcadiav1alpha1.TypeReady,
Status: corev1.ConditionFalse,
Reason: arcadiav1alpha1.ReasonUnavailable,
Message: err.Error(),
LastTransitionTime: metav1.Now(),
}
// set condition to False
newCondition = instance.ErrorCondition(err.Error())
} else {
msg, ok := t.(string)
if !ok {
msg = _StatusNilResponse
}
// Set status to available
newCondition = arcadiav1alpha1.Condition{
Type: arcadiav1alpha1.TypeReady,
Status: corev1.ConditionTrue,
Reason: arcadiav1alpha1.ReasonAvailable,
Message: msg,
LastTransitionTime: metav1.Now(),
LastSuccessfulTime: metav1.Now(),
}
// set condition to True
newCondition = instance.ReadyCondition(msg)
}
instanceCopy.Status.SetConditions(newCondition)
return r.Client.Status().Update(ctx, instanceCopy)
Expand Down
58 changes: 14 additions & 44 deletions controllers/base/llm_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (

"github.com/go-logr/logr"
langchainllms "github.com/tmc/langchaingo/llms"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -122,7 +120,14 @@ func (r *LLMReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
// SetupWithManager sets up the controller with the Manager.
func (r *LLMReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&arcadiav1alpha1.LLM{}, builder.WithPredicates(LLMPredicates{})).
For(&arcadiav1alpha1.LLM{}, builder.WithPredicates(predicate.Funcs{
UpdateFunc: func(ue event.UpdateEvent) bool {
// Avoid to handle the event that it's not spec update or delete
oldLLM := ue.ObjectOld.(*arcadiav1alpha1.LLM)
newLLM := ue.ObjectNew.(*arcadiav1alpha1.LLM)
return !reflect.DeepEqual(oldLLM.Spec, newLLM.Spec) || oldLLM.DeletionTimestamp != nil
},
})).
Watches(&source.Kind{Type: &arcadiav1alpha1.Worker{}},
handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request {
worker := o.(*arcadiav1alpha1.Worker)
Expand Down Expand Up @@ -228,53 +233,18 @@ func (r *LLMReconciler) checkWorkerLLM(ctx context.Context, logger logr.Logger,

func (r *LLMReconciler) UpdateStatus(ctx context.Context, instance *arcadiav1alpha1.LLM, t interface{}, err error) error {
instanceCopy := instance.DeepCopy()
var newCondition arcadiav1alpha1.Condition
if err != nil {
// Set status to unavailable
instanceCopy.Status.SetConditions(arcadiav1alpha1.Condition{
Type: arcadiav1alpha1.TypeReady,
Status: corev1.ConditionFalse,
Reason: arcadiav1alpha1.ReasonUnavailable,
Message: err.Error(),
LastTransitionTime: metav1.Now(),
})
// set condition to False
newCondition = instance.ErrorCondition(err.Error())
} else {
msg, ok := t.(string)
if !ok {
msg = _StatusNilResponse
}
// Set status to available
instanceCopy.Status.SetConditions(arcadiav1alpha1.Condition{
Type: arcadiav1alpha1.TypeReady,
Status: corev1.ConditionTrue,
Reason: arcadiav1alpha1.ReasonAvailable,
Message: msg,
LastTransitionTime: metav1.Now(),
LastSuccessfulTime: metav1.Now(),
})
// set condition to True
newCondition = instance.ReadyCondition(msg)
}
instanceCopy.Status.SetConditions(newCondition)
return r.Client.Status().Update(ctx, instanceCopy)
}

type LLMPredicates struct {
predicate.Funcs
}

func (llm LLMPredicates) Create(ce event.CreateEvent) bool {
prompt := ce.Object.(*arcadiav1alpha1.LLM)
return len(prompt.Status.ConditionedStatus.Conditions) == 0
}

func (llm LLMPredicates) Update(ue event.UpdateEvent) bool {
oldLLM := ue.ObjectOld.(*arcadiav1alpha1.LLM)
newLLM := ue.ObjectNew.(*arcadiav1alpha1.LLM)

return !reflect.DeepEqual(oldLLM.Spec, newLLM.Spec) || newLLM.DeletionTimestamp != nil
}

func (llm LLMPredicates) Delete(de event.DeleteEvent) bool {
return true
}

func (llm LLMPredicates) Generic(ge event.GenericEvent) bool {
return true
}
Loading

0 comments on commit 349c5c6

Please sign in to comment.