Merge pull request #38961 from wojtek-t/avoid_allocations_in_cacher

Automatic merge from submit-queue (batch tested with PRs 34353, 33837, 38878, 38961)

Reduce amount of allocations in cacher

Currently we are unnecessary copying stuff around in cacher.

This results in __tens of megabytes__ of allocations per __second__ in large clusters. This in turn results in expensive GC. Should help with metrics.

@gmarek
This commit is contained in:
Kubernetes Submit Queue 2016-12-19 06:43:43 -08:00 committed by GitHub
commit 75dfb21018
3 changed files with 17 additions and 17 deletions

View File

@ -557,12 +557,12 @@ func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) {
return result, len(result) > 0 return result, len(result) > 0
} }
func (c *Cacher) processEvent(event watchCacheEvent) { func (c *Cacher) processEvent(event *watchCacheEvent) {
if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) { if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) {
// Monitor if this gets backed up, and how much. // Monitor if this gets backed up, and how much.
glog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.objectType.String(), curLen) glog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.objectType.String(), curLen)
} }
c.incoming <- event c.incoming <- *event
} }
func (c *Cacher) dispatchEvents() { func (c *Cacher) dispatchEvents() {
@ -756,7 +756,7 @@ type cacheWatcher struct {
forget func(bool) forget func(bool)
} }
func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []watchCacheEvent, filter watchFilterFunc, forget func(bool)) *cacheWatcher { func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter watchFilterFunc, forget func(bool)) *cacheWatcher {
watcher := &cacheWatcher{ watcher := &cacheWatcher{
input: make(chan watchCacheEvent, chanSize), input: make(chan watchCacheEvent, chanSize),
result: make(chan watch.Event, chanSize), result: make(chan watch.Event, chanSize),
@ -866,7 +866,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
} }
} }
func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uint64) { func (c *cacheWatcher) process(initEvents []*watchCacheEvent, resourceVersion uint64) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
// Check how long we are processing initEvents. // Check how long we are processing initEvents.
@ -885,7 +885,7 @@ func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uin
const initProcessThreshold = 500 * time.Millisecond const initProcessThreshold = 500 * time.Millisecond
startTime := time.Now() startTime := time.Now()
for _, event := range initEvents { for _, event := range initEvents {
c.sendWatchCacheEvent(&event) c.sendWatchCacheEvent(event)
} }
processingTime := time.Since(startTime) processingTime := time.Since(startTime)
if processingTime > initProcessThreshold { if processingTime > initProcessThreshold {

View File

@ -38,7 +38,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
defer lock.Unlock() defer lock.Unlock()
count++ count++
} }
initEvents := []watchCacheEvent{ initEvents := []*watchCacheEvent{
{Object: &api.Pod{}}, {Object: &api.Pod{}},
{Object: &api.Pod{}}, {Object: &api.Pod{}},
} }

View File

@ -78,7 +78,7 @@ func storeElementKey(obj interface{}) (string, error) {
// itself. // itself.
type watchCacheElement struct { type watchCacheElement struct {
resourceVersion uint64 resourceVersion uint64
watchCacheEvent watchCacheEvent watchCacheEvent *watchCacheEvent
} }
// watchCache implements a Store interface. // watchCache implements a Store interface.
@ -125,7 +125,7 @@ type watchCache struct {
// This handler is run at the end of every Add/Update/Delete method // This handler is run at the end of every Add/Update/Delete method
// and additionally gets the previous value of the object. // and additionally gets the previous value of the object.
onEvent func(watchCacheEvent) onEvent func(*watchCacheEvent)
// for testing timeouts. // for testing timeouts.
clock clock.Clock clock clock.Clock
@ -240,7 +240,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
return err return err
} }
} }
watchCacheEvent := watchCacheEvent{ watchCacheEvent := &watchCacheEvent{
Type: event.Type, Type: event.Type,
Object: event.Object, Object: event.Object,
ObjLabels: objLabels, ObjLabels: objLabels,
@ -254,7 +254,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
if w.onEvent != nil { if w.onEvent != nil {
w.onEvent(watchCacheEvent) w.onEvent(watchCacheEvent)
} }
w.updateCache(resourceVersion, &watchCacheEvent) w.updateCache(resourceVersion, watchCacheEvent)
w.resourceVersion = resourceVersion w.resourceVersion = resourceVersion
w.cond.Broadcast() w.cond.Broadcast()
return updateFunc(elem) return updateFunc(elem)
@ -266,7 +266,7 @@ func (w *watchCache) updateCache(resourceVersion uint64, event *watchCacheEvent)
// Cache is full - remove the oldest element. // Cache is full - remove the oldest element.
w.startIndex++ w.startIndex++
} }
w.cache[w.endIndex%w.capacity] = watchCacheElement{resourceVersion, *event} w.cache[w.endIndex%w.capacity] = watchCacheElement{resourceVersion, event}
w.endIndex++ w.endIndex++
} }
@ -395,13 +395,13 @@ func (w *watchCache) SetOnReplace(onReplace func()) {
w.onReplace = onReplace w.onReplace = onReplace
} }
func (w *watchCache) SetOnEvent(onEvent func(watchCacheEvent)) { func (w *watchCache) SetOnEvent(onEvent func(*watchCacheEvent)) {
w.Lock() w.Lock()
defer w.Unlock() defer w.Unlock()
w.onEvent = onEvent w.onEvent = onEvent
} }
func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]watchCacheEvent, error) { func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) {
size := w.endIndex - w.startIndex size := w.endIndex - w.startIndex
oldest := w.resourceVersion oldest := w.resourceVersion
if size > 0 { if size > 0 {
@ -415,7 +415,7 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]wa
// //
// TODO: In v2 api, we should stop returning the current state - #13969. // TODO: In v2 api, we should stop returning the current state - #13969.
allItems := w.store.List() allItems := w.store.List()
result := make([]watchCacheEvent, len(allItems)) result := make([]*watchCacheEvent, len(allItems))
for i, item := range allItems { for i, item := range allItems {
elem, ok := item.(*storeElement) elem, ok := item.(*storeElement)
if !ok { if !ok {
@ -425,7 +425,7 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]wa
if err != nil { if err != nil {
return nil, err return nil, err
} }
result[i] = watchCacheEvent{ result[i] = &watchCacheEvent{
Type: watch.Added, Type: watch.Added,
Object: elem.Object, Object: elem.Object,
ObjLabels: objLabels, ObjLabels: objLabels,
@ -445,14 +445,14 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]wa
return w.cache[(w.startIndex+i)%w.capacity].resourceVersion > resourceVersion return w.cache[(w.startIndex+i)%w.capacity].resourceVersion > resourceVersion
} }
first := sort.Search(size, f) first := sort.Search(size, f)
result := make([]watchCacheEvent, size-first) result := make([]*watchCacheEvent, size-first)
for i := 0; i < size-first; i++ { for i := 0; i < size-first; i++ {
result[i] = w.cache[(w.startIndex+first+i)%w.capacity].watchCacheEvent result[i] = w.cache[(w.startIndex+first+i)%w.capacity].watchCacheEvent
} }
return result, nil return result, nil
} }
func (w *watchCache) GetAllEventsSince(resourceVersion uint64) ([]watchCacheEvent, error) { func (w *watchCache) GetAllEventsSince(resourceVersion uint64) ([]*watchCacheEvent, error) {
w.RLock() w.RLock()
defer w.RUnlock() defer w.RUnlock()
return w.GetAllEventsSinceThreadUnsafe(resourceVersion) return w.GetAllEventsSinceThreadUnsafe(resourceVersion)