Improve shared informer notification dispatching

Kubernetes-commit: 35e849bff2e4e2e1b08ee183992d620d4e409f54
This commit is contained in:
Mikhail Mazurskiy 2017-08-02 18:57:33 +10:00 committed by Kubernetes Publisher
parent b4b413abb1
commit 0648979117
2 changed files with 110 additions and 109 deletions

View File

@ -17,32 +17,41 @@ limitations under the License.
package cache
import (
"sync"
"testing"
"time"
"k8s.io/apimachinery/pkg/util/wait"
)
// TestPopReleaseLock tests that when processor listener blocks on chan,
// it should release the lock for pendingNotifications.
func TestPopReleaseLock(t *testing.T) {
pl := newProcessListener(nil, 0, 0, time.Now())
stopCh := make(chan struct{})
defer close(stopCh)
// make pop() block on nextCh: waiting for receiver to get notification.
pl.add(1)
go pl.pop(stopCh)
const (
concurrencyLevel = 5
)
resultCh := make(chan struct{})
go func() {
pl.lock.Lock()
close(resultCh)
}()
func BenchmarkListener(b *testing.B) {
var notification addNotification
select {
case <-resultCh:
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Timeout after %v", wait.ForeverTestTimeout)
}
pl.lock.Unlock()
var swg sync.WaitGroup
swg.Add(b.N)
b.SetParallelism(concurrencyLevel)
pl := newProcessListener(&ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
swg.Done()
},
}, 0, 0, time.Now())
var wg wait.Group
defer wg.Wait() // Wait for .run and .pop to stop
defer close(pl.addCh) // Tell .run and .pop to stop
wg.Start(pl.run)
wg.Start(pl.pop)
b.ReportAllocs()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
pl.add(notification)
}
})
swg.Wait() // Block until all notifications have been received
b.StopTimer()
}

View File

@ -138,16 +138,12 @@ type sharedIndexInformer struct {
// clock allows for testability
clock clock.Clock
started bool
startedLock sync.Mutex
started, stopped bool
startedLock sync.Mutex
// blockDeltas gives a way to stop all event distribution so that a late event handler
// can safely join the shared informer.
blockDeltas sync.Mutex
// stopCh is the channel used to stop the main Run process. We have to track it so that
// late joiners can have a proper stop
stopCh <-chan struct{}
wg wait.Group
}
// dummyController hides the fact that a SharedInformer is different from a dedicated one
@ -205,23 +201,25 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.stopCh = stopCh
s.started = true
}()
defer s.wg.Wait()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)
s.wg.StartWithChannel(stopCh, s.cacheMutationDetector.Run)
s.wg.StartWithChannel(stopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
s.controller.Run(stopCh)
}
func (s *sharedIndexInformer) isStarted() bool {
s.startedLock.Lock()
defer s.startedLock.Unlock()
return s.started
}
func (s *sharedIndexInformer) HasSynced() bool {
s.startedLock.Lock()
defer s.startedLock.Unlock()
@ -290,6 +288,11 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
s.startedLock.Lock()
defer s.startedLock.Unlock()
if s.stopped {
glog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
return
}
if resyncPeriod > 0 {
if resyncPeriod < minimumResyncPeriod {
glog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod)
@ -325,14 +328,9 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
s.processor.addListener(listener)
s.wg.StartWithChannel(s.stopCh, listener.run)
s.wg.StartWithChannel(s.stopCh, listener.pop)
items := s.indexer.List()
for i := range items {
listener.add(addNotification{newObj: items[i]})
s.processor.addAndStartListener(listener)
for _, item := range s.indexer.List() {
listener.add(addNotification{newObj: item})
}
}
@ -372,12 +370,26 @@ type sharedProcessor struct {
listeners []*processorListener
syncingListeners []*processorListener
clock clock.Clock
wg wait.Group
}
func (p *sharedProcessor) addAndStartListener(listener *processorListener) {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
p.addListenerLocked(listener)
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
func (p *sharedProcessor) addListener(listener *processorListener) {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
p.addListenerLocked(listener)
}
func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
p.listeners = append(p.listeners, listener)
p.syncingListeners = append(p.syncingListeners, listener)
}
@ -398,16 +410,21 @@ func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
}
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
var wg wait.Group
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
wg.StartWithChannel(stopCh, listener.run)
wg.StartWithChannel(stopCh, listener.pop)
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
}()
wg.Wait()
<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
}
p.wg.Wait() // Wait for all .pop() and .run() to stop
}
// shouldResync queries every listener to determine if any of them need a resync, based on each
@ -443,18 +460,8 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Durati
}
type processorListener struct {
// lock/cond protects access to 'pendingNotifications'.
lock sync.RWMutex
cond sync.Cond
// pendingNotifications is an unbounded slice that holds all notifications not yet distributed
// there is one per listener, but a failing/stalled listener will have infinite pendingNotifications
// added until we OOM.
// TODO This is no worse that before, since reflectors were backed by unbounded DeltaFIFOs, but
// we should try to do something better
pendingNotifications []interface{}
nextCh chan interface{}
addCh chan interface{}
handler ResourceEventHandler
@ -472,80 +479,65 @@ type processorListener struct {
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time) *processorListener {
ret := &processorListener{
pendingNotifications: []interface{}{},
nextCh: make(chan interface{}),
addCh: make(chan interface{}),
handler: handler,
requestedResyncPeriod: requestedResyncPeriod,
resyncPeriod: resyncPeriod,
}
ret.cond.L = &ret.lock
ret.determineNextResync(now)
return ret
}
func (p *processorListener) add(notification interface{}) {
p.lock.Lock()
defer p.lock.Unlock()
p.pendingNotifications = append(p.pendingNotifications, notification)
p.cond.Broadcast()
p.addCh <- notification
}
func (p *processorListener) pop(stopCh <-chan struct{}) {
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop
// pendingNotifications is an unbounded slice that holds all notifications not yet distributed
// there is one per listener, but a failing/stalled listener will have infinite pendingNotifications
// added until we OOM.
// TODO This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
// we should try to do something better
var pendingNotifications []interface{}
var nextCh chan<- interface{}
var notification interface{}
for {
blockingGet := func() (interface{}, bool) {
p.lock.Lock()
defer p.lock.Unlock()
for len(p.pendingNotifications) == 0 {
// check if we're shutdown
select {
case <-stopCh:
return nil, true
default:
}
p.cond.Wait()
}
nt := p.pendingNotifications[0]
p.pendingNotifications = p.pendingNotifications[1:]
return nt, false
}
notification, stopped := blockingGet()
if stopped {
return
}
select {
case <-stopCh:
return
case p.nextCh <- notification:
case nextCh <- notification:
// Notification dispatched
if len(pendingNotifications) == 0 { // Nothing to pop
nextCh = nil // Disable this select case
notification = nil
} else {
notification = pendingNotifications[0]
pendingNotifications[0] = nil
pendingNotifications = pendingNotifications[1:]
}
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil { // No notification to pop (and pendingNotifications is empty)
// Optimize the case - skip adding to pendingNotifications
notification = notificationToAdd
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
pendingNotifications = append(pendingNotifications, notificationToAdd)
}
}
}
}
func (p *processorListener) run(stopCh <-chan struct{}) {
func (p *processorListener) run() {
defer utilruntime.HandleCrash()
for {
var next interface{}
select {
case <-stopCh:
func() {
p.lock.Lock()
defer p.lock.Unlock()
p.cond.Broadcast()
}()
return
case next = <-p.nextCh:
}
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)