diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index a81c71a24a7..d82252c8c6a 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -557,12 +557,12 @@ func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) { return result, len(result) > 0 } -func (c *Cacher) processEvent(event watchCacheEvent) { +func (c *Cacher) processEvent(event *watchCacheEvent) { if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) { // Monitor if this gets backed up, and how much. glog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.objectType.String(), curLen) } - c.incoming <- event + c.incoming <- *event } func (c *Cacher) dispatchEvents() { @@ -756,7 +756,7 @@ type cacheWatcher struct { forget func(bool) } -func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []watchCacheEvent, filter watchFilterFunc, forget func(bool)) *cacheWatcher { +func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter watchFilterFunc, forget func(bool)) *cacheWatcher { watcher := &cacheWatcher{ input: make(chan watchCacheEvent, chanSize), result: make(chan watch.Event, chanSize), @@ -866,7 +866,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) { } } -func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uint64) { +func (c *cacheWatcher) process(initEvents []*watchCacheEvent, resourceVersion uint64) { defer utilruntime.HandleCrash() // Check how long we are processing initEvents. @@ -885,7 +885,7 @@ func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uin const initProcessThreshold = 500 * time.Millisecond startTime := time.Now() for _, event := range initEvents { - c.sendWatchCacheEvent(&event) + c.sendWatchCacheEvent(event) } processingTime := time.Since(startTime) if processingTime > initProcessThreshold { diff --git a/pkg/storage/cacher_whitebox_test.go b/pkg/storage/cacher_whitebox_test.go index a40e2dfe882..d9c50a6a9f5 100644 --- a/pkg/storage/cacher_whitebox_test.go +++ b/pkg/storage/cacher_whitebox_test.go @@ -38,7 +38,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { defer lock.Unlock() count++ } - initEvents := []watchCacheEvent{ + initEvents := []*watchCacheEvent{ {Object: &api.Pod{}}, {Object: &api.Pod{}}, } diff --git a/pkg/storage/watch_cache.go b/pkg/storage/watch_cache.go index 66769c7d753..27b7f97827f 100644 --- a/pkg/storage/watch_cache.go +++ b/pkg/storage/watch_cache.go @@ -78,7 +78,7 @@ func storeElementKey(obj interface{}) (string, error) { // itself. type watchCacheElement struct { resourceVersion uint64 - watchCacheEvent watchCacheEvent + watchCacheEvent *watchCacheEvent } // watchCache implements a Store interface. @@ -125,7 +125,7 @@ type watchCache struct { // This handler is run at the end of every Add/Update/Delete method // and additionally gets the previous value of the object. - onEvent func(watchCacheEvent) + onEvent func(*watchCacheEvent) // for testing timeouts. clock clock.Clock @@ -240,7 +240,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd return err } } - watchCacheEvent := watchCacheEvent{ + watchCacheEvent := &watchCacheEvent{ Type: event.Type, Object: event.Object, ObjLabels: objLabels, @@ -254,7 +254,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd if w.onEvent != nil { w.onEvent(watchCacheEvent) } - w.updateCache(resourceVersion, &watchCacheEvent) + w.updateCache(resourceVersion, watchCacheEvent) w.resourceVersion = resourceVersion w.cond.Broadcast() return updateFunc(elem) @@ -266,7 +266,7 @@ func (w *watchCache) updateCache(resourceVersion uint64, event *watchCacheEvent) // Cache is full - remove the oldest element. w.startIndex++ } - w.cache[w.endIndex%w.capacity] = watchCacheElement{resourceVersion, *event} + w.cache[w.endIndex%w.capacity] = watchCacheElement{resourceVersion, event} w.endIndex++ } @@ -395,13 +395,13 @@ func (w *watchCache) SetOnReplace(onReplace func()) { w.onReplace = onReplace } -func (w *watchCache) SetOnEvent(onEvent func(watchCacheEvent)) { +func (w *watchCache) SetOnEvent(onEvent func(*watchCacheEvent)) { w.Lock() defer w.Unlock() w.onEvent = onEvent } -func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]watchCacheEvent, error) { +func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) { size := w.endIndex - w.startIndex oldest := w.resourceVersion if size > 0 { @@ -415,7 +415,7 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]wa // // TODO: In v2 api, we should stop returning the current state - #13969. allItems := w.store.List() - result := make([]watchCacheEvent, len(allItems)) + result := make([]*watchCacheEvent, len(allItems)) for i, item := range allItems { elem, ok := item.(*storeElement) if !ok { @@ -425,7 +425,7 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]wa if err != nil { return nil, err } - result[i] = watchCacheEvent{ + result[i] = &watchCacheEvent{ Type: watch.Added, Object: elem.Object, ObjLabels: objLabels, @@ -445,14 +445,14 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]wa return w.cache[(w.startIndex+i)%w.capacity].resourceVersion > resourceVersion } first := sort.Search(size, f) - result := make([]watchCacheEvent, size-first) + result := make([]*watchCacheEvent, size-first) for i := 0; i < size-first; i++ { result[i] = w.cache[(w.startIndex+first+i)%w.capacity].watchCacheEvent } return result, nil } -func (w *watchCache) GetAllEventsSince(resourceVersion uint64) ([]watchCacheEvent, error) { +func (w *watchCache) GetAllEventsSince(resourceVersion uint64) ([]*watchCacheEvent, error) { w.RLock() defer w.RUnlock() return w.GetAllEventsSinceThreadUnsafe(resourceVersion)