diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 430f4c01bd5..ef7f58fec98 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -42,6 +42,7 @@ import ( "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/cacher/delegator" "k8s.io/apiserver/pkg/storage/cacher/metrics" "k8s.io/apiserver/pkg/storage/cacher/progress" etcdfeature "k8s.io/apiserver/pkg/storage/feature" @@ -1326,6 +1327,18 @@ func newErrWatcher(err error) *errWatcher { return watcher } +func (c *Cacher) ShouldDelegateExactRV(resourceVersion string, recursive bool) (delegator.Result, error) { + return delegator.CacheWithoutSnapshots{}.ShouldDelegateExactRV(resourceVersion, recursive) +} + +func (c *Cacher) ShouldDelegateContinue(continueToken string, recursive bool) (delegator.Result, error) { + return delegator.CacheWithoutSnapshots{}.ShouldDelegateContinue(continueToken, recursive) +} + +func (c *Cacher) ShouldDelegateConsistentRead() (delegator.Result, error) { + return delegator.CacheWithoutSnapshots{}.ShouldDelegateConsistentRead() +} + // Implements watch.Interface. func (c *errWatcher) ResultChan() <-chan watch.Event { return c.result diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 1d58dce8b4b..ca53fde6a05 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -48,7 +48,6 @@ import ( examplev1 "k8s.io/apiserver/pkg/apis/example/v1" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" - "k8s.io/apiserver/pkg/storage/cacher/delegator" "k8s.io/apiserver/pkg/storage/cacher/metrics" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" etcdfeature "k8s.io/apiserver/pkg/storage/feature" @@ -245,12 +244,25 @@ func TestShouldDelegateList(t *testing.T) { }, } } + oldRV := "80" + cacheRV := "100" + etcdRV := "120" keyPrefix := "/pods/" - continueOnRev1, err := storage.EncodeContinue(keyPrefix+"foo", keyPrefix, 1) + continueOnOldRV, err := storage.EncodeContinue(keyPrefix+"foo", keyPrefix, int64(mustAtoi(oldRV))) if err != nil { t.Fatalf("Unexpected error: %v", err) } + continueOnCacheRV, err := storage.EncodeContinue(keyPrefix+"foo", keyPrefix, int64(mustAtoi(cacheRV))) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + // Continue from different apiserver that is forward in RVs + continueOnEtcdRV, err := storage.EncodeContinue(keyPrefix+"foo", keyPrefix, int64(mustAtoi(etcdRV))) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + continueOnNegativeRV, err := storage.EncodeContinue(keyPrefix+"foo", keyPrefix, -1) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -258,24 +270,26 @@ func TestShouldDelegateList(t *testing.T) { testCases := map[opts]bool{} testCases[opts{}] = true testCases[opts{Limit: 100}] = true - testCases[opts{Continue: continueOnRev1}] = true - testCases[opts{Limit: 100, Continue: continueOnRev1}] = true - testCases[opts{Continue: continueOnNegativeRV}] = true - testCases[opts{Limit: 100, Continue: continueOnNegativeRV}] = true testCases[opts{ResourceVersion: "0"}] = false testCases[opts{ResourceVersion: "0", Limit: 100}] = false - testCases[opts{ResourceVersion: "0", Continue: continueOnRev1}] = true - testCases[opts{ResourceVersion: "0", Limit: 100, Continue: continueOnRev1}] = true - testCases[opts{ResourceVersion: "0", Continue: continueOnNegativeRV}] = true - testCases[opts{ResourceVersion: "0", Limit: 100, Continue: continueOnNegativeRV}] = true testCases[opts{ResourceVersion: "0", ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan}] = false testCases[opts{ResourceVersion: "0", ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, Limit: 100}] = false - testCases[opts{ResourceVersion: "1"}] = false - testCases[opts{ResourceVersion: "1", Limit: 100}] = true - testCases[opts{ResourceVersion: "1", ResourceVersionMatch: metav1.ResourceVersionMatchExact}] = true - testCases[opts{ResourceVersion: "1", ResourceVersionMatch: metav1.ResourceVersionMatchExact, Limit: 100}] = true - testCases[opts{ResourceVersion: "1", ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan}] = false - testCases[opts{ResourceVersion: "1", ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, Limit: 100}] = false + // Continue + for _, continueToken := range []string{continueOnOldRV, continueOnCacheRV, continueOnEtcdRV, continueOnNegativeRV} { + testCases[opts{Continue: continueToken}] = true + testCases[opts{Limit: 100, Continue: continueToken}] = true + testCases[opts{ResourceVersion: "0", Continue: continueToken}] = true + testCases[opts{ResourceVersion: "0", Limit: 100, Continue: continueToken}] = true + } + // With RV + for _, rv := range []string{oldRV, cacheRV, etcdRV} { + testCases[opts{ResourceVersion: rv}] = false + testCases[opts{ResourceVersion: rv, Limit: 100}] = true + testCases[opts{ResourceVersion: rv, ResourceVersionMatch: metav1.ResourceVersionMatchExact}] = true + testCases[opts{ResourceVersion: rv, ResourceVersionMatch: metav1.ResourceVersionMatchExact, Limit: 100}] = true + testCases[opts{ResourceVersion: rv, ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan}] = false + testCases[opts{ResourceVersion: rv, ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, Limit: 100}] = false + } // Bypass for most requests doesn't depend on Recursive for opts, expectBypass := range testCases { @@ -283,14 +297,16 @@ func TestShouldDelegateList(t *testing.T) { testCases[opts] = expectBypass } // Continue is ignored on non recursive LIST - testCases[opts{ResourceVersion: "1", Continue: continueOnRev1}] = true - testCases[opts{ResourceVersion: "1", Continue: continueOnRev1, Limit: 100}] = true - testCases[opts{ResourceVersion: "1", Continue: continueOnNegativeRV}] = true - testCases[opts{ResourceVersion: "1", Continue: continueOnNegativeRV, Limit: 100}] = true + for _, rv := range []string{oldRV, etcdRV} { + for _, continueToken := range []string{continueOnOldRV, continueOnCacheRV, continueOnEtcdRV, continueOnNegativeRV} { + testCases[opts{ResourceVersion: rv, Continue: continueToken}] = true + testCases[opts{ResourceVersion: rv, Continue: continueToken, Limit: 100}] = true + } + } - for _, rv := range []string{"", "0", "1"} { + for _, rv := range []string{"", "0", oldRV, etcdRV} { for _, match := range []metav1.ResourceVersionMatch{"", metav1.ResourceVersionMatchExact, metav1.ResourceVersionMatchNotOlderThan} { - for _, continueKey := range []string{"", continueOnRev1, continueOnNegativeRV} { + for _, continueKey := range []string{"", continueOnOldRV, continueOnCacheRV, continueOnEtcdRV, continueOnNegativeRV} { for _, limit := range []int64{0, 100} { for _, recursive := range []bool{true, false} { opt := opts{ @@ -335,7 +351,18 @@ func TestShouldDelegateList(t *testing.T) { expectBypass = bypass } } - result, err := shouldDelegateList(toStorageOpts(opt), delegator.CacheWithoutSnapshots{}) + backingStorage := &dummyStorage{} + backingStorage.getListFn = func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error { + podList := listObj.(*example.PodList) + podList.ListMeta = metav1.ListMeta{ResourceVersion: cacheRV} + return nil + } + cacher, _, err := newTestCacher(backingStorage) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + defer cacher.Stop() + result, err := shouldDelegateList(toStorageOpts(opt), cacher) if err != nil { t.Fatal(err) } @@ -368,6 +395,14 @@ func TestShouldDelegateList(t *testing.T) { }) } +func mustAtoi(s string) int { + value, err := strconv.Atoi(s) + if err != nil { + panic(err) + } + return value +} + func TestConsistentReadFallback(t *testing.T) { tcs := []struct { name string diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go index 6c0287c735b..478ae7ac871 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go @@ -180,7 +180,7 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L if err != nil { return err } - result, err := shouldDelegateList(opts, delegator.CacheWithoutSnapshots{}) + result, err := shouldDelegateList(opts, c.cacher) if err != nil { return err }