From 0f7de876a5e931a57ba6347b81e8e28d360c788a Mon Sep 17 00:00:00 2001 From: gongguan Date: Mon, 15 Jun 2020 23:57:11 +0800 Subject: [PATCH 1/2] improve Clock as Cacher Config --- .../k8s.io/apiserver/pkg/storage/cacher/cacher.go | 12 ++++++++---- .../pkg/storage/cacher/cacher_whitebox_test.go | 1 + .../apiserver/pkg/storage/cacher/watch_cache.go | 3 ++- .../apiserver/pkg/storage/cacher/watch_cache_test.go | 3 +-- staging/src/k8s.io/apiserver/pkg/storage/tests/BUILD | 1 + .../apiserver/pkg/storage/tests/cacher_test.go | 2 ++ 6 files changed, 15 insertions(+), 7 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 1c5f47a64a9..6cccf80c088 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -97,6 +97,8 @@ type Config struct { NewListFunc func() runtime.Object Codec runtime.Codec + + Clock clock.Clock } type watchersMap map[int]*cacheWatcher @@ -323,7 +325,9 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { } } - clock := clock.RealClock{} + if config.Clock == nil { + config.Clock = clock.RealClock{} + } objType := reflect.TypeOf(obj) cacher := &Cacher{ ready: newReady(), @@ -346,9 +350,9 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { // and there are no guarantees on the order that they will stop. // So we will be simply closing the channel, and synchronizing on the WaitGroup. stopCh: stopCh, - clock: clock, + clock: config.Clock, timer: time.NewTimer(time.Duration(0)), - bookmarkWatchers: newTimeBucketWatchers(clock, defaultBookmarkFrequency), + bookmarkWatchers: newTimeBucketWatchers(config.Clock, defaultBookmarkFrequency), } // Ensure that timer is stopped. @@ -359,7 +363,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { } watchCache := newWatchCache( - config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, objType) + config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, objType) listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) reflectorName := "storage/cacher.go:" + config.ResourcePrefix diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 3aafa66f33a..41199668085 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -271,6 +271,7 @@ func newTestCacher(s storage.Interface, cap int) (*Cacher, storage.Versioner, er NewFunc: func() runtime.Object { return &example.Pod{} }, NewListFunc: func() runtime.Object { return &example.PodList{} }, Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), + Clock: clock.RealClock{}, } cacher, err := NewCacherFromConfig(config) return cacher, testVersioner{}, err diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 4235f061627..7ba7d475b3e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -198,6 +198,7 @@ func newWatchCache( getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error), versioner storage.Versioner, indexers *cache.Indexers, + clock clock.Clock, objectType reflect.Type) *watchCache { wc := &watchCache{ capacity: defaultLowerBoundCapacity, @@ -212,7 +213,7 @@ func newWatchCache( resourceVersion: 0, listResourceVersion: 0, eventHandler: eventHandler, - clock: clock.RealClock{}, + clock: clock, versioner: versioner, objectType: objectType, } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index dde7d3b3c5b..7c0c61c8048 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -81,7 +81,7 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *watchCache { } versioner := etcd3.APIObjectVersioner{} mockHandler := func(*watchCacheEvent) {} - wc := newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, reflect.TypeOf(&example.Pod{})) + wc := newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, clock.NewFakeClock(time.Now()), reflect.TypeOf(&example.Pod{})) // To preserve behavior of tests that assume a given capacity, // resize it to th expected size. wc.capacity = capacity @@ -89,7 +89,6 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *watchCache { wc.lowerBoundCapacity = min(capacity, defaultLowerBoundCapacity) wc.upperBoundCapacity = max(capacity, defaultUpperBoundCapacity) - wc.clock = clock.NewFakeClock(time.Now()) return wc } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/tests/BUILD index 2074efe2abc..fdd648ff6a5 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/BUILD @@ -15,6 +15,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index 1f1b273c766..a46b35d3e27 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/util/clock" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" @@ -112,6 +113,7 @@ func newTestCacher(s storage.Interface, cap int) (*cacherstorage.Cacher, storage NewFunc: func() runtime.Object { return &example.Pod{} }, NewListFunc: func() runtime.Object { return &example.PodList{} }, Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), + Clock: clock.RealClock{}, } cacher, err := cacherstorage.NewCacherFromConfig(config) return cacher, v, err From f3cbfc3f7e4bf38978b118933d4e626125a9d80f Mon Sep 17 00:00:00 2001 From: gongguan Date: Tue, 16 Jun 2020 11:07:07 +0800 Subject: [PATCH 2/2] restore event Gone test --- .../pkg/storage/tests/cacher_test.go | 42 ++++++++++++++----- 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index a46b35d3e27..18a7f674157 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -57,6 +57,11 @@ var ( codecs = serializer.NewCodecFactory(scheme) ) +const ( + // watchCacheDefaultCapacity syncs watch cache defaultLowerBoundCapacity. + watchCacheDefaultCapacity = 100 +) + func init() { metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion) utilruntime.Must(example.AddToScheme(scheme)) @@ -101,6 +106,10 @@ func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServ } func newTestCacher(s storage.Interface, cap int) (*cacherstorage.Cacher, storage.Versioner, error) { + return newTestCacherWithClock(s, cap, clock.RealClock{}) +} + +func newTestCacherWithClock(s storage.Interface, cap int, clock clock.Clock) (*cacherstorage.Cacher, storage.Versioner, error) { prefix := "pods" v := etcd3.APIObjectVersioner{} config := cacherstorage.Config{ @@ -113,7 +122,7 @@ func newTestCacher(s storage.Interface, cap int) (*cacherstorage.Cacher, storage NewFunc: func() runtime.Object { return &example.Pod{} }, NewListFunc: func() runtime.Object { return &example.PodList{} }, Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), - Clock: clock.RealClock{}, + Clock: clock, } cacher, err := cacherstorage.NewCacherFromConfig(config) return cacher, v, err @@ -396,7 +405,8 @@ func TestWatch(t *testing.T) { // Inject one list error to make sure we test the relist case. etcdStorage = &injectListError{errors: 1, Interface: etcdStorage} defer server.Terminate(t) - cacher, _, err := newTestCacher(etcdStorage, 3) // small capacity to trigger "too old version" error + fakeClock := clock.NewFakeClock(time.Now()) + cacher, _, err := newTestCacherWithClock(etcdStorage, watchCacheDefaultCapacity, fakeClock) if err != nil { t.Fatalf("Couldn't create cacher: %v", err) } @@ -439,15 +449,6 @@ func TestWatch(t *testing.T) { verifyWatchEvent(t, watcher, watch.Added, podFoo) verifyWatchEvent(t, watcher, watch.Modified, podFooPrime) - // Check whether we get too-old error via the watch channel - tooOldWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "1", Predicate: storage.Everything}) - if err != nil { - t.Fatalf("Expected no direct error, got %v", err) - } - defer tooOldWatcher.Stop() - // Events happens in eventFreshDuration, cache expand without event "Gone". - verifyWatchEvent(t, tooOldWatcher, watch.Added, podFoo) - initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: fooCreated.ResourceVersion, Predicate: storage.Everything}) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -468,6 +469,25 @@ func TestWatch(t *testing.T) { _ = updatePod(t, etcdStorage, podFooBis, fooUpdated) verifyWatchEvent(t, nowWatcher, watch.Modified, podFooBis) + + // Add watchCacheDefaultCapacity events to make current watch cache full. + // Make start and last event duration exceed eventFreshDuration(current 75s) to ensure watch cache won't expand. + for i := 0; i < watchCacheDefaultCapacity; i++ { + fakeClock.SetTime(time.Now().Add(time.Duration(i) * time.Minute)) + podFoo := makeTestPod(fmt.Sprintf("foo-%d", i)) + updatePod(t, etcdStorage, podFoo, nil) + } + + // Check whether we get too-old error via the watch channel + tooOldWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "1", Predicate: storage.Everything}) + if err != nil { + t.Fatalf("Expected no direct error, got %v", err) + } + defer tooOldWatcher.Stop() + + // Ensure we get a "Gone" error. + expectedResourceExpiredError := errors.NewResourceExpired("").ErrStatus + verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedResourceExpiredError) } func TestWatcherTimeout(t *testing.T) {