diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index f9729aa44f5..8b30538dd80 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -77,14 +77,6 @@ func storeElementKey(obj interface{}) (string, error) { return elem.Key, nil } -// watchCacheElement is a single "watch event" stored in a cache. -// It contains the resource version of the object and the object -// itself. -type watchCacheElement struct { - resourceVersion uint64 - watchCacheEvent *watchCacheEvent -} - // watchCache implements a Store interface. // However, it depends on the elements implementing runtime.Object interface. // @@ -111,7 +103,7 @@ type watchCache struct { // by endIndex (if cache is full it will be startIndex + capacity). // Both startIndex and endIndex can be greater than buffer capacity - // you should always apply modulo capacity to get an index in cache array. - cache []watchCacheElement + cache []*watchCacheEvent startIndex int endIndex int @@ -150,7 +142,7 @@ func newWatchCache( capacity: capacity, keyFunc: keyFunc, getAttrsFunc: getAttrsFunc, - cache: make([]watchCacheElement, capacity), + cache: make([]*watchCacheEvent, capacity), startIndex: 0, endIndex: 0, store: cache.NewStore(storeElementKey), @@ -247,23 +239,23 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd watchCacheEvent.PrevObjLabels = previousElem.Labels watchCacheEvent.PrevObjFields = previousElem.Fields } + w.updateCache(watchCacheEvent) + w.resourceVersion = resourceVersion if w.onEvent != nil { w.onEvent(watchCacheEvent) } - w.updateCache(resourceVersion, watchCacheEvent) - w.resourceVersion = resourceVersion w.cond.Broadcast() return updateFunc(elem) } // Assumes that lock is already held for write. -func (w *watchCache) updateCache(resourceVersion uint64, event *watchCacheEvent) { +func (w *watchCache) updateCache(event *watchCacheEvent) { if w.endIndex == w.startIndex+w.capacity { // Cache is full - remove the oldest element. w.startIndex++ } - w.cache[w.endIndex%w.capacity] = watchCacheElement{resourceVersion, event} + w.cache[w.endIndex%w.capacity] = event w.endIndex++ } @@ -416,7 +408,7 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w case size >= w.capacity: // Once the watch event buffer is full, the oldest watch event we can deliver // is the first one in the buffer. - oldest = w.cache[w.startIndex%w.capacity].resourceVersion + oldest = w.cache[w.startIndex%w.capacity].ResourceVersion case w.listResourceVersion > 0: // If the watch event buffer isn't full, the oldest watch event we can deliver // is one greater than the resource version of the last full list. @@ -426,7 +418,7 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w // in the buffer. // This should only happen in unit tests that populate the buffer without // performing list/replace operations. - oldest = w.cache[w.startIndex%w.capacity].resourceVersion + oldest = w.cache[w.startIndex%w.capacity].ResourceVersion default: return nil, fmt.Errorf("watch cache isn't correctly initialized") } @@ -466,12 +458,12 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w // Binary search the smallest index at which resourceVersion is greater than the given one. f := func(i int) bool { - return w.cache[(w.startIndex+i)%w.capacity].resourceVersion > resourceVersion + return w.cache[(w.startIndex+i)%w.capacity].ResourceVersion > resourceVersion } first := sort.Search(size, f) result := make([]*watchCacheEvent, size-first) for i := 0; i < size-first; i++ { - result[i] = w.cache[(w.startIndex+first+i)%w.capacity].watchCacheEvent + result[i] = w.cache[(w.startIndex+first+i)%w.capacity] } return result, nil }