From a28cf3963bd1b985f82ee9651b5fb8291afb40d5 Mon Sep 17 00:00:00 2001 From: deads2k Date: Thu, 26 May 2016 09:59:48 -0400 Subject: [PATCH] refactor quota evaluation to cleanly abstract the quota access --- .../pkg/admission/resourcequota/admission.go | 55 +---- .../admission/resourcequota/admission_test.go | 95 ++++++--- .../pkg/admission/resourcequota/controller.go | 198 +++++++----------- .../resourcequota/resource_access.go | 179 ++++++++++++++++ 4 files changed, 318 insertions(+), 209 deletions(-) create mode 100644 plugin/pkg/admission/resourcequota/resource_access.go diff --git a/plugin/pkg/admission/resourcequota/admission.go b/plugin/pkg/admission/resourcequota/admission.go index 5ddd6db3dc4..ddc90994bc9 100644 --- a/plugin/pkg/admission/resourcequota/admission.go +++ b/plugin/pkg/admission/resourcequota/admission.go @@ -18,8 +18,6 @@ package resourcequota import ( "io" - "sort" - "strings" "time" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" @@ -28,7 +26,6 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/quota" "k8s.io/kubernetes/pkg/quota/install" - utilruntime "k8s.io/kubernetes/pkg/util/runtime" ) func init() { @@ -44,7 +41,7 @@ func init() { type quotaAdmission struct { *admission.Handler - evaluator *quotaEvaluator + evaluator Evaluator } type liveLookupEntry struct { @@ -56,13 +53,13 @@ type liveLookupEntry struct { // using the provided registry. The registry must have the capability to handle group/kinds that // are persisted by the server this admission controller is intercepting func NewResourceQuota(client clientset.Interface, registry quota.Registry, numEvaluators int, stopCh <-chan struct{}) (admission.Interface, error) { - evaluator, err := newQuotaEvaluator(client, registry) + quotaAccessor, err := newQuotaAccessor(client) if err != nil { return nil, err } + go quotaAccessor.Run(stopCh) - defer utilruntime.HandleCrash() - go evaluator.Run(numEvaluators, stopCh) + evaluator := NewQuotaEvaluator(quotaAccessor, registry, numEvaluators, stopCh) return "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), @@ -77,47 +74,5 @@ func (q *quotaAdmission) Admit(a admission.Attributes) (err error) { return nil } - // if we do not know how to evaluate use for this kind, just ignore - evaluators := q.evaluator.registry.Evaluators() - evaluator, found := evaluators[a.GetKind().GroupKind()] - if !found { - return nil - } - - // for this kind, check if the operation could mutate any quota resources - // if no resources tracked by quota are impacted, then just return - op := a.GetOperation() - operationResources := evaluator.OperationResources(op) - if len(operationResources) == 0 { - return nil - } - - return q.evaluator.evaluate(a) -} - -// prettyPrint formats a resource list for usage in errors -// it outputs resources sorted in increasing order -func prettyPrint(item api.ResourceList) string { - parts := []string{} - keys := []string{} - for key := range item { - keys = append(keys, string(key)) - } - sort.Strings(keys) - for _, key := range keys { - value := item[api.ResourceName(key)] - constraint := key + "=" + value.String() - parts = append(parts, constraint) - } - return strings.Join(parts, ",") -} - -// hasUsageStats returns true if for each hard constraint there is a value for its current usage -func hasUsageStats(resourceQuota *api.ResourceQuota) bool { - for resourceName := range resourceQuota.Status.Hard { - if _, found := resourceQuota.Status.Used[resourceName]; !found { - return false - } - } - return true + return q.evaluator.Evaluate(a) } diff --git a/plugin/pkg/admission/resourcequota/admission_test.go b/plugin/pkg/admission/resourcequota/admission_test.go index 7d650800133..7d337047b55 100644 --- a/plugin/pkg/admission/resourcequota/admission_test.go +++ b/plugin/pkg/admission/resourcequota/admission_test.go @@ -144,12 +144,15 @@ func TestAdmissionIgnoresSubresources(t *testing.T) { resourceQuota.Status.Used[api.ResourceMemory] = resource.MustParse("1Gi") kubeClient := fake.NewSimpleClientset(resourceQuota) indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) - evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient)) - evaluator.indexer = indexer stopCh := make(chan struct{}) defer close(stopCh) + + quotaAccessor, _ := newQuotaAccessor(kubeClient) + quotaAccessor.indexer = indexer + go quotaAccessor.Run(stopCh) + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh) + defer utilruntime.HandleCrash() - go evaluator.Run(5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, @@ -185,12 +188,15 @@ func TestAdmitBelowQuotaLimit(t *testing.T) { } kubeClient := fake.NewSimpleClientset(resourceQuota) indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) - evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient)) - evaluator.indexer = indexer stopCh := make(chan struct{}) defer close(stopCh) + + quotaAccessor, _ := newQuotaAccessor(kubeClient) + quotaAccessor.indexer = indexer + go quotaAccessor.Run(stopCh) + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh) + defer utilruntime.HandleCrash() - go evaluator.Run(5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, @@ -265,12 +271,14 @@ func TestAdmitHandlesOldObjects(t *testing.T) { // start up quota system kubeClient := fake.NewSimpleClientset(resourceQuota) indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) - evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient)) - evaluator.indexer = indexer stopCh := make(chan struct{}) defer close(stopCh) - defer utilruntime.HandleCrash() - go evaluator.Run(5, stopCh) + + quotaAccessor, _ := newQuotaAccessor(kubeClient) + quotaAccessor.indexer = indexer + go quotaAccessor.Run(stopCh) + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh) + handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, @@ -353,12 +361,15 @@ func TestAdmitExceedQuotaLimit(t *testing.T) { } kubeClient := fake.NewSimpleClientset(resourceQuota) indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) - evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient)) - evaluator.indexer = indexer stopCh := make(chan struct{}) defer close(stopCh) + + quotaAccessor, _ := newQuotaAccessor(kubeClient) + quotaAccessor.indexer = indexer + go quotaAccessor.Run(stopCh) + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh) + defer utilruntime.HandleCrash() - go evaluator.Run(5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, @@ -394,12 +405,15 @@ func TestAdmitEnforceQuotaConstraints(t *testing.T) { } kubeClient := fake.NewSimpleClientset(resourceQuota) indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) - evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient)) - evaluator.indexer = indexer stopCh := make(chan struct{}) defer close(stopCh) + + quotaAccessor, _ := newQuotaAccessor(kubeClient) + quotaAccessor.indexer = indexer + go quotaAccessor.Run(stopCh) + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh) + defer utilruntime.HandleCrash() - go evaluator.Run(5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, @@ -444,13 +458,16 @@ func TestAdmitPodInNamespaceWithoutQuota(t *testing.T) { if err != nil { t.Fatal(err) } - evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient)) - evaluator.indexer = indexer - evaluator.liveLookupCache = liveLookupCache stopCh := make(chan struct{}) defer close(stopCh) + + quotaAccessor, _ := newQuotaAccessor(kubeClient) + quotaAccessor.indexer = indexer + quotaAccessor.liveLookupCache = liveLookupCache + go quotaAccessor.Run(stopCh) + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh) + defer utilruntime.HandleCrash() - go evaluator.Run(5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, @@ -508,12 +525,15 @@ func TestAdmitBelowTerminatingQuotaLimit(t *testing.T) { } kubeClient := fake.NewSimpleClientset(resourceQuotaTerminating, resourceQuotaNonTerminating) indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) - evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient)) - evaluator.indexer = indexer stopCh := make(chan struct{}) defer close(stopCh) + + quotaAccessor, _ := newQuotaAccessor(kubeClient) + quotaAccessor.indexer = indexer + go quotaAccessor.Run(stopCh) + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh) + defer utilruntime.HandleCrash() - go evaluator.Run(5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, @@ -610,12 +630,15 @@ func TestAdmitBelowBestEffortQuotaLimit(t *testing.T) { } kubeClient := fake.NewSimpleClientset(resourceQuotaBestEffort, resourceQuotaNotBestEffort) indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) - evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient)) - evaluator.indexer = indexer stopCh := make(chan struct{}) defer close(stopCh) + + quotaAccessor, _ := newQuotaAccessor(kubeClient) + quotaAccessor.indexer = indexer + go quotaAccessor.Run(stopCh) + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh) + defer utilruntime.HandleCrash() - go evaluator.Run(5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, @@ -699,12 +722,15 @@ func TestAdmitBestEffortQuotaLimitIgnoresBurstable(t *testing.T) { } kubeClient := fake.NewSimpleClientset(resourceQuota) indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) - evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient)) - evaluator.indexer = indexer stopCh := make(chan struct{}) defer close(stopCh) + + quotaAccessor, _ := newQuotaAccessor(kubeClient) + quotaAccessor.indexer = indexer + go quotaAccessor.Run(stopCh) + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh) + defer utilruntime.HandleCrash() - go evaluator.Run(5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, @@ -814,13 +840,16 @@ func TestAdmissionSetsMissingNamespace(t *testing.T) { podEvaluator.GroupKind(): podEvaluator, }, } - evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient)) - evaluator.indexer = indexer - evaluator.registry = registry stopCh := make(chan struct{}) defer close(stopCh) + + quotaAccessor, _ := newQuotaAccessor(kubeClient) + quotaAccessor.indexer = indexer + go quotaAccessor.Run(stopCh) + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh) + evaluator.(*quotaEvaluator).registry = registry + defer utilruntime.HandleCrash() - go evaluator.Run(5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, diff --git a/plugin/pkg/admission/resourcequota/controller.go b/plugin/pkg/admission/resourcequota/controller.go index 5431c9e1630..31fe1a1990d 100644 --- a/plugin/pkg/admission/resourcequota/controller.go +++ b/plugin/pkg/admission/resourcequota/controller.go @@ -18,46 +18,36 @@ package resourcequota import ( "fmt" + "sort" + "strings" "sync" "time" "github.com/golang/glog" - lru "github.com/hashicorp/golang-lru" - - clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/meta" - "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/quota" - "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/storage/etcd" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" - "k8s.io/kubernetes/pkg/watch" ) -type quotaEvaluator struct { - client clientset.Interface +// Evaluator is used to see if quota constraints are satisfied. +type Evaluator interface { + // Evaluate takes an operation and checks to see if quota constraints are satisfied. It returns an error if they are not. + // The default implementation process related operations in chunks when possible. + Evaluate(a admission.Attributes) error +} + +type quotaEvaluator struct { + quotaAccessor QuotaAccessor - // indexer that holds quota objects by namespace - indexer cache.Indexer // registry that knows how to measure usage for objects registry quota.Registry - // liveLookups holds the last few live lookups we've done to help ammortize cost on repeated lookup failures. - // This let's us handle the case of latent caches, by looking up actual results for a namespace on cache miss/no results. - // We track the lookup result here so that for repeated requests, we don't look it up very often. - liveLookupCache *lru.Cache - liveTTL time.Duration - // updatedQuotas holds a cache of quotas that we've updated. This is used to pull the "really latest" during back to - // back quota evaluations that touch the same quota doc. This only works because we can compare etcd resourceVersions - // for the same resource as integers. Before this change: 22 updates with 12 conflicts. after this change: 15 updates with 0 conflicts - updatedQuotas *lru.Cache - // TODO these are used together to bucket items by namespace and then batch them up for processing. // The technique is valuable for rollup activities to avoid fanout and reduce resource contention. // We could move this into a library if another component needed it. @@ -67,6 +57,11 @@ type quotaEvaluator struct { work map[string][]*admissionWaiter dirtyWork map[string][]*admissionWaiter inProgress sets.String + + // controls the run method so that we can cleanly conform to the Evaluator interface + workers int + stopCh <-chan struct{} + init sync.Once } type admissionWaiter struct { @@ -98,52 +93,33 @@ func newAdmissionWaiter(a admission.Attributes) *admissionWaiter { } } -// newQuotaEvaluator configures an admission controller that can enforce quota constraints +// NewQuotaEvaluator configures an admission controller that can enforce quota constraints // using the provided registry. The registry must have the capability to handle group/kinds that // are persisted by the server this admission controller is intercepting -func newQuotaEvaluator(client clientset.Interface, registry quota.Registry) (*quotaEvaluator, error) { - liveLookupCache, err := lru.New(100) - if err != nil { - return nil, err - } - updatedCache, err := lru.New(100) - if err != nil { - return nil, err - } - lw := &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return client.Core().ResourceQuotas(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return client.Core().ResourceQuotas(api.NamespaceAll).Watch(options) - }, - } - indexer, reflector := cache.NewNamespaceKeyedIndexerAndReflector(lw, &api.ResourceQuota{}, 0) - - reflector.Run() +func NewQuotaEvaluator(quotaAccessor QuotaAccessor, registry quota.Registry, workers int, stopCh <-chan struct{}) Evaluator { return "aEvaluator{ - client: client, - indexer: indexer, - registry: registry, - liveLookupCache: liveLookupCache, - liveTTL: time.Duration(30 * time.Second), - updatedQuotas: updatedCache, + quotaAccessor: quotaAccessor, + + registry: registry, queue: workqueue.New(), work: map[string][]*admissionWaiter{}, dirtyWork: map[string][]*admissionWaiter{}, inProgress: sets.String{}, - }, nil + + workers: workers, + stopCh: stopCh, + } } // Run begins watching and syncing. -func (e *quotaEvaluator) Run(workers int, stopCh <-chan struct{}) { +func (e *quotaEvaluator) run() { defer utilruntime.HandleCrash() - for i := 0; i < workers; i++ { - go wait.Until(e.doWork, time.Second, stopCh) + for i := 0; i < e.workers; i++ { + go wait.Until(e.doWork, time.Second, e.stopCh) } - <-stopCh + <-e.stopCh glog.Infof("Shutting down quota evaluator") e.queue.ShutDown() } @@ -179,7 +155,7 @@ func (e *quotaEvaluator) checkAttributes(ns string, admissionAttributes []*admis } }() - quotas, err := e.getQuotas(ns) + quotas, err := e.quotaAccessor.GetQuotas(ns) if err != nil { for _, admissionAttribute := range admissionAttributes { admissionAttribute.result = err @@ -257,14 +233,9 @@ func (e *quotaEvaluator) checkQuotas(quotas []api.ResourceQuota, admissionAttrib continue } - if updatedQuota, err := e.client.Core().ResourceQuotas(newQuota.Namespace).UpdateStatus(&newQuota); err != nil { + if err := e.quotaAccessor.UpdateQuotaStatus(&newQuota); err != nil { updatedFailedQuotas = append(updatedFailedQuotas, newQuota) lastErr = err - - } else { - // update our cache - e.updateCache(updatedQuota) - } } @@ -293,7 +264,7 @@ func (e *quotaEvaluator) checkQuotas(quotas []api.ResourceQuota, admissionAttrib // you've added a new documented, then updated an old one, your resource matches both and you're only checking one // updates for these quota names failed. Get the current quotas in the namespace, compare by name, check to see if the // resource versions have changed. If not, we're going to fall through an fail everything. If they all have, then we can try again - newQuotas, err := e.getQuotas(quotas[0].Namespace) + newQuotas, err := e.quotaAccessor.GetQuotas(quotas[0].Namespace) if err != nil { // this means that updates failed. Anything with a default deny error has failed and we need to let them know for _, admissionAttribute := range admissionAttributes { @@ -416,7 +387,25 @@ func (e *quotaEvaluator) checkRequest(quotas []api.ResourceQuota, a admission.At return quotas, nil } -func (e *quotaEvaluator) evaluate(a admission.Attributes) error { +func (e *quotaEvaluator) Evaluate(a admission.Attributes) error { + e.init.Do(func() { + go e.run() + }) + + // if we do not know how to evaluate use for this kind, just ignore + evaluators := e.registry.Evaluators() + evaluator, found := evaluators[a.GetKind().GroupKind()] + if !found { + return nil + } + // for this kind, check if the operation could mutate any quota resources + // if no resources tracked by quota are impacted, then just return + op := a.GetOperation() + operationResources := evaluator.OperationResources(op) + if len(operationResources) == 0 { + return nil + } + waiter := newAdmissionWaiter(a) e.addWork(waiter) @@ -485,72 +474,29 @@ func (e *quotaEvaluator) getWork() (string, []*admissionWaiter, bool) { return ns, []*admissionWaiter{}, false } -func (e *quotaEvaluator) updateCache(quota *api.ResourceQuota) { - key := quota.Namespace + "/" + quota.Name - e.updatedQuotas.Add(key, quota) +// prettyPrint formats a resource list for usage in errors +// it outputs resources sorted in increasing order +func prettyPrint(item api.ResourceList) string { + parts := []string{} + keys := []string{} + for key := range item { + keys = append(keys, string(key)) + } + sort.Strings(keys) + for _, key := range keys { + value := item[api.ResourceName(key)] + constraint := key + "=" + value.String() + parts = append(parts, constraint) + } + return strings.Join(parts, ",") } -var etcdVersioner = etcd.APIObjectVersioner{} - -// checkCache compares the passed quota against the value in the look-aside cache and returns the newer -// if the cache is out of date, it deletes the stale entry. This only works because of etcd resourceVersions -// being monotonically increasing integers -func (e *quotaEvaluator) checkCache(quota *api.ResourceQuota) *api.ResourceQuota { - key := quota.Namespace + "/" + quota.Name - uncastCachedQuota, ok := e.updatedQuotas.Get(key) - if !ok { - return quota - } - cachedQuota := uncastCachedQuota.(*api.ResourceQuota) - - if etcdVersioner.CompareResourceVersion(quota, cachedQuota) >= 0 { - e.updatedQuotas.Remove(key) - return quota - } - return cachedQuota -} - -func (e *quotaEvaluator) getQuotas(namespace string) ([]api.ResourceQuota, error) { - // determine if there are any quotas in this namespace - // if there are no quotas, we don't need to do anything - items, err := e.indexer.Index("namespace", &api.ResourceQuota{ObjectMeta: api.ObjectMeta{Namespace: namespace, Name: ""}}) - if err != nil { - return nil, fmt.Errorf("Error resolving quota.") - } - - // if there are no items held in our indexer, check our live-lookup LRU, if that misses, do the live lookup to prime it. - if len(items) == 0 { - lruItemObj, ok := e.liveLookupCache.Get(namespace) - if !ok || lruItemObj.(liveLookupEntry).expiry.Before(time.Now()) { - // TODO: If there are multiple operations at the same time and cache has just expired, - // this may cause multiple List operations being issued at the same time. - // If there is already in-flight List() for a given namespace, we should wait until - // it is finished and cache is updated instead of doing the same, also to avoid - // throttling - see #22422 for details. - liveList, err := e.client.Core().ResourceQuotas(namespace).List(api.ListOptions{}) - if err != nil { - return nil, err - } - newEntry := liveLookupEntry{expiry: time.Now().Add(e.liveTTL)} - for i := range liveList.Items { - newEntry.items = append(newEntry.items, &liveList.Items[i]) - } - e.liveLookupCache.Add(namespace, newEntry) - lruItemObj = newEntry - } - lruEntry := lruItemObj.(liveLookupEntry) - for i := range lruEntry.items { - items = append(items, lruEntry.items[i]) +// hasUsageStats returns true if for each hard constraint there is a value for its current usage +func hasUsageStats(resourceQuota *api.ResourceQuota) bool { + for resourceName := range resourceQuota.Status.Hard { + if _, found := resourceQuota.Status.Used[resourceName]; !found { + return false } } - - resourceQuotas := []api.ResourceQuota{} - for i := range items { - quota := items[i].(*api.ResourceQuota) - quota = e.checkCache(quota) - // always make a copy. We're going to muck around with this and we should never mutate the originals - resourceQuotas = append(resourceQuotas, *quota) - } - - return resourceQuotas, nil + return true } diff --git a/plugin/pkg/admission/resourcequota/resource_access.go b/plugin/pkg/admission/resourcequota/resource_access.go new file mode 100644 index 00000000000..48882281c1a --- /dev/null +++ b/plugin/pkg/admission/resourcequota/resource_access.go @@ -0,0 +1,179 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcequota + +import ( + "fmt" + "time" + + "github.com/golang/glog" + lru "github.com/hashicorp/golang-lru" + + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage/etcd" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/watch" +) + +// QuotaAccessor abstracts the get/set logic from the rest of the Evaluator. This could be a test stub, a straight passthrough, +// or most commonly a series of deconflicting caches. +type QuotaAccessor interface { + // UpdateQuotaStatus is called to persist final status. This method should write to persistent storage. + // An error indicates that write didn't complete successfully. + UpdateQuotaStatus(newQuota *api.ResourceQuota) error + + // GetQuotas gets all possible quotas for a given namespace + GetQuotas(namespace string) ([]api.ResourceQuota, error) +} + +type quotaAccessor struct { + client clientset.Interface + + // indexer that holds quota objects by namespace + indexer cache.Indexer + reflector *cache.Reflector + + // liveLookups holds the last few live lookups we've done to help ammortize cost on repeated lookup failures. + // This let's us handle the case of latent caches, by looking up actual results for a namespace on cache miss/no results. + // We track the lookup result here so that for repeated requests, we don't look it up very often. + liveLookupCache *lru.Cache + liveTTL time.Duration + // updatedQuotas holds a cache of quotas that we've updated. This is used to pull the "really latest" during back to + // back quota evaluations that touch the same quota doc. This only works because we can compare etcd resourceVersions + // for the same resource as integers. Before this change: 22 updates with 12 conflicts. after this change: 15 updates with 0 conflicts + updatedQuotas *lru.Cache +} + +// newQuotaAccessor creates an object that conforms to the QuotaAccessor interface to be used to retrieve quota objects. +func newQuotaAccessor(client clientset.Interface) (*quotaAccessor, error) { + liveLookupCache, err := lru.New(100) + if err != nil { + return nil, err + } + updatedCache, err := lru.New(100) + if err != nil { + return nil, err + } + lw := &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return client.Core().ResourceQuotas(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return client.Core().ResourceQuotas(api.NamespaceAll).Watch(options) + }, + } + indexer, reflector := cache.NewNamespaceKeyedIndexerAndReflector(lw, &api.ResourceQuota{}, 0) + + return "aAccessor{ + client: client, + indexer: indexer, + reflector: reflector, + liveLookupCache: liveLookupCache, + liveTTL: time.Duration(30 * time.Second), + updatedQuotas: updatedCache, + }, nil +} + +// Run begins watching and syncing. +func (e *quotaAccessor) Run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + + e.reflector.RunUntil(stopCh) + + <-stopCh + glog.Infof("Shutting down quota accessor") +} + +func (e *quotaAccessor) UpdateQuotaStatus(newQuota *api.ResourceQuota) error { + updatedQuota, err := e.client.Core().ResourceQuotas(newQuota.Namespace).UpdateStatus(newQuota) + if err != nil { + return err + } + + key := newQuota.Namespace + "/" + newQuota.Name + e.updatedQuotas.Add(key, updatedQuota) + return nil +} + +var etcdVersioner = etcd.APIObjectVersioner{} + +// checkCache compares the passed quota against the value in the look-aside cache and returns the newer +// if the cache is out of date, it deletes the stale entry. This only works because of etcd resourceVersions +// being monotonically increasing integers +func (e *quotaAccessor) checkCache(quota *api.ResourceQuota) *api.ResourceQuota { + key := quota.Namespace + "/" + quota.Name + uncastCachedQuota, ok := e.updatedQuotas.Get(key) + if !ok { + return quota + } + cachedQuota := uncastCachedQuota.(*api.ResourceQuota) + + if etcdVersioner.CompareResourceVersion(quota, cachedQuota) >= 0 { + e.updatedQuotas.Remove(key) + return quota + } + return cachedQuota +} + +func (e *quotaAccessor) GetQuotas(namespace string) ([]api.ResourceQuota, error) { + // determine if there are any quotas in this namespace + // if there are no quotas, we don't need to do anything + items, err := e.indexer.Index("namespace", &api.ResourceQuota{ObjectMeta: api.ObjectMeta{Namespace: namespace, Name: ""}}) + if err != nil { + return nil, fmt.Errorf("Error resolving quota.") + } + + // if there are no items held in our indexer, check our live-lookup LRU, if that misses, do the live lookup to prime it. + if len(items) == 0 { + lruItemObj, ok := e.liveLookupCache.Get(namespace) + if !ok || lruItemObj.(liveLookupEntry).expiry.Before(time.Now()) { + // TODO: If there are multiple operations at the same time and cache has just expired, + // this may cause multiple List operations being issued at the same time. + // If there is already in-flight List() for a given namespace, we should wait until + // it is finished and cache is updated instead of doing the same, also to avoid + // throttling - see #22422 for details. + liveList, err := e.client.Core().ResourceQuotas(namespace).List(api.ListOptions{}) + if err != nil { + return nil, err + } + newEntry := liveLookupEntry{expiry: time.Now().Add(e.liveTTL)} + for i := range liveList.Items { + newEntry.items = append(newEntry.items, &liveList.Items[i]) + } + e.liveLookupCache.Add(namespace, newEntry) + lruItemObj = newEntry + } + lruEntry := lruItemObj.(liveLookupEntry) + for i := range lruEntry.items { + items = append(items, lruEntry.items[i]) + } + } + + resourceQuotas := []api.ResourceQuota{} + for i := range items { + quota := items[i].(*api.ResourceQuota) + quota = e.checkCache(quota) + // always make a copy. We're going to muck around with this and we should never mutate the originals + resourceQuotas = append(resourceQuotas, *quota) + } + + return resourceQuotas, nil +}