Merge pull request #76330 from wojtek-t/simplify_watchcache

Slightly simplify watchcache
This commit is contained in:
Kubernetes Prow Robot 2019-04-09 16:42:14 -07:00 committed by GitHub
commit c4f97fa2a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -77,14 +77,6 @@ func storeElementKey(obj interface{}) (string, error) {
return elem.Key, nil
}
// watchCacheElement is a single "watch event" stored in a cache.
// It contains the resource version of the object and the object
// itself.
type watchCacheElement struct {
resourceVersion uint64
watchCacheEvent *watchCacheEvent
}
// watchCache implements a Store interface.
// However, it depends on the elements implementing runtime.Object interface.
//
@ -111,7 +103,7 @@ type watchCache struct {
// by endIndex (if cache is full it will be startIndex + capacity).
// Both startIndex and endIndex can be greater than buffer capacity -
// you should always apply modulo capacity to get an index in cache array.
cache []watchCacheElement
cache []*watchCacheEvent
startIndex int
endIndex int
@ -150,7 +142,7 @@ func newWatchCache(
capacity: capacity,
keyFunc: keyFunc,
getAttrsFunc: getAttrsFunc,
cache: make([]watchCacheElement, capacity),
cache: make([]*watchCacheEvent, capacity),
startIndex: 0,
endIndex: 0,
store: cache.NewStore(storeElementKey),
@ -247,23 +239,23 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
watchCacheEvent.PrevObjLabels = previousElem.Labels
watchCacheEvent.PrevObjFields = previousElem.Fields
}
w.updateCache(watchCacheEvent)
w.resourceVersion = resourceVersion
if w.onEvent != nil {
w.onEvent(watchCacheEvent)
}
w.updateCache(resourceVersion, watchCacheEvent)
w.resourceVersion = resourceVersion
w.cond.Broadcast()
return updateFunc(elem)
}
// Assumes that lock is already held for write.
func (w *watchCache) updateCache(resourceVersion uint64, event *watchCacheEvent) {
func (w *watchCache) updateCache(event *watchCacheEvent) {
if w.endIndex == w.startIndex+w.capacity {
// Cache is full - remove the oldest element.
w.startIndex++
}
w.cache[w.endIndex%w.capacity] = watchCacheElement{resourceVersion, event}
w.cache[w.endIndex%w.capacity] = event
w.endIndex++
}
@ -416,7 +408,7 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w
case size >= w.capacity:
// Once the watch event buffer is full, the oldest watch event we can deliver
// is the first one in the buffer.
oldest = w.cache[w.startIndex%w.capacity].resourceVersion
oldest = w.cache[w.startIndex%w.capacity].ResourceVersion
case w.listResourceVersion > 0:
// If the watch event buffer isn't full, the oldest watch event we can deliver
// is one greater than the resource version of the last full list.
@ -426,7 +418,7 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w
// in the buffer.
// This should only happen in unit tests that populate the buffer without
// performing list/replace operations.
oldest = w.cache[w.startIndex%w.capacity].resourceVersion
oldest = w.cache[w.startIndex%w.capacity].ResourceVersion
default:
return nil, fmt.Errorf("watch cache isn't correctly initialized")
}
@ -466,12 +458,12 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w
// Binary search the smallest index at which resourceVersion is greater than the given one.
f := func(i int) bool {
return w.cache[(w.startIndex+i)%w.capacity].resourceVersion > resourceVersion
return w.cache[(w.startIndex+i)%w.capacity].ResourceVersion > resourceVersion
}
first := sort.Search(size, f)
result := make([]*watchCacheEvent, size-first)
for i := 0; i < size-first; i++ {
result[i] = w.cache[(w.startIndex+first+i)%w.capacity].watchCacheEvent
result[i] = w.cache[(w.startIndex+first+i)%w.capacity]
}
return result, nil
}