diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 1c5f47a64a9..6cccf80c088 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -97,6 +97,8 @@ type Config struct { NewListFunc func() runtime.Object Codec runtime.Codec + + Clock clock.Clock } type watchersMap map[int]*cacheWatcher @@ -323,7 +325,9 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { } } - clock := clock.RealClock{} + if config.Clock == nil { + config.Clock = clock.RealClock{} + } objType := reflect.TypeOf(obj) cacher := &Cacher{ ready: newReady(), @@ -346,9 +350,9 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { // and there are no guarantees on the order that they will stop. // So we will be simply closing the channel, and synchronizing on the WaitGroup. stopCh: stopCh, - clock: clock, + clock: config.Clock, timer: time.NewTimer(time.Duration(0)), - bookmarkWatchers: newTimeBucketWatchers(clock, defaultBookmarkFrequency), + bookmarkWatchers: newTimeBucketWatchers(config.Clock, defaultBookmarkFrequency), } // Ensure that timer is stopped. @@ -359,7 +363,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { } watchCache := newWatchCache( - config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, objType) + config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, objType) listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) reflectorName := "storage/cacher.go:" + config.ResourcePrefix diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 3aafa66f33a..41199668085 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -271,6 +271,7 @@ func newTestCacher(s storage.Interface, cap int) (*Cacher, storage.Versioner, er NewFunc: func() runtime.Object { return &example.Pod{} }, NewListFunc: func() runtime.Object { return &example.PodList{} }, Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), + Clock: clock.RealClock{}, } cacher, err := NewCacherFromConfig(config) return cacher, testVersioner{}, err diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 4235f061627..7ba7d475b3e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -198,6 +198,7 @@ func newWatchCache( getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error), versioner storage.Versioner, indexers *cache.Indexers, + clock clock.Clock, objectType reflect.Type) *watchCache { wc := &watchCache{ capacity: defaultLowerBoundCapacity, @@ -212,7 +213,7 @@ func newWatchCache( resourceVersion: 0, listResourceVersion: 0, eventHandler: eventHandler, - clock: clock.RealClock{}, + clock: clock, versioner: versioner, objectType: objectType, } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index dde7d3b3c5b..7c0c61c8048 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -81,7 +81,7 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *watchCache { } versioner := etcd3.APIObjectVersioner{} mockHandler := func(*watchCacheEvent) {} - wc := newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, reflect.TypeOf(&example.Pod{})) + wc := newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, clock.NewFakeClock(time.Now()), reflect.TypeOf(&example.Pod{})) // To preserve behavior of tests that assume a given capacity, // resize it to th expected size. wc.capacity = capacity @@ -89,7 +89,6 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *watchCache { wc.lowerBoundCapacity = min(capacity, defaultLowerBoundCapacity) wc.upperBoundCapacity = max(capacity, defaultUpperBoundCapacity) - wc.clock = clock.NewFakeClock(time.Now()) return wc } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/tests/BUILD index 2074efe2abc..fdd648ff6a5 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/BUILD @@ -15,6 +15,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index 1f1b273c766..a46b35d3e27 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/util/clock" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" @@ -112,6 +113,7 @@ func newTestCacher(s storage.Interface, cap int) (*cacherstorage.Cacher, storage NewFunc: func() runtime.Object { return &example.Pod{} }, NewListFunc: func() runtime.Object { return &example.PodList{} }, Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), + Clock: clock.RealClock{}, } cacher, err := cacherstorage.NewCacherFromConfig(config) return cacher, v, err