diff --git a/pkg/controller/resourcequota/resource_quota_controller.go b/pkg/controller/resourcequota/resource_quota_controller.go index a89a16a58b3..8441a3c4032 100644 --- a/pkg/controller/resourcequota/resource_quota_controller.go +++ b/pkg/controller/resourcequota/resource_quota_controller.go @@ -269,6 +269,7 @@ func (rq *Controller) worker(ctx context.Context, queue workqueue.RateLimitingIn func (rq *Controller) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer rq.queue.ShutDown() + defer rq.missingUsageQueue.ShutDown() klog.Infof("Starting resource quota controller") defer klog.Infof("Shutting down resource quota controller") diff --git a/pkg/controller/resourcequota/resource_quota_monitor.go b/pkg/controller/resourcequota/resource_quota_monitor.go index d37a1a0f60f..e2defa9d009 100644 --- a/pkg/controller/resourcequota/resource_quota_monitor.go +++ b/pkg/controller/resourcequota/resource_quota_monitor.go @@ -305,6 +305,8 @@ func (qm *QuotaMonitor) IsSynced() bool { // Run sets the stop channel and starts monitor execution until stopCh is // closed. Any running monitors will be stopped before Run returns. func (qm *QuotaMonitor) Run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + klog.Infof("QuotaMonitor running") defer klog.Infof("QuotaMonitor stopping") @@ -317,6 +319,15 @@ func (qm *QuotaMonitor) Run(stopCh <-chan struct{}) { // Start monitors and begin change processing until the stop channel is // closed. qm.StartMonitors() + + // The following workers are hanging forever until the queue is + // shutted down, so we need to shut it down in a separate goroutine. + go func() { + defer utilruntime.HandleCrash() + defer qm.resourceChanges.ShutDown() + + <-stopCh + }() wait.Until(qm.runProcessResourceChanges, 1*time.Second, stopCh) // Stop any running monitors. diff --git a/test/integration/quota/quota_test.go b/test/integration/quota/quota_test.go index 382e3d2c643..f8885d84da3 100644 --- a/test/integration/quota/quota_test.go +++ b/test/integration/quota/quota_test.go @@ -20,8 +20,7 @@ import ( "context" "errors" "fmt" - "net/http" - "net/http/httptest" + "os" "testing" "time" @@ -31,23 +30,17 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" - "k8s.io/apiserver/pkg/admission" - genericadmissioninitializer "k8s.io/apiserver/pkg/admission/initializer" - "k8s.io/apiserver/pkg/admission/plugin/resourcequota" - resourcequotaapi "k8s.io/apiserver/pkg/admission/plugin/resourcequota/apis/resourcequota" "k8s.io/apiserver/pkg/quota/v1/generic" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" - restclient "k8s.io/client-go/rest" watchtools "k8s.io/client-go/tools/watch" + "k8s.io/kubernetes/cmd/kube-apiserver/app/options" "k8s.io/kubernetes/pkg/controller" replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota" - kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" quotainstall "k8s.io/kubernetes/pkg/quota/v1/install" "k8s.io/kubernetes/test/integration/framework" ) @@ -64,41 +57,20 @@ const ( // quota_test.go:115: Took 12.021640372s to scale up with quota func TestQuota(t *testing.T) { // Set up a API server - h := &framework.APIServerHolder{Initialized: make(chan struct{})} - s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - <-h.Initialized - h.M.GenericAPIServer.Handler.ServeHTTP(w, req) - })) + _, kubeConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. + opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"} + }, + }) + defer tearDownFn() - admissionCh := make(chan struct{}) - defer close(admissionCh) - clientset := clientset.NewForConfigOrDie(&restclient.Config{QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - config := &resourcequotaapi.Configuration{} - admissionControl, err := resourcequota.NewResourceQuota(config, 5) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - internalInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) - qca := quotainstall.NewQuotaConfigurationForAdmission() + clientset := clientset.NewForConfigOrDie(kubeConfig) - initializers := admission.PluginInitializers{ - genericadmissioninitializer.New(clientset, internalInformers, nil, nil, admissionCh), - kubeapiserveradmission.NewPluginInitializer(nil, nil, qca), - } - initializers.Initialize(admissionControl) - if err := admission.ValidateInitialization(admissionControl); err != nil { - t.Fatalf("couldn't initialize resource quota: %v", err) - } - - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() - controlPlaneConfig.GenericConfig.AdmissionControl = admissionControl - _, _, closeFn := framework.RunAnAPIServerUsingServer(controlPlaneConfig, s, h) - defer closeFn() - - ns := framework.CreateTestingNamespace("quotaed", t) - defer framework.DeleteTestingNamespace(ns, t) - ns2 := framework.CreateTestingNamespace("non-quotaed", t) - defer framework.DeleteTestingNamespace(ns2, t) + ns := framework.CreateNamespaceOrDie(clientset, "quotaed", t) + defer framework.DeleteNamespaceOrDie(clientset, ns, t) + ns2 := framework.CreateNamespaceOrDie(clientset, "non-quotaed", t) + defer framework.DeleteNamespaceOrDie(clientset, ns2, t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -136,7 +108,6 @@ func TestQuota(t *testing.T) { // Periodically the quota controller to detect new resource types go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Done()) - internalInformers.Start(ctx.Done()) informers.Start(ctx.Done()) close(informersStarted) @@ -292,49 +263,43 @@ func scale(t *testing.T, namespace string, clientset *clientset.Clientset) { } func TestQuotaLimitedResourceDenial(t *testing.T) { - // Set up an API server - h := &framework.APIServerHolder{Initialized: make(chan struct{})} - s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - <-h.Initialized - h.M.GenericAPIServer.Handler.ServeHTTP(w, req) - })) - - admissionCh := make(chan struct{}) - defer close(admissionCh) - clientset := clientset.NewForConfigOrDie(&restclient.Config{QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - - // stop creation of a pod resource unless there is a quota - config := &resourcequotaapi.Configuration{ - LimitedResources: []resourcequotaapi.LimitedResource{ - { - Resource: "pods", - MatchContains: []string{"pods"}, - }, - }, - } - qca := quotainstall.NewQuotaConfigurationForAdmission() - admissionControl, err := resourcequota.NewResourceQuota(config, 5) + // Create admission configuration with ResourceQuota configuration. + admissionConfigFile, err := os.CreateTemp("", "admission-config.yaml") if err != nil { - t.Fatalf("unexpected error: %v", err) + t.Fatal(err) } - externalInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) - - initializers := admission.PluginInitializers{ - genericadmissioninitializer.New(clientset, externalInformers, nil, nil, admissionCh), - kubeapiserveradmission.NewPluginInitializer(nil, nil, qca), - } - initializers.Initialize(admissionControl) - if err := admission.ValidateInitialization(admissionControl); err != nil { - t.Fatalf("couldn't initialize resource quota: %v", err) + defer os.Remove(admissionConfigFile.Name()) + if err := os.WriteFile(admissionConfigFile.Name(), []byte(` +apiVersion: apiserver.k8s.io/v1alpha1 +kind: AdmissionConfiguration +plugins: +- name: ResourceQuota + configuration: + apiVersion: apiserver.config.k8s.io/v1 + kind: ResourceQuotaConfiguration + limitedResources: + - resource: pods + matchContains: + - pods +`), os.FileMode(0644)); err != nil { + t.Fatal(err) } - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() - controlPlaneConfig.GenericConfig.AdmissionControl = admissionControl - _, _, closeFn := framework.RunAnAPIServerUsingServer(controlPlaneConfig, s, h) - defer closeFn() + // Set up an API server + _, kubeConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. + opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"} + opts.Admission.GenericAdmission.ConfigFile = admissionConfigFile.Name() - ns := framework.CreateTestingNamespace("quota", t) - defer framework.DeleteTestingNamespace(ns, t) + }, + }) + defer tearDownFn() + + clientset := clientset.NewForConfigOrDie(kubeConfig) + + ns := framework.CreateNamespaceOrDie(clientset, "quota", t) + defer framework.DeleteNamespaceOrDie(clientset, ns, t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -372,7 +337,6 @@ func TestQuotaLimitedResourceDenial(t *testing.T) { // Periodically the quota controller to detect new resource types go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Done()) - externalInformers.Start(ctx.Done()) informers.Start(ctx.Done()) close(informersStarted) @@ -425,50 +389,43 @@ func TestQuotaLimitedResourceDenial(t *testing.T) { } func TestQuotaLimitService(t *testing.T) { + // Create admission configuration with ResourceQuota configuration. + admissionConfigFile, err := os.CreateTemp("", "admission-config.yaml") + if err != nil { + t.Fatal(err) + } + defer os.Remove(admissionConfigFile.Name()) + if err := os.WriteFile(admissionConfigFile.Name(), []byte(` +apiVersion: apiserver.k8s.io/v1alpha1 +kind: AdmissionConfiguration +plugins: +- name: ResourceQuota + configuration: + apiVersion: apiserver.config.k8s.io/v1 + kind: ResourceQuotaConfiguration + limitedResources: + - resource: pods + matchContains: + - pods +`), os.FileMode(0644)); err != nil { + t.Fatal(err) + } // Set up an API server - h := &framework.APIServerHolder{Initialized: make(chan struct{})} - s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - <-h.Initialized - h.M.GenericAPIServer.Handler.ServeHTTP(w, req) - })) + _, kubeConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. + opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"} + opts.Admission.GenericAdmission.ConfigFile = admissionConfigFile.Name() - admissionCh := make(chan struct{}) - defer close(admissionCh) - clientset := clientset.NewForConfigOrDie(&restclient.Config{QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - - // stop creation of a pod resource unless there is a quota - config := &resourcequotaapi.Configuration{ - LimitedResources: []resourcequotaapi.LimitedResource{ - { - Resource: "pods", - MatchContains: []string{"pods"}, - }, }, - } - qca := quotainstall.NewQuotaConfigurationForAdmission() - admissionControl, err := resourcequota.NewResourceQuota(config, 5) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - externalInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) + }) + defer tearDownFn() - initializers := admission.PluginInitializers{ - genericadmissioninitializer.New(clientset, externalInformers, nil, nil, admissionCh), - kubeapiserveradmission.NewPluginInitializer(nil, nil, qca), - } - initializers.Initialize(admissionControl) - if err := admission.ValidateInitialization(admissionControl); err != nil { - t.Fatalf("couldn't initialize resource quota: %v", err) - } + clientset := clientset.NewForConfigOrDie(kubeConfig) - controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() - controlPlaneConfig.GenericConfig.AdmissionControl = admissionControl - _, _, closeFn := framework.RunAnAPIServerUsingServer(controlPlaneConfig, s, h) - defer closeFn() - - ns := framework.CreateTestingNamespace("quota", t) - defer framework.DeleteTestingNamespace(ns, t) + ns := framework.CreateNamespaceOrDie(clientset, "quota", t) + defer framework.DeleteNamespaceOrDie(clientset, ns, t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -506,7 +463,6 @@ func TestQuotaLimitService(t *testing.T) { // Periodically the quota controller to detect new resource types go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Done()) - externalInformers.Start(ctx.Done()) informers.Start(ctx.Done()) close(informersStarted)