use a separate queue for initial quota calculation

This commit is contained in:
deads2k 2016-07-18 15:48:37 -04:00
parent 9344f2ca89
commit 2ea342289e
2 changed files with 209 additions and 13 deletions

View File

@ -63,6 +63,8 @@ type ResourceQuotaController struct {
rqController *framework.Controller rqController *framework.Controller
// ResourceQuota objects that need to be synchronized // ResourceQuota objects that need to be synchronized
queue workqueue.RateLimitingInterface queue workqueue.RateLimitingInterface
// missingUsageQueue holds objects that are missing the initial usage informatino
missingUsageQueue workqueue.RateLimitingInterface
// To allow injection of syncUsage for testing. // To allow injection of syncUsage for testing.
syncHandler func(key string) error syncHandler func(key string) error
// function that controls full recalculation of quota usage // function that controls full recalculation of quota usage
@ -78,6 +80,7 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour
rq := &ResourceQuotaController{ rq := &ResourceQuotaController{
kubeClient: options.KubeClient, kubeClient: options.KubeClient,
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
missingUsageQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
resyncPeriod: options.ResyncPeriod, resyncPeriod: options.ResyncPeriod,
registry: options.Registry, registry: options.Registry,
replenishmentControllers: []framework.ControllerInterface{}, replenishmentControllers: []framework.ControllerInterface{},
@ -101,7 +104,7 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour
&api.ResourceQuota{}, &api.ResourceQuota{},
rq.resyncPeriod(), rq.resyncPeriod(),
framework.ResourceEventHandlerFuncs{ framework.ResourceEventHandlerFuncs{
AddFunc: rq.enqueueResourceQuota, AddFunc: rq.addQuota,
UpdateFunc: func(old, cur interface{}) { UpdateFunc: func(old, cur interface{}) {
// We are only interested in observing updates to quota.spec to drive updates to quota.status. // We are only interested in observing updates to quota.spec to drive updates to quota.status.
// We ignore all updates to quota.Status because they are all driven by this controller. // We ignore all updates to quota.Status because they are all driven by this controller.
@ -116,7 +119,7 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour
if quota.Equals(curResourceQuota.Spec.Hard, oldResourceQuota.Spec.Hard) { if quota.Equals(curResourceQuota.Spec.Hard, oldResourceQuota.Spec.Hard) {
return return
} }
rq.enqueueResourceQuota(curResourceQuota) rq.addQuota(curResourceQuota)
}, },
// This will enter the sync loop and no-op, because the controller has been deleted from the store. // This will enter the sync loop and no-op, because the controller has been deleted from the store.
// Note that deleting a controller immediately after scaling it to 0 will not work. The recommended // Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
@ -160,30 +163,65 @@ func (rq *ResourceQuotaController) enqueueResourceQuota(obj interface{}) {
rq.queue.Add(key) rq.queue.Add(key)
} }
func (rq *ResourceQuotaController) addQuota(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
return
}
resourceQuota := obj.(*api.ResourceQuota)
// if we declared an intent that is not yet captured in status (prioritize it)
if !api.Semantic.DeepEqual(resourceQuota.Spec.Hard, resourceQuota.Status.Hard) {
rq.missingUsageQueue.Add(key)
return
}
// if we declared a constraint that has no usage (which this controller can calculate, prioritize it)
for constraint := range resourceQuota.Status.Hard {
if _, usageFound := resourceQuota.Status.Used[constraint]; !usageFound {
matchedResources := []api.ResourceName{constraint}
for _, evaluator := range rq.registry.Evaluators() {
if intersection := quota.Intersection(evaluator.MatchesResources(), matchedResources); len(intersection) != 0 {
rq.missingUsageQueue.Add(key)
return
}
}
}
}
// no special priority, go in normal recalc queue
rq.queue.Add(key)
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key. func (rq *ResourceQuotaController) worker(queue workqueue.RateLimitingInterface) func() {
func (rq *ResourceQuotaController) worker() {
workFunc := func() bool { workFunc := func() bool {
key, quit := rq.queue.Get() key, quit := queue.Get()
if quit { if quit {
return true return true
} }
defer rq.queue.Done(key) defer queue.Done(key)
err := rq.syncHandler(key.(string)) err := rq.syncHandler(key.(string))
if err == nil { if err == nil {
rq.queue.Forget(key) queue.Forget(key)
return false return false
} }
utilruntime.HandleError(err) utilruntime.HandleError(err)
rq.queue.AddRateLimited(key) queue.AddRateLimited(key)
return false return false
} }
return func() {
for { for {
if quit := workFunc(); quit { if quit := workFunc(); quit {
glog.Infof("resource quota controller worker shutting down") glog.Infof("resource quota controller worker shutting down")
return return
} }
} }
}
} }
// Run begins quota controller using the specified number of workers // Run begins quota controller using the specified number of workers
@ -196,7 +234,8 @@ func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) {
} }
// the workers that chug through the quota calculation backlog // the workers that chug through the quota calculation backlog
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
go wait.Until(rq.worker, time.Second, stopCh) go wait.Until(rq.worker(rq.queue), time.Second, stopCh)
go wait.Until(rq.worker(rq.missingUsageQueue), time.Second, stopCh)
} }
// the timer for how often we do a full recalculation across all quotas // the timer for how often we do a full recalculation across all quotas
go wait.Until(func() { rq.enqueueAll() }, rq.resyncPeriod(), stopCh) go wait.Until(func() { rq.enqueueAll() }, rq.resyncPeriod(), stopCh)

