diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index a46b35d3e27..18a7f674157 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -57,6 +57,11 @@ var ( codecs = serializer.NewCodecFactory(scheme) ) +const ( + // watchCacheDefaultCapacity syncs watch cache defaultLowerBoundCapacity. + watchCacheDefaultCapacity = 100 +) + func init() { metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion) utilruntime.Must(example.AddToScheme(scheme)) @@ -101,6 +106,10 @@ func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServ } func newTestCacher(s storage.Interface, cap int) (*cacherstorage.Cacher, storage.Versioner, error) { + return newTestCacherWithClock(s, cap, clock.RealClock{}) +} + +func newTestCacherWithClock(s storage.Interface, cap int, clock clock.Clock) (*cacherstorage.Cacher, storage.Versioner, error) { prefix := "pods" v := etcd3.APIObjectVersioner{} config := cacherstorage.Config{ @@ -113,7 +122,7 @@ func newTestCacher(s storage.Interface, cap int) (*cacherstorage.Cacher, storage NewFunc: func() runtime.Object { return &example.Pod{} }, NewListFunc: func() runtime.Object { return &example.PodList{} }, Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), - Clock: clock.RealClock{}, + Clock: clock, } cacher, err := cacherstorage.NewCacherFromConfig(config) return cacher, v, err @@ -396,7 +405,8 @@ func TestWatch(t *testing.T) { // Inject one list error to make sure we test the relist case. etcdStorage = &injectListError{errors: 1, Interface: etcdStorage} defer server.Terminate(t) - cacher, _, err := newTestCacher(etcdStorage, 3) // small capacity to trigger "too old version" error + fakeClock := clock.NewFakeClock(time.Now()) + cacher, _, err := newTestCacherWithClock(etcdStorage, watchCacheDefaultCapacity, fakeClock) if err != nil { t.Fatalf("Couldn't create cacher: %v", err) } @@ -439,15 +449,6 @@ func TestWatch(t *testing.T) { verifyWatchEvent(t, watcher, watch.Added, podFoo) verifyWatchEvent(t, watcher, watch.Modified, podFooPrime) - // Check whether we get too-old error via the watch channel - tooOldWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "1", Predicate: storage.Everything}) - if err != nil { - t.Fatalf("Expected no direct error, got %v", err) - } - defer tooOldWatcher.Stop() - // Events happens in eventFreshDuration, cache expand without event "Gone". - verifyWatchEvent(t, tooOldWatcher, watch.Added, podFoo) - initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: fooCreated.ResourceVersion, Predicate: storage.Everything}) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -468,6 +469,25 @@ func TestWatch(t *testing.T) { _ = updatePod(t, etcdStorage, podFooBis, fooUpdated) verifyWatchEvent(t, nowWatcher, watch.Modified, podFooBis) + + // Add watchCacheDefaultCapacity events to make current watch cache full. + // Make start and last event duration exceed eventFreshDuration(current 75s) to ensure watch cache won't expand. + for i := 0; i < watchCacheDefaultCapacity; i++ { + fakeClock.SetTime(time.Now().Add(time.Duration(i) * time.Minute)) + podFoo := makeTestPod(fmt.Sprintf("foo-%d", i)) + updatePod(t, etcdStorage, podFoo, nil) + } + + // Check whether we get too-old error via the watch channel + tooOldWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "1", Predicate: storage.Everything}) + if err != nil { + t.Fatalf("Expected no direct error, got %v", err) + } + defer tooOldWatcher.Stop() + + // Ensure we get a "Gone" error. + expectedResourceExpiredError := errors.NewResourceExpired("").ErrStatus + verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedResourceExpiredError) } func TestWatcherTimeout(t *testing.T) {