cacher: Use watchCacheInterval to reduce lock contention

- Modify GetAllEventsSinceThreadUnsafe to return a watchCacheInterval
- Modify Watch() to compute a watchCacheInterval rather than a slice
  of all "initEvents" and pass this interval to process()
- Use interval::Next() to obtain events to process rather than obtain
  them all at once
- Modify tests accordingly to use interval
- On invalidation, stop processing and stop the watch.
- Make indexValidator injectable for testing
- Add unit test for verifying the behaviour of stopping the watch.

Signed-off-by: Madhav Jivrajani <madhav.jiv@gmail.com>
This commit is contained in:
Madhav Jivrajani 2021-10-05 15:52:09 +05:30
parent c225bdd552
commit 7f2aa7ad3a
4 changed files with 163 additions and 46 deletions

View File

@ -509,7 +509,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
// underlying watchCache is calling processEvent under its lock.
c.watchCache.RLock()
defer c.watchCache.RUnlock()
initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
cacheInterval, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
if err != nil {
// To match the uncached watch implementation, once we have passed authn/authz/admission,
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
@ -531,7 +531,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
c.watcherIdx++
}()
go watcher.processEvents(ctx, initEvents, watchRV)
go watcher.processInterval(ctx, cacheInterval, watchRV)
return watcher, nil
}
@ -1333,7 +1333,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
// would give us non-determinism.
// At the same time, we don't want to block infinitely on putting
// to c.result, when c.done is already closed.
//
// This ensures that with c.done already close, we at most once go
// into the next select after this. With that, no matter which
// statement we choose there, we will deliver only consecutive
@ -1350,8 +1350,10 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
}
}
func (c *cacheWatcher) processEvents(ctx context.Context, initEvents []*watchCacheEvent, resourceVersion uint64) {
func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watchCacheInterval, resourceVersion uint64) {
defer utilruntime.HandleCrash()
defer close(c.result)
defer c.Stop()
// Check how long we are processing initEvents.
// As long as these are not processed, we are not processing
@ -1368,22 +1370,46 @@ func (c *cacheWatcher) processEvents(ctx context.Context, initEvents []*watchCac
// consider increase size of result buffer in those cases.
const initProcessThreshold = 500 * time.Millisecond
startTime := time.Now()
for _, event := range initEvents {
initEventCount := 0
for {
event, err := cacheInterval.Next()
if err != nil {
// An error indicates that the cache interval
// has been invalidated and can no longer serve
// events.
//
// Initially we considered sending an "out-of-history"
// Error event in this case, but because historically
// such events weren't sent out of the watchCache, we
// decided not to. This is still ok, because on watch
// closure, the watcher will try to re-instantiate the
// watch and then will get an explicit "out-of-history"
// window. There is potential for optimization, but for
// now, in order to be on the safe side and not break
// custom clients, the cost of it is something that we
// are fully accepting.
klog.Warningf("couldn't retrieve watch event to serve: %#v", err)
return
}
if event == nil {
break
}
c.sendWatchCacheEvent(event)
// With some events already sent, update resourceVersion so that
// events that were buffered and not yet processed won't be delivered
// to this watcher second time causing going back in time.
resourceVersion = event.ResourceVersion
initEventCount++
}
objType := c.objectType.String()
if len(initEvents) > 0 {
initCounter.WithLabelValues(objType).Add(float64(len(initEvents)))
// With some events already sent, update resourceVersion
// so that events that were buffered and not yet processed
// won't be delivered to this watcher second time causing
// going back in time.
resourceVersion = initEvents[len(initEvents)-1].ResourceVersion
if initEventCount > 0 {
initCounter.WithLabelValues(objType).Add(float64(initEventCount))
}
processingTime := time.Since(startTime)
if processingTime > initProcessThreshold {
klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", len(initEvents), objType, c.identifier, processingTime)
klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", initEventCount, objType, c.identifier, processingTime)
}
c.process(ctx, resourceVersion)
@ -1398,8 +1424,6 @@ func (c *cacheWatcher) process(ctx context.Context, resourceVersion uint64) {
// process, but we're leaving this to the tuning phase.
utilflowcontrol.WatchInitialized(ctx)
defer close(c.result)
defer c.Stop()
for {
select {
case event, ok := <-c.input:

View File

@ -74,7 +74,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
// set the size of the buffer of w.result to 0, so that the writes to
// w.result is blocked.
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "")
go w.processEvents(context.Background(), initEvents, 0)
go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0)
w.Stop()
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
lock.RLock()
@ -194,7 +194,7 @@ TestCase:
}
w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "")
go w.processEvents(context.Background(), testCase.events, 0)
go w.processInterval(context.Background(), intervalFromEvents(testCase.events), 0)
ch := w.ResultChan()
for j, event := range testCase.expected {
@ -542,7 +542,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, objectType, "")
w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)}
ctx, _ := context.WithDeadline(context.Background(), deadline)
go w.processEvents(ctx, nil, 0)
go w.processInterval(ctx, intervalFromEvents(nil), 0)
select {
case <-w.ResultChan():
case <-time.After(time.Second):
@ -1412,3 +1412,85 @@ func TestCachingObjects(t *testing.T) {
t.Run("single watcher", func(t *testing.T) { testCachingObjects(t, 1) })
t.Run("many watcher", func(t *testing.T) { testCachingObjects(t, 3) })
}
func TestCacheIntervalInvalidationStopsWatch(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
// Wait until cacher is initialized.
cacher.ready.wait()
// Ensure there is enough budget for slow processing since
// the entire watch cache is going to be served through the
// interval and events won't be popped from the cacheWatcher's
// input channel until much later.
cacher.dispatchTimeoutBudget.returnUnused(100 * time.Millisecond)
// We define a custom index validator such that the interval is
// able to serve the first bufferSize elements successfully, but
// on trying to fill it's buffer again, the indexValidator simulates
// an invalidation leading to the watch being closed and the number
// of events we actually process to be bufferSize, each event of
// type watch.Added.
valid := true
invalidateCacheInterval := func() {
valid = false
}
once := sync.Once{}
indexValidator := func(index int) bool {
isValid := valid && (index >= cacher.watchCache.startIndex)
once.Do(invalidateCacheInterval)
return isValid
}
cacher.watchCache.indexValidator = indexValidator
makePod := func(i int) *examplev1.Pod {
return &examplev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod-%d", 1000+i),
Namespace: "ns",
ResourceVersion: fmt.Sprintf("%d", 1000+i),
},
}
}
// 250 is arbitrary, point is to have enough elements such that
// it generates more than bufferSize number of events allowing
// us to simulate the invalidation of the cache interval.
totalPods := 250
for i := 0; i < totalPods; i++ {
err := cacher.watchCache.Add(makePod(i))
if err != nil {
t.Errorf("error: %v", err)
}
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{
ResourceVersion: "999",
Predicate: storage.Everything,
})
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
defer w.Stop()
received := 0
resChan := w.ResultChan()
for event := range resChan {
received++
t.Logf("event type: %v, events received so far: %d", event.Type, received)
if event.Type != watch.Added {
t.Errorf("unexpected event type, expected: %s, got: %s, event: %v", watch.Added, event.Type, event)
}
}
// Since the watch is stopped after the interval is invalidated,
// we should have processed exactly bufferSize number of elements.
if received != bufferSize {
t.Errorf("unexpected number of events received, expected: %d, got: %d", bufferSize+1, received)
}
}

