From aab7cd3d8a66f425022ca5b2a2bd0d3019efe526 Mon Sep 17 00:00:00 2001 From: Madhav Jivrajani Date: Fri, 29 Oct 2021 14:41:10 +0530 Subject: [PATCH] cacher: refactor process() function Split process() function into processEvents() and process(). This is done in anticipation of GetAllEventsSinceThreadUnsafe() returning an entity using which events can be constructed and not the events itself. Subsequently, this commit also moves updating resource version for initEvents from Watch() to the processEvents() func. Signed-off-by: Madhav Jivrajani --- .../apiserver/pkg/storage/cacher/cacher.go | 21 +++++++++++-------- .../storage/cacher/cacher_whitebox_test.go | 6 +++--- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 9540a529fc4..217fa72588c 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -516,13 +516,6 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions return newErrWatcher(err), nil } - // With some events already sent, update resourceVersion so that - // events that were buffered and not yet processed won't be delivered - // to this watcher second time causing going back in time. - if len(initEvents) > 0 { - watchRV = initEvents[len(initEvents)-1].ResourceVersion - } - func() { c.Lock() defer c.Unlock() @@ -537,7 +530,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions c.watcherIdx++ }() - go watcher.process(ctx, initEvents, watchRV) + go watcher.processEvents(ctx, initEvents, watchRV) return watcher, nil } @@ -1392,7 +1385,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) { } } -func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEvent, resourceVersion uint64) { +func (c *cacheWatcher) processEvents(ctx context.Context, initEvents []*watchCacheEvent, resourceVersion uint64) { defer utilruntime.HandleCrash() // Check how long we are processing initEvents. @@ -1413,15 +1406,25 @@ func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEven for _, event := range initEvents { c.sendWatchCacheEvent(event) } + objType := c.objectType.String() if len(initEvents) > 0 { initCounter.WithLabelValues(objType).Add(float64(len(initEvents))) + // With some events already sent, update resourceVersion + // so that events that were buffered and not yet processed + // won't be delivered to this watcher second time causing + // going back in time. + resourceVersion = initEvents[len(initEvents)-1].ResourceVersion } processingTime := time.Since(startTime) if processingTime > initProcessThreshold { klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", len(initEvents), objType, c.identifier, processingTime) } + c.process(ctx, resourceVersion) +} + +func (c *cacheWatcher) process(ctx context.Context, resourceVersion uint64) { // At this point we already start processing incoming watch events. // However, the init event can still be processed because their serialization // and sending to the client happens asynchrnously. diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index ca65b9f6352..88819285f5a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -74,7 +74,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { // set the size of the buffer of w.result to 0, so that the writes to // w.result is blocked. w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "") - go w.process(context.Background(), initEvents, 0) + go w.processEvents(context.Background(), initEvents, 0) w.Stop() if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { lock.RLock() @@ -194,7 +194,7 @@ TestCase: } w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "") - go w.process(context.Background(), testCase.events, 0) + go w.processEvents(context.Background(), testCase.events, 0) ch := w.ResultChan() for j, event := range testCase.expected { @@ -545,7 +545,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) { w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, objectType, "") w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)} ctx, _ := context.WithDeadline(context.Background(), deadline) - go w.process(ctx, nil, 0) + go w.processEvents(ctx, nil, 0) select { case <-w.ResultChan(): case <-time.After(time.Second):