From eba25cdbbcc5d35e707516194f64d8ed363c2773 Mon Sep 17 00:00:00 2001 From: Steve Kuznetsov Date: Wed, 23 Mar 2022 12:44:49 -0800 Subject: [PATCH 1/2] pkg/storage/etcd3: correctly validate resourceVersions In a number of tests, the underlying storage backend interaction will return the revision (logical clock underpinning the MVCC implementation) at the call-time of the RPC. Previously, the tests validated that this returned revision was exactly equal to some previously seen revision. This assertion is only true in systems where no other events are advancing the logical clock. For instance, when using a single etcd cluster as a shared fixture for these tests, the assertion is not valid any longer. By checking that the returned revision is no older than the previously seen revision, the validation logic is correct in all cases. Signed-off-by: Steve Kuznetsov --- .../apiserver/pkg/storage/etcd3/store_test.go | 20 ++++++--- .../pkg/storage/etcd3/watcher_test.go | 41 +++++++++++++++++-- 2 files changed, 52 insertions(+), 9 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index fedbcce3e41..1b0a0e436a0 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -1213,6 +1213,7 @@ func TestList(t *testing.T) { expectError bool expectRVTooLarge bool expectRV string + expectRVFunc func(string) error }{ { name: "rejects invalid resource version", @@ -1385,7 +1386,7 @@ func TestList(t *testing.T) { expectContinue: true, expectedRemainingItemCount: utilpointer.Int64Ptr(1), rv: "0", - expectRV: list.ResourceVersion, + expectRVFunc: resourceVersionNotOlderThan(list.ResourceVersion), }, { name: "test List with limit at resource version 0 match=NotOlderThan", @@ -1400,7 +1401,7 @@ func TestList(t *testing.T) { expectedRemainingItemCount: utilpointer.Int64Ptr(1), rv: "0", rvMatch: metav1.ResourceVersionMatchNotOlderThan, - expectRV: list.ResourceVersion, + expectRVFunc: resourceVersionNotOlderThan(list.ResourceVersion), }, { name: "test List with limit at resource version before first write and match=Exact", @@ -1612,6 +1613,11 @@ func TestList(t *testing.T) { t.Errorf("resourceVersion in list response want=%s, got=%s", tt.expectRV, out.ResourceVersion) } } + if tt.expectRVFunc != nil { + if err := tt.expectRVFunc(out.ResourceVersion); err != nil { + t.Errorf("resourceVersion in list response invalid: %v", err) + } + } if len(tt.expectedOut) != len(out.Items) { t.Fatalf("length of list want=%d, got=%d", len(tt.expectedOut), len(out.Items)) } @@ -2106,11 +2112,13 @@ func TestListInconsistentContinuation(t *testing.T) { if len(out.Continue) == 0 { t.Fatalf("No continuation token set") } + validateResourceVersion := resourceVersionNotOlderThan(lastRVString) expectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj}, out.Items) - if out.ResourceVersion != lastRVString { - t.Fatalf("Expected list resource version to be %s, got %s", lastRVString, out.ResourceVersion) + if err := validateResourceVersion(out.ResourceVersion); err != nil { + t.Fatal(err) } continueFromThirdItem := out.Continue + resolvedResourceVersionFromThirdItem := out.ResourceVersion out = &example.PodList{} options = storage.ListOptions{ ResourceVersion: "0", @@ -2124,8 +2132,8 @@ func TestListInconsistentContinuation(t *testing.T) { t.Fatalf("Unexpected continuation token set") } expectNoDiff(t, "incorrect third page", []example.Pod{*preset[2].storedObj}, out.Items) - if out.ResourceVersion != lastRVString { - t.Fatalf("Expected list resource version to be %s, got %s", lastRVString, out.ResourceVersion) + if out.ResourceVersion != resolvedResourceVersionFromThirdItem { + t.Fatalf("Expected list resource version to be %s, got %s", resolvedResourceVersionFromThirdItem, out.ResourceVersion) } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index af6d0ba02cf..27f281808e7 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -353,8 +353,13 @@ func TestProgressNotify(t *testing.T) { if err != nil { t.Fatalf("Watch failed: %v", err) } - result := &example.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: out.ResourceVersion}} - testCheckResult(t, watch.Bookmark, w, result) + testCheckResultFunc(t, watch.Bookmark, w, func(object runtime.Object) error { + pod, ok := object.(*example.Pod) + if !ok { + return fmt.Errorf("got %T, not *example.Pod", object) + } + return resourceVersionNotOlderThan(out.ResourceVersion)(pod.ResourceVersion) + }) } type testWatchStruct struct { @@ -382,14 +387,44 @@ func testCheckEventType(t *testing.T, expectEventType watch.EventType, w watch.I } } +// resourceVersionNotOlderThan returns a function to validate resource versions. Resource versions +// referring to points in logical time before the sentinel generate an error. All logical times as +// new as the sentinel or newer generate no error. +func resourceVersionNotOlderThan(sentinel string) func(string) error { + return func(resourceVersion string) error { + objectVersioner := APIObjectVersioner{} + actualRV, err := objectVersioner.ParseResourceVersion(resourceVersion) + if err != nil { + return err + } + expectedRV, err := objectVersioner.ParseResourceVersion(sentinel) + if err != nil { + return err + } + if actualRV < expectedRV { + return fmt.Errorf("expected a resourceVersion no smaller than than %d, but got %d", expectedRV, actualRV) + } + return nil + } +} + func testCheckResult(t *testing.T, expectEventType watch.EventType, w watch.Interface, expectObj *example.Pod) { + testCheckResultFunc(t, expectEventType, w, func(object runtime.Object) error { + expectNoDiff(t, "incorrect object", expectObj, object) + return nil + }) +} + +func testCheckResultFunc(t *testing.T, expectEventType watch.EventType, w watch.Interface, check func(object runtime.Object) error) { select { case res := <-w.ResultChan(): if res.Type != expectEventType { t.Errorf("event type want=%v, get=%v", expectEventType, res.Type) return } - expectNoDiff(t, "incorrect obj", expectObj, res.Object) + if err := check(res.Object); err != nil { + t.Error(err) + } case <-time.After(wait.ForeverTestTimeout): t.Errorf("time out after waiting %v on ResultChan", wait.ForeverTestTimeout) } From ed5fd905f2b42e9919d99c40a1cb25014f0a7f89 Mon Sep 17 00:00:00 2001 From: Steve Kuznetsov Date: Wed, 13 Apr 2022 10:57:02 -0700 Subject: [PATCH 2/2] pkg/storage/etcd3: validate revision invariant We must ensure that we notice if the etcd behavior on linearized reads changes. Signed-off-by: Steve Kuznetsov --- .../pkg/storage/etcd3/linearized_read_test.go | 64 +++++++++++++++++++ .../pkg/storage/etcd3/watcher_test.go | 18 +++++- 2 files changed, 81 insertions(+), 1 deletion(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/storage/etcd3/linearized_read_test.go diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/linearized_read_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/linearized_read_test.go new file mode 100644 index 00000000000..bb1b9df7818 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/linearized_read_test.go @@ -0,0 +1,64 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd3 + +import ( + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apiserver/pkg/apis/example" + "k8s.io/apiserver/pkg/storage" +) + +func TestLinearizedReadRevisionInvariant(t *testing.T) { + // The etcd documentation [1] states that "linearized requests must go through the Raft consensus process." + // A full round of Raft consensus adds a new item to the Raft log, some of which is surfaced by etcd as a + // higher store revision in the response header. Kubernetes exposes this header revision in e.g. List calls, + // so it is ultimately client-facing. By default, all the requests that our *etcd3.store{} issues are + // linearized. However, this also includes *read* requests, and we would not expect non-mutating requests + // against etcd to, by "go[ing] through the Raft consensus process," result in a higher resource version on + // List calls. Today, the mechanism etcd uses to increment the store revision ensures that linearized reads + // do *not* bump the key-value store revision. This test exists to ensure that we notice if this implementation + // detail ever changes. + // [1] https://etcd.io/docs/v3.5/learning/api_guarantees/#isolation-level-and-consistency-of-replicas + ctx, store, etcdClient := testSetup(t) + + key := "/testkey" + out := &example.Pod{} + obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", SelfLink: "testlink"}} + + if err := store.Create(ctx, key, obj, out, 0); err != nil { + t.Fatalf("Set failed: %v", err) + } + originalRevision := out.ResourceVersion + + for i := 0; i < 5; i++ { + if _, err := etcdClient.KV.Get(ctx, key); err != nil { // this is by default linearizable, the only option the client library exposes is WithSerializable() to make it *not* a linearized read + t.Fatalf("failed to get key: %v", err) + } + } + + list := &example.PodList{} + if err := store.GetList(ctx, "/", storage.ListOptions{Predicate: storage.Everything, Recursive: true}, list); err != nil { + t.Errorf("Unexpected List error: %v", err) + } + finalRevision := list.ResourceVersion + + if originalRevision != finalRevision { + t.Fatalf("original revision (%s) did not match final revision after linearized reads (%s)", originalRevision, finalRevision) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index 27f281808e7..4b5a0ff119b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -343,6 +343,7 @@ func TestProgressNotify(t *testing.T) { if err := store.Create(ctx, key, input, out, 0); err != nil { t.Fatalf("Create failed: %v", err) } + validateResourceVersion := resourceVersionNotOlderThan(out.ResourceVersion) opts := storage.ListOptions{ ResourceVersion: out.ResourceVersion, @@ -353,12 +354,27 @@ func TestProgressNotify(t *testing.T) { if err != nil { t.Fatalf("Watch failed: %v", err) } + + // when we send a bookmark event, the client expects the event to contain an + // object of the correct type, but with no fields set other than the resourceVersion testCheckResultFunc(t, watch.Bookmark, w, func(object runtime.Object) error { + // first, check that we have the correct resource version + obj, ok := object.(metav1.Object) + if !ok { + return fmt.Errorf("got %T, not metav1.Object", object) + } + if err := validateResourceVersion(obj.GetResourceVersion()); err != nil { + return err + } + + // then, check that we have the right type and content pod, ok := object.(*example.Pod) if !ok { return fmt.Errorf("got %T, not *example.Pod", object) } - return resourceVersionNotOlderThan(out.ResourceVersion)(pod.ResourceVersion) + pod.ResourceVersion = "" + expectNoDiff(t, "bookmark event should contain an object with no fields set other than resourceVersion", newPod(), pod) + return nil }) }