From bbcf5e38776f2b18026539a0fbcf3aa505386c1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Wed, 26 Oct 2022 21:37:19 +0200 Subject: [PATCH] Fix setting resource version on deletion --- .../apiserver/pkg/storage/etcd3/store.go | 10 +++- .../pkg/storage/etcd3/watcher_test.go | 60 +++++-------------- .../pkg/storage/testing/store_tests.go | 24 ++++++-- 3 files changed, 43 insertions(+), 51 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index f4b66858908..a3f9d154741 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -330,7 +330,15 @@ func (s *store) conditionalDelete( origStateIsCurrent = true continue } - return decode(s.codec, s.versioner, origState.data, out, origState.rev) + + if len(txnResp.Responses) == 0 || txnResp.Responses[0].GetResponseDeleteRange() == nil { + return errors.New(fmt.Sprintf("invalid DeleteRange response: %v", txnResp.Responses)) + } + deleteResp := txnResp.Responses[0].GetResponseDeleteRange() + if deleteResp.Header == nil { + return errors.New("invalid DeleteRange response - nil header") + } + return decode(s.codec, s.versioner, origState.data, out, deleteResp.Header.Revision) } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index c00321ce7ef..2487d407503 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -23,8 +23,6 @@ import ( "testing" "time" - "go.etcd.io/etcd/api/v3/mvccpb" - clientv3 "go.etcd.io/etcd/client/v3" storagetesting "k8s.io/apiserver/pkg/storage/testing" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -72,7 +70,7 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) { origCtx, store, _ := testSetup(t) ctx, cancel := context.WithCancel(origCtx) w := store.watcher.createWatchChan(ctx, "/abc", 0, false, false, newTestTransformer(), storage.Everything) - // make resutlChan and errChan blocking to ensure ordering. + // make resultChan and errChan blocking to ensure ordering. w.resultChan = make(chan watch.Event) w.errChan = make(chan error) // The event flow goes like: @@ -91,57 +89,31 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) { } func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) { - ctx, store, client := testSetup(t) + ctx, store, _ := testSetup(t) + key, storedObj := storagetesting.TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) - w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything}) + watchCtx, _ := context.WithTimeout(ctx, wait.ForeverTestTimeout) + w, err := store.Watch(watchCtx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything}) if err != nil { t.Fatalf("Watch failed: %v", err) } - rv, err := storage.APIObjectVersioner{}.ObjectResourceVersion(storedObj) - if err != nil { - t.Fatalf("failed to parse resourceVersion on stored object: %v", err) - } - etcdW := client.Watch(ctx, key, clientv3.WithRev(int64(rv))) - if err := store.Delete(ctx, key, &example.Pod{}, &storage.Preconditions{}, storage.ValidateAllObjectFunc, nil); err != nil { + deletedObj := &example.Pod{} + if err := store.Delete(ctx, key, deletedObj, &storage.Preconditions{}, storage.ValidateAllObjectFunc, nil); err != nil { t.Fatalf("Delete failed: %v", err) } - var e watch.Event - watchCtx, _ := context.WithTimeout(ctx, wait.ForeverTestTimeout) + // Verify that ResourceVersion has changed on deletion. + if storedObj.ResourceVersion == deletedObj.ResourceVersion { + t.Fatalf("ResourceVersion didn't changed on deletion: %s", deletedObj.ResourceVersion) + } + select { - case e = <-w.ResultChan(): - case <-watchCtx.Done(): - t.Fatalf("timed out waiting for watch event") - } - deletedRV, err := deletedRevision(watchCtx, etcdW) - if err != nil { - t.Fatalf("did not see delete event in raw watch: %v", err) - } - watchedDeleteObj := e.Object.(*example.Pod) - - watchedDeleteRev, err := store.versioner.ParseResourceVersion(watchedDeleteObj.ResourceVersion) - if err != nil { - t.Fatalf("ParseWatchResourceVersion failed: %v", err) - } - if int64(watchedDeleteRev) != deletedRV { - t.Errorf("Object from delete event have version: %v, should be the same as etcd delete's mod rev: %d", - watchedDeleteRev, deletedRV) - } -} - -func deletedRevision(ctx context.Context, watch <-chan clientv3.WatchResponse) (int64, error) { - for { - select { - case <-ctx.Done(): - return 0, ctx.Err() - case wres := <-watch: - for _, evt := range wres.Events { - if evt.Type == mvccpb.DELETE && evt.Kv != nil { - return evt.Kv.ModRevision, nil - } - } + case event := <-w.ResultChan(): + watchedDeleteObj := event.Object.(*example.Pod) + if e, a := deletedObj.ResourceVersion, watchedDeleteObj.ResourceVersion; e != a { + t.Errorf("Unexpected resource version: %v, expected %v", a, e) } } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go index e6d98a0ecc8..92e37263037 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go @@ -225,14 +225,20 @@ func RunTestUnconditionalDelete(ctx context.Context, t *testing.T, store storage err := store.Delete(ctx, tt.key, out, nil, storage.ValidateAllObjectFunc, nil) if tt.expectNotFoundErr { if err == nil || !storage.IsNotFound(err) { - t.Errorf("%s: expecting not found error, but get: %s", tt.name, err) + t.Errorf("expecting not found error, but get: %s", err) } return } if err != nil { - t.Fatalf("%s: Delete failed: %v", tt.name, err) + t.Fatalf("Delete failed: %v", err) } - ExpectNoDiff(t, fmt.Sprintf("%s: incorrect pod:", tt.name), tt.expectedObj, out) + // We expect the resource version of the returned object to be + // updated compared to the last existing object. + if storedObj.ResourceVersion == out.ResourceVersion { + t.Errorf("expecting resource version to be updated, but get: %s", out.ResourceVersion) + } + out.ResourceVersion = storedObj.ResourceVersion + ExpectNoDiff(t, "incorrect pod:", tt.expectedObj, out) }) } } @@ -260,14 +266,20 @@ func RunTestConditionalDelete(ctx context.Context, t *testing.T, store storage.I err := store.Delete(ctx, key, out, tt.precondition, storage.ValidateAllObjectFunc, nil) if tt.expectInvalidObjErr { if err == nil || !storage.IsInvalidObj(err) { - t.Errorf("%s: expecting invalid UID error, but get: %s", tt.name, err) + t.Errorf("expecting invalid UID error, but get: %s", err) } return } if err != nil { - t.Fatalf("%s: Delete failed: %v", tt.name, err) + t.Fatalf("Delete failed: %v", err) } - ExpectNoDiff(t, fmt.Sprintf("%s: incorrect pod", tt.name), storedObj, out) + // We expect the resource version of the returned object to be + // updated compared to the last existing object. + if storedObj.ResourceVersion == out.ResourceVersion { + t.Errorf("expecting resource version to be updated, but get: %s", out.ResourceVersion) + } + out.ResourceVersion = storedObj.ResourceVersion + ExpectNoDiff(t, "incorrect pod:", storedObj, out) key, storedObj = TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}) }) }