diff --git a/pkg/controller/framework/processor_listener_test.go b/pkg/controller/framework/processor_listener_test.go new file mode 100644 index 00000000000..ffd72d8fae2 --- /dev/null +++ b/pkg/controller/framework/processor_listener_test.go @@ -0,0 +1,48 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "testing" + "time" + + "k8s.io/kubernetes/pkg/util/wait" +) + +// TestPopReleaseLock tests that when processor listener blocks on chan, +// it should release the lock for pendingNotifications. +func TestPopReleaseLock(t *testing.T) { + pl := newProcessListener(nil) + stopCh := make(chan struct{}) + defer close(stopCh) + // make pop() block on nextCh: waiting for receiver to get notification. + pl.add(1) + go pl.pop(stopCh) + + resultCh := make(chan struct{}) + go func() { + pl.lock.Lock() + close(resultCh) + }() + + select { + case <-resultCh: + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("Timeout after %v", wait.ForeverTestTimeout) + } + pl.lock.Unlock() +} diff --git a/pkg/controller/framework/shared_informer.go b/pkg/controller/framework/shared_informer.go index ce9ddf2c714..c557bf97548 100644 --- a/pkg/controller/framework/shared_informer.go +++ b/pkg/controller/framework/shared_informer.go @@ -279,21 +279,30 @@ func (p *processorListener) add(notification interface{}) { func (p *processorListener) pop(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() - p.lock.Lock() - defer p.lock.Unlock() for { - for len(p.pendingNotifications) == 0 { - // check if we're shutdown - select { - case <-stopCh: - return - default: + blockingGet := func() (interface{}, bool) { + p.lock.Lock() + defer p.lock.Unlock() + + for len(p.pendingNotifications) == 0 { + // check if we're shutdown + select { + case <-stopCh: + return nil, true + default: + } + p.cond.Wait() } - p.cond.Wait() + nt := p.pendingNotifications[0] + p.pendingNotifications = p.pendingNotifications[1:] + return nt, false + } + + notification, stopped := blockingGet() + if stopped { + return } - notification := p.pendingNotifications[0] - p.pendingNotifications = p.pendingNotifications[1:] select { case <-stopCh: