diff --git a/plugin/pkg/admission/limitranger/admission.go b/plugin/pkg/admission/limitranger/admission.go index aed3e0e5e94..0a2ddaf2942 100644 --- a/plugin/pkg/admission/limitranger/admission.go +++ b/plugin/pkg/admission/limitranger/admission.go @@ -21,6 +21,9 @@ import ( "io" "sort" "strings" + "time" + + "github.com/hashicorp/golang-lru" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" @@ -40,7 +43,7 @@ const ( func init() { admission.RegisterPlugin("LimitRanger", func(client clientset.Interface, config io.Reader) (admission.Interface, error) { - return NewLimitRanger(client, Limit), nil + return NewLimitRanger(client, Limit) }) } @@ -50,6 +53,17 @@ type limitRanger struct { client clientset.Interface limitFunc LimitFunc indexer cache.Indexer + + // liveLookups holds the last few live lookups we've done to help ammortize cost on repeated lookup failures. + // This let's us handle the case of latent caches, by looking up actual results for a namespace on cache miss/no results. + // We track the lookup result here so that for repeated requests, we don't look it up very often. + liveLookupCache *lru.Cache + liveTTL time.Duration +} + +type liveLookupEntry struct { + expiry time.Time + items []*api.LimitRange } // Admit admits resources into cluster that do not violate any defined LimitRange in the namespace @@ -79,8 +93,28 @@ func (l *limitRanger) Admit(a admission.Attributes) (err error) { if err != nil { return admission.NewForbidden(a, fmt.Errorf("Unable to %s %v at this time because there was an error enforcing limit ranges", a.GetOperation(), a.GetResource())) } + + // if there are no items held in our indexer, check our live-lookup LRU, if that misses, do the live lookup to prime it. if len(items) == 0 { - return nil + lruItemObj, ok := l.liveLookupCache.Get(a.GetNamespace()) + if !ok || lruItemObj.(liveLookupEntry).expiry.Before(time.Now()) { + liveList, err := l.client.Core().LimitRanges(a.GetNamespace()).List(api.ListOptions{}) + if err != nil { + return admission.NewForbidden(a, err) + } + newEntry := liveLookupEntry{expiry: time.Now().Add(l.liveTTL)} + for i := range liveList.Items { + newEntry.items = append(newEntry.items, &liveList.Items[i]) + } + l.liveLookupCache.Add(a.GetNamespace(), newEntry) + lruItemObj = newEntry + } + lruEntry := lruItemObj.(liveLookupEntry) + + for i := range lruEntry.items { + items = append(items, lruEntry.items[i]) + } + } // ensure it meets each prescribed min/max @@ -95,7 +129,12 @@ func (l *limitRanger) Admit(a admission.Attributes) (err error) { } // NewLimitRanger returns an object that enforces limits based on the supplied limit function -func NewLimitRanger(client clientset.Interface, limitFunc LimitFunc) admission.Interface { +func NewLimitRanger(client clientset.Interface, limitFunc LimitFunc) (admission.Interface, error) { + liveLookupCache, err := lru.New(10000) + if err != nil { + return nil, err + } + lw := &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { return client.Core().LimitRanges(api.NamespaceAll).List(options) @@ -107,11 +146,13 @@ func NewLimitRanger(client clientset.Interface, limitFunc LimitFunc) admission.I indexer, reflector := cache.NewNamespaceKeyedIndexerAndReflector(lw, &api.LimitRange{}, 0) reflector.Run() return &limitRanger{ - Handler: admission.NewHandler(admission.Create, admission.Update), - client: client, - limitFunc: limitFunc, - indexer: indexer, - } + Handler: admission.NewHandler(admission.Create, admission.Update), + client: client, + limitFunc: limitFunc, + indexer: indexer, + liveLookupCache: liveLookupCache, + liveTTL: time.Duration(30 * time.Second), + }, nil } // Min returns the lesser of its 2 arguments diff --git a/plugin/pkg/admission/limitranger/admission_test.go b/plugin/pkg/admission/limitranger/admission_test.go index f13040f6480..e0279ff0f6a 100644 --- a/plugin/pkg/admission/limitranger/admission_test.go +++ b/plugin/pkg/admission/limitranger/admission_test.go @@ -19,6 +19,9 @@ package limitranger import ( "strconv" "testing" + "time" + + "github.com/hashicorp/golang-lru" "k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/api" @@ -453,3 +456,99 @@ func TestLimitRangerIgnoresSubresource(t *testing.T) { } } + +func TestLimitRangerCacheMisses(t *testing.T) { + liveLookupCache, err := lru.New(10000) + if err != nil { + t.Fatal(err) + } + + client := fake.NewSimpleClientset() + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) + handler := &limitRanger{ + Handler: admission.NewHandler(admission.Create, admission.Update), + client: client, + limitFunc: Limit, + indexer: indexer, + liveLookupCache: liveLookupCache, + } + + limitRange := validLimitRangeNoDefaults() + testPod := validPod("testPod", 1, api.ResourceRequirements{}) + + // add to the lru cache + liveLookupCache.Add(limitRange.Namespace, liveLookupEntry{expiry: time.Now().Add(time.Duration(30 * time.Second)), items: []*api.LimitRange{&limitRange}}) + + err = handler.Admit(admission.NewAttributesRecord(&testPod, api.Kind("Pod"), limitRange.Namespace, "testPod", api.Resource("pods"), "", admission.Update, nil)) + if err == nil { + t.Errorf("Expected an error since the pod did not specify resource limits in its update call") + } + + err = handler.Admit(admission.NewAttributesRecord(&testPod, api.Kind("Pod"), limitRange.Namespace, "testPod", api.Resource("pods"), "status", admission.Update, nil)) + if err != nil { + t.Errorf("Should have ignored calls to any subresource of pod %v", err) + } +} + +func TestLimitRangerCacheAndLRUMisses(t *testing.T) { + liveLookupCache, err := lru.New(10000) + if err != nil { + t.Fatal(err) + } + + limitRange := validLimitRangeNoDefaults() + client := fake.NewSimpleClientset(&limitRange) + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) + handler := &limitRanger{ + Handler: admission.NewHandler(admission.Create, admission.Update), + client: client, + limitFunc: Limit, + indexer: indexer, + liveLookupCache: liveLookupCache, + } + + testPod := validPod("testPod", 1, api.ResourceRequirements{}) + + err = handler.Admit(admission.NewAttributesRecord(&testPod, api.Kind("Pod"), limitRange.Namespace, "testPod", api.Resource("pods"), "", admission.Update, nil)) + if err == nil { + t.Errorf("Expected an error since the pod did not specify resource limits in its update call") + } + + err = handler.Admit(admission.NewAttributesRecord(&testPod, api.Kind("Pod"), limitRange.Namespace, "testPod", api.Resource("pods"), "status", admission.Update, nil)) + if err != nil { + t.Errorf("Should have ignored calls to any subresource of pod %v", err) + } +} + +func TestLimitRangerCacheAndLRUExpiredMisses(t *testing.T) { + liveLookupCache, err := lru.New(10000) + if err != nil { + t.Fatal(err) + } + + limitRange := validLimitRangeNoDefaults() + client := fake.NewSimpleClientset(&limitRange) + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) + handler := &limitRanger{ + Handler: admission.NewHandler(admission.Create, admission.Update), + client: client, + limitFunc: Limit, + indexer: indexer, + liveLookupCache: liveLookupCache, + } + + testPod := validPod("testPod", 1, api.ResourceRequirements{}) + + // add to the lru cache + liveLookupCache.Add(limitRange.Namespace, liveLookupEntry{expiry: time.Now().Add(time.Duration(-30 * time.Second)), items: []*api.LimitRange{}}) + + err = handler.Admit(admission.NewAttributesRecord(&testPod, api.Kind("Pod"), limitRange.Namespace, "testPod", api.Resource("pods"), "", admission.Update, nil)) + if err == nil { + t.Errorf("Expected an error since the pod did not specify resource limits in its update call") + } + + err = handler.Admit(admission.NewAttributesRecord(&testPod, api.Kind("Pod"), limitRange.Namespace, "testPod", api.Resource("pods"), "status", admission.Update, nil)) + if err != nil { + t.Errorf("Should have ignored calls to any subresource of pod %v", err) + } +}