View File

@ -189,6 +189,9 @@ type watchCache struct {
// cacher's objectType.
objectType reflect.Type
// For testing cache interval invalidation.
indexValidator indexValidator
}
func newWatchCache(
@ -219,6 +222,8 @@ func newWatchCache(
objType := objectType.String()
watchCacheCapacity.WithLabelValues(objType).Set(float64(wc.capacity))
wc.cond = sync.NewCond(wc.RLocker())
wc.indexValidator = wc.isIndexValidLocked
return wc
}
@ -568,7 +573,7 @@ func (w *watchCache) SetOnReplace(onReplace func()) {
w.onReplace = onReplace
}
func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) {
func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) (*watchCacheInterval, error) {
size := w.endIndex - w.startIndex
var oldest uint64
switch {
@ -594,27 +599,11 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w
// current state and only then start watching from that point.
//
// TODO: In v2 api, we should stop returning the current state - #13969.
allItems := w.store.List()
result := make([]*watchCacheEvent, len(allItems))
for i, item := range allItems {
elem, ok := item.(*storeElement)
if !ok {
return nil, fmt.Errorf("not a storeElement: %v", elem)
}
objLabels, objFields, err := w.getAttrsFunc(elem.Object)
if err != nil {
return nil, err
}
result[i] = &watchCacheEvent{
Type: watch.Added,
Object: elem.Object,
ObjLabels: objLabels,
ObjFields: objFields,
Key: elem.Key,
ResourceVersion: w.resourceVersion,
}
ci, err := newCacheIntervalFromStore(w.resourceVersion, w.store, w.getAttrsFunc)
if err != nil {
return nil, err
}
return result, nil
return ci, nil
}
if resourceVersion < oldest-1 {
return nil, errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1))
@ -625,11 +614,11 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w
return w.cache[(w.startIndex+i)%w.capacity].ResourceVersion > resourceVersion
}
first := sort.Search(size, f)
result := make([]*watchCacheEvent, size-first)
for i := 0; i < size-first; i++ {
result[i] = w.cache[(w.startIndex+first+i)%w.capacity]
indexerFunc := func(i int) *watchCacheEvent {
return w.cache[i%w.capacity]
}
return result, nil
ci := newCacheInterval(w.startIndex+first, w.endIndex, indexerFunc, w.indexValidator, &w.RWMutex)
return ci, nil
}
func (w *watchCache) Resync() error {

View File

@ -72,9 +72,31 @@ type testWatchCache struct {
}
func (w *testWatchCache) getAllEventsSince(resourceVersion uint64) ([]*watchCacheEvent, error) {
w.watchCache.RLock()
defer w.watchCache.RUnlock()
return w.watchCache.GetAllEventsSinceThreadUnsafe(resourceVersion)
cacheInterval, err := w.getCacheIntervalForEvents(resourceVersion)
if err != nil {
return nil, err
}
result := []*watchCacheEvent{}
for {
event, err := cacheInterval.Next()
if err != nil {
return nil, err
}
if event == nil {
break
}
result = append(result, event)
}
return result, nil
}
func (w *testWatchCache) getCacheIntervalForEvents(resourceVersion uint64) (*watchCacheInterval, error) {
w.RLock()
defer w.RUnlock()
return w.GetAllEventsSinceThreadUnsafe(resourceVersion)
}
// newTestWatchCache just adds a fake clock.