diff --git a/plugin/pkg/admission/resourcequota/admission.go b/plugin/pkg/admission/resourcequota/admission.go index a775ef411ea..5ddd6db3dc4 100644 --- a/plugin/pkg/admission/resourcequota/admission.go +++ b/plugin/pkg/admission/resourcequota/admission.go @@ -28,13 +28,15 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/quota" "k8s.io/kubernetes/pkg/quota/install" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" ) func init() { admission.RegisterPlugin("ResourceQuota", func(client clientset.Interface, config io.Reader) (admission.Interface, error) { registry := install.NewRegistry(client) - return NewResourceQuota(client, registry, 5) + // TODO: expose a stop channel in admission factory + return NewResourceQuota(client, registry, 5, make(chan struct{})) }) } @@ -53,12 +55,14 @@ type liveLookupEntry struct { // NewResourceQuota configures an admission controller that can enforce quota constraints // using the provided registry. The registry must have the capability to handle group/kinds that // are persisted by the server this admission controller is intercepting -func NewResourceQuota(client clientset.Interface, registry quota.Registry, numEvaluators int) (admission.Interface, error) { +func NewResourceQuota(client clientset.Interface, registry quota.Registry, numEvaluators int, stopCh <-chan struct{}) (admission.Interface, error) { evaluator, err := newQuotaEvaluator(client, registry) if err != nil { return nil, err } - evaluator.Run(numEvaluators) + + defer utilruntime.HandleCrash() + go evaluator.Run(numEvaluators, stopCh) return "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), diff --git a/plugin/pkg/admission/resourcequota/admission_test.go b/plugin/pkg/admission/resourcequota/admission_test.go index 28359f483fc..4c619805c67 100644 --- a/plugin/pkg/admission/resourcequota/admission_test.go +++ b/plugin/pkg/admission/resourcequota/admission_test.go @@ -22,7 +22,7 @@ import ( "testing" "time" - "github.com/hashicorp/golang-lru" + lru "github.com/hashicorp/golang-lru" "k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/api" @@ -36,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/quota/generic" "k8s.io/kubernetes/pkg/quota/install" "k8s.io/kubernetes/pkg/runtime" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" ) @@ -115,7 +116,9 @@ func TestPrettyPrint(t *testing.T) { // TestAdmissionIgnoresDelete verifies that the admission controller ignores delete operations func TestAdmissionIgnoresDelete(t *testing.T) { kubeClient := fake.NewSimpleClientset() - handler, err := NewResourceQuota(kubeClient, install.NewRegistry(kubeClient), 5) + stopCh := make(chan struct{}) + defer close(stopCh) + handler, err := NewResourceQuota(kubeClient, install.NewRegistry(kubeClient), 5, stopCh) if err != nil { t.Errorf("Unexpected error %v", err) } @@ -143,7 +146,10 @@ func TestAdmissionIgnoresSubresources(t *testing.T) { indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient)) evaluator.indexer = indexer - evaluator.Run(5) + stopCh := make(chan struct{}) + defer close(stopCh) + defer utilruntime.HandleCrash() + go evaluator.Run(5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, @@ -181,7 +187,10 @@ func TestAdmitBelowQuotaLimit(t *testing.T) { indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient)) evaluator.indexer = indexer - evaluator.Run(5) + stopCh := make(chan struct{}) + defer close(stopCh) + defer utilruntime.HandleCrash() + go evaluator.Run(5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, @@ -255,7 +264,10 @@ func TestAdmitExceedQuotaLimit(t *testing.T) { indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient)) evaluator.indexer = indexer - evaluator.Run(5) + stopCh := make(chan struct{}) + defer close(stopCh) + defer utilruntime.HandleCrash() + go evaluator.Run(5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, @@ -293,7 +305,10 @@ func TestAdmitEnforceQuotaConstraints(t *testing.T) { indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient)) evaluator.indexer = indexer - evaluator.Run(5) + stopCh := make(chan struct{}) + defer close(stopCh) + defer utilruntime.HandleCrash() + go evaluator.Run(5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, @@ -334,7 +349,10 @@ func TestAdmitPodInNamespaceWithoutQuota(t *testing.T) { evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient)) evaluator.indexer = indexer evaluator.liveLookupCache = liveLookupCache - evaluator.Run(5) + stopCh := make(chan struct{}) + defer close(stopCh) + defer utilruntime.HandleCrash() + go evaluator.Run(5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, @@ -394,7 +412,10 @@ func TestAdmitBelowTerminatingQuotaLimit(t *testing.T) { indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient)) evaluator.indexer = indexer - evaluator.Run(5) + stopCh := make(chan struct{}) + defer close(stopCh) + defer utilruntime.HandleCrash() + go evaluator.Run(5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, @@ -493,7 +514,10 @@ func TestAdmitBelowBestEffortQuotaLimit(t *testing.T) { indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient)) evaluator.indexer = indexer - evaluator.Run(5) + stopCh := make(chan struct{}) + defer close(stopCh) + defer utilruntime.HandleCrash() + go evaluator.Run(5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, @@ -579,7 +603,10 @@ func TestAdmitBestEffortQuotaLimitIgnoresBurstable(t *testing.T) { indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient)) evaluator.indexer = indexer - evaluator.Run(5) + stopCh := make(chan struct{}) + defer close(stopCh) + defer utilruntime.HandleCrash() + go evaluator.Run(5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, @@ -692,7 +719,10 @@ func TestAdmissionSetsMissingNamespace(t *testing.T) { evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient)) evaluator.indexer = indexer evaluator.registry = registry - evaluator.Run(5) + stopCh := make(chan struct{}) + defer close(stopCh) + defer utilruntime.HandleCrash() + go evaluator.Run(5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, diff --git a/plugin/pkg/admission/resourcequota/controller.go b/plugin/pkg/admission/resourcequota/controller.go index f31c72d7117..02c304795ac 100644 --- a/plugin/pkg/admission/resourcequota/controller.go +++ b/plugin/pkg/admission/resourcequota/controller.go @@ -21,7 +21,8 @@ import ( "sync" "time" - "github.com/hashicorp/golang-lru" + "github.com/golang/glog" + lru "github.com/hashicorp/golang-lru" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" @@ -126,25 +127,35 @@ func newQuotaEvaluator(client clientset.Interface, registry quota.Registry) (*qu } // Run begins watching and syncing. -func (e *quotaEvaluator) Run(workers int) { +func (e *quotaEvaluator) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() for i := 0; i < workers; i++ { - go wait.Until(e.doWork, time.Second, make(chan struct{})) + go wait.Until(e.doWork, time.Second, stopCh) } + <-stopCh + glog.Infof("Shutting down quota evaluator") + e.queue.ShutDown() } func (e *quotaEvaluator) doWork() { + workFunc := func() bool { + ns, admissionAttributes, quit := e.getWork() + if quit { + return true + } + defer e.completeWork(ns) + if len(admissionAttributes) == 0 { + return false + } + e.checkAttributes(ns, admissionAttributes) + return false + } for { - func() { - ns, admissionAttributes := e.getWork() - defer e.completeWork(ns) - if len(admissionAttributes) == 0 { - return - } - - e.checkAttributes(ns, admissionAttributes) - }() + if quit := workFunc(); quit { + glog.Infof("quota evaluator worker shutdown") + return + } } } @@ -434,8 +445,11 @@ func (e *quotaEvaluator) completeWork(ns string) { e.inProgress.Delete(ns) } -func (e *quotaEvaluator) getWork() (string, []*admissionWaiter) { - uncastNS, _ := e.queue.Get() +func (e *quotaEvaluator) getWork() (string, []*admissionWaiter, bool) { + uncastNS, shutdown := e.queue.Get() + if shutdown { + return "", []*admissionWaiter{}, shutdown + } ns := uncastNS.(string) e.workLock.Lock() @@ -450,12 +464,12 @@ func (e *quotaEvaluator) getWork() (string, []*admissionWaiter) { if len(work) != 0 { e.inProgress.Insert(ns) - return ns, work + return ns, work, false } e.queue.Done(ns) e.inProgress.Delete(ns) - return ns, []*admissionWaiter{} + return ns, []*admissionWaiter{}, false } func (e *quotaEvaluator) getQuotas(namespace string) ([]api.ResourceQuota, error) { diff --git a/test/integration/quota_test.go b/test/integration/quota_test.go index 01831c62bb4..a1ae1cb895b 100644 --- a/test/integration/quota_test.go +++ b/test/integration/quota_test.go @@ -43,6 +43,10 @@ import ( "k8s.io/kubernetes/test/integration/framework" ) +func init() { + requireEtcd() +} + // 1.2 code gets: // quota_test.go:95: Took 4.218619579s to scale up without quota // quota_test.go:199: unexpected error: timed out waiting for the condition, ended with 342 pods (1 minute) @@ -59,13 +63,15 @@ func TestQuota(t *testing.T) { <-initializationCh m.Handler.ServeHTTP(w, req) })) - // TODO: Uncomment when fix #19254 - // defer s.Close() + // TODO: https://github.com/kubernetes/kubernetes/issues/25412 + //defer s.Close() + admissionCh := make(chan struct{}) clientset := clientset.NewForConfigOrDie(&restclient.Config{QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - admission, err := resourcequota.NewResourceQuota(clientset, quotainstall.NewRegistry(clientset), 5) + admission, err := resourcequota.NewResourceQuota(clientset, quotainstall.NewRegistry(clientset), 5, admissionCh) if err != nil { t.Fatalf("unexpected error: %v", err) } + defer close(admissionCh) masterConfig := framework.NewIntegrationTestMasterConfig() masterConfig.AdmissionControl = admission @@ -78,16 +84,12 @@ func TestQuota(t *testing.T) { controllerCh := make(chan struct{}) defer close(controllerCh) - go replicationcontroller.NewReplicationManagerFromClient(clientset, controller.NoResyncPeriodFunc, replicationcontroller.BurstReplicas, 4096). + go replicationcontroller.NewReplicationManagerFromClientForIntegration(clientset, controller.NoResyncPeriodFunc, replicationcontroller.BurstReplicas, 4096). Run(3, controllerCh) resourceQuotaRegistry := quotainstall.NewRegistry(clientset) groupKindsToReplenish := []unversioned.GroupKind{ api.Kind("Pod"), - api.Kind("Service"), - api.Kind("ReplicationController"), - api.Kind("PersistentVolumeClaim"), - api.Kind("Secret"), } resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{ KubeClient: clientset, @@ -118,8 +120,8 @@ func TestQuota(t *testing.T) { scale(t, "quotaed", clientset) endTime = time.Now() t.Logf("Took %v to scale up with quota", endTime.Sub(startTime)) - } + func waitForQuota(t *testing.T, quota *api.ResourceQuota, clientset *clientset.Clientset) { w, err := clientset.Core().ResourceQuotas(quota.Namespace).Watch(api.SingleObject(api.ObjectMeta{Name: quota.Name})) if err != nil { @@ -152,7 +154,7 @@ func waitForQuota(t *testing.T, quota *api.ResourceQuota, clientset *clientset.C } func scale(t *testing.T, namespace string, clientset *clientset.Clientset) { - target := 1000 + target := 100 rc := &api.ReplicationController{ ObjectMeta: api.ObjectMeta{ Name: "foo",