Merge pull request #92174 from gongguan/event-gone

restore cacher event Gone tests
This commit is contained in:
Kubernetes Prow Robot 2020-06-16 19:24:21 -07:00 committed by GitHub
commit 1df459c2d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 45 additions and 17 deletions

View File

@ -97,6 +97,8 @@ type Config struct {
NewListFunc func() runtime.Object NewListFunc func() runtime.Object
Codec runtime.Codec Codec runtime.Codec
Clock clock.Clock
} }
type watchersMap map[int]*cacheWatcher type watchersMap map[int]*cacheWatcher
@ -323,7 +325,9 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
} }
} }
clock := clock.RealClock{} if config.Clock == nil {
config.Clock = clock.RealClock{}
}
objType := reflect.TypeOf(obj) objType := reflect.TypeOf(obj)
cacher := &Cacher{ cacher := &Cacher{
ready: newReady(), ready: newReady(),
@ -346,9 +350,9 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
// and there are no guarantees on the order that they will stop. // and there are no guarantees on the order that they will stop.
// So we will be simply closing the channel, and synchronizing on the WaitGroup. // So we will be simply closing the channel, and synchronizing on the WaitGroup.
stopCh: stopCh, stopCh: stopCh,
clock: clock, clock: config.Clock,
timer: time.NewTimer(time.Duration(0)), timer: time.NewTimer(time.Duration(0)),
bookmarkWatchers: newTimeBucketWatchers(clock, defaultBookmarkFrequency), bookmarkWatchers: newTimeBucketWatchers(config.Clock, defaultBookmarkFrequency),
} }
// Ensure that timer is stopped. // Ensure that timer is stopped.
@ -359,7 +363,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
} }
watchCache := newWatchCache( watchCache := newWatchCache(
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, objType) config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, objType)
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
reflectorName := "storage/cacher.go:" + config.ResourcePrefix reflectorName := "storage/cacher.go:" + config.ResourcePrefix

View File

@ -271,6 +271,7 @@ func newTestCacher(s storage.Interface, cap int) (*Cacher, storage.Versioner, er
NewFunc: func() runtime.Object { return &example.Pod{} }, NewFunc: func() runtime.Object { return &example.Pod{} },
NewListFunc: func() runtime.Object { return &example.PodList{} }, NewListFunc: func() runtime.Object { return &example.PodList{} },
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
Clock: clock.RealClock{},
} }
cacher, err := NewCacherFromConfig(config) cacher, err := NewCacherFromConfig(config)
return cacher, testVersioner{}, err return cacher, testVersioner{}, err

View File

@ -198,6 +198,7 @@ func newWatchCache(
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error), getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error),
versioner storage.Versioner, versioner storage.Versioner,
indexers *cache.Indexers, indexers *cache.Indexers,
clock clock.Clock,
objectType reflect.Type) *watchCache { objectType reflect.Type) *watchCache {
wc := &watchCache{ wc := &watchCache{
capacity: defaultLowerBoundCapacity, capacity: defaultLowerBoundCapacity,
@ -212,7 +213,7 @@ func newWatchCache(
resourceVersion: 0, resourceVersion: 0,
listResourceVersion: 0, listResourceVersion: 0,
eventHandler: eventHandler, eventHandler: eventHandler,
clock: clock.RealClock{}, clock: clock,
versioner: versioner, versioner: versioner,
objectType: objectType, objectType: objectType,
} }

View File

@ -81,7 +81,7 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *watchCache {
} }
versioner := etcd3.APIObjectVersioner{} versioner := etcd3.APIObjectVersioner{}
mockHandler := func(*watchCacheEvent) {} mockHandler := func(*watchCacheEvent) {}
wc := newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, reflect.TypeOf(&example.Pod{})) wc := newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, clock.NewFakeClock(time.Now()), reflect.TypeOf(&example.Pod{}))
// To preserve behavior of tests that assume a given capacity, // To preserve behavior of tests that assume a given capacity,
// resize it to th expected size. // resize it to th expected size.
wc.capacity = capacity wc.capacity = capacity
@ -89,7 +89,6 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *watchCache {
wc.lowerBoundCapacity = min(capacity, defaultLowerBoundCapacity) wc.lowerBoundCapacity = min(capacity, defaultLowerBoundCapacity)
wc.upperBoundCapacity = max(capacity, defaultUpperBoundCapacity) wc.upperBoundCapacity = max(capacity, defaultUpperBoundCapacity)
wc.clock = clock.NewFakeClock(time.Now())
return wc return wc
} }

View File

@ -15,6 +15,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",

View File

