diff --git a/pkg/controller/resourcequota/resource_quota_controller.go b/pkg/controller/resourcequota/resource_quota_controller.go index 91f55be0656..86860177310 100644 --- a/pkg/controller/resourcequota/resource_quota_controller.go +++ b/pkg/controller/resourcequota/resource_quota_controller.go @@ -63,6 +63,8 @@ type ResourceQuotaController struct { rqController *framework.Controller // ResourceQuota objects that need to be synchronized queue workqueue.RateLimitingInterface + // missingUsageQueue holds objects that are missing the initial usage informatino + missingUsageQueue workqueue.RateLimitingInterface // To allow injection of syncUsage for testing. syncHandler func(key string) error // function that controls full recalculation of quota usage @@ -78,6 +80,7 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour rq := &ResourceQuotaController{ kubeClient: options.KubeClient, queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + missingUsageQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), resyncPeriod: options.ResyncPeriod, registry: options.Registry, replenishmentControllers: []framework.ControllerInterface{}, @@ -101,7 +104,7 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour &api.ResourceQuota{}, rq.resyncPeriod(), framework.ResourceEventHandlerFuncs{ - AddFunc: rq.enqueueResourceQuota, + AddFunc: rq.addQuota, UpdateFunc: func(old, cur interface{}) { // We are only interested in observing updates to quota.spec to drive updates to quota.status. // We ignore all updates to quota.Status because they are all driven by this controller. @@ -116,7 +119,7 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour if quota.Equals(curResourceQuota.Spec.Hard, oldResourceQuota.Spec.Hard) { return } - rq.enqueueResourceQuota(curResourceQuota) + rq.addQuota(curResourceQuota) }, // This will enter the sync loop and no-op, because the controller has been deleted from the store. // Note that deleting a controller immediately after scaling it to 0 will not work. The recommended @@ -160,28 +163,63 @@ func (rq *ResourceQuotaController) enqueueResourceQuota(obj interface{}) { rq.queue.Add(key) } +func (rq *ResourceQuotaController) addQuota(obj interface{}) { + key, err := controller.KeyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + return + } + + resourceQuota := obj.(*api.ResourceQuota) + + // if we declared an intent that is not yet captured in status (prioritize it) + if !api.Semantic.DeepEqual(resourceQuota.Spec.Hard, resourceQuota.Status.Hard) { + rq.missingUsageQueue.Add(key) + return + } + + // if we declared a constraint that has no usage (which this controller can calculate, prioritize it) + for constraint := range resourceQuota.Status.Hard { + if _, usageFound := resourceQuota.Status.Used[constraint]; !usageFound { + matchedResources := []api.ResourceName{constraint} + + for _, evaluator := range rq.registry.Evaluators() { + if intersection := quota.Intersection(evaluator.MatchesResources(), matchedResources); len(intersection) != 0 { + rq.missingUsageQueue.Add(key) + return + } + } + } + } + + // no special priority, go in normal recalc queue + rq.queue.Add(key) +} + // worker runs a worker thread that just dequeues items, processes them, and marks them done. -// It enforces that the syncHandler is never invoked concurrently with the same key. -func (rq *ResourceQuotaController) worker() { +func (rq *ResourceQuotaController) worker(queue workqueue.RateLimitingInterface) func() { workFunc := func() bool { - key, quit := rq.queue.Get() + key, quit := queue.Get() if quit { return true } - defer rq.queue.Done(key) + defer queue.Done(key) err := rq.syncHandler(key.(string)) if err == nil { - rq.queue.Forget(key) + queue.Forget(key) return false } utilruntime.HandleError(err) - rq.queue.AddRateLimited(key) + queue.AddRateLimited(key) return false } - for { - if quit := workFunc(); quit { - glog.Infof("resource quota controller worker shutting down") - return + + return func() { + for { + if quit := workFunc(); quit { + glog.Infof("resource quota controller worker shutting down") + return + } } } } @@ -196,7 +234,8 @@ func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) { } // the workers that chug through the quota calculation backlog for i := 0; i < workers; i++ { - go wait.Until(rq.worker, time.Second, stopCh) + go wait.Until(rq.worker(rq.queue), time.Second, stopCh) + go wait.Until(rq.worker(rq.missingUsageQueue), time.Second, stopCh) } // the timer for how often we do a full recalculation across all quotas go wait.Until(func() { rq.enqueueAll() }, rq.resyncPeriod(), stopCh) diff --git a/pkg/controller/resourcequota/resource_quota_controller_test.go b/pkg/controller/resourcequota/resource_quota_controller_test.go index afee22196c8..d7600730b0e 100644 --- a/pkg/controller/resourcequota/resource_quota_controller_test.go +++ b/pkg/controller/resourcequota/resource_quota_controller_test.go @@ -26,6 +26,7 @@ import ( "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/quota/generic" "k8s.io/kubernetes/pkg/quota/install" "k8s.io/kubernetes/pkg/util/sets" ) @@ -308,3 +309,159 @@ func TestSyncResourceQuotaNoChange(t *testing.T) { t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, expectedActionSet.Difference(actionSet)) } } + +func TestAddQuota(t *testing.T) { + kubeClient := fake.NewSimpleClientset() + resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{ + KubeClient: kubeClient, + ResyncPeriod: controller.NoResyncPeriodFunc, + Registry: install.NewRegistry(kubeClient), + GroupKindsToReplenish: []unversioned.GroupKind{ + api.Kind("Pod"), + api.Kind("ReplicationController"), + api.Kind("PersistentVolumeClaim"), + }, + ControllerFactory: NewReplenishmentControllerFactoryFromClient(kubeClient), + ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc, + } + quotaController := NewResourceQuotaController(resourceQuotaControllerOptions) + + delete(quotaController.registry.(*generic.GenericRegistry).InternalEvaluators, api.Kind("Service")) + + testCases := []struct { + name string + + quota *api.ResourceQuota + expectedPriority bool + }{ + { + name: "no status", + expectedPriority: true, + quota: &api.ResourceQuota{ + ObjectMeta: api.ObjectMeta{ + Namespace: "default", + Name: "rq", + }, + Spec: api.ResourceQuotaSpec{ + Hard: api.ResourceList{ + api.ResourceCPU: resource.MustParse("4"), + }, + }, + }, + }, + { + name: "status, no usage", + expectedPriority: true, + quota: &api.ResourceQuota{ + ObjectMeta: api.ObjectMeta{ + Namespace: "default", + Name: "rq", + }, + Spec: api.ResourceQuotaSpec{ + Hard: api.ResourceList{ + api.ResourceCPU: resource.MustParse("4"), + }, + }, + Status: api.ResourceQuotaStatus{ + Hard: api.ResourceList{ + api.ResourceCPU: resource.MustParse("4"), + }, + }, + }, + }, + { + name: "status, mismatch", + expectedPriority: true, + quota: &api.ResourceQuota{ + ObjectMeta: api.ObjectMeta{ + Namespace: "default", + Name: "rq", + }, + Spec: api.ResourceQuotaSpec{ + Hard: api.ResourceList{ + api.ResourceCPU: resource.MustParse("4"), + }, + }, + Status: api.ResourceQuotaStatus{ + Hard: api.ResourceList{ + api.ResourceCPU: resource.MustParse("6"), + }, + Used: api.ResourceList{ + api.ResourceCPU: resource.MustParse("0"), + }, + }, + }, + }, + { + name: "status, missing usage, but don't care", + expectedPriority: false, + quota: &api.ResourceQuota{ + ObjectMeta: api.ObjectMeta{ + Namespace: "default", + Name: "rq", + }, + Spec: api.ResourceQuotaSpec{ + Hard: api.ResourceList{ + api.ResourceServices: resource.MustParse("4"), + }, + }, + Status: api.ResourceQuotaStatus{ + Hard: api.ResourceList{ + api.ResourceServices: resource.MustParse("4"), + }, + }, + }, + }, + { + name: "ready", + expectedPriority: false, + quota: &api.ResourceQuota{ + ObjectMeta: api.ObjectMeta{ + Namespace: "default", + Name: "rq", + }, + Spec: api.ResourceQuotaSpec{ + Hard: api.ResourceList{ + api.ResourceCPU: resource.MustParse("4"), + }, + }, + Status: api.ResourceQuotaStatus{ + Hard: api.ResourceList{ + api.ResourceCPU: resource.MustParse("4"), + }, + Used: api.ResourceList{ + api.ResourceCPU: resource.MustParse("0"), + }, + }, + }, + }, + } + + for _, tc := range testCases { + quotaController.addQuota(tc.quota) + if tc.expectedPriority { + if e, a := 1, quotaController.missingUsageQueue.Len(); e != a { + t.Errorf("%s: expected %v, got %v", tc.name, e, a) + } + if e, a := 0, quotaController.queue.Len(); e != a { + t.Errorf("%s: expected %v, got %v", tc.name, e, a) + } + } else { + if e, a := 0, quotaController.missingUsageQueue.Len(); e != a { + t.Errorf("%s: expected %v, got %v", tc.name, e, a) + } + if e, a := 1, quotaController.queue.Len(); e != a { + t.Errorf("%s: expected %v, got %v", tc.name, e, a) + } + } + + for quotaController.missingUsageQueue.Len() > 0 { + key, _ := quotaController.missingUsageQueue.Get() + quotaController.missingUsageQueue.Done(key) + } + for quotaController.queue.Len() > 0 { + key, _ := quotaController.queue.Get() + quotaController.queue.Done(key) + } + } +}