mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Merge pull request #108938 from stevekuznetsov/skuznets/more-correct-rv
pkg/storage/etcd3: correctly validate resourceVersions
This commit is contained in:
commit
2845122e14
@ -0,0 +1,64 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2022 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package etcd3
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apiserver/pkg/apis/example"
|
||||||
|
"k8s.io/apiserver/pkg/storage"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestLinearizedReadRevisionInvariant(t *testing.T) {
|
||||||
|
// The etcd documentation [1] states that "linearized requests must go through the Raft consensus process."
|
||||||
|
// A full round of Raft consensus adds a new item to the Raft log, some of which is surfaced by etcd as a
|
||||||
|
// higher store revision in the response header. Kubernetes exposes this header revision in e.g. List calls,
|
||||||
|
// so it is ultimately client-facing. By default, all the requests that our *etcd3.store{} issues are
|
||||||
|
// linearized. However, this also includes *read* requests, and we would not expect non-mutating requests
|
||||||
|
// against etcd to, by "go[ing] through the Raft consensus process," result in a higher resource version on
|
||||||
|
// List calls. Today, the mechanism etcd uses to increment the store revision ensures that linearized reads
|
||||||
|
// do *not* bump the key-value store revision. This test exists to ensure that we notice if this implementation
|
||||||
|
// detail ever changes.
|
||||||
|
// [1] https://etcd.io/docs/v3.5/learning/api_guarantees/#isolation-level-and-consistency-of-replicas
|
||||||
|
ctx, store, etcdClient := testSetup(t)
|
||||||
|
|
||||||
|
key := "/testkey"
|
||||||
|
out := &example.Pod{}
|
||||||
|
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", SelfLink: "testlink"}}
|
||||||
|
|
||||||
|
if err := store.Create(ctx, key, obj, out, 0); err != nil {
|
||||||
|
t.Fatalf("Set failed: %v", err)
|
||||||
|
}
|
||||||
|
originalRevision := out.ResourceVersion
|
||||||
|
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
if _, err := etcdClient.KV.Get(ctx, key); err != nil { // this is by default linearizable, the only option the client library exposes is WithSerializable() to make it *not* a linearized read
|
||||||
|
t.Fatalf("failed to get key: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
list := &example.PodList{}
|
||||||
|
if err := store.GetList(ctx, "/", storage.ListOptions{Predicate: storage.Everything, Recursive: true}, list); err != nil {
|
||||||
|
t.Errorf("Unexpected List error: %v", err)
|
||||||
|
}
|
||||||
|
finalRevision := list.ResourceVersion
|
||||||
|
|
||||||
|
if originalRevision != finalRevision {
|
||||||
|
t.Fatalf("original revision (%s) did not match final revision after linearized reads (%s)", originalRevision, finalRevision)
|
||||||
|
}
|
||||||
|
}
|
@ -1213,6 +1213,7 @@ func TestList(t *testing.T) {
|
|||||||
expectError bool
|
expectError bool
|
||||||
expectRVTooLarge bool
|
expectRVTooLarge bool
|
||||||
expectRV string
|
expectRV string
|
||||||
|
expectRVFunc func(string) error
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "rejects invalid resource version",
|
name: "rejects invalid resource version",
|
||||||
@ -1385,7 +1386,7 @@ func TestList(t *testing.T) {
|
|||||||
expectContinue: true,
|
expectContinue: true,
|
||||||
expectedRemainingItemCount: utilpointer.Int64Ptr(1),
|
expectedRemainingItemCount: utilpointer.Int64Ptr(1),
|
||||||
rv: "0",
|
rv: "0",
|
||||||
expectRV: list.ResourceVersion,
|
expectRVFunc: resourceVersionNotOlderThan(list.ResourceVersion),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "test List with limit at resource version 0 match=NotOlderThan",
|
name: "test List with limit at resource version 0 match=NotOlderThan",
|
||||||
@ -1400,7 +1401,7 @@ func TestList(t *testing.T) {
|
|||||||
expectedRemainingItemCount: utilpointer.Int64Ptr(1),
|
expectedRemainingItemCount: utilpointer.Int64Ptr(1),
|
||||||
rv: "0",
|
rv: "0",
|
||||||
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
|
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
|
||||||
expectRV: list.ResourceVersion,
|
expectRVFunc: resourceVersionNotOlderThan(list.ResourceVersion),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "test List with limit at resource version before first write and match=Exact",
|
name: "test List with limit at resource version before first write and match=Exact",
|
||||||
@ -1612,6 +1613,11 @@ func TestList(t *testing.T) {
|
|||||||
t.Errorf("resourceVersion in list response want=%s, got=%s", tt.expectRV, out.ResourceVersion)
|
t.Errorf("resourceVersion in list response want=%s, got=%s", tt.expectRV, out.ResourceVersion)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if tt.expectRVFunc != nil {
|
||||||
|
if err := tt.expectRVFunc(out.ResourceVersion); err != nil {
|
||||||
|
t.Errorf("resourceVersion in list response invalid: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
if len(tt.expectedOut) != len(out.Items) {
|
if len(tt.expectedOut) != len(out.Items) {
|
||||||
t.Fatalf("length of list want=%d, got=%d", len(tt.expectedOut), len(out.Items))
|
t.Fatalf("length of list want=%d, got=%d", len(tt.expectedOut), len(out.Items))
|
||||||
}
|
}
|
||||||
@ -2106,11 +2112,13 @@ func TestListInconsistentContinuation(t *testing.T) {
|
|||||||
if len(out.Continue) == 0 {
|
if len(out.Continue) == 0 {
|
||||||
t.Fatalf("No continuation token set")
|
t.Fatalf("No continuation token set")
|
||||||
}
|
}
|
||||||
|
validateResourceVersion := resourceVersionNotOlderThan(lastRVString)
|
||||||
expectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj}, out.Items)
|
expectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj}, out.Items)
|
||||||
if out.ResourceVersion != lastRVString {
|
if err := validateResourceVersion(out.ResourceVersion); err != nil {
|
||||||
t.Fatalf("Expected list resource version to be %s, got %s", lastRVString, out.ResourceVersion)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
continueFromThirdItem := out.Continue
|
continueFromThirdItem := out.Continue
|
||||||
|
resolvedResourceVersionFromThirdItem := out.ResourceVersion
|
||||||
out = &example.PodList{}
|
out = &example.PodList{}
|
||||||
options = storage.ListOptions{
|
options = storage.ListOptions{
|
||||||
ResourceVersion: "0",
|
ResourceVersion: "0",
|
||||||
@ -2124,8 +2132,8 @@ func TestListInconsistentContinuation(t *testing.T) {
|
|||||||
t.Fatalf("Unexpected continuation token set")
|
t.Fatalf("Unexpected continuation token set")
|
||||||
}
|
}
|
||||||
expectNoDiff(t, "incorrect third page", []example.Pod{*preset[2].storedObj}, out.Items)
|
expectNoDiff(t, "incorrect third page", []example.Pod{*preset[2].storedObj}, out.Items)
|
||||||
if out.ResourceVersion != lastRVString {
|
if out.ResourceVersion != resolvedResourceVersionFromThirdItem {
|
||||||
t.Fatalf("Expected list resource version to be %s, got %s", lastRVString, out.ResourceVersion)
|
t.Fatalf("Expected list resource version to be %s, got %s", resolvedResourceVersionFromThirdItem, out.ResourceVersion)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -372,6 +372,7 @@ func TestProgressNotify(t *testing.T) {
|
|||||||
if err := store.Create(ctx, key, input, out, 0); err != nil {
|
if err := store.Create(ctx, key, input, out, 0); err != nil {
|
||||||
t.Fatalf("Create failed: %v", err)
|
t.Fatalf("Create failed: %v", err)
|
||||||
}
|
}
|
||||||
|
validateResourceVersion := resourceVersionNotOlderThan(out.ResourceVersion)
|
||||||
|
|
||||||
opts := storage.ListOptions{
|
opts := storage.ListOptions{
|
||||||
ResourceVersion: out.ResourceVersion,
|
ResourceVersion: out.ResourceVersion,
|
||||||
@ -382,8 +383,28 @@ func TestProgressNotify(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Watch failed: %v", err)
|
t.Fatalf("Watch failed: %v", err)
|
||||||
}
|
}
|
||||||
result := &example.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: out.ResourceVersion}}
|
|
||||||
testCheckResult(t, watch.Bookmark, w, result)
|
// when we send a bookmark event, the client expects the event to contain an
|
||||||
|
// object of the correct type, but with no fields set other than the resourceVersion
|
||||||
|
testCheckResultFunc(t, watch.Bookmark, w, func(object runtime.Object) error {
|
||||||
|
// first, check that we have the correct resource version
|
||||||
|
obj, ok := object.(metav1.Object)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("got %T, not metav1.Object", object)
|
||||||
|
}
|
||||||
|
if err := validateResourceVersion(obj.GetResourceVersion()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// then, check that we have the right type and content
|
||||||
|
pod, ok := object.(*example.Pod)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("got %T, not *example.Pod", object)
|
||||||
|
}
|
||||||
|
pod.ResourceVersion = ""
|
||||||
|
expectNoDiff(t, "bookmark event should contain an object with no fields set other than resourceVersion", newPod(), pod)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
type testWatchStruct struct {
|
type testWatchStruct struct {
|
||||||
@ -411,14 +432,44 @@ func testCheckEventType(t *testing.T, expectEventType watch.EventType, w watch.I
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// resourceVersionNotOlderThan returns a function to validate resource versions. Resource versions
|
||||||
|
// referring to points in logical time before the sentinel generate an error. All logical times as
|
||||||
|
// new as the sentinel or newer generate no error.
|
||||||
|
func resourceVersionNotOlderThan(sentinel string) func(string) error {
|
||||||
|
return func(resourceVersion string) error {
|
||||||
|
objectVersioner := APIObjectVersioner{}
|
||||||
|
actualRV, err := objectVersioner.ParseResourceVersion(resourceVersion)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
expectedRV, err := objectVersioner.ParseResourceVersion(sentinel)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if actualRV < expectedRV {
|
||||||
|
return fmt.Errorf("expected a resourceVersion no smaller than than %d, but got %d", expectedRV, actualRV)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func testCheckResult(t *testing.T, expectEventType watch.EventType, w watch.Interface, expectObj *example.Pod) {
|
func testCheckResult(t *testing.T, expectEventType watch.EventType, w watch.Interface, expectObj *example.Pod) {
|
||||||
|
testCheckResultFunc(t, expectEventType, w, func(object runtime.Object) error {
|
||||||
|
expectNoDiff(t, "incorrect object", expectObj, object)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func testCheckResultFunc(t *testing.T, expectEventType watch.EventType, w watch.Interface, check func(object runtime.Object) error) {
|
||||||
select {
|
select {
|
||||||
case res := <-w.ResultChan():
|
case res := <-w.ResultChan():
|
||||||
if res.Type != expectEventType {
|
if res.Type != expectEventType {
|
||||||
t.Errorf("event type want=%v, get=%v", expectEventType, res.Type)
|
t.Errorf("event type want=%v, get=%v", expectEventType, res.Type)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
expectNoDiff(t, "incorrect obj", expectObj, res.Object)
|
if err := check(res.Object); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
case <-time.After(wait.ForeverTestTimeout):
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
t.Errorf("time out after waiting %v on ResultChan", wait.ForeverTestTimeout)
|
t.Errorf("time out after waiting %v on ResultChan", wait.ForeverTestTimeout)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user