@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
@ -56,6 +57,11 @@ var (
codecs = serializer.NewCodecFactory(scheme) codecs = serializer.NewCodecFactory(scheme)
) )
const (
// watchCacheDefaultCapacity syncs watch cache defaultLowerBoundCapacity.
watchCacheDefaultCapacity = 100
)
func init() { func init() {
metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion) metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion)
utilruntime.Must(example.AddToScheme(scheme)) utilruntime.Must(example.AddToScheme(scheme))
@ -100,6 +106,10 @@ func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServ
} }
func newTestCacher(s storage.Interface, cap int) (*cacherstorage.Cacher, storage.Versioner, error) { func newTestCacher(s storage.Interface, cap int) (*cacherstorage.Cacher, storage.Versioner, error) {
return newTestCacherWithClock(s, cap, clock.RealClock{})
}
func newTestCacherWithClock(s storage.Interface, cap int, clock clock.Clock) (*cacherstorage.Cacher, storage.Versioner, error) {
prefix := "pods" prefix := "pods"
v := etcd3.APIObjectVersioner{} v := etcd3.APIObjectVersioner{}
config := cacherstorage.Config{ config := cacherstorage.Config{
@ -112,6 +122,7 @@ func newTestCacher(s storage.Interface, cap int) (*cacherstorage.Cacher, storage
NewFunc: func() runtime.Object { return &example.Pod{} }, NewFunc: func() runtime.Object { return &example.Pod{} },
NewListFunc: func() runtime.Object { return &example.PodList{} }, NewListFunc: func() runtime.Object { return &example.PodList{} },
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
Clock: clock,
} }
cacher, err := cacherstorage.NewCacherFromConfig(config) cacher, err := cacherstorage.NewCacherFromConfig(config)
return cacher, v, err return cacher, v, err
@ -394,7 +405,8 @@ func TestWatch(t *testing.T) {
// Inject one list error to make sure we test the relist case. // Inject one list error to make sure we test the relist case.
etcdStorage = &injectListError{errors: 1, Interface: etcdStorage} etcdStorage = &injectListError{errors: 1, Interface: etcdStorage}
defer server.Terminate(t) defer server.Terminate(t)
cacher, _, err := newTestCacher(etcdStorage, 3) // small capacity to trigger "too old version" error fakeClock := clock.NewFakeClock(time.Now())
cacher, _, err := newTestCacherWithClock(etcdStorage, watchCacheDefaultCapacity, fakeClock)
if err != nil { if err != nil {
t.Fatalf("Couldn't create cacher: %v", err) t.Fatalf("Couldn't create cacher: %v", err)
} }
@ -437,15 +449,6 @@ func TestWatch(t *testing.T) {
verifyWatchEvent(t, watcher, watch.Added, podFoo) verifyWatchEvent(t, watcher, watch.Added, podFoo)
verifyWatchEvent(t, watcher, watch.Modified, podFooPrime) verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
// Check whether we get too-old error via the watch channel
tooOldWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "1", Predicate: storage.Everything})
if err != nil {
t.Fatalf("Expected no direct error, got %v", err)
}
defer tooOldWatcher.Stop()
// Events happens in eventFreshDuration, cache expand without event "Gone".
verifyWatchEvent(t, tooOldWatcher, watch.Added, podFoo)
initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: fooCreated.ResourceVersion, Predicate: storage.Everything}) initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: fooCreated.ResourceVersion, Predicate: storage.Everything})
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
@ -466,6 +469,25 @@ func TestWatch(t *testing.T) {
_ = updatePod(t, etcdStorage, podFooBis, fooUpdated) _ = updatePod(t, etcdStorage, podFooBis, fooUpdated)
verifyWatchEvent(t, nowWatcher, watch.Modified, podFooBis) verifyWatchEvent(t, nowWatcher, watch.Modified, podFooBis)
// Add watchCacheDefaultCapacity events to make current watch cache full.
// Make start and last event duration exceed eventFreshDuration(current 75s) to ensure watch cache won't expand.
for i := 0; i < watchCacheDefaultCapacity; i++ {
fakeClock.SetTime(time.Now().Add(time.Duration(i) * time.Minute))
podFoo := makeTestPod(fmt.Sprintf("foo-%d", i))
updatePod(t, etcdStorage, podFoo, nil)
}
// Check whether we get too-old error via the watch channel
tooOldWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "1", Predicate: storage.Everything})
if err != nil {
t.Fatalf("Expected no direct error, got %v", err)
}
defer tooOldWatcher.Stop()
// Ensure we get a "Gone" error.
expectedResourceExpiredError := errors.NewResourceExpired("").ErrStatus
verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedResourceExpiredError)
} }
func TestWatcherTimeout(t *testing.T) { func TestWatcherTimeout(t *testing.T) {