diff --git a/pkg/controller/framework/shared_informer.go b/pkg/controller/framework/shared_informer.go index 43b749f9c2d..87dcac6d22b 100644 --- a/pkg/controller/framework/shared_informer.go +++ b/pkg/controller/framework/shared_informer.go @@ -92,6 +92,9 @@ type sharedIndexInformer struct { // blockDeltas gives a way to stop all event distribution so that a late event handler // can safely join the shared informer. blockDeltas sync.Mutex + // stopCh is the channel used to stop the main Run process. We have to track it so that + // late joiners can have a proper stop + stopCh <-chan struct{} } // dummyController hides the fact that a SharedInformer is different from a dedicated one @@ -146,6 +149,7 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { s.started = true }() + s.stopCh = stopCh s.processor.run(stopCh) s.controller.Run(stopCh) } @@ -220,6 +224,9 @@ func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) erro listener := newProcessListener(handler) s.processor.listeners = append(s.processor.listeners, listener) + go listener.run(s.stopCh) + go listener.pop(s.stopCh) + items := s.indexer.List() for i := range items { listener.add(addNotification{newObj: items[i]}) diff --git a/plugin/pkg/admission/resourcequota/admission.go b/plugin/pkg/admission/resourcequota/admission.go index 41eaae64b09..244d338eefc 100644 --- a/plugin/pkg/admission/resourcequota/admission.go +++ b/plugin/pkg/admission/resourcequota/admission.go @@ -59,7 +59,7 @@ func NewResourceQuota(client clientset.Interface, registry quota.Registry, numEv } go quotaAccessor.Run(stopCh) - evaluator := NewQuotaEvaluator(quotaAccessor, registry, numEvaluators, stopCh) + evaluator := NewQuotaEvaluator(quotaAccessor, registry, nil, numEvaluators, stopCh) return "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), diff --git a/plugin/pkg/admission/resourcequota/admission_test.go b/plugin/pkg/admission/resourcequota/admission_test.go index 95242bdd98b..87c244c0d2b 100644 --- a/plugin/pkg/admission/resourcequota/admission_test.go +++ b/plugin/pkg/admission/resourcequota/admission_test.go @@ -150,7 +150,7 @@ func TestAdmissionIgnoresSubresources(t *testing.T) { quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor.indexer = indexer go quotaAccessor.Run(stopCh) - evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh) + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh) defer utilruntime.HandleCrash() handler := "aAdmission{ @@ -194,7 +194,7 @@ func TestAdmitBelowQuotaLimit(t *testing.T) { quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor.indexer = indexer go quotaAccessor.Run(stopCh) - evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh) + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh) defer utilruntime.HandleCrash() handler := "aAdmission{ @@ -277,7 +277,7 @@ func TestAdmitHandlesOldObjects(t *testing.T) { quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor.indexer = indexer go quotaAccessor.Run(stopCh) - evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh) + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), @@ -367,7 +367,7 @@ func TestAdmitExceedQuotaLimit(t *testing.T) { quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor.indexer = indexer go quotaAccessor.Run(stopCh) - evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh) + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh) defer utilruntime.HandleCrash() handler := "aAdmission{ @@ -411,7 +411,7 @@ func TestAdmitEnforceQuotaConstraints(t *testing.T) { quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor.indexer = indexer go quotaAccessor.Run(stopCh) - evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh) + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh) defer utilruntime.HandleCrash() handler := "aAdmission{ @@ -465,7 +465,7 @@ func TestAdmitPodInNamespaceWithoutQuota(t *testing.T) { quotaAccessor.indexer = indexer quotaAccessor.liveLookupCache = liveLookupCache go quotaAccessor.Run(stopCh) - evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh) + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh) defer utilruntime.HandleCrash() handler := "aAdmission{ @@ -531,7 +531,7 @@ func TestAdmitBelowTerminatingQuotaLimit(t *testing.T) { quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor.indexer = indexer go quotaAccessor.Run(stopCh) - evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh) + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh) defer utilruntime.HandleCrash() handler := "aAdmission{ @@ -636,7 +636,7 @@ func TestAdmitBelowBestEffortQuotaLimit(t *testing.T) { quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor.indexer = indexer go quotaAccessor.Run(stopCh) - evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh) + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh) defer utilruntime.HandleCrash() handler := "aAdmission{ @@ -728,7 +728,7 @@ func TestAdmitBestEffortQuotaLimitIgnoresBurstable(t *testing.T) { quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor.indexer = indexer go quotaAccessor.Run(stopCh) - evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh) + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh) defer utilruntime.HandleCrash() handler := "aAdmission{ @@ -846,7 +846,7 @@ func TestAdmissionSetsMissingNamespace(t *testing.T) { quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor.indexer = indexer go quotaAccessor.Run(stopCh) - evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh) + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh) evaluator.(*quotaEvaluator).registry = registry defer utilruntime.HandleCrash() diff --git a/plugin/pkg/admission/resourcequota/controller.go b/plugin/pkg/admission/resourcequota/controller.go index 9d3aa18d273..82f84374cc2 100644 --- a/plugin/pkg/admission/resourcequota/controller.go +++ b/plugin/pkg/admission/resourcequota/controller.go @@ -44,6 +44,8 @@ type Evaluator interface { type quotaEvaluator struct { quotaAccessor QuotaAccessor + // lockAquisitionFunc acquires any required locks and returns a cleanup method to defer + lockAquisitionFunc func([]api.ResourceQuota) func() // registry that knows how to measure usage for objects registry quota.Registry @@ -96,9 +98,10 @@ func newAdmissionWaiter(a admission.Attributes) *admissionWaiter { // NewQuotaEvaluator configures an admission controller that can enforce quota constraints // using the provided registry. The registry must have the capability to handle group/kinds that // are persisted by the server this admission controller is intercepting -func NewQuotaEvaluator(quotaAccessor QuotaAccessor, registry quota.Registry, workers int, stopCh <-chan struct{}) Evaluator { +func NewQuotaEvaluator(quotaAccessor QuotaAccessor, registry quota.Registry, lockAquisitionFunc func([]api.ResourceQuota) func(), workers int, stopCh <-chan struct{}) Evaluator { return "aEvaluator{ - quotaAccessor: quotaAccessor, + quotaAccessor: quotaAccessor, + lockAquisitionFunc: lockAquisitionFunc, registry: registry, @@ -169,6 +172,11 @@ func (e *quotaEvaluator) checkAttributes(ns string, admissionAttributes []*admis return } + if e.lockAquisitionFunc != nil { + releaseLocks := e.lockAquisitionFunc(quotas) + defer releaseLocks() + } + e.checkQuotas(quotas, admissionAttributes, 3) }