View File

@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/quota/generic"
"k8s.io/kubernetes/pkg/quota/install" "k8s.io/kubernetes/pkg/quota/install"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
) )
@ -308,3 +309,159 @@ func TestSyncResourceQuotaNoChange(t *testing.T) {
t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, expectedActionSet.Difference(actionSet)) t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, expectedActionSet.Difference(actionSet))
} }
} }
func TestAddQuota(t *testing.T) {
kubeClient := fake.NewSimpleClientset()
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
KubeClient: kubeClient,
ResyncPeriod: controller.NoResyncPeriodFunc,
Registry: install.NewRegistry(kubeClient),
GroupKindsToReplenish: []unversioned.GroupKind{
api.Kind("Pod"),
api.Kind("ReplicationController"),
api.Kind("PersistentVolumeClaim"),
},
ControllerFactory: NewReplenishmentControllerFactoryFromClient(kubeClient),
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
}
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)
delete(quotaController.registry.(*generic.GenericRegistry).InternalEvaluators, api.Kind("Service"))
testCases := []struct {
name string
quota *api.ResourceQuota
expectedPriority bool
}{
{
name: "no status",
expectedPriority: true,
quota: &api.ResourceQuota{
ObjectMeta: api.ObjectMeta{
Namespace: "default",
Name: "rq",
},
Spec: api.ResourceQuotaSpec{
Hard: api.ResourceList{
api.ResourceCPU: resource.MustParse("4"),
},
},
},
},
{
name: "status, no usage",
expectedPriority: true,
quota: &api.ResourceQuota{
ObjectMeta: api.ObjectMeta{
Namespace: "default",
Name: "rq",
},
Spec: api.ResourceQuotaSpec{
Hard: api.ResourceList{
api.ResourceCPU: resource.MustParse("4"),
},
},
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{
api.ResourceCPU: resource.MustParse("4"),
},
},
},
},
{
name: "status, mismatch",
expectedPriority: true,
quota: &api.ResourceQuota{
ObjectMeta: api.ObjectMeta{
Namespace: "default",
Name: "rq",
},
Spec: api.ResourceQuotaSpec{
Hard: api.ResourceList{
api.ResourceCPU: resource.MustParse("4"),
},
},
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{
api.ResourceCPU: resource.MustParse("6"),
},
Used: api.ResourceList{
api.ResourceCPU: resource.MustParse("0"),
},
},
},
},
{
name: "status, missing usage, but don't care",
expectedPriority: false,
quota: &api.ResourceQuota{
ObjectMeta: api.ObjectMeta{
Namespace: "default",
Name: "rq",
},
Spec: api.ResourceQuotaSpec{
Hard: api.ResourceList{
api.ResourceServices: resource.MustParse("4"),
},
},
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{
api.ResourceServices: resource.MustParse("4"),
},
},
},
},
{
name: "ready",
expectedPriority: false,
quota: &api.ResourceQuota{
ObjectMeta: api.ObjectMeta{
Namespace: "default",
Name: "rq",
},
Spec: api.ResourceQuotaSpec{
Hard: api.ResourceList{
api.ResourceCPU: resource.MustParse("4"),
},
},
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{
api.ResourceCPU: resource.MustParse("4"),
},
Used: api.ResourceList{
api.ResourceCPU: resource.MustParse("0"),
},
},
},
},
}
for _, tc := range testCases {
quotaController.addQuota(tc.quota)
if tc.expectedPriority {
if e, a := 1, quotaController.missingUsageQueue.Len(); e != a {
t.Errorf("%s: expected %v, got %v", tc.name, e, a)
}
if e, a := 0, quotaController.queue.Len(); e != a {
t.Errorf("%s: expected %v, got %v", tc.name, e, a)
}
} else {
if e, a := 0, quotaController.missingUsageQueue.Len(); e != a {
t.Errorf("%s: expected %v, got %v", tc.name, e, a)
}
if e, a := 1, quotaController.queue.Len(); e != a {
t.Errorf("%s: expected %v, got %v", tc.name, e, a)
}
}
for quotaController.missingUsageQueue.Len() > 0 {
key, _ := quotaController.missingUsageQueue.Get()
quotaController.missingUsageQueue.Done(key)
}
for quotaController.queue.Len() > 0 {
key, _ := quotaController.queue.Get()
quotaController.queue.Done(key)
}
}
}