From 7f2aa7ad3a61a52d0a780f904b291d063399c28a Mon Sep 17 00:00:00 2001 From: Madhav Jivrajani Date: Tue, 5 Oct 2021 15:52:09 +0530 Subject: [PATCH] cacher: Use watchCacheInterval to reduce lock contention - Modify GetAllEventsSinceThreadUnsafe to return a watchCacheInterval - Modify Watch() to compute a watchCacheInterval rather than a slice of all "initEvents" and pass this interval to process() - Use interval::Next() to obtain events to process rather than obtain them all at once - Modify tests accordingly to use interval - On invalidation, stop processing and stop the watch. - Make indexValidator injectable for testing - Add unit test for verifying the behaviour of stopping the watch. Signed-off-by: Madhav Jivrajani --- .../apiserver/pkg/storage/cacher/cacher.go | 54 ++++++++---- .../storage/cacher/cacher_whitebox_test.go | 88 ++++++++++++++++++- .../pkg/storage/cacher/watch_cache.go | 39 +++----- .../pkg/storage/cacher/watch_cache_test.go | 28 +++++- 4 files changed, 163 insertions(+), 46 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index e89614ad325..930939cb763 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -509,7 +509,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions // underlying watchCache is calling processEvent under its lock. c.watchCache.RLock() defer c.watchCache.RUnlock() - initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV) + cacheInterval, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV) if err != nil { // To match the uncached watch implementation, once we have passed authn/authz/admission, // and successfully parsed a resource version, other errors must fail with a watch event of type ERROR, @@ -531,7 +531,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions c.watcherIdx++ }() - go watcher.processEvents(ctx, initEvents, watchRV) + go watcher.processInterval(ctx, cacheInterval, watchRV) return watcher, nil } @@ -1333,7 +1333,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) { // would give us non-determinism. // At the same time, we don't want to block infinitely on putting // to c.result, when c.done is already closed. - + // // This ensures that with c.done already close, we at most once go // into the next select after this. With that, no matter which // statement we choose there, we will deliver only consecutive @@ -1350,8 +1350,10 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) { } } -func (c *cacheWatcher) processEvents(ctx context.Context, initEvents []*watchCacheEvent, resourceVersion uint64) { +func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watchCacheInterval, resourceVersion uint64) { defer utilruntime.HandleCrash() + defer close(c.result) + defer c.Stop() // Check how long we are processing initEvents. // As long as these are not processed, we are not processing @@ -1368,22 +1370,46 @@ func (c *cacheWatcher) processEvents(ctx context.Context, initEvents []*watchCac // consider increase size of result buffer in those cases. const initProcessThreshold = 500 * time.Millisecond startTime := time.Now() - for _, event := range initEvents { + + initEventCount := 0 + for { + event, err := cacheInterval.Next() + if err != nil { + // An error indicates that the cache interval + // has been invalidated and can no longer serve + // events. + // + // Initially we considered sending an "out-of-history" + // Error event in this case, but because historically + // such events weren't sent out of the watchCache, we + // decided not to. This is still ok, because on watch + // closure, the watcher will try to re-instantiate the + // watch and then will get an explicit "out-of-history" + // window. There is potential for optimization, but for + // now, in order to be on the safe side and not break + // custom clients, the cost of it is something that we + // are fully accepting. + klog.Warningf("couldn't retrieve watch event to serve: %#v", err) + return + } + if event == nil { + break + } c.sendWatchCacheEvent(event) + // With some events already sent, update resourceVersion so that + // events that were buffered and not yet processed won't be delivered + // to this watcher second time causing going back in time. + resourceVersion = event.ResourceVersion + initEventCount++ } objType := c.objectType.String() - if len(initEvents) > 0 { - initCounter.WithLabelValues(objType).Add(float64(len(initEvents))) - // With some events already sent, update resourceVersion - // so that events that were buffered and not yet processed - // won't be delivered to this watcher second time causing - // going back in time. - resourceVersion = initEvents[len(initEvents)-1].ResourceVersion + if initEventCount > 0 { + initCounter.WithLabelValues(objType).Add(float64(initEventCount)) } processingTime := time.Since(startTime) if processingTime > initProcessThreshold { - klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", len(initEvents), objType, c.identifier, processingTime) + klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", initEventCount, objType, c.identifier, processingTime) } c.process(ctx, resourceVersion) @@ -1398,8 +1424,6 @@ func (c *cacheWatcher) process(ctx context.Context, resourceVersion uint64) { // process, but we're leaving this to the tuning phase. utilflowcontrol.WatchInitialized(ctx) - defer close(c.result) - defer c.Stop() for { select { case event, ok := <-c.input: diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index bae4b4ca1fd..924272f91b8 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -74,7 +74,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { // set the size of the buffer of w.result to 0, so that the writes to // w.result is blocked. w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "") - go w.processEvents(context.Background(), initEvents, 0) + go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0) w.Stop() if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { lock.RLock() @@ -194,7 +194,7 @@ TestCase: } w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "") - go w.processEvents(context.Background(), testCase.events, 0) + go w.processInterval(context.Background(), intervalFromEvents(testCase.events), 0) ch := w.ResultChan() for j, event := range testCase.expected { @@ -542,7 +542,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) { w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, objectType, "") w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)} ctx, _ := context.WithDeadline(context.Background(), deadline) - go w.processEvents(ctx, nil, 0) + go w.processInterval(ctx, intervalFromEvents(nil), 0) select { case <-w.ResultChan(): case <-time.After(time.Second): @@ -1412,3 +1412,85 @@ func TestCachingObjects(t *testing.T) { t.Run("single watcher", func(t *testing.T) { testCachingObjects(t, 1) }) t.Run("many watcher", func(t *testing.T) { testCachingObjects(t, 3) }) } + +func TestCacheIntervalInvalidationStopsWatch(t *testing.T) { + backingStorage := &dummyStorage{} + cacher, _, err := newTestCacher(backingStorage) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + defer cacher.Stop() + + // Wait until cacher is initialized. + cacher.ready.wait() + // Ensure there is enough budget for slow processing since + // the entire watch cache is going to be served through the + // interval and events won't be popped from the cacheWatcher's + // input channel until much later. + cacher.dispatchTimeoutBudget.returnUnused(100 * time.Millisecond) + + // We define a custom index validator such that the interval is + // able to serve the first bufferSize elements successfully, but + // on trying to fill it's buffer again, the indexValidator simulates + // an invalidation leading to the watch being closed and the number + // of events we actually process to be bufferSize, each event of + // type watch.Added. + valid := true + invalidateCacheInterval := func() { + valid = false + } + once := sync.Once{} + indexValidator := func(index int) bool { + isValid := valid && (index >= cacher.watchCache.startIndex) + once.Do(invalidateCacheInterval) + return isValid + } + cacher.watchCache.indexValidator = indexValidator + + makePod := func(i int) *examplev1.Pod { + return &examplev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pod-%d", 1000+i), + Namespace: "ns", + ResourceVersion: fmt.Sprintf("%d", 1000+i), + }, + } + } + + // 250 is arbitrary, point is to have enough elements such that + // it generates more than bufferSize number of events allowing + // us to simulate the invalidation of the cache interval. + totalPods := 250 + for i := 0; i < totalPods; i++ { + err := cacher.watchCache.Add(makePod(i)) + if err != nil { + t.Errorf("error: %v", err) + } + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ + ResourceVersion: "999", + Predicate: storage.Everything, + }) + if err != nil { + t.Fatalf("Failed to create watch: %v", err) + } + defer w.Stop() + + received := 0 + resChan := w.ResultChan() + for event := range resChan { + received++ + t.Logf("event type: %v, events received so far: %d", event.Type, received) + if event.Type != watch.Added { + t.Errorf("unexpected event type, expected: %s, got: %s, event: %v", watch.Added, event.Type, event) + } + } + // Since the watch is stopped after the interval is invalidated, + // we should have processed exactly bufferSize number of elements. + if received != bufferSize { + t.Errorf("unexpected number of events received, expected: %d, got: %d", bufferSize+1, received) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 42d84653324..b9dba258d3d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -189,6 +189,9 @@ type watchCache struct { // cacher's objectType. objectType reflect.Type + + // For testing cache interval invalidation. + indexValidator indexValidator } func newWatchCache( @@ -219,6 +222,8 @@ func newWatchCache( objType := objectType.String() watchCacheCapacity.WithLabelValues(objType).Set(float64(wc.capacity)) wc.cond = sync.NewCond(wc.RLocker()) + wc.indexValidator = wc.isIndexValidLocked + return wc } @@ -568,7 +573,7 @@ func (w *watchCache) SetOnReplace(onReplace func()) { w.onReplace = onReplace } -func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) { +func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) (*watchCacheInterval, error) { size := w.endIndex - w.startIndex var oldest uint64 switch { @@ -594,27 +599,11 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w // current state and only then start watching from that point. // // TODO: In v2 api, we should stop returning the current state - #13969. - allItems := w.store.List() - result := make([]*watchCacheEvent, len(allItems)) - for i, item := range allItems { - elem, ok := item.(*storeElement) - if !ok { - return nil, fmt.Errorf("not a storeElement: %v", elem) - } - objLabels, objFields, err := w.getAttrsFunc(elem.Object) - if err != nil { - return nil, err - } - result[i] = &watchCacheEvent{ - Type: watch.Added, - Object: elem.Object, - ObjLabels: objLabels, - ObjFields: objFields, - Key: elem.Key, - ResourceVersion: w.resourceVersion, - } + ci, err := newCacheIntervalFromStore(w.resourceVersion, w.store, w.getAttrsFunc) + if err != nil { + return nil, err } - return result, nil + return ci, nil } if resourceVersion < oldest-1 { return nil, errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1)) @@ -625,11 +614,11 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w return w.cache[(w.startIndex+i)%w.capacity].ResourceVersion > resourceVersion } first := sort.Search(size, f) - result := make([]*watchCacheEvent, size-first) - for i := 0; i < size-first; i++ { - result[i] = w.cache[(w.startIndex+first+i)%w.capacity] + indexerFunc := func(i int) *watchCacheEvent { + return w.cache[i%w.capacity] } - return result, nil + ci := newCacheInterval(w.startIndex+first, w.endIndex, indexerFunc, w.indexValidator, &w.RWMutex) + return ci, nil } func (w *watchCache) Resync() error { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index 58ddfddab6a..6a4a62dcc8f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -72,9 +72,31 @@ type testWatchCache struct { } func (w *testWatchCache) getAllEventsSince(resourceVersion uint64) ([]*watchCacheEvent, error) { - w.watchCache.RLock() - defer w.watchCache.RUnlock() - return w.watchCache.GetAllEventsSinceThreadUnsafe(resourceVersion) + cacheInterval, err := w.getCacheIntervalForEvents(resourceVersion) + if err != nil { + return nil, err + } + + result := []*watchCacheEvent{} + for { + event, err := cacheInterval.Next() + if err != nil { + return nil, err + } + if event == nil { + break + } + result = append(result, event) + } + + return result, nil +} + +func (w *testWatchCache) getCacheIntervalForEvents(resourceVersion uint64) (*watchCacheInterval, error) { + w.RLock() + defer w.RUnlock() + + return w.GetAllEventsSinceThreadUnsafe(resourceVersion) } // newTestWatchCache just adds a fake clock.