refactor quota evaluation to cleanly abstract the quota access

This commit is contained in:
deads2k 2016-05-26 09:59:48 -04:00
parent f3359fe134
commit a28cf3963b
4 changed files with 318 additions and 209 deletions

View File

@ -18,8 +18,6 @@ package resourcequota
import (
"io"
"sort"
"strings"
"time"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
@ -28,7 +26,6 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/quota"
"k8s.io/kubernetes/pkg/quota/install"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
)
func init() {
@ -44,7 +41,7 @@ func init() {
type quotaAdmission struct {
*admission.Handler
evaluator *quotaEvaluator
evaluator Evaluator
}
type liveLookupEntry struct {
@ -56,13 +53,13 @@ type liveLookupEntry struct {
// using the provided registry. The registry must have the capability to handle group/kinds that
// are persisted by the server this admission controller is intercepting
func NewResourceQuota(client clientset.Interface, registry quota.Registry, numEvaluators int, stopCh <-chan struct{}) (admission.Interface, error) {
evaluator, err := newQuotaEvaluator(client, registry)
quotaAccessor, err := newQuotaAccessor(client)
if err != nil {
return nil, err
}
go quotaAccessor.Run(stopCh)
defer utilruntime.HandleCrash()
go evaluator.Run(numEvaluators, stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, registry, numEvaluators, stopCh)
return &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
@ -77,47 +74,5 @@ func (q *quotaAdmission) Admit(a admission.Attributes) (err error) {
return nil
}
// if we do not know how to evaluate use for this kind, just ignore
evaluators := q.evaluator.registry.Evaluators()
evaluator, found := evaluators[a.GetKind().GroupKind()]
if !found {
return nil
}
// for this kind, check if the operation could mutate any quota resources
// if no resources tracked by quota are impacted, then just return
op := a.GetOperation()
operationResources := evaluator.OperationResources(op)
if len(operationResources) == 0 {
return nil
}
return q.evaluator.evaluate(a)
}
// prettyPrint formats a resource list for usage in errors
// it outputs resources sorted in increasing order
func prettyPrint(item api.ResourceList) string {
parts := []string{}
keys := []string{}
for key := range item {
keys = append(keys, string(key))
}
sort.Strings(keys)
for _, key := range keys {
value := item[api.ResourceName(key)]
constraint := key + "=" + value.String()
parts = append(parts, constraint)
}
return strings.Join(parts, ",")
}
// hasUsageStats returns true if for each hard constraint there is a value for its current usage
func hasUsageStats(resourceQuota *api.ResourceQuota) bool {
for resourceName := range resourceQuota.Status.Hard {
if _, found := resourceQuota.Status.Used[resourceName]; !found {
return false
}
}
return true
return q.evaluator.Evaluate(a)
}

View File

@ -144,12 +144,15 @@ func TestAdmissionIgnoresSubresources(t *testing.T) {
resourceQuota.Status.Used[api.ResourceMemory] = resource.MustParse("1Gi")
kubeClient := fake.NewSimpleClientset(resourceQuota)
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
evaluator.indexer = indexer
stopCh := make(chan struct{})
defer close(stopCh)
quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
defer utilruntime.HandleCrash()
go evaluator.Run(5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
evaluator: evaluator,
@ -185,12 +188,15 @@ func TestAdmitBelowQuotaLimit(t *testing.T) {
}
kubeClient := fake.NewSimpleClientset(resourceQuota)
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
evaluator.indexer = indexer
stopCh := make(chan struct{})
defer close(stopCh)
quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
defer utilruntime.HandleCrash()
go evaluator.Run(5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
evaluator: evaluator,
@ -265,12 +271,14 @@ func TestAdmitHandlesOldObjects(t *testing.T) {
// start up quota system
kubeClient := fake.NewSimpleClientset(resourceQuota)
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
evaluator.indexer = indexer
stopCh := make(chan struct{})
defer close(stopCh)
defer utilruntime.HandleCrash()
go evaluator.Run(5, stopCh)
quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
evaluator: evaluator,
@ -353,12 +361,15 @@ func TestAdmitExceedQuotaLimit(t *testing.T) {
}
kubeClient := fake.NewSimpleClientset(resourceQuota)
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
evaluator.indexer = indexer
stopCh := make(chan struct{})
defer close(stopCh)
quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
defer utilruntime.HandleCrash()
go evaluator.Run(5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
evaluator: evaluator,
@ -394,12 +405,15 @@ func TestAdmitEnforceQuotaConstraints(t *testing.T) {
}
kubeClient := fake.NewSimpleClientset(resourceQuota)
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
evaluator.indexer = indexer
stopCh := make(chan struct{})
defer close(stopCh)
quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
defer utilruntime.HandleCrash()
go evaluator.Run(5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
evaluator: evaluator,
@ -444,13 +458,16 @@ func TestAdmitPodInNamespaceWithoutQuota(t *testing.T) {
if err != nil {
t.Fatal(err)
}
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
evaluator.indexer = indexer
evaluator.liveLookupCache = liveLookupCache
stopCh := make(chan struct{})
defer close(stopCh)
quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer
quotaAccessor.liveLookupCache = liveLookupCache
go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
defer utilruntime.HandleCrash()
go evaluator.Run(5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
evaluator: evaluator,
@ -508,12 +525,15 @@ func TestAdmitBelowTerminatingQuotaLimit(t *testing.T) {
}
kubeClient := fake.NewSimpleClientset(resourceQuotaTerminating, resourceQuotaNonTerminating)
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
evaluator.indexer = indexer
stopCh := make(chan struct{})
defer close(stopCh)
quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
defer utilruntime.HandleCrash()
go evaluator.Run(5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
evaluator: evaluator,
@ -610,12 +630,15 @@ func TestAdmitBelowBestEffortQuotaLimit(t *testing.T) {
}
kubeClient := fake.NewSimpleClientset(resourceQuotaBestEffort, resourceQuotaNotBestEffort)
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
evaluator.indexer = indexer
stopCh := make(chan struct{})
defer close(stopCh)
quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
defer utilruntime.HandleCrash()
go evaluator.Run(5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
evaluator: evaluator,
@ -699,12 +722,15 @@ func TestAdmitBestEffortQuotaLimitIgnoresBurstable(t *testing.T) {
}
kubeClient := fake.NewSimpleClientset(resourceQuota)
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
evaluator.indexer = indexer
stopCh := make(chan struct{})
defer close(stopCh)
quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
defer utilruntime.HandleCrash()
go evaluator.Run(5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
evaluator: evaluator,
@ -814,13 +840,16 @@ func TestAdmissionSetsMissingNamespace(t *testing.T) {
podEvaluator.GroupKind(): podEvaluator,
},
}
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
evaluator.indexer = indexer
evaluator.registry = registry
stopCh := make(chan struct{})
defer close(stopCh)
quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
evaluator.(*quotaEvaluator).registry = registry
defer utilruntime.HandleCrash()
go evaluator.Run(5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
evaluator: evaluator,

View File

@ -18,46 +18,36 @@ package resourcequota
import (
"fmt"
"sort"
"strings"
"sync"
"time"
"github.com/golang/glog"
lru "github.com/hashicorp/golang-lru"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/quota"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage/etcd"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
)
type quotaEvaluator struct {
client clientset.Interface
// Evaluator is used to see if quota constraints are satisfied.
type Evaluator interface {
// Evaluate takes an operation and checks to see if quota constraints are satisfied. It returns an error if they are not.
// The default implementation process related operations in chunks when possible.
Evaluate(a admission.Attributes) error
}
type quotaEvaluator struct {
quotaAccessor QuotaAccessor
// indexer that holds quota objects by namespace
indexer cache.Indexer
// registry that knows how to measure usage for objects
registry quota.Registry
// liveLookups holds the last few live lookups we've done to help ammortize cost on repeated lookup failures.
// This let's us handle the case of latent caches, by looking up actual results for a namespace on cache miss/no results.
// We track the lookup result here so that for repeated requests, we don't look it up very often.
liveLookupCache *lru.Cache
liveTTL time.Duration
// updatedQuotas holds a cache of quotas that we've updated. This is used to pull the "really latest" during back to
// back quota evaluations that touch the same quota doc. This only works because we can compare etcd resourceVersions
// for the same resource as integers. Before this change: 22 updates with 12 conflicts. after this change: 15 updates with 0 conflicts
updatedQuotas *lru.Cache
// TODO these are used together to bucket items by namespace and then batch them up for processing.
// The technique is valuable for rollup activities to avoid fanout and reduce resource contention.
// We could move this into a library if another component needed it.
@ -67,6 +57,11 @@ type quotaEvaluator struct {
work map[string][]*admissionWaiter
dirtyWork map[string][]*admissionWaiter
inProgress sets.String
// controls the run method so that we can cleanly conform to the Evaluator interface
workers int
stopCh <-chan struct{}
init sync.Once
}
type admissionWaiter struct {
@ -98,52 +93,33 @@ func newAdmissionWaiter(a admission.Attributes) *admissionWaiter {
}
}
// newQuotaEvaluator configures an admission controller that can enforce quota constraints
// NewQuotaEvaluator configures an admission controller that can enforce quota constraints
// using the provided registry. The registry must have the capability to handle group/kinds that
// are persisted by the server this admission controller is intercepting
func newQuotaEvaluator(client clientset.Interface, registry quota.Registry) (*quotaEvaluator, error) {
liveLookupCache, err := lru.New(100)
if err != nil {
return nil, err
}
updatedCache, err := lru.New(100)
if err != nil {
return nil, err
}
lw := &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().ResourceQuotas(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().ResourceQuotas(api.NamespaceAll).Watch(options)
},
}
indexer, reflector := cache.NewNamespaceKeyedIndexerAndReflector(lw, &api.ResourceQuota{}, 0)
reflector.Run()
func NewQuotaEvaluator(quotaAccessor QuotaAccessor, registry quota.Registry, workers int, stopCh <-chan struct{}) Evaluator {
return &quotaEvaluator{
client: client,
indexer: indexer,
registry: registry,
liveLookupCache: liveLookupCache,
liveTTL: time.Duration(30 * time.Second),
updatedQuotas: updatedCache,
quotaAccessor: quotaAccessor,
registry: registry,
queue: workqueue.New(),
work: map[string][]*admissionWaiter{},
dirtyWork: map[string][]*admissionWaiter{},
inProgress: sets.String{},
}, nil
workers: workers,
stopCh: stopCh,
}
}
// Run begins watching and syncing.
func (e *quotaEvaluator) Run(workers int, stopCh <-chan struct{}) {
func (e *quotaEvaluator) run() {
defer utilruntime.HandleCrash()
for i := 0; i < workers; i++ {
go wait.Until(e.doWork, time.Second, stopCh)
for i := 0; i < e.workers; i++ {
go wait.Until(e.doWork, time.Second, e.stopCh)
}
<-stopCh
<-e.stopCh
glog.Infof("Shutting down quota evaluator")
e.queue.ShutDown()
}
@ -179,7 +155,7 @@ func (e *quotaEvaluator) checkAttributes(ns string, admissionAttributes []*admis
}
}()
quotas, err := e.getQuotas(ns)
quotas, err := e.quotaAccessor.GetQuotas(ns)
if err != nil {
for _, admissionAttribute := range admissionAttributes {
admissionAttribute.result = err
@ -257,14 +233,9 @@ func (e *quotaEvaluator) checkQuotas(quotas []api.ResourceQuota, admissionAttrib
continue
}
if updatedQuota, err := e.client.Core().ResourceQuotas(newQuota.Namespace).UpdateStatus(&newQuota); err != nil {
if err := e.quotaAccessor.UpdateQuotaStatus(&newQuota); err != nil {
updatedFailedQuotas = append(updatedFailedQuotas, newQuota)
lastErr = err
} else {
// update our cache
e.updateCache(updatedQuota)
}
}
@ -293,7 +264,7 @@ func (e *quotaEvaluator) checkQuotas(quotas []api.ResourceQuota, admissionAttrib
// you've added a new documented, then updated an old one, your resource matches both and you're only checking one
// updates for these quota names failed. Get the current quotas in the namespace, compare by name, check to see if the
// resource versions have changed. If not, we're going to fall through an fail everything. If they all have, then we can try again
newQuotas, err := e.getQuotas(quotas[0].Namespace)
newQuotas, err := e.quotaAccessor.GetQuotas(quotas[0].Namespace)
if err != nil {
// this means that updates failed. Anything with a default deny error has failed and we need to let them know
for _, admissionAttribute := range admissionAttributes {
@ -416,7 +387,25 @@ func (e *quotaEvaluator) checkRequest(quotas []api.ResourceQuota, a admission.At
return quotas, nil
}
func (e *quotaEvaluator) evaluate(a admission.Attributes) error {
func (e *quotaEvaluator) Evaluate(a admission.Attributes) error {
e.init.Do(func() {
go e.run()
})
// if we do not know how to evaluate use for this kind, just ignore
evaluators := e.registry.Evaluators()
evaluator, found := evaluators[a.GetKind().GroupKind()]
if !found {
return nil
}
// for this kind, check if the operation could mutate any quota resources
// if no resources tracked by quota are impacted, then just return
op := a.GetOperation()
operationResources := evaluator.OperationResources(op)
if len(operationResources) == 0 {
return nil
}
waiter := newAdmissionWaiter(a)
e.addWork(waiter)
@ -485,72 +474,29 @@ func (e *quotaEvaluator) getWork() (string, []*admissionWaiter, bool) {
return ns, []*admissionWaiter{}, false
}
func (e *quotaEvaluator) updateCache(quota *api.ResourceQuota) {
key := quota.Namespace + "/" + quota.Name
e.updatedQuotas.Add(key, quota)
// prettyPrint formats a resource list for usage in errors
// it outputs resources sorted in increasing order
func prettyPrint(item api.ResourceList) string {
parts := []string{}
keys := []string{}
for key := range item {
keys = append(keys, string(key))
}
sort.Strings(keys)
for _, key := range keys {
value := item[api.ResourceName(key)]
constraint := key + "=" + value.String()
parts = append(parts, constraint)
}
return strings.Join(parts, ",")
}
var etcdVersioner = etcd.APIObjectVersioner{}
// checkCache compares the passed quota against the value in the look-aside cache and returns the newer
// if the cache is out of date, it deletes the stale entry. This only works because of etcd resourceVersions
// being monotonically increasing integers
func (e *quotaEvaluator) checkCache(quota *api.ResourceQuota) *api.ResourceQuota {
key := quota.Namespace + "/" + quota.Name
uncastCachedQuota, ok := e.updatedQuotas.Get(key)
if !ok {
return quota
}
cachedQuota := uncastCachedQuota.(*api.ResourceQuota)
if etcdVersioner.CompareResourceVersion(quota, cachedQuota) >= 0 {
e.updatedQuotas.Remove(key)
return quota
}
return cachedQuota
}
func (e *quotaEvaluator) getQuotas(namespace string) ([]api.ResourceQuota, error) {
// determine if there are any quotas in this namespace
// if there are no quotas, we don't need to do anything
items, err := e.indexer.Index("namespace", &api.ResourceQuota{ObjectMeta: api.ObjectMeta{Namespace: namespace, Name: ""}})
if err != nil {
return nil, fmt.Errorf("Error resolving quota.")
}
// if there are no items held in our indexer, check our live-lookup LRU, if that misses, do the live lookup to prime it.
if len(items) == 0 {
lruItemObj, ok := e.liveLookupCache.Get(namespace)
if !ok || lruItemObj.(liveLookupEntry).expiry.Before(time.Now()) {
// TODO: If there are multiple operations at the same time and cache has just expired,
// this may cause multiple List operations being issued at the same time.
// If there is already in-flight List() for a given namespace, we should wait until
// it is finished and cache is updated instead of doing the same, also to avoid
// throttling - see #22422 for details.
liveList, err := e.client.Core().ResourceQuotas(namespace).List(api.ListOptions{})
if err != nil {
return nil, err
}
newEntry := liveLookupEntry{expiry: time.Now().Add(e.liveTTL)}
for i := range liveList.Items {
newEntry.items = append(newEntry.items, &liveList.Items[i])
}
e.liveLookupCache.Add(namespace, newEntry)
lruItemObj = newEntry
}
lruEntry := lruItemObj.(liveLookupEntry)
for i := range lruEntry.items {
items = append(items, lruEntry.items[i])
// hasUsageStats returns true if for each hard constraint there is a value for its current usage
func hasUsageStats(resourceQuota *api.ResourceQuota) bool {
for resourceName := range resourceQuota.Status.Hard {
if _, found := resourceQuota.Status.Used[resourceName]; !found {
return false
}
}
resourceQuotas := []api.ResourceQuota{}
for i := range items {
quota := items[i].(*api.ResourceQuota)
quota = e.checkCache(quota)
// always make a copy. We're going to muck around with this and we should never mutate the originals
resourceQuotas = append(resourceQuotas, *quota)
}
return resourceQuotas, nil
return true
}

View File

@ -0,0 +1,179 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resourcequota
import (
"fmt"
"time"
"github.com/golang/glog"
lru "github.com/hashicorp/golang-lru"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage/etcd"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/watch"
)
// QuotaAccessor abstracts the get/set logic from the rest of the Evaluator. This could be a test stub, a straight passthrough,
// or most commonly a series of deconflicting caches.
type QuotaAccessor interface {
// UpdateQuotaStatus is called to persist final status. This method should write to persistent storage.
// An error indicates that write didn't complete successfully.
UpdateQuotaStatus(newQuota *api.ResourceQuota) error
// GetQuotas gets all possible quotas for a given namespace
GetQuotas(namespace string) ([]api.ResourceQuota, error)
}
type quotaAccessor struct {
client clientset.Interface
// indexer that holds quota objects by namespace
indexer cache.Indexer
reflector *cache.Reflector
// liveLookups holds the last few live lookups we've done to help ammortize cost on repeated lookup failures.
// This let's us handle the case of latent caches, by looking up actual results for a namespace on cache miss/no results.
// We track the lookup result here so that for repeated requests, we don't look it up very often.
liveLookupCache *lru.Cache
liveTTL time.Duration
// updatedQuotas holds a cache of quotas that we've updated. This is used to pull the "really latest" during back to
// back quota evaluations that touch the same quota doc. This only works because we can compare etcd resourceVersions
// for the same resource as integers. Before this change: 22 updates with 12 conflicts. after this change: 15 updates with 0 conflicts
updatedQuotas *lru.Cache
}
// newQuotaAccessor creates an object that conforms to the QuotaAccessor interface to be used to retrieve quota objects.
func newQuotaAccessor(client clientset.Interface) (*quotaAccessor, error) {
liveLookupCache, err := lru.New(100)
if err != nil {
return nil, err
}
updatedCache, err := lru.New(100)
if err != nil {
return nil, err
}
lw := &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().ResourceQuotas(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().ResourceQuotas(api.NamespaceAll).Watch(options)
},
}
indexer, reflector := cache.NewNamespaceKeyedIndexerAndReflector(lw, &api.ResourceQuota{}, 0)
return &quotaAccessor{
client: client,
indexer: indexer,
reflector: reflector,
liveLookupCache: liveLookupCache,
liveTTL: time.Duration(30 * time.Second),
updatedQuotas: updatedCache,
}, nil
}
// Run begins watching and syncing.
func (e *quotaAccessor) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
e.reflector.RunUntil(stopCh)
<-stopCh
glog.Infof("Shutting down quota accessor")
}
func (e *quotaAccessor) UpdateQuotaStatus(newQuota *api.ResourceQuota) error {
updatedQuota, err := e.client.Core().ResourceQuotas(newQuota.Namespace).UpdateStatus(newQuota)
if err != nil {
return err
}
key := newQuota.Namespace + "/" + newQuota.Name
e.updatedQuotas.Add(key, updatedQuota)
return nil
}
var etcdVersioner = etcd.APIObjectVersioner{}
// checkCache compares the passed quota against the value in the look-aside cache and returns the newer
// if the cache is out of date, it deletes the stale entry. This only works because of etcd resourceVersions
// being monotonically increasing integers
func (e *quotaAccessor) checkCache(quota *api.ResourceQuota) *api.ResourceQuota {
key := quota.Namespace + "/" + quota.Name
uncastCachedQuota, ok := e.updatedQuotas.Get(key)
if !ok {
return quota
}
cachedQuota := uncastCachedQuota.(*api.ResourceQuota)
if etcdVersioner.CompareResourceVersion(quota, cachedQuota) >= 0 {
e.updatedQuotas.Remove(key)
return quota
}
return cachedQuota
}
func (e *quotaAccessor) GetQuotas(namespace string) ([]api.ResourceQuota, error) {
// determine if there are any quotas in this namespace
// if there are no quotas, we don't need to do anything
items, err := e.indexer.Index("namespace", &api.ResourceQuota{ObjectMeta: api.ObjectMeta{Namespace: namespace, Name: ""}})
if err != nil {
return nil, fmt.Errorf("Error resolving quota.")
}
// if there are no items held in our indexer, check our live-lookup LRU, if that misses, do the live lookup to prime it.
if len(items) == 0 {
lruItemObj, ok := e.liveLookupCache.Get(namespace)
if !ok || lruItemObj.(liveLookupEntry).expiry.Before(time.Now()) {
// TODO: If there are multiple operations at the same time and cache has just expired,
// this may cause multiple List operations being issued at the same time.
// If there is already in-flight List() for a given namespace, we should wait until
// it is finished and cache is updated instead of doing the same, also to avoid
// throttling - see #22422 for details.
liveList, err := e.client.Core().ResourceQuotas(namespace).List(api.ListOptions{})
if err != nil {
return nil, err
}
newEntry := liveLookupEntry{expiry: time.Now().Add(e.liveTTL)}
for i := range liveList.Items {
newEntry.items = append(newEntry.items, &liveList.Items[i])
}
e.liveLookupCache.Add(namespace, newEntry)
lruItemObj = newEntry
}
lruEntry := lruItemObj.(liveLookupEntry)
for i := range lruEntry.items {
items = append(items, lruEntry.items[i])
}
}
resourceQuotas := []api.ResourceQuota{}
for i := range items {
quota := items[i].(*api.ResourceQuota)
quota = e.checkCache(quota)
// always make a copy. We're going to muck around with this and we should never mutate the originals
resourceQuotas = append(resourceQuotas, *quota)
}
return resourceQuotas, nil
}