diff --git a/tools/cache/processor_listener_test.go b/tools/cache/processor_listener_test.go index dbffcc53..90a775b3 100644 --- a/tools/cache/processor_listener_test.go +++ b/tools/cache/processor_listener_test.go @@ -17,32 +17,41 @@ limitations under the License. package cache import ( + "sync" "testing" "time" "k8s.io/apimachinery/pkg/util/wait" ) -// TestPopReleaseLock tests that when processor listener blocks on chan, -// it should release the lock for pendingNotifications. -func TestPopReleaseLock(t *testing.T) { - pl := newProcessListener(nil, 0, 0, time.Now()) - stopCh := make(chan struct{}) - defer close(stopCh) - // make pop() block on nextCh: waiting for receiver to get notification. - pl.add(1) - go pl.pop(stopCh) +const ( + concurrencyLevel = 5 +) - resultCh := make(chan struct{}) - go func() { - pl.lock.Lock() - close(resultCh) - }() +func BenchmarkListener(b *testing.B) { + var notification addNotification - select { - case <-resultCh: - case <-time.After(wait.ForeverTestTimeout): - t.Errorf("Timeout after %v", wait.ForeverTestTimeout) - } - pl.lock.Unlock() + var swg sync.WaitGroup + swg.Add(b.N) + b.SetParallelism(concurrencyLevel) + pl := newProcessListener(&ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + swg.Done() + }, + }, 0, 0, time.Now()) + var wg wait.Group + defer wg.Wait() // Wait for .run and .pop to stop + defer close(pl.addCh) // Tell .run and .pop to stop + wg.Start(pl.run) + wg.Start(pl.pop) + + b.ReportAllocs() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + pl.add(notification) + } + }) + swg.Wait() // Block until all notifications have been received + b.StopTimer() } diff --git a/tools/cache/shared_informer.go b/tools/cache/shared_informer.go index 62009646..451ef88a 100644 --- a/tools/cache/shared_informer.go +++ b/tools/cache/shared_informer.go @@ -138,16 +138,12 @@ type sharedIndexInformer struct { // clock allows for testability clock clock.Clock - started bool - startedLock sync.Mutex + started, stopped bool + startedLock sync.Mutex // blockDeltas gives a way to stop all event distribution so that a late event handler // can safely join the shared informer. blockDeltas sync.Mutex - // stopCh is the channel used to stop the main Run process. We have to track it so that - // late joiners can have a proper stop - stopCh <-chan struct{} - wg wait.Group } // dummyController hides the fact that a SharedInformer is different from a dedicated one @@ -205,23 +201,25 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { s.controller = New(cfg) s.controller.(*controller).clock = s.clock - s.stopCh = stopCh s.started = true }() - defer s.wg.Wait() + // Separate stop channel because Processor should be stopped strictly after controller + processorStopCh := make(chan struct{}) + var wg wait.Group + defer wg.Wait() // Wait for Processor to stop + defer close(processorStopCh) // Tell Processor to stop + wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run) + wg.StartWithChannel(processorStopCh, s.processor.run) - s.wg.StartWithChannel(stopCh, s.cacheMutationDetector.Run) - s.wg.StartWithChannel(stopCh, s.processor.run) + defer func() { + s.startedLock.Lock() + defer s.startedLock.Unlock() + s.stopped = true // Don't want any new listeners + }() s.controller.Run(stopCh) } -func (s *sharedIndexInformer) isStarted() bool { - s.startedLock.Lock() - defer s.startedLock.Unlock() - return s.started -} - func (s *sharedIndexInformer) HasSynced() bool { s.startedLock.Lock() defer s.startedLock.Unlock() @@ -290,6 +288,11 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv s.startedLock.Lock() defer s.startedLock.Unlock() + if s.stopped { + glog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler) + return + } + if resyncPeriod > 0 { if resyncPeriod < minimumResyncPeriod { glog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod) @@ -325,14 +328,9 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv s.blockDeltas.Lock() defer s.blockDeltas.Unlock() - s.processor.addListener(listener) - - s.wg.StartWithChannel(s.stopCh, listener.run) - s.wg.StartWithChannel(s.stopCh, listener.pop) - - items := s.indexer.List() - for i := range items { - listener.add(addNotification{newObj: items[i]}) + s.processor.addAndStartListener(listener) + for _, item := range s.indexer.List() { + listener.add(addNotification{newObj: item}) } } @@ -372,12 +370,26 @@ type sharedProcessor struct { listeners []*processorListener syncingListeners []*processorListener clock clock.Clock + wg wait.Group +} + +func (p *sharedProcessor) addAndStartListener(listener *processorListener) { + p.listenersLock.Lock() + defer p.listenersLock.Unlock() + + p.addListenerLocked(listener) + p.wg.Start(listener.run) + p.wg.Start(listener.pop) } func (p *sharedProcessor) addListener(listener *processorListener) { p.listenersLock.Lock() defer p.listenersLock.Unlock() + p.addListenerLocked(listener) +} + +func (p *sharedProcessor) addListenerLocked(listener *processorListener) { p.listeners = append(p.listeners, listener) p.syncingListeners = append(p.syncingListeners, listener) } @@ -398,16 +410,21 @@ func (p *sharedProcessor) distribute(obj interface{}, sync bool) { } func (p *sharedProcessor) run(stopCh <-chan struct{}) { - var wg wait.Group func() { p.listenersLock.RLock() defer p.listenersLock.RUnlock() for _, listener := range p.listeners { - wg.StartWithChannel(stopCh, listener.run) - wg.StartWithChannel(stopCh, listener.pop) + p.wg.Start(listener.run) + p.wg.Start(listener.pop) } }() - wg.Wait() + <-stopCh + p.listenersLock.RLock() + defer p.listenersLock.RUnlock() + for _, listener := range p.listeners { + close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop + } + p.wg.Wait() // Wait for all .pop() and .run() to stop } // shouldResync queries every listener to determine if any of them need a resync, based on each @@ -443,18 +460,8 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Durati } type processorListener struct { - // lock/cond protects access to 'pendingNotifications'. - lock sync.RWMutex - cond sync.Cond - - // pendingNotifications is an unbounded slice that holds all notifications not yet distributed - // there is one per listener, but a failing/stalled listener will have infinite pendingNotifications - // added until we OOM. - // TODO This is no worse that before, since reflectors were backed by unbounded DeltaFIFOs, but - // we should try to do something better - pendingNotifications []interface{} - nextCh chan interface{} + addCh chan interface{} handler ResourceEventHandler @@ -472,80 +479,65 @@ type processorListener struct { func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time) *processorListener { ret := &processorListener{ - pendingNotifications: []interface{}{}, nextCh: make(chan interface{}), + addCh: make(chan interface{}), handler: handler, requestedResyncPeriod: requestedResyncPeriod, resyncPeriod: resyncPeriod, } - ret.cond.L = &ret.lock - ret.determineNextResync(now) return ret } func (p *processorListener) add(notification interface{}) { - p.lock.Lock() - defer p.lock.Unlock() - - p.pendingNotifications = append(p.pendingNotifications, notification) - p.cond.Broadcast() + p.addCh <- notification } -func (p *processorListener) pop(stopCh <-chan struct{}) { +func (p *processorListener) pop() { defer utilruntime.HandleCrash() + defer close(p.nextCh) // Tell .run() to stop + // pendingNotifications is an unbounded slice that holds all notifications not yet distributed + // there is one per listener, but a failing/stalled listener will have infinite pendingNotifications + // added until we OOM. + // TODO This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but + // we should try to do something better + var pendingNotifications []interface{} + var nextCh chan<- interface{} + var notification interface{} for { - blockingGet := func() (interface{}, bool) { - p.lock.Lock() - defer p.lock.Unlock() - - for len(p.pendingNotifications) == 0 { - // check if we're shutdown - select { - case <-stopCh: - return nil, true - default: - } - p.cond.Wait() - } - - nt := p.pendingNotifications[0] - p.pendingNotifications = p.pendingNotifications[1:] - return nt, false - } - - notification, stopped := blockingGet() - if stopped { - return - } - select { - case <-stopCh: - return - case p.nextCh <- notification: + case nextCh <- notification: + // Notification dispatched + if len(pendingNotifications) == 0 { // Nothing to pop + nextCh = nil // Disable this select case + notification = nil + } else { + notification = pendingNotifications[0] + pendingNotifications[0] = nil + pendingNotifications = pendingNotifications[1:] + } + case notificationToAdd, ok := <-p.addCh: + if !ok { + return + } + if notification == nil { // No notification to pop (and pendingNotifications is empty) + // Optimize the case - skip adding to pendingNotifications + notification = notificationToAdd + nextCh = p.nextCh + } else { // There is already a notification waiting to be dispatched + pendingNotifications = append(pendingNotifications, notificationToAdd) + } } } } -func (p *processorListener) run(stopCh <-chan struct{}) { +func (p *processorListener) run() { defer utilruntime.HandleCrash() - for { - var next interface{} - select { - case <-stopCh: - func() { - p.lock.Lock() - defer p.lock.Unlock() - p.cond.Broadcast() - }() - return - case next = <-p.nextCh: - } - + for next := range p.nextCh { switch notification := next.(type) { case updateNotification: p.handler.OnUpdate(notification.oldObj, notification.newObj)