From 7bab6a9c6ed7d4e54da3e010a7bf0a30786d28f9 Mon Sep 17 00:00:00 2001 From: wojtekt Date: Tue, 3 Nov 2020 17:30:26 +0100 Subject: [PATCH] Use current state from watchcache to avoid etcd get for deletions --- .../apiserver/pkg/storage/cacher/cacher.go | 15 +- .../apiserver/pkg/storage/etcd3/store.go | 72 ++++++-- .../apiserver/pkg/storage/etcd3/store_test.go | 166 ++++++++++++++++++ 3 files changed, 235 insertions(+), 18 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 8609faef028..9170a63a46e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -433,8 +433,19 @@ func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object // Delete implements storage.Interface. func (c *Cacher) Delete( ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, - validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error { - return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, cachedExistingObject) + validateDeletion storage.ValidateObjectFunc, _ runtime.Object) error { + // Ignore the suggestion and try to pass down the current version of the object + // read from cache. + if elem, exists, err := c.watchCache.GetByKey(key); err != nil { + klog.Errorf("GetByKey returned error: %v", err) + } else if exists { + // DeepCopy the object since we modify resource version when serializing the + // current object. + currObj := elem.(*storeElement).Object.DeepCopyObject() + return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, currObj) + } + // If we couldn't get the object, fallback to no-suggestion. + return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, nil) } // Watch implements storage.Interface. diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 34b1b5c21d0..c185b456e43 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -199,25 +199,63 @@ func (s *store) Delete( func (s *store) conditionalDelete( ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error { - startTime := time.Now() - getResp, err := s.client.KV.Get(ctx, key) - metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime) + getCurrentState := func() (*objState, error) { + startTime := time.Now() + getResp, err := s.client.KV.Get(ctx, key) + metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime) + if err != nil { + return nil, err + } + return s.getState(getResp, key, v, false) + } + + var origState *objState + var err error + var origStateIsCurrent bool + if cachedExistingObject != nil { + origState, err = s.getStateFromObject(cachedExistingObject) + } else { + origState, err = getCurrentState() + origStateIsCurrent = true + } if err != nil { return err } + for { - origState, err := s.getState(getResp, key, v, false) - if err != nil { - return err - } if preconditions != nil { if err := preconditions.Check(key, origState.obj); err != nil { - return err + if origStateIsCurrent { + return err + } + + // It's possible we're working with stale data. + // Actually fetch + origState, err = getCurrentState() + if err != nil { + return err + } + origStateIsCurrent = true + // Retry + continue } } if err := validateDeletion(ctx, origState.obj); err != nil { - return err + if origStateIsCurrent { + return err + } + + // It's possible we're working with stale data. + // Actually fetch + origState, err = getCurrentState() + if err != nil { + return err + } + origStateIsCurrent = true + // Retry + continue } + startTime := time.Now() txnResp, err := s.client.KV.Txn(ctx).If( clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev), @@ -231,8 +269,13 @@ func (s *store) conditionalDelete( return err } if !txnResp.Succeeded { - getResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange()) + getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange()) klog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key) + origState, err = s.getState(getResp, key, v, false) + if err != nil { + return err + } + origStateIsCurrent = true continue } return decode(s.codec, s.versioner, origState.data, out, origState.rev) @@ -266,15 +309,12 @@ func (s *store) GuaranteedUpdate( var mustCheckData bool if suggestion != nil { origState, err = s.getStateFromObject(suggestion) - if err != nil { - return err - } mustCheckData = true } else { origState, err = getCurrentState() - if err != nil { - return err - } + } + if err != nil { + return err } trace.Step("initial value restored") diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index 00bf218e008..7ca1c3e2e30 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -374,6 +374,172 @@ func TestConditionalDelete(t *testing.T) { } } +// The following set of Delete tests are testing the logic of adding `suggestion` +// as a parameter with probably value of the current state. +// Introducing it for GuaranteedUpdate cause a number of issues, so we're addressing +// all of those upfront by adding appropriate tests: +// - https://github.com/kubernetes/kubernetes/pull/35415 +// [DONE] Lack of tests originally - added TestDeleteWithSuggestion. +// - https://github.com/kubernetes/kubernetes/pull/40664 +// [DONE] Irrelevant for delete, as Delete doesn't write data (nor compare it). +// - https://github.com/kubernetes/kubernetes/pull/47703 +// [DONE] Irrelevant for delete, because Delete doesn't persist data. +// - https://github.com/kubernetes/kubernetes/pull/48394/ +// [DONE] Irrelevant for delete, because Delete doesn't compare data. +// - https://github.com/kubernetes/kubernetes/pull/43152 +// [DONE] Added TestDeleteWithSuggestionAndConflict +// - https://github.com/kubernetes/kubernetes/pull/54780 +// [DONE] Irrelevant for delete, because Delete doesn't compare data. +// - https://github.com/kubernetes/kubernetes/pull/58375 +// [DONE] Irrelevant for delete, because Delete doesn't compare data. +// - https://github.com/kubernetes/kubernetes/pull/77619 +// [DONE] Added TestValidateDeletionWithSuggestion for corresponding delete checks. +// - https://github.com/kubernetes/kubernetes/pull/78713 +// [DONE] Bug was in getState function which is shared with the new code. +// - https://github.com/kubernetes/kubernetes/pull/78713 +// [DONE] Added TestPreconditionalDeleteWithSuggestion + +func TestDeleteWithSuggestion(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + + key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}}) + + out := &example.Pod{} + if err := store.Delete(ctx, key, out, nil, storage.ValidateAllObjectFunc, originalPod); err != nil { + t.Errorf("Unexpected failure during deletion: %v", err) + } + + if err := store.Get(ctx, key, storage.GetOptions{}, &example.Pod{}); !storage.IsNotFound(err) { + t.Errorf("Unexpected error on reading object: %v", err) + } +} + +func TestDeleteWithSuggestionAndConflict(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + + key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}}) + + // First update, so originalPod is outdated. + updatedPod := &example.Pod{} + if err := store.GuaranteedUpdate(ctx, key, updatedPod, false, nil, + storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { + pod := obj.(*example.Pod) + pod.ObjectMeta.Labels = map[string]string{"foo": "bar"} + return pod, nil + }), nil); err != nil { + t.Errorf("Unexpected failure during updated: %v", err) + } + + out := &example.Pod{} + if err := store.Delete(ctx, key, out, nil, storage.ValidateAllObjectFunc, originalPod); err != nil { + t.Errorf("Unexpected failure during deletion: %v", err) + } + + if err := store.Get(ctx, key, storage.GetOptions{}, &example.Pod{}); !storage.IsNotFound(err) { + t.Errorf("Unexpected error on reading object: %v", err) + } +} + +func TestDeleteWithSuggestionOfDeletedObject(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + + key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}}) + + // First delete, so originalPod is outdated. + deletedPod := &example.Pod{} + if err := store.Delete(ctx, key, deletedPod, nil, storage.ValidateAllObjectFunc, originalPod); err != nil { + t.Errorf("Unexpected failure during deletion: %v", err) + } + + // Now try deleting with stale object. + out := &example.Pod{} + if err := store.Delete(ctx, key, out, nil, storage.ValidateAllObjectFunc, originalPod); !storage.IsNotFound(err) { + t.Errorf("Unexpected error during deletion: %v, expected not-found", err) + } +} + +func TestValidateDeletionWithSuggestion(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + + key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}}) + + // Check that validaing fresh object fails. + validationError := fmt.Errorf("validation error") + validateNothing := func(_ context.Context, _ runtime.Object) error { + return validationError + } + out := &example.Pod{} + if err := store.Delete(ctx, key, out, nil, validateNothing, originalPod); err != validationError { + t.Errorf("Unexpected failure during deletion: %v", err) + } + + // First update, so originalPod is outdated. + updatedPod := &example.Pod{} + if err := store.GuaranteedUpdate(ctx, key, updatedPod, false, nil, + storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { + pod := obj.(*example.Pod) + pod.ObjectMeta.Labels = map[string]string{"foo": "bar"} + return pod, nil + }), nil); err != nil { + t.Errorf("Unexpected failure during updated: %v", err) + } + + calls := 0 + validateFresh := func(_ context.Context, obj runtime.Object) error { + calls++ + pod := obj.(*example.Pod) + if pod.ObjectMeta.Labels == nil || pod.ObjectMeta.Labels["foo"] != "bar" { + return fmt.Errorf("stale object") + } + return nil + } + + if err := store.Delete(ctx, key, out, nil, validateFresh, originalPod); err != nil { + t.Errorf("Unexpected failure during deletion: %v", err) + } + + if calls != 2 { + t.Errorf("validate function should have been called twice, called %d", calls) + } + + if err := store.Get(ctx, key, storage.GetOptions{}, &example.Pod{}); !storage.IsNotFound(err) { + t.Errorf("Unexpected error on reading object: %v", err) + } +} + +func TestPreconditionalDeleteWithSuggestion(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + + key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}}) + + // First update, so originalPod is outdated. + updatedPod := &example.Pod{} + if err := store.GuaranteedUpdate(ctx, key, updatedPod, false, nil, + storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { + pod := obj.(*example.Pod) + pod.ObjectMeta.UID = "myUID" + return pod, nil + }), nil); err != nil { + t.Errorf("Unexpected failure during updated: %v", err) + } + + prec := storage.NewUIDPreconditions("myUID") + + out := &example.Pod{} + if err := store.Delete(ctx, key, out, prec, storage.ValidateAllObjectFunc, originalPod); err != nil { + t.Errorf("Unexpected failure during deletion: %v", err) + } + + if err := store.Get(ctx, key, storage.GetOptions{}, &example.Pod{}); !storage.IsNotFound(err) { + t.Errorf("Unexpected error on reading object: %v", err) + } +} + func TestGetToList(t *testing.T) { ctx, store, cluster := testSetup(t) defer cluster.Terminate(t)