diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/linearized_read_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/linearized_read_test.go new file mode 100644 index 00000000000..bb1b9df7818 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/linearized_read_test.go @@ -0,0 +1,64 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd3 + +import ( + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apiserver/pkg/apis/example" + "k8s.io/apiserver/pkg/storage" +) + +func TestLinearizedReadRevisionInvariant(t *testing.T) { + // The etcd documentation [1] states that "linearized requests must go through the Raft consensus process." + // A full round of Raft consensus adds a new item to the Raft log, some of which is surfaced by etcd as a + // higher store revision in the response header. Kubernetes exposes this header revision in e.g. List calls, + // so it is ultimately client-facing. By default, all the requests that our *etcd3.store{} issues are + // linearized. However, this also includes *read* requests, and we would not expect non-mutating requests + // against etcd to, by "go[ing] through the Raft consensus process," result in a higher resource version on + // List calls. Today, the mechanism etcd uses to increment the store revision ensures that linearized reads + // do *not* bump the key-value store revision. This test exists to ensure that we notice if this implementation + // detail ever changes. + // [1] https://etcd.io/docs/v3.5/learning/api_guarantees/#isolation-level-and-consistency-of-replicas + ctx, store, etcdClient := testSetup(t) + + key := "/testkey" + out := &example.Pod{} + obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", SelfLink: "testlink"}} + + if err := store.Create(ctx, key, obj, out, 0); err != nil { + t.Fatalf("Set failed: %v", err) + } + originalRevision := out.ResourceVersion + + for i := 0; i < 5; i++ { + if _, err := etcdClient.KV.Get(ctx, key); err != nil { // this is by default linearizable, the only option the client library exposes is WithSerializable() to make it *not* a linearized read + t.Fatalf("failed to get key: %v", err) + } + } + + list := &example.PodList{} + if err := store.GetList(ctx, "/", storage.ListOptions{Predicate: storage.Everything, Recursive: true}, list); err != nil { + t.Errorf("Unexpected List error: %v", err) + } + finalRevision := list.ResourceVersion + + if originalRevision != finalRevision { + t.Fatalf("original revision (%s) did not match final revision after linearized reads (%s)", originalRevision, finalRevision) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index fedbcce3e41..1b0a0e436a0 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -1213,6 +1213,7 @@ func TestList(t *testing.T) { expectError bool expectRVTooLarge bool expectRV string + expectRVFunc func(string) error }{ { name: "rejects invalid resource version", @@ -1385,7 +1386,7 @@ func TestList(t *testing.T) { expectContinue: true, expectedRemainingItemCount: utilpointer.Int64Ptr(1), rv: "0", - expectRV: list.ResourceVersion, + expectRVFunc: resourceVersionNotOlderThan(list.ResourceVersion), }, { name: "test List with limit at resource version 0 match=NotOlderThan", @@ -1400,7 +1401,7 @@ func TestList(t *testing.T) { expectedRemainingItemCount: utilpointer.Int64Ptr(1), rv: "0", rvMatch: metav1.ResourceVersionMatchNotOlderThan, - expectRV: list.ResourceVersion, + expectRVFunc: resourceVersionNotOlderThan(list.ResourceVersion), }, { name: "test List with limit at resource version before first write and match=Exact", @@ -1612,6 +1613,11 @@ func TestList(t *testing.T) { t.Errorf("resourceVersion in list response want=%s, got=%s", tt.expectRV, out.ResourceVersion) } } + if tt.expectRVFunc != nil { + if err := tt.expectRVFunc(out.ResourceVersion); err != nil { + t.Errorf("resourceVersion in list response invalid: %v", err) + } + } if len(tt.expectedOut) != len(out.Items) { t.Fatalf("length of list want=%d, got=%d", len(tt.expectedOut), len(out.Items)) } @@ -2106,11 +2112,13 @@ func TestListInconsistentContinuation(t *testing.T) { if len(out.Continue) == 0 { t.Fatalf("No continuation token set") } + validateResourceVersion := resourceVersionNotOlderThan(lastRVString) expectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj}, out.Items) - if out.ResourceVersion != lastRVString { - t.Fatalf("Expected list resource version to be %s, got %s", lastRVString, out.ResourceVersion) + if err := validateResourceVersion(out.ResourceVersion); err != nil { + t.Fatal(err) } continueFromThirdItem := out.Continue + resolvedResourceVersionFromThirdItem := out.ResourceVersion out = &example.PodList{} options = storage.ListOptions{ ResourceVersion: "0", @@ -2124,8 +2132,8 @@ func TestListInconsistentContinuation(t *testing.T) { t.Fatalf("Unexpected continuation token set") } expectNoDiff(t, "incorrect third page", []example.Pod{*preset[2].storedObj}, out.Items) - if out.ResourceVersion != lastRVString { - t.Fatalf("Expected list resource version to be %s, got %s", lastRVString, out.ResourceVersion) + if out.ResourceVersion != resolvedResourceVersionFromThirdItem { + t.Fatalf("Expected list resource version to be %s, got %s", resolvedResourceVersionFromThirdItem, out.ResourceVersion) } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index eb3d76fc379..e656f2447c0 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -372,6 +372,7 @@ func TestProgressNotify(t *testing.T) { if err := store.Create(ctx, key, input, out, 0); err != nil { t.Fatalf("Create failed: %v", err) } + validateResourceVersion := resourceVersionNotOlderThan(out.ResourceVersion) opts := storage.ListOptions{ ResourceVersion: out.ResourceVersion, @@ -382,8 +383,28 @@ func TestProgressNotify(t *testing.T) { if err != nil { t.Fatalf("Watch failed: %v", err) } - result := &example.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: out.ResourceVersion}} - testCheckResult(t, watch.Bookmark, w, result) + + // when we send a bookmark event, the client expects the event to contain an + // object of the correct type, but with no fields set other than the resourceVersion + testCheckResultFunc(t, watch.Bookmark, w, func(object runtime.Object) error { + // first, check that we have the correct resource version + obj, ok := object.(metav1.Object) + if !ok { + return fmt.Errorf("got %T, not metav1.Object", object) + } + if err := validateResourceVersion(obj.GetResourceVersion()); err != nil { + return err + } + + // then, check that we have the right type and content + pod, ok := object.(*example.Pod) + if !ok { + return fmt.Errorf("got %T, not *example.Pod", object) + } + pod.ResourceVersion = "" + expectNoDiff(t, "bookmark event should contain an object with no fields set other than resourceVersion", newPod(), pod) + return nil + }) } type testWatchStruct struct { @@ -411,14 +432,44 @@ func testCheckEventType(t *testing.T, expectEventType watch.EventType, w watch.I } } +// resourceVersionNotOlderThan returns a function to validate resource versions. Resource versions +// referring to points in logical time before the sentinel generate an error. All logical times as +// new as the sentinel or newer generate no error. +func resourceVersionNotOlderThan(sentinel string) func(string) error { + return func(resourceVersion string) error { + objectVersioner := APIObjectVersioner{} + actualRV, err := objectVersioner.ParseResourceVersion(resourceVersion) + if err != nil { + return err + } + expectedRV, err := objectVersioner.ParseResourceVersion(sentinel) + if err != nil { + return err + } + if actualRV < expectedRV { + return fmt.Errorf("expected a resourceVersion no smaller than than %d, but got %d", expectedRV, actualRV) + } + return nil + } +} + func testCheckResult(t *testing.T, expectEventType watch.EventType, w watch.Interface, expectObj *example.Pod) { + testCheckResultFunc(t, expectEventType, w, func(object runtime.Object) error { + expectNoDiff(t, "incorrect object", expectObj, object) + return nil + }) +} + +func testCheckResultFunc(t *testing.T, expectEventType watch.EventType, w watch.Interface, check func(object runtime.Object) error) { select { case res := <-w.ResultChan(): if res.Type != expectEventType { t.Errorf("event type want=%v, get=%v", expectEventType, res.Type) return } - expectNoDiff(t, "incorrect obj", expectObj, res.Object) + if err := check(res.Object); err != nil { + t.Error(err) + } case <-time.After(wait.ForeverTestTimeout): t.Errorf("time out after waiting %v on ResultChan", wait.ForeverTestTimeout) }