fix limitranger to handle latent caches without live lookups every time

This commit is contained in:
deads2k
2016-02-18 10:06:16 -05:00
parent 03d66e083a
commit 24d5329130
2 changed files with 148 additions and 8 deletions

View File

@@ -21,6 +21,9 @@ import (
"io"
"sort"
"strings"
"time"
"github.com/hashicorp/golang-lru"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
@@ -40,7 +43,7 @@ const (
func init() {
admission.RegisterPlugin("LimitRanger", func(client clientset.Interface, config io.Reader) (admission.Interface, error) {
return NewLimitRanger(client, Limit), nil
return NewLimitRanger(client, Limit)
})
}
@@ -50,6 +53,17 @@ type limitRanger struct {
client clientset.Interface
limitFunc LimitFunc
indexer cache.Indexer
// liveLookups holds the last few live lookups we've done to help ammortize cost on repeated lookup failures.
// This let's us handle the case of latent caches, by looking up actual results for a namespace on cache miss/no results.
// We track the lookup result here so that for repeated requests, we don't look it up very often.
liveLookupCache *lru.Cache
liveTTL time.Duration
}
type liveLookupEntry struct {
expiry time.Time
items []*api.LimitRange
}
// Admit admits resources into cluster that do not violate any defined LimitRange in the namespace
@@ -79,8 +93,28 @@ func (l *limitRanger) Admit(a admission.Attributes) (err error) {
if err != nil {
return admission.NewForbidden(a, fmt.Errorf("Unable to %s %v at this time because there was an error enforcing limit ranges", a.GetOperation(), a.GetResource()))
}
// if there are no items held in our indexer, check our live-lookup LRU, if that misses, do the live lookup to prime it.
if len(items) == 0 {
return nil
lruItemObj, ok := l.liveLookupCache.Get(a.GetNamespace())
if !ok || lruItemObj.(liveLookupEntry).expiry.Before(time.Now()) {
liveList, err := l.client.Core().LimitRanges(a.GetNamespace()).List(api.ListOptions{})
if err != nil {
return admission.NewForbidden(a, err)
}
newEntry := liveLookupEntry{expiry: time.Now().Add(l.liveTTL)}
for i := range liveList.Items {
newEntry.items = append(newEntry.items, &liveList.Items[i])
}
l.liveLookupCache.Add(a.GetNamespace(), newEntry)
lruItemObj = newEntry
}
lruEntry := lruItemObj.(liveLookupEntry)
for i := range lruEntry.items {
items = append(items, lruEntry.items[i])
}
}
// ensure it meets each prescribed min/max
@@ -95,7 +129,12 @@ func (l *limitRanger) Admit(a admission.Attributes) (err error) {
}
// NewLimitRanger returns an object that enforces limits based on the supplied limit function
func NewLimitRanger(client clientset.Interface, limitFunc LimitFunc) admission.Interface {
func NewLimitRanger(client clientset.Interface, limitFunc LimitFunc) (admission.Interface, error) {
liveLookupCache, err := lru.New(10000)
if err != nil {
return nil, err
}
lw := &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().LimitRanges(api.NamespaceAll).List(options)
@@ -107,11 +146,13 @@ func NewLimitRanger(client clientset.Interface, limitFunc LimitFunc) admission.I
indexer, reflector := cache.NewNamespaceKeyedIndexerAndReflector(lw, &api.LimitRange{}, 0)
reflector.Run()
return &limitRanger{
Handler: admission.NewHandler(admission.Create, admission.Update),
client: client,
limitFunc: limitFunc,
indexer: indexer,
}
Handler: admission.NewHandler(admission.Create, admission.Update),
client: client,
limitFunc: limitFunc,
indexer: indexer,
liveLookupCache: liveLookupCache,
liveTTL: time.Duration(30 * time.Second),
}, nil
}
// Min returns the lesser of its 2 arguments

View File

@@ -19,6 +19,9 @@ package limitranger
import (
"strconv"
"testing"
"time"
"github.com/hashicorp/golang-lru"
"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
@@ -453,3 +456,99 @@ func TestLimitRangerIgnoresSubresource(t *testing.T) {
}
}
func TestLimitRangerCacheMisses(t *testing.T) {
liveLookupCache, err := lru.New(10000)
if err != nil {
t.Fatal(err)
}
client := fake.NewSimpleClientset()
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
handler := &limitRanger{
Handler: admission.NewHandler(admission.Create, admission.Update),
client: client,
limitFunc: Limit,
indexer: indexer,
liveLookupCache: liveLookupCache,
}
limitRange := validLimitRangeNoDefaults()
testPod := validPod("testPod", 1, api.ResourceRequirements{})
// add to the lru cache
liveLookupCache.Add(limitRange.Namespace, liveLookupEntry{expiry: time.Now().Add(time.Duration(30 * time.Second)), items: []*api.LimitRange{&limitRange}})
err = handler.Admit(admission.NewAttributesRecord(&testPod, api.Kind("Pod"), limitRange.Namespace, "testPod", api.Resource("pods"), "", admission.Update, nil))
if err == nil {
t.Errorf("Expected an error since the pod did not specify resource limits in its update call")
}
err = handler.Admit(admission.NewAttributesRecord(&testPod, api.Kind("Pod"), limitRange.Namespace, "testPod", api.Resource("pods"), "status", admission.Update, nil))
if err != nil {
t.Errorf("Should have ignored calls to any subresource of pod %v", err)
}
}
func TestLimitRangerCacheAndLRUMisses(t *testing.T) {
liveLookupCache, err := lru.New(10000)
if err != nil {
t.Fatal(err)
}
limitRange := validLimitRangeNoDefaults()
client := fake.NewSimpleClientset(&limitRange)
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
handler := &limitRanger{
Handler: admission.NewHandler(admission.Create, admission.Update),
client: client,
limitFunc: Limit,
indexer: indexer,
liveLookupCache: liveLookupCache,
}
testPod := validPod("testPod", 1, api.ResourceRequirements{})
err = handler.Admit(admission.NewAttributesRecord(&testPod, api.Kind("Pod"), limitRange.Namespace, "testPod", api.Resource("pods"), "", admission.Update, nil))
if err == nil {
t.Errorf("Expected an error since the pod did not specify resource limits in its update call")
}
err = handler.Admit(admission.NewAttributesRecord(&testPod, api.Kind("Pod"), limitRange.Namespace, "testPod", api.Resource("pods"), "status", admission.Update, nil))
if err != nil {
t.Errorf("Should have ignored calls to any subresource of pod %v", err)
}
}
func TestLimitRangerCacheAndLRUExpiredMisses(t *testing.T) {
liveLookupCache, err := lru.New(10000)
if err != nil {
t.Fatal(err)
}
limitRange := validLimitRangeNoDefaults()
client := fake.NewSimpleClientset(&limitRange)
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
handler := &limitRanger{
Handler: admission.NewHandler(admission.Create, admission.Update),
client: client,
limitFunc: Limit,
indexer: indexer,
liveLookupCache: liveLookupCache,
}
testPod := validPod("testPod", 1, api.ResourceRequirements{})
// add to the lru cache
liveLookupCache.Add(limitRange.Namespace, liveLookupEntry{expiry: time.Now().Add(time.Duration(-30 * time.Second)), items: []*api.LimitRange{}})
err = handler.Admit(admission.NewAttributesRecord(&testPod, api.Kind("Pod"), limitRange.Namespace, "testPod", api.Resource("pods"), "", admission.Update, nil))
if err == nil {
t.Errorf("Expected an error since the pod did not specify resource limits in its update call")
}
err = handler.Admit(admission.NewAttributesRecord(&testPod, api.Kind("Pod"), limitRange.Namespace, "testPod", api.Resource("pods"), "status", admission.Update, nil))
if err != nil {
t.Errorf("Should have ignored calls to any subresource of pod %v", err)
}
}