cacher: refactor process() function

Split process() function into processEvents() and process().
This is done in anticipation of GetAllEventsSinceThreadUnsafe()
returning an entity using which events can be constructed and
not the events itself.

Subsequently, this commit also moves updating resource version
for initEvents from Watch() to the processEvents() func.

Signed-off-by: Madhav Jivrajani <madhav.jiv@gmail.com>
This commit is contained in:
Madhav Jivrajani 2021-10-29 14:41:10 +05:30
parent bb24c265ce
commit aab7cd3d8a
2 changed files with 15 additions and 12 deletions

View File

@ -516,13 +516,6 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
return newErrWatcher(err), nil
}
// With some events already sent, update resourceVersion so that
// events that were buffered and not yet processed won't be delivered
// to this watcher second time causing going back in time.
if len(initEvents) > 0 {
watchRV = initEvents[len(initEvents)-1].ResourceVersion
}
func() {
c.Lock()
defer c.Unlock()
@ -537,7 +530,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
c.watcherIdx++
}()
go watcher.process(ctx, initEvents, watchRV)
go watcher.processEvents(ctx, initEvents, watchRV)
return watcher, nil
}
@ -1392,7 +1385,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
}
}
func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEvent, resourceVersion uint64) {
func (c *cacheWatcher) processEvents(ctx context.Context, initEvents []*watchCacheEvent, resourceVersion uint64) {
defer utilruntime.HandleCrash()
// Check how long we are processing initEvents.
@ -1413,15 +1406,25 @@ func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEven
for _, event := range initEvents {
c.sendWatchCacheEvent(event)
}
objType := c.objectType.String()
if len(initEvents) > 0 {
initCounter.WithLabelValues(objType).Add(float64(len(initEvents)))
// With some events already sent, update resourceVersion
// so that events that were buffered and not yet processed
// won't be delivered to this watcher second time causing
// going back in time.
resourceVersion = initEvents[len(initEvents)-1].ResourceVersion
}
processingTime := time.Since(startTime)
if processingTime > initProcessThreshold {
klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", len(initEvents), objType, c.identifier, processingTime)
}
c.process(ctx, resourceVersion)
}
func (c *cacheWatcher) process(ctx context.Context, resourceVersion uint64) {
// At this point we already start processing incoming watch events.
// However, the init event can still be processed because their serialization
// and sending to the client happens asynchrnously.

View File

@ -74,7 +74,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
// set the size of the buffer of w.result to 0, so that the writes to
// w.result is blocked.
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "")
go w.process(context.Background(), initEvents, 0)
go w.processEvents(context.Background(), initEvents, 0)
w.Stop()
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
lock.RLock()
@ -194,7 +194,7 @@ TestCase:
}
w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "")
go w.process(context.Background(), testCase.events, 0)
go w.processEvents(context.Background(), testCase.events, 0)
ch := w.ResultChan()
for j, event := range testCase.expected {
@ -545,7 +545,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, objectType, "")
w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)}
ctx, _ := context.WithDeadline(context.Background(), deadline)
go w.process(ctx, nil, 0)
go w.processEvents(ctx, nil, 0)
select {
case <-w.ResultChan():
case <-time.After(time.Second):