Merge pull request #106005 from MadhavJivrajani/refactor-cacher-process

cacher: refactor process() function
This commit is contained in:
Kubernetes Prow Robot 2021-10-29 04:40:52 -07:00 committed by GitHub
commit 5cf54fe762
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 15 additions and 12 deletions

View File

@ -516,13 +516,6 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
return newErrWatcher(err), nil return newErrWatcher(err), nil
} }
// With some events already sent, update resourceVersion so that
// events that were buffered and not yet processed won't be delivered
// to this watcher second time causing going back in time.
if len(initEvents) > 0 {
watchRV = initEvents[len(initEvents)-1].ResourceVersion
}
func() { func() {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
@ -537,7 +530,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
c.watcherIdx++ c.watcherIdx++
}() }()
go watcher.process(ctx, initEvents, watchRV) go watcher.processEvents(ctx, initEvents, watchRV)
return watcher, nil return watcher, nil
} }
@ -1392,7 +1385,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
} }
} }
func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEvent, resourceVersion uint64) { func (c *cacheWatcher) processEvents(ctx context.Context, initEvents []*watchCacheEvent, resourceVersion uint64) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
// Check how long we are processing initEvents. // Check how long we are processing initEvents.
@ -1413,15 +1406,25 @@ func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEven
for _, event := range initEvents { for _, event := range initEvents {
c.sendWatchCacheEvent(event) c.sendWatchCacheEvent(event)
} }
objType := c.objectType.String() objType := c.objectType.String()
if len(initEvents) > 0 { if len(initEvents) > 0 {
initCounter.WithLabelValues(objType).Add(float64(len(initEvents))) initCounter.WithLabelValues(objType).Add(float64(len(initEvents)))
// With some events already sent, update resourceVersion
// so that events that were buffered and not yet processed
// won't be delivered to this watcher second time causing
// going back in time.
resourceVersion = initEvents[len(initEvents)-1].ResourceVersion
} }
processingTime := time.Since(startTime) processingTime := time.Since(startTime)
if processingTime > initProcessThreshold { if processingTime > initProcessThreshold {
klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", len(initEvents), objType, c.identifier, processingTime) klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", len(initEvents), objType, c.identifier, processingTime)
} }
c.process(ctx, resourceVersion)
}
func (c *cacheWatcher) process(ctx context.Context, resourceVersion uint64) {
// At this point we already start processing incoming watch events. // At this point we already start processing incoming watch events.
// However, the init event can still be processed because their serialization // However, the init event can still be processed because their serialization
// and sending to the client happens asynchrnously. // and sending to the client happens asynchrnously.

View File

@ -74,7 +74,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
// set the size of the buffer of w.result to 0, so that the writes to // set the size of the buffer of w.result to 0, so that the writes to
// w.result is blocked. // w.result is blocked.
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "") w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "")
go w.process(context.Background(), initEvents, 0) go w.processEvents(context.Background(), initEvents, 0)
w.Stop() w.Stop()
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
lock.RLock() lock.RLock()
@ -194,7 +194,7 @@ TestCase:
} }
w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "") w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "")
go w.process(context.Background(), testCase.events, 0) go w.processEvents(context.Background(), testCase.events, 0)
ch := w.ResultChan() ch := w.ResultChan()
for j, event := range testCase.expected { for j, event := range testCase.expected {
@ -545,7 +545,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, objectType, "") w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, objectType, "")
w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)} w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)}
ctx, _ := context.WithDeadline(context.Background(), deadline) ctx, _ := context.WithDeadline(context.Background(), deadline)
go w.process(ctx, nil, 0) go w.processEvents(ctx, nil, 0)
select { select {
case <-w.ResultChan(): case <-w.ResultChan():
case <-time.After(time.Second): case <-time.After(time.Second):