From a53704911047b87d0960cabab7889ce92f48b79e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Fri, 24 Feb 2023 11:59:01 +0100 Subject: [PATCH] Split cacheWatcher into its own file --- .../pkg/storage/cacher/cache_watcher.go | 370 ++++++++++++++++ .../pkg/storage/cacher/cache_watcher_test.go | 414 ++++++++++++++++++ .../apiserver/pkg/storage/cacher/cacher.go | 340 +------------- .../storage/cacher/cacher_whitebox_test.go | 380 ---------------- 4 files changed, 785 insertions(+), 719 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go create mode 100644 staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher_test.go diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go new file mode 100644 index 00000000000..fbcc4d6404c --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go @@ -0,0 +1,370 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cacher + +import ( + "context" + "fmt" + "time" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/cacher/metrics" + utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" + + "k8s.io/klog/v2" +) + +// cacheWatcher implements watch.Interface +// this is not thread-safe +type cacheWatcher struct { + input chan *watchCacheEvent + result chan watch.Event + done chan struct{} + filter filterWithAttrsFunc + stopped bool + forget func(bool) + versioner storage.Versioner + // The watcher will be closed by server after the deadline, + // save it here to send bookmark events before that. + deadline time.Time + allowWatchBookmarks bool + groupResource schema.GroupResource + + // human readable identifier that helps assigning cacheWatcher + // instance with request + identifier string + + // drainInputBuffer indicates whether we should delay closing this watcher + // and send all event in the input buffer. + drainInputBuffer bool +} + +func newCacheWatcher( + chanSize int, + filter filterWithAttrsFunc, + forget func(bool), + versioner storage.Versioner, + deadline time.Time, + allowWatchBookmarks bool, + groupResource schema.GroupResource, + identifier string, +) *cacheWatcher { + return &cacheWatcher{ + input: make(chan *watchCacheEvent, chanSize), + result: make(chan watch.Event, chanSize), + done: make(chan struct{}), + filter: filter, + stopped: false, + forget: forget, + versioner: versioner, + deadline: deadline, + allowWatchBookmarks: allowWatchBookmarks, + groupResource: groupResource, + identifier: identifier, + } +} + +// Implements watch.Interface. +func (c *cacheWatcher) ResultChan() <-chan watch.Event { + return c.result +} + +// Implements watch.Interface. +func (c *cacheWatcher) Stop() { + c.forget(false) +} + +// we rely on the fact that stopLocked is actually protected by Cacher.Lock() +func (c *cacheWatcher) stopLocked() { + if !c.stopped { + c.stopped = true + // stop without draining the input channel was requested. + if !c.drainInputBuffer { + close(c.done) + } + close(c.input) + } + + // Even if the watcher was already stopped, if it previously was + // using draining mode and it's not using it now we need to + // close the done channel now. Otherwise we could leak the + // processing goroutine if it will be trying to put more objects + // into result channel, the channel will be full and there will + // already be noone on the processing the events on the receiving end. + if !c.drainInputBuffer && !c.isDoneChannelClosedLocked() { + close(c.done) + } +} + +func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool { + select { + case c.input <- event: + return true + default: + return false + } +} + +// Nil timer means that add will not block (if it can't send event immediately, it will break the watcher) +func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool { + // Try to send the event immediately, without blocking. + if c.nonblockingAdd(event) { + return true + } + + closeFunc := func() { + // This means that we couldn't send event to that watcher. + // Since we don't want to block on it infinitely, + // we simply terminate it. + klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v", c.groupResource.String(), c.identifier, len(c.input), len(c.result)) + metrics.TerminatedWatchersCounter.WithLabelValues(c.groupResource.String()).Inc() + c.forget(false) + } + + if timer == nil { + closeFunc() + return false + } + + // OK, block sending, but only until timer fires. + select { + case c.input <- event: + return true + case <-timer.C: + closeFunc() + return false + } +} + +func (c *cacheWatcher) nextBookmarkTime(now time.Time, bookmarkFrequency time.Duration) (time.Time, bool) { + // We try to send bookmarks: + // + // (a) right before the watcher timeout - for now we simply set it 2s before + // the deadline + // + // (b) roughly every minute + // + // (b) gives us periodicity if the watch breaks due to unexpected + // conditions, (a) ensures that on timeout the watcher is as close to + // now as possible - this covers 99% of cases. + + heartbeatTime := now.Add(bookmarkFrequency) + if c.deadline.IsZero() { + // Timeout is set by our client libraries (e.g. reflector) as well as defaulted by + // apiserver if properly configured. So this shoudln't happen in practice. + return heartbeatTime, true + } + if pretimeoutTime := c.deadline.Add(-2 * time.Second); pretimeoutTime.Before(heartbeatTime) { + heartbeatTime = pretimeoutTime + } + + if heartbeatTime.Before(now) { + return time.Time{}, false + } + return heartbeatTime, true +} + +// setDrainInputBufferLocked if set to true indicates that we should delay closing this watcher +// until we send all events residing in the input buffer. +func (c *cacheWatcher) setDrainInputBufferLocked(drain bool) { + c.drainInputBuffer = drain +} + +// isDoneChannelClosed checks if c.done channel is closed +func (c *cacheWatcher) isDoneChannelClosedLocked() bool { + select { + case <-c.done: + return true + default: + } + return false +} + +func getMutableObject(object runtime.Object) runtime.Object { + if _, ok := object.(*cachingObject); ok { + // It is safe to return without deep-copy, because the underlying + // object will lazily perform deep-copy on the first try to change + // any of its fields. + return object + } + return object.DeepCopyObject() +} + +func updateResourceVersion(object runtime.Object, versioner storage.Versioner, resourceVersion uint64) { + if err := versioner.UpdateObject(object, resourceVersion); err != nil { + utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", resourceVersion, object, err)) + } +} + +func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event { + if event.Type == watch.Bookmark { + return &watch.Event{Type: watch.Bookmark, Object: event.Object.DeepCopyObject()} + } + + curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields) + oldObjPasses := false + if event.PrevObject != nil { + oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields) + } + if !curObjPasses && !oldObjPasses { + // Watcher is not interested in that object. + return nil + } + + switch { + case curObjPasses && !oldObjPasses: + return &watch.Event{Type: watch.Added, Object: getMutableObject(event.Object)} + case curObjPasses && oldObjPasses: + return &watch.Event{Type: watch.Modified, Object: getMutableObject(event.Object)} + case !curObjPasses && oldObjPasses: + // return a delete event with the previous object content, but with the event's resource version + oldObj := getMutableObject(event.PrevObject) + // We know that if oldObj is cachingObject (which can only be set via + // setCachingObjects), its resourceVersion is already set correctly and + // we don't need to update it. However, since cachingObject efficiently + // handles noop updates, we avoid this microoptimization here. + updateResourceVersion(oldObj, c.versioner, event.ResourceVersion) + return &watch.Event{Type: watch.Deleted, Object: oldObj} + } + + return nil +} + +// NOTE: sendWatchCacheEvent is assumed to not modify !!! +func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) { + watchEvent := c.convertToWatchEvent(event) + if watchEvent == nil { + // Watcher is not interested in that object. + return + } + + // We need to ensure that if we put event X to the c.result, all + // previous events were already put into it before, no matter whether + // c.done is close or not. + // Thus we cannot simply select from c.done and c.result and this + // would give us non-determinism. + // At the same time, we don't want to block infinitely on putting + // to c.result, when c.done is already closed. + // + // This ensures that with c.done already close, we at most once go + // into the next select after this. With that, no matter which + // statement we choose there, we will deliver only consecutive + // events. + select { + case <-c.done: + return + default: + } + + select { + case c.result <- *watchEvent: + case <-c.done: + } +} + +func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watchCacheInterval, resourceVersion uint64) { + defer utilruntime.HandleCrash() + defer close(c.result) + defer c.Stop() + + // Check how long we are processing initEvents. + // As long as these are not processed, we are not processing + // any incoming events, so if it takes long, we may actually + // block all watchers for some time. + // TODO: From the logs it seems that there happens processing + // times even up to 1s which is very long. However, this doesn't + // depend that much on the number of initEvents. E.g. from the + // 2000-node Kubemark run we have logs like this, e.g.: + // ... processing 13862 initEvents took 66.808689ms + // ... processing 14040 initEvents took 993.532539ms + // We should understand what is blocking us in those cases (e.g. + // is it lack of CPU, network, or sth else) and potentially + // consider increase size of result buffer in those cases. + const initProcessThreshold = 500 * time.Millisecond + startTime := time.Now() + + initEventCount := 0 + for { + event, err := cacheInterval.Next() + if err != nil { + // An error indicates that the cache interval + // has been invalidated and can no longer serve + // events. + // + // Initially we considered sending an "out-of-history" + // Error event in this case, but because historically + // such events weren't sent out of the watchCache, we + // decided not to. This is still ok, because on watch + // closure, the watcher will try to re-instantiate the + // watch and then will get an explicit "out-of-history" + // window. There is potential for optimization, but for + // now, in order to be on the safe side and not break + // custom clients, the cost of it is something that we + // are fully accepting. + klog.Warningf("couldn't retrieve watch event to serve: %#v", err) + return + } + if event == nil { + break + } + c.sendWatchCacheEvent(event) + // With some events already sent, update resourceVersion so that + // events that were buffered and not yet processed won't be delivered + // to this watcher second time causing going back in time. + resourceVersion = event.ResourceVersion + initEventCount++ + } + + if initEventCount > 0 { + metrics.InitCounter.WithLabelValues(c.groupResource.String()).Add(float64(initEventCount)) + } + processingTime := time.Since(startTime) + if processingTime > initProcessThreshold { + klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", initEventCount, c.groupResource, c.identifier, processingTime) + } + + c.process(ctx, resourceVersion) +} + +func (c *cacheWatcher) process(ctx context.Context, resourceVersion uint64) { + // At this point we already start processing incoming watch events. + // However, the init event can still be processed because their serialization + // and sending to the client happens asynchrnously. + // TODO: As describe in the KEP, we would like to estimate that by delaying + // the initialization signal proportionally to the number of events to + // process, but we're leaving this to the tuning phase. + utilflowcontrol.WatchInitialized(ctx) + + for { + select { + case event, ok := <-c.input: + if !ok { + return + } + // only send events newer than resourceVersion + if event.ResourceVersion > resourceVersion { + c.sendWatchCacheEvent(event) + } + case <-ctx.Done(): + return + } + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher_test.go new file mode 100644 index 00000000000..0cc59c39726 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher_test.go @@ -0,0 +1,414 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cacher + +import ( + "context" + "fmt" + "reflect" + "sync" + "testing" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/diff" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/storage" + testingclock "k8s.io/utils/clock/testing" +) + +// verifies the cacheWatcher.process goroutine is properly cleaned up even if +// the writes to cacheWatcher.result channel is blocked. +func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { + var lock sync.RWMutex + var w *cacheWatcher + count := 0 + filter := func(string, labels.Set, fields.Set) bool { return true } + forget := func(drainWatcher bool) { + lock.Lock() + defer lock.Unlock() + count++ + // forget() has to stop the watcher, as only stopping the watcher + // triggers stopping the process() goroutine which we are in the + // end waiting for in this test. + w.setDrainInputBufferLocked(drainWatcher) + w.stopLocked() + } + initEvents := []*watchCacheEvent{ + {Object: &v1.Pod{}}, + {Object: &v1.Pod{}}, + } + // set the size of the buffer of w.result to 0, so that the writes to + // w.result is blocked. + w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "") + go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0) + w.Stop() + if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { + lock.RLock() + defer lock.RUnlock() + return count == 2, nil + }); err != nil { + t.Fatalf("expected forget() to be called twice, because sendWatchCacheEvent should not be blocked by the result channel: %v", err) + } +} + +func TestCacheWatcherHandlesFiltering(t *testing.T) { + filter := func(_ string, _ labels.Set, field fields.Set) bool { + return field["spec.nodeName"] == "host" + } + forget := func(bool) {} + + testCases := []struct { + events []*watchCacheEvent + expected []watch.Event + }{ + // properly handle starting with the filter, then being deleted, then re-added + { + events: []*watchCacheEvent{ + { + Type: watch.Added, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, + ObjFields: fields.Set{"spec.nodeName": "host"}, + ResourceVersion: 1, + }, + { + Type: watch.Modified, + PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, + PrevObjFields: fields.Set{"spec.nodeName": "host"}, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, + ObjFields: fields.Set{"spec.nodeName": ""}, + ResourceVersion: 2, + }, + { + Type: watch.Modified, + PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, + PrevObjFields: fields.Set{"spec.nodeName": ""}, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}, + ObjFields: fields.Set{"spec.nodeName": "host"}, + ResourceVersion: 3, + }, + }, + expected: []watch.Event{ + {Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}}, + {Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}}, + {Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}}, + }, + }, + // properly handle ignoring changes prior to the filter, then getting added, then deleted + { + events: []*watchCacheEvent{ + { + Type: watch.Added, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, + ObjFields: fields.Set{"spec.nodeName": ""}, + ResourceVersion: 1, + }, + { + Type: watch.Modified, + PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, + PrevObjFields: fields.Set{"spec.nodeName": ""}, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, + ObjFields: fields.Set{"spec.nodeName": ""}, + ResourceVersion: 2, + }, + { + Type: watch.Modified, + PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, + PrevObjFields: fields.Set{"spec.nodeName": ""}, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}, + ObjFields: fields.Set{"spec.nodeName": "host"}, + ResourceVersion: 3, + }, + { + Type: watch.Modified, + PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}, + PrevObjFields: fields.Set{"spec.nodeName": "host"}, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}, + ObjFields: fields.Set{"spec.nodeName": "host"}, + ResourceVersion: 4, + }, + { + Type: watch.Modified, + PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}, + PrevObjFields: fields.Set{"spec.nodeName": "host"}, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}, + ObjFields: fields.Set{"spec.nodeName": ""}, + ResourceVersion: 5, + }, + { + Type: watch.Modified, + PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}, + PrevObjFields: fields.Set{"spec.nodeName": ""}, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "6"}}, + ObjFields: fields.Set{"spec.nodeName": ""}, + ResourceVersion: 6, + }, + }, + expected: []watch.Event{ + {Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}}, + {Type: watch.Modified, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}}, + {Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}}, + }, + }, + } + +TestCase: + for i, testCase := range testCases { + // set the size of the buffer of w.result to 0, so that the writes to + // w.result is blocked. + for j := range testCase.events { + testCase.events[j].ResourceVersion = uint64(j) + 1 + } + + w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "") + go w.processInterval(context.Background(), intervalFromEvents(testCase.events), 0) + + ch := w.ResultChan() + for j, event := range testCase.expected { + e := <-ch + if !reflect.DeepEqual(event, e) { + t.Errorf("%d: unexpected event %d: %s", i, j, diff.ObjectReflectDiff(event, e)) + break TestCase + } + } + select { + case obj, ok := <-ch: + t.Errorf("%d: unexpected excess event: %#v %t", i, obj, ok) + break TestCase + default: + } + w.setDrainInputBufferLocked(false) + w.stopLocked() + } +} + +func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) { + var w *cacheWatcher + done := make(chan struct{}) + filter := func(string, labels.Set, fields.Set) bool { return true } + forget := func(drainWatcher bool) { + w.setDrainInputBufferLocked(drainWatcher) + w.stopLocked() + done <- struct{}{} + } + + maxRetriesToProduceTheRaceCondition := 1000 + // Simulating the timer is fired and stopped concurrently by set time + // timeout to zero and run the Stop goroutine concurrently. + // May sure that the watch will not be blocked on Stop. + for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ { + w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "") + go w.Stop() + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("stop is blocked when the timer is fired concurrently") + } + } + + deadline := time.Now().Add(time.Hour) + // After that, verifies the cacheWatcher.process goroutine works correctly. + for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ { + w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, schema.GroupResource{Resource: "pods"}, "") + w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)} + ctx, cancel := context.WithDeadline(context.Background(), deadline) + defer cancel() + go w.processInterval(ctx, intervalFromEvents(nil), 0) + select { + case <-w.ResultChan(): + case <-time.After(time.Second): + t.Fatal("expected received a event on ResultChan") + } + w.setDrainInputBufferLocked(false) + w.stopLocked() + } +} + +func TestCacheWatcherStoppedOnDestroy(t *testing.T) { + backingStorage := &dummyStorage{} + cacher, _, err := newTestCacher(backingStorage) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + defer cacher.Stop() + + // Wait until cacher is initialized. + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } + + w, err := cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) + if err != nil { + t.Fatalf("Failed to create watch: %v", err) + } + + watchClosed := make(chan struct{}) + go func() { + defer close(watchClosed) + for event := range w.ResultChan() { + switch event.Type { + case watch.Added, watch.Modified, watch.Deleted: + // ok + default: + t.Errorf("unexpected event %#v", event) + } + } + }() + + cacher.Stop() + + select { + case <-watchClosed: + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("timed out waiting for watch to close") + } + +} + +func TestTimeBucketWatchersBasic(t *testing.T) { + filter := func(_ string, _ labels.Set, _ fields.Set) bool { + return true + } + forget := func(bool) {} + + newWatcher := func(deadline time.Time) *cacheWatcher { + return newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "") + } + + clock := testingclock.NewFakeClock(time.Now()) + watchers := newTimeBucketWatchers(clock, defaultBookmarkFrequency) + now := clock.Now() + watchers.addWatcher(newWatcher(now.Add(10 * time.Second))) + watchers.addWatcher(newWatcher(now.Add(20 * time.Second))) + watchers.addWatcher(newWatcher(now.Add(20 * time.Second))) + + if len(watchers.watchersBuckets) != 2 { + t.Errorf("unexpected bucket size: %#v", watchers.watchersBuckets) + } + watchers0 := watchers.popExpiredWatchers() + if len(watchers0) != 0 { + t.Errorf("unexpected bucket size: %#v", watchers0) + } + + clock.Step(10 * time.Second) + watchers1 := watchers.popExpiredWatchers() + if len(watchers1) != 1 || len(watchers1[0]) != 1 { + t.Errorf("unexpected bucket size: %v", watchers1) + } + watchers1 = watchers.popExpiredWatchers() + if len(watchers1) != 0 { + t.Errorf("unexpected bucket size: %#v", watchers1) + } + + clock.Step(12 * time.Second) + watchers2 := watchers.popExpiredWatchers() + if len(watchers2) != 1 || len(watchers2[0]) != 2 { + t.Errorf("unexpected bucket size: %#v", watchers2) + } +} + +func makeWatchCacheEvent(rv uint64) *watchCacheEvent { + return &watchCacheEvent{ + Type: watch.Added, + Object: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pod-%d", rv), + ResourceVersion: fmt.Sprintf("%d", rv), + }, + }, + ResourceVersion: rv, + } +} + +// TestCacheWatcherDraining verifies the cacheWatcher.process goroutine is properly cleaned up when draining was requested +func TestCacheWatcherDraining(t *testing.T) { + var lock sync.RWMutex + var w *cacheWatcher + count := 0 + filter := func(string, labels.Set, fields.Set) bool { return true } + forget := func(drainWatcher bool) { + lock.Lock() + defer lock.Unlock() + count++ + w.setDrainInputBufferLocked(drainWatcher) + w.stopLocked() + } + initEvents := []*watchCacheEvent{ + makeWatchCacheEvent(5), + makeWatchCacheEvent(6), + } + w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "") + go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1) + if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) { + t.Fatal("failed adding an even to the watcher") + } + forget(true) // drain the watcher + + eventCount := 0 + for range w.ResultChan() { + eventCount++ + } + if eventCount != 3 { + t.Errorf("Unexpected number of objects received: %d, expected: 3", eventCount) + } + if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { + lock.RLock() + defer lock.RUnlock() + return count == 2, nil + }); err != nil { + t.Fatalf("expected forget() to be called twice, because processInterval should call Stop(): %v", err) + } +} + +// TestCacheWatcherDrainingRequestedButNotDrained verifies the cacheWatcher.process goroutine is properly cleaned up when draining was requested +// but the client never actually get any data +func TestCacheWatcherDrainingRequestedButNotDrained(t *testing.T) { + var lock sync.RWMutex + var w *cacheWatcher + count := 0 + filter := func(string, labels.Set, fields.Set) bool { return true } + forget := func(drainWatcher bool) { + lock.Lock() + defer lock.Unlock() + count++ + w.setDrainInputBufferLocked(drainWatcher) + w.stopLocked() + } + initEvents := []*watchCacheEvent{ + makeWatchCacheEvent(5), + makeWatchCacheEvent(6), + } + w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "") + go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1) + if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) { + t.Fatal("failed adding an even to the watcher") + } + forget(true) // drain the watcher + w.Stop() // client disconnected, timeout expired or ctx was actually closed + if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { + lock.RLock() + defer lock.RUnlock() + return count == 3, nil + }); err != nil { + t.Fatalf("expected forget() to be called three times, because processInterval should call Stop(): %v", err) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index d913154e000..8cf76f76567 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -34,7 +34,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/audit" @@ -42,9 +41,9 @@ import ( "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/cacher/metrics" utilfeature "k8s.io/apiserver/pkg/util/feature" - utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" "k8s.io/client-go/tools/cache" "k8s.io/component-base/tracing" + "k8s.io/klog/v2" "k8s.io/utils/clock" ) @@ -1198,340 +1197,3 @@ func (c *errWatcher) ResultChan() <-chan watch.Event { func (c *errWatcher) Stop() { // no-op } - -// cacheWatcher implements watch.Interface -// this is not thread-safe -type cacheWatcher struct { - input chan *watchCacheEvent - result chan watch.Event - done chan struct{} - filter filterWithAttrsFunc - stopped bool - forget func(bool) - versioner storage.Versioner - // The watcher will be closed by server after the deadline, - // save it here to send bookmark events before that. - deadline time.Time - allowWatchBookmarks bool - groupResource schema.GroupResource - - // human readable identifier that helps assigning cacheWatcher - // instance with request - identifier string - - // drainInputBuffer indicates whether we should delay closing this watcher - // and send all event in the input buffer. - drainInputBuffer bool -} - -func newCacheWatcher( - chanSize int, - filter filterWithAttrsFunc, - forget func(bool), - versioner storage.Versioner, - deadline time.Time, - allowWatchBookmarks bool, - groupResource schema.GroupResource, - identifier string, -) *cacheWatcher { - return &cacheWatcher{ - input: make(chan *watchCacheEvent, chanSize), - result: make(chan watch.Event, chanSize), - done: make(chan struct{}), - filter: filter, - stopped: false, - forget: forget, - versioner: versioner, - deadline: deadline, - allowWatchBookmarks: allowWatchBookmarks, - groupResource: groupResource, - identifier: identifier, - } -} - -// Implements watch.Interface. -func (c *cacheWatcher) ResultChan() <-chan watch.Event { - return c.result -} - -// Implements watch.Interface. -func (c *cacheWatcher) Stop() { - c.forget(false) -} - -// we rely on the fact that stopLocked is actually protected by Cacher.Lock() -func (c *cacheWatcher) stopLocked() { - if !c.stopped { - c.stopped = true - // stop without draining the input channel was requested. - if !c.drainInputBuffer { - close(c.done) - } - close(c.input) - } - - // Even if the watcher was already stopped, if it previously was - // using draining mode and it's not using it now we need to - // close the done channel now. Otherwise we could leak the - // processing goroutine if it will be trying to put more objects - // into result channel, the channel will be full and there will - // already be noone on the processing the events on the receiving end. - if !c.drainInputBuffer && !c.isDoneChannelClosedLocked() { - close(c.done) - } -} - -func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool { - select { - case c.input <- event: - return true - default: - return false - } -} - -// Nil timer means that add will not block (if it can't send event immediately, it will break the watcher) -func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool { - // Try to send the event immediately, without blocking. - if c.nonblockingAdd(event) { - return true - } - - closeFunc := func() { - // This means that we couldn't send event to that watcher. - // Since we don't want to block on it infinitely, - // we simply terminate it. - klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v", c.groupResource.String(), c.identifier, len(c.input), len(c.result)) - metrics.TerminatedWatchersCounter.WithLabelValues(c.groupResource.String()).Inc() - c.forget(false) - } - - if timer == nil { - closeFunc() - return false - } - - // OK, block sending, but only until timer fires. - select { - case c.input <- event: - return true - case <-timer.C: - closeFunc() - return false - } -} - -func (c *cacheWatcher) nextBookmarkTime(now time.Time, bookmarkFrequency time.Duration) (time.Time, bool) { - // We try to send bookmarks: - // - // (a) right before the watcher timeout - for now we simply set it 2s before - // the deadline - // - // (b) roughly every minute - // - // (b) gives us periodicity if the watch breaks due to unexpected - // conditions, (a) ensures that on timeout the watcher is as close to - // now as possible - this covers 99% of cases. - - heartbeatTime := now.Add(bookmarkFrequency) - if c.deadline.IsZero() { - // Timeout is set by our client libraries (e.g. reflector) as well as defaulted by - // apiserver if properly configured. So this shoudln't happen in practice. - return heartbeatTime, true - } - if pretimeoutTime := c.deadline.Add(-2 * time.Second); pretimeoutTime.Before(heartbeatTime) { - heartbeatTime = pretimeoutTime - } - - if heartbeatTime.Before(now) { - return time.Time{}, false - } - return heartbeatTime, true -} - -// setDrainInputBufferLocked if set to true indicates that we should delay closing this watcher -// until we send all events residing in the input buffer. -func (c *cacheWatcher) setDrainInputBufferLocked(drain bool) { - c.drainInputBuffer = drain -} - -// isDoneChannelClosed checks if c.done channel is closed -func (c *cacheWatcher) isDoneChannelClosedLocked() bool { - select { - case <-c.done: - return true - default: - } - return false -} - -func getMutableObject(object runtime.Object) runtime.Object { - if _, ok := object.(*cachingObject); ok { - // It is safe to return without deep-copy, because the underlying - // object will lazily perform deep-copy on the first try to change - // any of its fields. - return object - } - return object.DeepCopyObject() -} - -func updateResourceVersion(object runtime.Object, versioner storage.Versioner, resourceVersion uint64) { - if err := versioner.UpdateObject(object, resourceVersion); err != nil { - utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", resourceVersion, object, err)) - } -} - -func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event { - if event.Type == watch.Bookmark { - return &watch.Event{Type: watch.Bookmark, Object: event.Object.DeepCopyObject()} - } - - curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields) - oldObjPasses := false - if event.PrevObject != nil { - oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields) - } - if !curObjPasses && !oldObjPasses { - // Watcher is not interested in that object. - return nil - } - - switch { - case curObjPasses && !oldObjPasses: - return &watch.Event{Type: watch.Added, Object: getMutableObject(event.Object)} - case curObjPasses && oldObjPasses: - return &watch.Event{Type: watch.Modified, Object: getMutableObject(event.Object)} - case !curObjPasses && oldObjPasses: - // return a delete event with the previous object content, but with the event's resource version - oldObj := getMutableObject(event.PrevObject) - // We know that if oldObj is cachingObject (which can only be set via - // setCachingObjects), its resourceVersion is already set correctly and - // we don't need to update it. However, since cachingObject efficiently - // handles noop updates, we avoid this microoptimization here. - updateResourceVersion(oldObj, c.versioner, event.ResourceVersion) - return &watch.Event{Type: watch.Deleted, Object: oldObj} - } - - return nil -} - -// NOTE: sendWatchCacheEvent is assumed to not modify !!! -func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) { - watchEvent := c.convertToWatchEvent(event) - if watchEvent == nil { - // Watcher is not interested in that object. - return - } - - // We need to ensure that if we put event X to the c.result, all - // previous events were already put into it before, no matter whether - // c.done is close or not. - // Thus we cannot simply select from c.done and c.result and this - // would give us non-determinism. - // At the same time, we don't want to block infinitely on putting - // to c.result, when c.done is already closed. - // - // This ensures that with c.done already close, we at most once go - // into the next select after this. With that, no matter which - // statement we choose there, we will deliver only consecutive - // events. - select { - case <-c.done: - return - default: - } - - select { - case c.result <- *watchEvent: - case <-c.done: - } -} - -func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watchCacheInterval, resourceVersion uint64) { - defer utilruntime.HandleCrash() - defer close(c.result) - defer c.Stop() - - // Check how long we are processing initEvents. - // As long as these are not processed, we are not processing - // any incoming events, so if it takes long, we may actually - // block all watchers for some time. - // TODO: From the logs it seems that there happens processing - // times even up to 1s which is very long. However, this doesn't - // depend that much on the number of initEvents. E.g. from the - // 2000-node Kubemark run we have logs like this, e.g.: - // ... processing 13862 initEvents took 66.808689ms - // ... processing 14040 initEvents took 993.532539ms - // We should understand what is blocking us in those cases (e.g. - // is it lack of CPU, network, or sth else) and potentially - // consider increase size of result buffer in those cases. - const initProcessThreshold = 500 * time.Millisecond - startTime := time.Now() - - initEventCount := 0 - for { - event, err := cacheInterval.Next() - if err != nil { - // An error indicates that the cache interval - // has been invalidated and can no longer serve - // events. - // - // Initially we considered sending an "out-of-history" - // Error event in this case, but because historically - // such events weren't sent out of the watchCache, we - // decided not to. This is still ok, because on watch - // closure, the watcher will try to re-instantiate the - // watch and then will get an explicit "out-of-history" - // window. There is potential for optimization, but for - // now, in order to be on the safe side and not break - // custom clients, the cost of it is something that we - // are fully accepting. - klog.Warningf("couldn't retrieve watch event to serve: %#v", err) - return - } - if event == nil { - break - } - c.sendWatchCacheEvent(event) - // With some events already sent, update resourceVersion so that - // events that were buffered and not yet processed won't be delivered - // to this watcher second time causing going back in time. - resourceVersion = event.ResourceVersion - initEventCount++ - } - - if initEventCount > 0 { - metrics.InitCounter.WithLabelValues(c.groupResource.String()).Add(float64(initEventCount)) - } - processingTime := time.Since(startTime) - if processingTime > initProcessThreshold { - klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", initEventCount, c.groupResource, c.identifier, processingTime) - } - - c.process(ctx, resourceVersion) -} - -func (c *cacheWatcher) process(ctx context.Context, resourceVersion uint64) { - // At this point we already start processing incoming watch events. - // However, the init event can still be processed because their serialization - // and sending to the client happens asynchrnously. - // TODO: As describe in the KEP, we would like to estimate that by delaying - // the initialization signal proportionally to the number of events to - // process, but we're leaving this to the tuning phase. - utilflowcontrol.WatchInitialized(ctx) - - for { - select { - case event, ok := <-c.input: - if !ok { - return - } - // only send events newer than resourceVersion - if event.ResourceVersion > resourceVersion { - c.sendWatchCacheEvent(event) - } - case <-ctx.Done(): - return - } - } -} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index a06ab757e6e..b95cf10513d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -27,7 +27,6 @@ import ( "testing" "time" - v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -36,7 +35,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" - "k8s.io/apimachinery/pkg/util/diff" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" @@ -45,174 +43,8 @@ import ( "k8s.io/apiserver/pkg/storage" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" "k8s.io/utils/clock" - testingclock "k8s.io/utils/clock/testing" ) -// verifies the cacheWatcher.process goroutine is properly cleaned up even if -// the writes to cacheWatcher.result channel is blocked. -func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { - var lock sync.RWMutex - var w *cacheWatcher - count := 0 - filter := func(string, labels.Set, fields.Set) bool { return true } - forget := func(drainWatcher bool) { - lock.Lock() - defer lock.Unlock() - count++ - // forget() has to stop the watcher, as only stopping the watcher - // triggers stopping the process() goroutine which we are in the - // end waiting for in this test. - w.setDrainInputBufferLocked(drainWatcher) - w.stopLocked() - } - initEvents := []*watchCacheEvent{ - {Object: &v1.Pod{}}, - {Object: &v1.Pod{}}, - } - // set the size of the buffer of w.result to 0, so that the writes to - // w.result is blocked. - w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "") - go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0) - w.Stop() - if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { - lock.RLock() - defer lock.RUnlock() - return count == 2, nil - }); err != nil { - t.Fatalf("expected forget() to be called twice, because sendWatchCacheEvent should not be blocked by the result channel: %v", err) - } -} - -func TestCacheWatcherHandlesFiltering(t *testing.T) { - filter := func(_ string, _ labels.Set, field fields.Set) bool { - return field["spec.nodeName"] == "host" - } - forget := func(bool) {} - - testCases := []struct { - events []*watchCacheEvent - expected []watch.Event - }{ - // properly handle starting with the filter, then being deleted, then re-added - { - events: []*watchCacheEvent{ - { - Type: watch.Added, - Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, - ObjFields: fields.Set{"spec.nodeName": "host"}, - ResourceVersion: 1, - }, - { - Type: watch.Modified, - PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, - PrevObjFields: fields.Set{"spec.nodeName": "host"}, - Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, - ObjFields: fields.Set{"spec.nodeName": ""}, - ResourceVersion: 2, - }, - { - Type: watch.Modified, - PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, - PrevObjFields: fields.Set{"spec.nodeName": ""}, - Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}, - ObjFields: fields.Set{"spec.nodeName": "host"}, - ResourceVersion: 3, - }, - }, - expected: []watch.Event{ - {Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}}, - {Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}}, - {Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}}, - }, - }, - // properly handle ignoring changes prior to the filter, then getting added, then deleted - { - events: []*watchCacheEvent{ - { - Type: watch.Added, - Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, - ObjFields: fields.Set{"spec.nodeName": ""}, - ResourceVersion: 1, - }, - { - Type: watch.Modified, - PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, - PrevObjFields: fields.Set{"spec.nodeName": ""}, - Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, - ObjFields: fields.Set{"spec.nodeName": ""}, - ResourceVersion: 2, - }, - { - Type: watch.Modified, - PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, - PrevObjFields: fields.Set{"spec.nodeName": ""}, - Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}, - ObjFields: fields.Set{"spec.nodeName": "host"}, - ResourceVersion: 3, - }, - { - Type: watch.Modified, - PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}, - PrevObjFields: fields.Set{"spec.nodeName": "host"}, - Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}, - ObjFields: fields.Set{"spec.nodeName": "host"}, - ResourceVersion: 4, - }, - { - Type: watch.Modified, - PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}, - PrevObjFields: fields.Set{"spec.nodeName": "host"}, - Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}, - ObjFields: fields.Set{"spec.nodeName": ""}, - ResourceVersion: 5, - }, - { - Type: watch.Modified, - PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}, - PrevObjFields: fields.Set{"spec.nodeName": ""}, - Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "6"}}, - ObjFields: fields.Set{"spec.nodeName": ""}, - ResourceVersion: 6, - }, - }, - expected: []watch.Event{ - {Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}}, - {Type: watch.Modified, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}}, - {Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}}, - }, - }, - } - -TestCase: - for i, testCase := range testCases { - // set the size of the buffer of w.result to 0, so that the writes to - // w.result is blocked. - for j := range testCase.events { - testCase.events[j].ResourceVersion = uint64(j) + 1 - } - - w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "") - go w.processInterval(context.Background(), intervalFromEvents(testCase.events), 0) - - ch := w.ResultChan() - for j, event := range testCase.expected { - e := <-ch - if !reflect.DeepEqual(event, e) { - t.Errorf("%d: unexpected event %d: %s", i, j, diff.ObjectReflectDiff(event, e)) - break TestCase - } - } - select { - case obj, ok := <-ch: - t.Errorf("%d: unexpected excess event: %#v %t", i, obj, ok) - break TestCase - default: - } - w.setDrainInputBufferLocked(false) - w.stopLocked() - } -} - type testVersioner struct{} func (testVersioner) UpdateObject(obj runtime.Object, resourceVersion uint64) error { @@ -567,89 +399,6 @@ func TestWatcherNotGoingBackInTime(t *testing.T) { } } -func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) { - var w *cacheWatcher - done := make(chan struct{}) - filter := func(string, labels.Set, fields.Set) bool { return true } - forget := func(drainWatcher bool) { - w.setDrainInputBufferLocked(drainWatcher) - w.stopLocked() - done <- struct{}{} - } - - maxRetriesToProduceTheRaceCondition := 1000 - // Simulating the timer is fired and stopped concurrently by set time - // timeout to zero and run the Stop goroutine concurrently. - // May sure that the watch will not be blocked on Stop. - for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ { - w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "") - go w.Stop() - select { - case <-done: - case <-time.After(time.Second): - t.Fatal("stop is blocked when the timer is fired concurrently") - } - } - - deadline := time.Now().Add(time.Hour) - // After that, verifies the cacheWatcher.process goroutine works correctly. - for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ { - w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, schema.GroupResource{Resource: "pods"}, "") - w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)} - ctx, cancel := context.WithDeadline(context.Background(), deadline) - defer cancel() - go w.processInterval(ctx, intervalFromEvents(nil), 0) - select { - case <-w.ResultChan(): - case <-time.After(time.Second): - t.Fatal("expected received a event on ResultChan") - } - w.setDrainInputBufferLocked(false) - w.stopLocked() - } -} - -func TestCacheWatcherStoppedOnDestroy(t *testing.T) { - backingStorage := &dummyStorage{} - cacher, _, err := newTestCacher(backingStorage) - if err != nil { - t.Fatalf("Couldn't create cacher: %v", err) - } - defer cacher.Stop() - - // Wait until cacher is initialized. - if err := cacher.ready.wait(); err != nil { - t.Fatalf("unexpected error waiting for the cache to be ready") - } - - w, err := cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) - if err != nil { - t.Fatalf("Failed to create watch: %v", err) - } - - watchClosed := make(chan struct{}) - go func() { - defer close(watchClosed) - for event := range w.ResultChan() { - switch event.Type { - case watch.Added, watch.Modified, watch.Deleted: - // ok - default: - t.Errorf("unexpected event %#v", event) - } - } - }() - - cacher.Stop() - - select { - case <-watchClosed: - case <-time.After(wait.ForeverTestTimeout): - t.Errorf("timed out waiting for watch to close") - } - -} - func TestCacheDontAcceptRequestsStopped(t *testing.T) { backingStorage := &dummyStorage{} cacher, _, err := newTestCacher(backingStorage) @@ -712,48 +461,6 @@ func TestCacheDontAcceptRequestsStopped(t *testing.T) { } } -func TestTimeBucketWatchersBasic(t *testing.T) { - filter := func(_ string, _ labels.Set, _ fields.Set) bool { - return true - } - forget := func(bool) {} - - newWatcher := func(deadline time.Time) *cacheWatcher { - return newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "") - } - - clock := testingclock.NewFakeClock(time.Now()) - watchers := newTimeBucketWatchers(clock, defaultBookmarkFrequency) - now := clock.Now() - watchers.addWatcher(newWatcher(now.Add(10 * time.Second))) - watchers.addWatcher(newWatcher(now.Add(20 * time.Second))) - watchers.addWatcher(newWatcher(now.Add(20 * time.Second))) - - if len(watchers.watchersBuckets) != 2 { - t.Errorf("unexpected bucket size: %#v", watchers.watchersBuckets) - } - watchers0 := watchers.popExpiredWatchers() - if len(watchers0) != 0 { - t.Errorf("unexpected bucket size: %#v", watchers0) - } - - clock.Step(10 * time.Second) - watchers1 := watchers.popExpiredWatchers() - if len(watchers1) != 1 || len(watchers1[0]) != 1 { - t.Errorf("unexpected bucket size: %v", watchers1) - } - watchers1 = watchers.popExpiredWatchers() - if len(watchers1) != 0 { - t.Errorf("unexpected bucket size: %#v", watchers1) - } - - clock.Step(12 * time.Second) - watchers2 := watchers.popExpiredWatchers() - if len(watchers2) != 1 || len(watchers2[0]) != 2 { - t.Errorf("unexpected bucket size: %#v", watchers2) - } -} - func TestCacherNoLeakWithMultipleWatchers(t *testing.T) { backingStorage := &dummyStorage{} cacher, _, err := newTestCacher(backingStorage) @@ -1630,90 +1337,3 @@ func TestCacheIntervalInvalidationStopsWatch(t *testing.T) { t.Errorf("unexpected number of events received, expected: %d, got: %d", bufferSize+1, received) } } - -func makeWatchCacheEvent(rv uint64) *watchCacheEvent { - return &watchCacheEvent{ - Type: watch.Added, - Object: &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("pod-%d", rv), - ResourceVersion: fmt.Sprintf("%d", rv), - }, - }, - ResourceVersion: rv, - } -} - -// TestCacheWatcherDraining verifies the cacheWatcher.process goroutine is properly cleaned up when draining was requested -func TestCacheWatcherDraining(t *testing.T) { - var lock sync.RWMutex - var w *cacheWatcher - count := 0 - filter := func(string, labels.Set, fields.Set) bool { return true } - forget := func(drainWatcher bool) { - lock.Lock() - defer lock.Unlock() - count++ - w.setDrainInputBufferLocked(drainWatcher) - w.stopLocked() - } - initEvents := []*watchCacheEvent{ - makeWatchCacheEvent(5), - makeWatchCacheEvent(6), - } - w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "") - go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1) - if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) { - t.Fatal("failed adding an even to the watcher") - } - forget(true) // drain the watcher - - eventCount := 0 - for range w.ResultChan() { - eventCount++ - } - if eventCount != 3 { - t.Errorf("Unexpected number of objects received: %d, expected: 3", eventCount) - } - if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { - lock.RLock() - defer lock.RUnlock() - return count == 2, nil - }); err != nil { - t.Fatalf("expected forget() to be called twice, because processInterval should call Stop(): %v", err) - } -} - -// TestCacheWatcherDrainingRequestedButNotDrained verifies the cacheWatcher.process goroutine is properly cleaned up when draining was requested -// but the client never actually get any data -func TestCacheWatcherDrainingRequestedButNotDrained(t *testing.T) { - var lock sync.RWMutex - var w *cacheWatcher - count := 0 - filter := func(string, labels.Set, fields.Set) bool { return true } - forget := func(drainWatcher bool) { - lock.Lock() - defer lock.Unlock() - count++ - w.setDrainInputBufferLocked(drainWatcher) - w.stopLocked() - } - initEvents := []*watchCacheEvent{ - makeWatchCacheEvent(5), - makeWatchCacheEvent(6), - } - w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "") - go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1) - if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) { - t.Fatal("failed adding an even to the watcher") - } - forget(true) // drain the watcher - w.Stop() // client disconnected, timeout expired or ctx was actually closed - if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { - lock.RLock() - defer lock.RUnlock() - return count == 3, nil - }); err != nil { - t.Fatalf("expected forget() to be called three times, because processInterval should call Stop(): %v", err) - } -}