mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
improve Clock as Cacher Config
This commit is contained in:
parent
a8e43038a4
commit
0f7de876a5
@ -97,6 +97,8 @@ type Config struct {
|
|||||||
NewListFunc func() runtime.Object
|
NewListFunc func() runtime.Object
|
||||||
|
|
||||||
Codec runtime.Codec
|
Codec runtime.Codec
|
||||||
|
|
||||||
|
Clock clock.Clock
|
||||||
}
|
}
|
||||||
|
|
||||||
type watchersMap map[int]*cacheWatcher
|
type watchersMap map[int]*cacheWatcher
|
||||||
@ -323,7 +325,9 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
clock := clock.RealClock{}
|
if config.Clock == nil {
|
||||||
|
config.Clock = clock.RealClock{}
|
||||||
|
}
|
||||||
objType := reflect.TypeOf(obj)
|
objType := reflect.TypeOf(obj)
|
||||||
cacher := &Cacher{
|
cacher := &Cacher{
|
||||||
ready: newReady(),
|
ready: newReady(),
|
||||||
@ -346,9 +350,9 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
|
|||||||
// and there are no guarantees on the order that they will stop.
|
// and there are no guarantees on the order that they will stop.
|
||||||
// So we will be simply closing the channel, and synchronizing on the WaitGroup.
|
// So we will be simply closing the channel, and synchronizing on the WaitGroup.
|
||||||
stopCh: stopCh,
|
stopCh: stopCh,
|
||||||
clock: clock,
|
clock: config.Clock,
|
||||||
timer: time.NewTimer(time.Duration(0)),
|
timer: time.NewTimer(time.Duration(0)),
|
||||||
bookmarkWatchers: newTimeBucketWatchers(clock, defaultBookmarkFrequency),
|
bookmarkWatchers: newTimeBucketWatchers(config.Clock, defaultBookmarkFrequency),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure that timer is stopped.
|
// Ensure that timer is stopped.
|
||||||
@ -359,7 +363,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
watchCache := newWatchCache(
|
watchCache := newWatchCache(
|
||||||
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, objType)
|
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, objType)
|
||||||
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
|
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
|
||||||
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
|
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
|
||||||
|
|
||||||
|
@ -271,6 +271,7 @@ func newTestCacher(s storage.Interface, cap int) (*Cacher, storage.Versioner, er
|
|||||||
NewFunc: func() runtime.Object { return &example.Pod{} },
|
NewFunc: func() runtime.Object { return &example.Pod{} },
|
||||||
NewListFunc: func() runtime.Object { return &example.PodList{} },
|
NewListFunc: func() runtime.Object { return &example.PodList{} },
|
||||||
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
|
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
|
||||||
|
Clock: clock.RealClock{},
|
||||||
}
|
}
|
||||||
cacher, err := NewCacherFromConfig(config)
|
cacher, err := NewCacherFromConfig(config)
|
||||||
return cacher, testVersioner{}, err
|
return cacher, testVersioner{}, err
|
||||||
|
@ -198,6 +198,7 @@ func newWatchCache(
|
|||||||
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error),
|
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error),
|
||||||
versioner storage.Versioner,
|
versioner storage.Versioner,
|
||||||
indexers *cache.Indexers,
|
indexers *cache.Indexers,
|
||||||
|
clock clock.Clock,
|
||||||
objectType reflect.Type) *watchCache {
|
objectType reflect.Type) *watchCache {
|
||||||
wc := &watchCache{
|
wc := &watchCache{
|
||||||
capacity: defaultLowerBoundCapacity,
|
capacity: defaultLowerBoundCapacity,
|
||||||
@ -212,7 +213,7 @@ func newWatchCache(
|
|||||||
resourceVersion: 0,
|
resourceVersion: 0,
|
||||||
listResourceVersion: 0,
|
listResourceVersion: 0,
|
||||||
eventHandler: eventHandler,
|
eventHandler: eventHandler,
|
||||||
clock: clock.RealClock{},
|
clock: clock,
|
||||||
versioner: versioner,
|
versioner: versioner,
|
||||||
objectType: objectType,
|
objectType: objectType,
|
||||||
}
|
}
|
||||||
|
@ -81,7 +81,7 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *watchCache {
|
|||||||
}
|
}
|
||||||
versioner := etcd3.APIObjectVersioner{}
|
versioner := etcd3.APIObjectVersioner{}
|
||||||
mockHandler := func(*watchCacheEvent) {}
|
mockHandler := func(*watchCacheEvent) {}
|
||||||
wc := newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, reflect.TypeOf(&example.Pod{}))
|
wc := newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, clock.NewFakeClock(time.Now()), reflect.TypeOf(&example.Pod{}))
|
||||||
// To preserve behavior of tests that assume a given capacity,
|
// To preserve behavior of tests that assume a given capacity,
|
||||||
// resize it to th expected size.
|
// resize it to th expected size.
|
||||||
wc.capacity = capacity
|
wc.capacity = capacity
|
||||||
@ -89,7 +89,6 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *watchCache {
|
|||||||
wc.lowerBoundCapacity = min(capacity, defaultLowerBoundCapacity)
|
wc.lowerBoundCapacity = min(capacity, defaultLowerBoundCapacity)
|
||||||
wc.upperBoundCapacity = max(capacity, defaultUpperBoundCapacity)
|
wc.upperBoundCapacity = max(capacity, defaultUpperBoundCapacity)
|
||||||
|
|
||||||
wc.clock = clock.NewFakeClock(time.Now())
|
|
||||||
return wc
|
return wc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -15,6 +15,7 @@ go_test(
|
|||||||
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
|
@ -34,6 +34,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||||
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
@ -112,6 +113,7 @@ func newTestCacher(s storage.Interface, cap int) (*cacherstorage.Cacher, storage
|
|||||||
NewFunc: func() runtime.Object { return &example.Pod{} },
|
NewFunc: func() runtime.Object { return &example.Pod{} },
|
||||||
NewListFunc: func() runtime.Object { return &example.PodList{} },
|
NewListFunc: func() runtime.Object { return &example.PodList{} },
|
||||||
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
|
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
|
||||||
|
Clock: clock.RealClock{},
|
||||||
}
|
}
|
||||||
cacher, err := cacherstorage.NewCacherFromConfig(config)
|
cacher, err := cacherstorage.NewCacherFromConfig(config)
|
||||||
return cacher, v, err
|
return cacher, v, err
|
||||||
|
Loading…
Reference in New Issue
Block a user