Merge pull request #41470 from kubernetes/revert-41018-revert-40735-avoid_copy_in_cacher

Automatic merge from submit-queue (batch tested with PRs 41332, 41069, 41470, 41474)

"Avoid unnecessary copies in cacher""

This is resend of #40735 (which I reverted when I suspected it to cause issues). But the issue was a completely different. So it's safe to resubmit.
This commit is contained in:
Kubernetes Submit Queue
2017-02-15 10:10:10 -08:00
committed by GitHub

View File

@@ -752,7 +752,7 @@ func (c *errWatcher) Stop() {
type cacheWatcher struct { type cacheWatcher struct {
sync.Mutex sync.Mutex
copier runtime.ObjectCopier copier runtime.ObjectCopier
input chan watchCacheEvent input chan *watchCacheEvent
result chan watch.Event result chan watch.Event
done chan struct{} done chan struct{}
filter watchFilterFunc filter watchFilterFunc
@@ -763,7 +763,7 @@ type cacheWatcher struct {
func newCacheWatcher(copier runtime.ObjectCopier, resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter watchFilterFunc, forget func(bool)) *cacheWatcher { func newCacheWatcher(copier runtime.ObjectCopier, resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter watchFilterFunc, forget func(bool)) *cacheWatcher {
watcher := &cacheWatcher{ watcher := &cacheWatcher{
copier: copier, copier: copier,
input: make(chan watchCacheEvent, chanSize), input: make(chan *watchCacheEvent, chanSize),
result: make(chan watch.Event, chanSize), result: make(chan watch.Event, chanSize),
done: make(chan struct{}), done: make(chan struct{}),
filter: filter, filter: filter,
@@ -800,7 +800,7 @@ var timerPool sync.Pool
func (c *cacheWatcher) add(event *watchCacheEvent, budget *timeBudget) { func (c *cacheWatcher) add(event *watchCacheEvent, budget *timeBudget) {
// Try to send the event immediately, without blocking. // Try to send the event immediately, without blocking.
select { select {
case c.input <- *event: case c.input <- event:
return return
default: default:
} }
@@ -820,7 +820,7 @@ func (c *cacheWatcher) add(event *watchCacheEvent, budget *timeBudget) {
defer timerPool.Put(t) defer timerPool.Put(t)
select { select {
case c.input <- *event: case c.input <- event:
stopped := t.Stop() stopped := t.Stop()
if !stopped { if !stopped {
// Consume triggered (but not yet received) timer event // Consume triggered (but not yet received) timer event
@@ -928,7 +928,7 @@ func (c *cacheWatcher) process(initEvents []*watchCacheEvent, resourceVersion ui
} }
// only send events newer than resourceVersion // only send events newer than resourceVersion
if event.ResourceVersion > resourceVersion { if event.ResourceVersion > resourceVersion {
c.sendWatchCacheEvent(&event) c.sendWatchCacheEvent(event)
} }
} }
} }