processor listener: fix locking in pop()

This commit is contained in:
Hongchao Deng 2016-06-08 09:20:06 -07:00
parent 5b7e617abf
commit 308201acb0

View File

@ -279,21 +279,30 @@ func (p *processorListener) add(notification interface{}) {
func (p *processorListener) pop(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
p.lock.Lock()
defer p.lock.Unlock()
for {
for len(p.pendingNotifications) == 0 {
// check if we're shutdown
select {
case <-stopCh:
return
default:
blockingGet := func() (interface{}, bool) {
p.lock.Lock()
defer p.lock.Unlock()
for len(p.pendingNotifications) == 0 {
// check if we're shutdown
select {
case <-stopCh:
return nil, true
default:
}
p.cond.Wait()
}
p.cond.Wait()
nt := p.pendingNotifications[0]
p.pendingNotifications = p.pendingNotifications[1:]
return nt, false
}
notification, stopped := blockingGet()
if stopped {
return
}
notification := p.pendingNotifications[0]
p.pendingNotifications = p.pendingNotifications[1:]
select {
case <-stopCh: