From 746f35f883c78883549de082d0b2500ddd54f32c Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Sat, 12 Oct 2024 13:42:01 -0400 Subject: [PATCH] :bug: Fakeclient: Allow concurrent patching if RV is unset Currently, the fake client always errors if patches are done concurrently rather than only if the patch contains a ResourceVersion. This is because we have a number of checks including the one related to resourceVersion implemented in a `versionedTracker`. The `versionedTracker` receives the patched object and assumes that the patchedObject only contains a RV if the patch had one. That turns out not to be true, it almost always has one. The reason the object we receive in the `versionedTracker` almost always has a RV is that we use a client-go reactor to apply the patch to an object. The way this works is that the reactor takes the patch and a tracker, fetches the object from the tracker and applies the patch to it. This means that the resulting object always has a resourceVersion unless the patch explicitly set it to `null`. This `null` case apparently is special cased in the Kubernetes apiserver to be acceptable, so we do the same here. Fix the original issue by checking in the fakeclient if the patch modifies the RV and if not, retry conflicts. --- pkg/client/fake/client.go | 18 ++++++++++ pkg/client/fake/client_test.go | 63 ++++++++++++++++++++++++++++++++-- 2 files changed, 78 insertions(+), 3 deletions(-) diff --git a/pkg/client/fake/client.go b/pkg/client/fake/client.go index 87a8a8380b..39a6c78fe8 100644 --- a/pkg/client/fake/client.go +++ b/pkg/client/fake/client.go @@ -468,6 +468,11 @@ func (t versionedTracker) updateObject(gvr schema.GroupVersionResource, obj runt switch { case allowsUnconditionalUpdate(gvk): accessor.SetResourceVersion(oldAccessor.GetResourceVersion()) + // This is needed because if the patch explicitly sets the RV to null, the client-go reaction we use + // to apply it and whose output we process here will have it unset. It is not clear why the Kubernetes + // apiserver accepts such a patch, but it does so we just copy that behavior. + // Kubernetes apiserver behavior can be checked like this: + // `kubectl patch configmap foo --patch '{"metadata":{"annotations":{"foo":"bar"},"resourceVersion":null}}' -v=9` case bytes. Contains(debug.Stack(), []byte("sigs.k8s.io/controller-runtime/pkg/client/fake.(*fakeClient).Patch")): // We apply patches using a client-go reaction that ends up calling the trackers Update. As we can't change @@ -904,6 +909,16 @@ func (c *fakeClient) patch(obj client.Object, patch client.Patch, opts ...client return err } + // retryOnConflict unless the patch includes a resourceVersion. We have to + // compare the resource version of newObj with oldObject because newObj is + // oldObj + the patch. It is important to note that the RV in it might be + // different from the RV in obj, as oldObj is fetched from the tracker and + // a concurrent actor could have modified it. + retryOnConflict := true + if newObj.GetResourceVersion() != oldAccessor.GetResourceVersion() { + retryOnConflict = false + } + // Validate that deletionTimestamp has not been changed if !deletionTimestampEqual(newObj, oldAccessor) { return fmt.Errorf("rejected patch, metadata.deletionTimestamp immutable") @@ -912,6 +927,9 @@ func (c *fakeClient) patch(obj client.Object, patch client.Patch, opts ...client reaction := testing.ObjectReaction(c.tracker) handled, o, err := reaction(action) if err != nil { + if retryOnConflict && apierrors.IsConflict(err) { + return c.patch(obj, patch, opts...) + } return err } if !handled { diff --git a/pkg/client/fake/client_test.go b/pkg/client/fake/client_test.go index 0a7d17db47..dca05e20fe 100644 --- a/pkg/client/fake/client_test.go +++ b/pkg/client/fake/client_test.go @@ -21,6 +21,7 @@ import ( "encoding/json" "fmt" "strconv" + "sync" "time" "github.com/google/go-cmp/cmp" @@ -580,7 +581,7 @@ var _ = Describe("Fake client", func() { Expect(obj.ObjectMeta.ResourceVersion).To(Equal("1000")) }) - It("should allow patch with non-set ResourceVersion for a resource that doesn't allow unconditional updates", func() { + It("should allow patch when the patch sets RV to 'null'", func() { schemeBuilder := &scheme.Builder{GroupVersion: schema.GroupVersion{Group: "test", Version: "v1"}} schemeBuilder.Register(&WithPointerMeta{}, &WithPointerMetaList{}) @@ -605,6 +606,7 @@ var _ = Describe("Fake client", func() { "foo": "bar", }, }} + Expect(cl.Patch(context.Background(), newObj, client.MergeFrom(original))).To(Succeed()) patched := &WithPointerMeta{} @@ -2134,6 +2136,61 @@ var _ = Describe("Fake client", func() { Expect(apierrors.IsNotFound(err)).To(BeTrue()) }) + It("should allow concurrent patches to a configMap", func() { + scheme := runtime.NewScheme() + Expect(corev1.AddToScheme(scheme)).To(Succeed()) + + obj := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + ResourceVersion: "0", + }, + } + cl := NewClientBuilder().WithScheme(scheme).WithObjects(obj).Build() + wg := sync.WaitGroup{} + wg.Add(5) + + for i := range 5 { + go func() { + defer wg.Done() + defer GinkgoRecover() + + newObj := obj.DeepCopy() + newObj.Data = map[string]string{"foo": strconv.Itoa(i)} + Expect(cl.Patch(context.Background(), newObj, client.MergeFrom(obj))).To(Succeed()) + }() + } + wg.Wait() + }) + + It("should not allow concurrent patches to a configMap if the patch contains a ResourceVersion", func() { + scheme := runtime.NewScheme() + Expect(corev1.AddToScheme(scheme)).To(Succeed()) + + obj := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + ResourceVersion: "0", + }, + } + cl := NewClientBuilder().WithScheme(scheme).WithObjects(obj).Build() + wg := sync.WaitGroup{} + wg.Add(5) + + for i := range 5 { + go func() { + defer wg.Done() + defer GinkgoRecover() + + newObj := obj.DeepCopy() + newObj.ResourceVersion = "1" // include an invalid RV to cause a conflcit + newObj.Data = map[string]string{"foo": strconv.Itoa(i)} + Expect(apierrors.IsConflict(cl.Patch(context.Background(), newObj, client.MergeFrom(obj)))).To(BeTrue()) + }() + } + wg.Wait() + }) + It("disallows scale subresources on unsupported built-in types", func() { scheme := runtime.NewScheme() Expect(corev1.AddToScheme(scheme)).To(Succeed()) @@ -2288,8 +2345,8 @@ func (t *WithPointerMetaList) DeepCopyObject() runtime.Object { } type WithPointerMeta struct { - *metav1.TypeMeta - *metav1.ObjectMeta + *metav1.TypeMeta `json:",inline"` + *metav1.ObjectMeta `json:"metadata,omitempty"` } func (t *WithPointerMeta) DeepCopy() *WithPointerMeta {