mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-02 01:39:02 +00:00
Merge pull request #130922 from serathius/watchcache-delegate-state
Extend shouldDelegateList testing incorportating state of cacher
This commit is contained in:
@@ -42,6 +42,7 @@ import (
|
|||||||
"k8s.io/apiserver/pkg/endpoints/request"
|
"k8s.io/apiserver/pkg/endpoints/request"
|
||||||
"k8s.io/apiserver/pkg/features"
|
"k8s.io/apiserver/pkg/features"
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
|
"k8s.io/apiserver/pkg/storage/cacher/delegator"
|
||||||
"k8s.io/apiserver/pkg/storage/cacher/metrics"
|
"k8s.io/apiserver/pkg/storage/cacher/metrics"
|
||||||
"k8s.io/apiserver/pkg/storage/cacher/progress"
|
"k8s.io/apiserver/pkg/storage/cacher/progress"
|
||||||
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
|
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
|
||||||
@@ -1326,6 +1327,18 @@ func newErrWatcher(err error) *errWatcher {
|
|||||||
return watcher
|
return watcher
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Cacher) ShouldDelegateExactRV(resourceVersion string, recursive bool) (delegator.Result, error) {
|
||||||
|
return delegator.CacheWithoutSnapshots{}.ShouldDelegateExactRV(resourceVersion, recursive)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cacher) ShouldDelegateContinue(continueToken string, recursive bool) (delegator.Result, error) {
|
||||||
|
return delegator.CacheWithoutSnapshots{}.ShouldDelegateContinue(continueToken, recursive)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cacher) ShouldDelegateConsistentRead() (delegator.Result, error) {
|
||||||
|
return delegator.CacheWithoutSnapshots{}.ShouldDelegateConsistentRead()
|
||||||
|
}
|
||||||
|
|
||||||
// Implements watch.Interface.
|
// Implements watch.Interface.
|
||||||
func (c *errWatcher) ResultChan() <-chan watch.Event {
|
func (c *errWatcher) ResultChan() <-chan watch.Event {
|
||||||
return c.result
|
return c.result
|
||||||
|
@@ -48,7 +48,6 @@ import (
|
|||||||
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
||||||
"k8s.io/apiserver/pkg/features"
|
"k8s.io/apiserver/pkg/features"
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
"k8s.io/apiserver/pkg/storage/cacher/delegator"
|
|
||||||
"k8s.io/apiserver/pkg/storage/cacher/metrics"
|
"k8s.io/apiserver/pkg/storage/cacher/metrics"
|
||||||
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
|
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
|
||||||
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
|
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
|
||||||
@@ -245,12 +244,25 @@ func TestShouldDelegateList(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
oldRV := "80"
|
||||||
|
cacheRV := "100"
|
||||||
|
etcdRV := "120"
|
||||||
|
|
||||||
keyPrefix := "/pods/"
|
keyPrefix := "/pods/"
|
||||||
continueOnRev1, err := storage.EncodeContinue(keyPrefix+"foo", keyPrefix, 1)
|
continueOnOldRV, err := storage.EncodeContinue(keyPrefix+"foo", keyPrefix, int64(mustAtoi(oldRV)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
continueOnCacheRV, err := storage.EncodeContinue(keyPrefix+"foo", keyPrefix, int64(mustAtoi(cacheRV)))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
// Continue from different apiserver that is forward in RVs
|
||||||
|
continueOnEtcdRV, err := storage.EncodeContinue(keyPrefix+"foo", keyPrefix, int64(mustAtoi(etcdRV)))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
continueOnNegativeRV, err := storage.EncodeContinue(keyPrefix+"foo", keyPrefix, -1)
|
continueOnNegativeRV, err := storage.EncodeContinue(keyPrefix+"foo", keyPrefix, -1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
@@ -258,24 +270,26 @@ func TestShouldDelegateList(t *testing.T) {
|
|||||||
testCases := map[opts]bool{}
|
testCases := map[opts]bool{}
|
||||||
testCases[opts{}] = true
|
testCases[opts{}] = true
|
||||||
testCases[opts{Limit: 100}] = true
|
testCases[opts{Limit: 100}] = true
|
||||||
testCases[opts{Continue: continueOnRev1}] = true
|
|
||||||
testCases[opts{Limit: 100, Continue: continueOnRev1}] = true
|
|
||||||
testCases[opts{Continue: continueOnNegativeRV}] = true
|
|
||||||
testCases[opts{Limit: 100, Continue: continueOnNegativeRV}] = true
|
|
||||||
testCases[opts{ResourceVersion: "0"}] = false
|
testCases[opts{ResourceVersion: "0"}] = false
|
||||||
testCases[opts{ResourceVersion: "0", Limit: 100}] = false
|
testCases[opts{ResourceVersion: "0", Limit: 100}] = false
|
||||||
testCases[opts{ResourceVersion: "0", Continue: continueOnRev1}] = true
|
|
||||||
testCases[opts{ResourceVersion: "0", Limit: 100, Continue: continueOnRev1}] = true
|
|
||||||
testCases[opts{ResourceVersion: "0", Continue: continueOnNegativeRV}] = true
|
|
||||||
testCases[opts{ResourceVersion: "0", Limit: 100, Continue: continueOnNegativeRV}] = true
|
|
||||||
testCases[opts{ResourceVersion: "0", ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan}] = false
|
testCases[opts{ResourceVersion: "0", ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan}] = false
|
||||||
testCases[opts{ResourceVersion: "0", ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, Limit: 100}] = false
|
testCases[opts{ResourceVersion: "0", ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, Limit: 100}] = false
|
||||||
testCases[opts{ResourceVersion: "1"}] = false
|
// Continue
|
||||||
testCases[opts{ResourceVersion: "1", Limit: 100}] = true
|
for _, continueToken := range []string{continueOnOldRV, continueOnCacheRV, continueOnEtcdRV, continueOnNegativeRV} {
|
||||||
testCases[opts{ResourceVersion: "1", ResourceVersionMatch: metav1.ResourceVersionMatchExact}] = true
|
testCases[opts{Continue: continueToken}] = true
|
||||||
testCases[opts{ResourceVersion: "1", ResourceVersionMatch: metav1.ResourceVersionMatchExact, Limit: 100}] = true
|
testCases[opts{Limit: 100, Continue: continueToken}] = true
|
||||||
testCases[opts{ResourceVersion: "1", ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan}] = false
|
testCases[opts{ResourceVersion: "0", Continue: continueToken}] = true
|
||||||
testCases[opts{ResourceVersion: "1", ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, Limit: 100}] = false
|
testCases[opts{ResourceVersion: "0", Limit: 100, Continue: continueToken}] = true
|
||||||
|
}
|
||||||
|
// With RV
|
||||||
|
for _, rv := range []string{oldRV, cacheRV, etcdRV} {
|
||||||
|
testCases[opts{ResourceVersion: rv}] = false
|
||||||
|
testCases[opts{ResourceVersion: rv, Limit: 100}] = true
|
||||||
|
testCases[opts{ResourceVersion: rv, ResourceVersionMatch: metav1.ResourceVersionMatchExact}] = true
|
||||||
|
testCases[opts{ResourceVersion: rv, ResourceVersionMatch: metav1.ResourceVersionMatchExact, Limit: 100}] = true
|
||||||
|
testCases[opts{ResourceVersion: rv, ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan}] = false
|
||||||
|
testCases[opts{ResourceVersion: rv, ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, Limit: 100}] = false
|
||||||
|
}
|
||||||
|
|
||||||
// Bypass for most requests doesn't depend on Recursive
|
// Bypass for most requests doesn't depend on Recursive
|
||||||
for opts, expectBypass := range testCases {
|
for opts, expectBypass := range testCases {
|
||||||
@@ -283,14 +297,16 @@ func TestShouldDelegateList(t *testing.T) {
|
|||||||
testCases[opts] = expectBypass
|
testCases[opts] = expectBypass
|
||||||
}
|
}
|
||||||
// Continue is ignored on non recursive LIST
|
// Continue is ignored on non recursive LIST
|
||||||
testCases[opts{ResourceVersion: "1", Continue: continueOnRev1}] = true
|
for _, rv := range []string{oldRV, etcdRV} {
|
||||||
testCases[opts{ResourceVersion: "1", Continue: continueOnRev1, Limit: 100}] = true
|
for _, continueToken := range []string{continueOnOldRV, continueOnCacheRV, continueOnEtcdRV, continueOnNegativeRV} {
|
||||||
testCases[opts{ResourceVersion: "1", Continue: continueOnNegativeRV}] = true
|
testCases[opts{ResourceVersion: rv, Continue: continueToken}] = true
|
||||||
testCases[opts{ResourceVersion: "1", Continue: continueOnNegativeRV, Limit: 100}] = true
|
testCases[opts{ResourceVersion: rv, Continue: continueToken, Limit: 100}] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for _, rv := range []string{"", "0", "1"} {
|
for _, rv := range []string{"", "0", oldRV, etcdRV} {
|
||||||
for _, match := range []metav1.ResourceVersionMatch{"", metav1.ResourceVersionMatchExact, metav1.ResourceVersionMatchNotOlderThan} {
|
for _, match := range []metav1.ResourceVersionMatch{"", metav1.ResourceVersionMatchExact, metav1.ResourceVersionMatchNotOlderThan} {
|
||||||
for _, continueKey := range []string{"", continueOnRev1, continueOnNegativeRV} {
|
for _, continueKey := range []string{"", continueOnOldRV, continueOnCacheRV, continueOnEtcdRV, continueOnNegativeRV} {
|
||||||
for _, limit := range []int64{0, 100} {
|
for _, limit := range []int64{0, 100} {
|
||||||
for _, recursive := range []bool{true, false} {
|
for _, recursive := range []bool{true, false} {
|
||||||
opt := opts{
|
opt := opts{
|
||||||
@@ -335,7 +351,18 @@ func TestShouldDelegateList(t *testing.T) {
|
|||||||
expectBypass = bypass
|
expectBypass = bypass
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
result, err := shouldDelegateList(toStorageOpts(opt), delegator.CacheWithoutSnapshots{})
|
backingStorage := &dummyStorage{}
|
||||||
|
backingStorage.getListFn = func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error {
|
||||||
|
podList := listObj.(*example.PodList)
|
||||||
|
podList.ListMeta = metav1.ListMeta{ResourceVersion: cacheRV}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
cacher, _, err := newTestCacher(backingStorage)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
|
defer cacher.Stop()
|
||||||
|
result, err := shouldDelegateList(toStorageOpts(opt), cacher)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@@ -368,6 +395,14 @@ func TestShouldDelegateList(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func mustAtoi(s string) int {
|
||||||
|
value, err := strconv.Atoi(s)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
|
||||||
func TestConsistentReadFallback(t *testing.T) {
|
func TestConsistentReadFallback(t *testing.T) {
|
||||||
tcs := []struct {
|
tcs := []struct {
|
||||||
name string
|
name string
|
||||||
|
@@ -180,7 +180,7 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
result, err := shouldDelegateList(opts, delegator.CacheWithoutSnapshots{})
|
result, err := shouldDelegateList(opts, c.cacher)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user