Add LimitRange informer

This commit is contained in:
derekwaynecarr 2016-09-30 15:15:55 -04:00
parent 19e9f7e400
commit 3c0b35d6b1
5 changed files with 168 additions and 114 deletions

View File

@ -203,3 +203,42 @@ func (s *StoreToReplicationControllerLister) GetPodControllers(pod *api.Pod) (co
}
return
}
// StoreToLimitRangeLister helps list limit ranges
type StoreToLimitRangeLister struct {
Indexer Indexer
}
func (s *StoreToLimitRangeLister) List(selector labels.Selector) (ret []*api.LimitRange, err error) {
err = ListAll(s.Indexer, selector, func(m interface{}) {
ret = append(ret, m.(*api.LimitRange))
})
return ret, err
}
func (s *StoreToLimitRangeLister) LimitRanges(namespace string) storeLimitRangesNamespacer {
return storeLimitRangesNamespacer{s.Indexer, namespace}
}
type storeLimitRangesNamespacer struct {
indexer Indexer
namespace string
}
func (s storeLimitRangesNamespacer) List(selector labels.Selector) (ret []*api.LimitRange, err error) {
err = ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) {
ret = append(ret, m.(*api.LimitRange))
})
return ret, err
}
func (s storeLimitRangesNamespacer) Get(name string) (*api.LimitRange, error) {
obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name)
if err != nil {
return nil, err
}
if !exists {
return nil, errors.NewNotFound(api.Resource("limitrange"), name)
}
return obj.(*api.LimitRange), nil
}

View File

@ -205,6 +205,42 @@ func (f *pvInformer) Lister() *cache.StoreToPVFetcher {
return &cache.StoreToPVFetcher{Store: informer.GetStore()}
}
//*****************************************************************************
// LimitRangeInformer is type of SharedIndexInformer which watches and lists all limit ranges.
// Interface provides constructor for informer and lister for limit ranges.
type LimitRangeInformer interface {
Informer() cache.SharedIndexInformer
Lister() *cache.StoreToLimitRangeLister
}
type limitRangeInformer struct {
*sharedInformerFactory
}
// Informer checks whether pvcInformer exists in sharedInformerFactory and if not, it creates new informer of type
// limitRangeInformer and connects it to sharedInformerFactory
func (f *limitRangeInformer) Informer() cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerType := reflect.TypeOf(&api.LimitRange{})
informer, exists := f.informers[informerType]
if exists {
return informer
}
informer = NewLimitRangeInformer(f.client, f.defaultResync)
f.informers[informerType] = informer
return informer
}
// Lister returns lister for limitRangeInformer
func (f *limitRangeInformer) Lister() *cache.StoreToLimitRangeLister {
informer := f.Informer()
return &cache.StoreToLimitRangeLister{Indexer: informer.GetIndexer()}
}
// NewPodInformer returns a SharedIndexInformer that lists and watches all pods
func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
sharedIndexInformer := cache.NewSharedIndexInformer(
@ -295,3 +331,21 @@ func NewNamespaceInformer(client clientset.Interface, resyncPeriod time.Duration
return sharedIndexInformer
}
// NewLimitRangeInformer returns a SharedIndexInformer that lists and watches all LimitRanges
func NewLimitRangeInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
sharedIndexInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().LimitRanges(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().LimitRanges(api.NamespaceAll).Watch(options)
},
},
&api.LimitRange{},
resyncPeriod,
cache.Indexers{})
return sharedIndexInformer
}

View File

@ -45,6 +45,8 @@ type SharedInformerFactory interface {
ClusterRoleBindings() ClusterRoleBindingInformer
Roles() RoleInformer
RoleBindings() RoleBindingInformer
LimitRanges() LimitRangeInformer
}
type sharedInformerFactory struct {
@ -106,6 +108,7 @@ func (f *sharedInformerFactory) PersistentVolumes() PVInformer {
return &pvInformer{sharedInformerFactory: f}
}
// DaemonSets returns a SharedIndexInformer that lists and watches all daemon sets.
func (f *sharedInformerFactory) DaemonSets() DaemonSetInformer {
return &daemonSetInformer{sharedInformerFactory: f}
}
@ -133,3 +136,8 @@ func (f *sharedInformerFactory) Roles() RoleInformer {
func (f *sharedInformerFactory) RoleBindings() RoleBindingInformer {
return &roleBindingInformer{sharedInformerFactory: f}
}
// LimitRanges returns a SharedIndexInformer that lists and watches all limit ranges.
func (f *sharedInformerFactory) LimitRanges() LimitRangeInformer {
return &limitRangeInformer{sharedInformerFactory: f}
}

View File

@ -26,15 +26,16 @@ import (
lru "github.com/hashicorp/golang-lru"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/watch"
)
const (
@ -52,7 +53,7 @@ type limitRanger struct {
*admission.Handler
client clientset.Interface
actions LimitRangerActions
indexer cache.Indexer
lister *cache.StoreToLimitRangeLister
// liveLookups holds the last few live lookups we've done to help ammortize cost on repeated lookup failures.
// This let's us handle the case of latent caches, by looking up actual results for a namespace on cache miss/no results.
@ -66,6 +67,19 @@ type liveLookupEntry struct {
items []*api.LimitRange
}
func (l *limitRanger) SetInformerFactory(f informers.SharedInformerFactory) {
limitRangeInformer := f.LimitRanges().Informer()
l.SetReadyFunc(limitRangeInformer.HasSynced)
l.lister = f.LimitRanges().Lister()
}
func (l *limitRanger) Validate() error {
if l.lister == nil {
return fmt.Errorf("missing limitRange lister")
}
return nil
}
// Admit admits resources into cluster that do not violate any defined LimitRange in the namespace
func (l *limitRanger) Admit(a admission.Attributes) (err error) {
if !l.actions.SupportsAttributes(a) {
@ -81,13 +95,7 @@ func (l *limitRanger) Admit(a admission.Attributes) (err error) {
}
}
key := &api.LimitRange{
ObjectMeta: api.ObjectMeta{
Namespace: a.GetNamespace(),
Name: "",
},
}
items, err := l.indexer.Index("namespace", key)
items, err := l.lister.LimitRanges(a.GetNamespace()).List(labels.Everything())
if err != nil {
return admission.NewForbidden(a, fmt.Errorf("unable to %s %v at this time because there was an error enforcing limit ranges", a.GetOperation(), a.GetResource()))
}
@ -122,7 +130,7 @@ func (l *limitRanger) Admit(a admission.Attributes) (err error) {
// ensure it meets each prescribed min/max
for i := range items {
limitRange := items[i].(*api.LimitRange)
limitRange := items[i]
if !l.actions.SupportsLimit(limitRange) {
continue
@ -143,17 +151,6 @@ func NewLimitRanger(client clientset.Interface, actions LimitRangerActions) (adm
return nil, err
}
lw := &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().LimitRanges(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().LimitRanges(api.NamespaceAll).Watch(options)
},
}
indexer, reflector := cache.NewNamespaceKeyedIndexerAndReflector(lw, &api.LimitRange{}, 0)
reflector.Run()
if actions == nil {
actions = &DefaultLimitRangerActions{}
}
@ -162,7 +159,6 @@ func NewLimitRanger(client clientset.Interface, actions LimitRangerActions) (adm
Handler: admission.NewHandler(admission.Create, admission.Update),
client: client,
actions: actions,
indexer: indexer,
liveLookupCache: liveLookupCache,
liveTTL: time.Duration(30 * time.Second),
}, nil

View File

@ -17,6 +17,7 @@ limitations under the License.
package limitranger
import (
"fmt"
"strconv"
"testing"
"time"
@ -24,10 +25,13 @@ import (
"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/api/unversioned"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"github.com/hashicorp/golang-lru"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/wait"
)
func getComputeResourceList(cpu, memory string) api.ResourceList {
@ -522,20 +526,16 @@ func TestPodLimitFuncApplyDefault(t *testing.T) {
}
func TestLimitRangerIgnoresSubresource(t *testing.T) {
client := fake.NewSimpleClientset()
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
handler := &limitRanger{
Handler: admission.NewHandler(admission.Create, admission.Update),
client: client,
actions: &DefaultLimitRangerActions{},
indexer: indexer,
}
limitRange := validLimitRangeNoDefaults()
testPod := validPod("testPod", 1, api.ResourceRequirements{})
mockClient := newMockClientForTest([]api.LimitRange{limitRange})
handler, informerFactory, err := newHandlerForTest(mockClient)
if err != nil {
t.Errorf("unexpected error initializing handler: %v", err)
}
informerFactory.Start(wait.NeverStop)
indexer.Add(&limitRange)
err := handler.Admit(admission.NewAttributesRecord(&testPod, nil, api.Kind("Pod").WithVersion("version"), limitRange.Namespace, "testPod", api.Resource("pods").WithVersion("version"), "", admission.Update, nil))
testPod := validPod("testPod", 1, api.ResourceRequirements{})
err = handler.Admit(admission.NewAttributesRecord(&testPod, nil, api.Kind("Pod").WithVersion("version"), limitRange.Namespace, "testPod", api.Resource("pods").WithVersion("version"), "", admission.Update, nil))
if err == nil {
t.Errorf("Expected an error since the pod did not specify resource limits in its update call")
}
@ -547,28 +547,16 @@ func TestLimitRangerIgnoresSubresource(t *testing.T) {
}
func TestLimitRangerCacheMisses(t *testing.T) {
liveLookupCache, err := lru.New(10000)
if err != nil {
t.Fatal(err)
}
client := fake.NewSimpleClientset()
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
handler := &limitRanger{
Handler: admission.NewHandler(admission.Create, admission.Update),
client: client,
actions: &DefaultLimitRangerActions{},
indexer: indexer,
liveLookupCache: liveLookupCache,
}
func TestLimitRangerAdmitPod(t *testing.T) {
limitRange := validLimitRangeNoDefaults()
mockClient := newMockClientForTest([]api.LimitRange{limitRange})
handler, informerFactory, err := newHandlerForTest(mockClient)
if err != nil {
t.Errorf("unexpected error initializing handler: %v", err)
}
informerFactory.Start(wait.NeverStop)
testPod := validPod("testPod", 1, api.ResourceRequirements{})
// add to the lru cache
liveLookupCache.Add(limitRange.Namespace, liveLookupEntry{expiry: time.Now().Add(time.Duration(30 * time.Second)), items: []*api.LimitRange{&limitRange}})
err = handler.Admit(admission.NewAttributesRecord(&testPod, nil, api.Kind("Pod").WithVersion("version"), limitRange.Namespace, "testPod", api.Resource("pods").WithVersion("version"), "", admission.Update, nil))
if err == nil {
t.Errorf("Expected an error since the pod did not specify resource limits in its update call")
@ -580,67 +568,36 @@ func TestLimitRangerCacheMisses(t *testing.T) {
}
}
func TestLimitRangerCacheAndLRUMisses(t *testing.T) {
liveLookupCache, err := lru.New(10000)
if err != nil {
t.Fatal(err)
}
limitRange := validLimitRangeNoDefaults()
client := fake.NewSimpleClientset(&limitRange)
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
handler := &limitRanger{
Handler: admission.NewHandler(admission.Create, admission.Update),
client: client,
actions: &DefaultLimitRangerActions{},
indexer: indexer,
liveLookupCache: liveLookupCache,
}
testPod := validPod("testPod", 1, api.ResourceRequirements{})
err = handler.Admit(admission.NewAttributesRecord(&testPod, nil, api.Kind("Pod").WithVersion("version"), limitRange.Namespace, "testPod", api.Resource("pods").WithVersion("version"), "", admission.Update, nil))
if err == nil {
t.Errorf("Expected an error since the pod did not specify resource limits in its update call")
}
err = handler.Admit(admission.NewAttributesRecord(&testPod, nil, api.Kind("Pod").WithVersion("version"), limitRange.Namespace, "testPod", api.Resource("pods").WithVersion("version"), "status", admission.Update, nil))
if err != nil {
t.Errorf("Should have ignored calls to any subresource of pod %v", err)
}
// newMockClientForTest creates a mock client that returns a client configured for the specified list of limit ranges
func newMockClientForTest(limitRanges []api.LimitRange) *fake.Clientset {
mockClient := &fake.Clientset{}
mockClient.AddReactor("list", "limitranges", func(action core.Action) (bool, runtime.Object, error) {
limitRangeList := &api.LimitRangeList{
ListMeta: unversioned.ListMeta{
ResourceVersion: fmt.Sprintf("%d", len(limitRanges)),
},
}
for index, value := range limitRanges {
value.ResourceVersion = fmt.Sprintf("%d", index)
limitRangeList.Items = append(limitRangeList.Items, value)
}
return true, limitRangeList, nil
})
return mockClient
}
func TestLimitRangerCacheAndLRUExpiredMisses(t *testing.T) {
liveLookupCache, err := lru.New(10000)
// newHandlerForTest returns a handler configured for testing.
func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) {
f := informers.NewSharedInformerFactory(c, 5*time.Minute)
handler, err := NewLimitRanger(c, &DefaultLimitRangerActions{})
if err != nil {
t.Fatal(err)
}
limitRange := validLimitRangeNoDefaults()
client := fake.NewSimpleClientset(&limitRange)
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
handler := &limitRanger{
Handler: admission.NewHandler(admission.Create, admission.Update),
client: client,
actions: &DefaultLimitRangerActions{},
indexer: indexer,
liveLookupCache: liveLookupCache,
}
testPod := validPod("testPod", 1, api.ResourceRequirements{})
// add to the lru cache
liveLookupCache.Add(limitRange.Namespace, liveLookupEntry{expiry: time.Now().Add(time.Duration(-30 * time.Second)), items: []*api.LimitRange{}})
err = handler.Admit(admission.NewAttributesRecord(&testPod, nil, api.Kind("Pod").WithVersion("version"), limitRange.Namespace, "testPod", api.Resource("pods").WithVersion("version"), "", admission.Update, nil))
if err == nil {
t.Errorf("Expected an error since the pod did not specify resource limits in its update call")
}
err = handler.Admit(admission.NewAttributesRecord(&testPod, nil, api.Kind("Pod").WithVersion("version"), limitRange.Namespace, "testPod", api.Resource("pods").WithVersion("version"), "status", admission.Update, nil))
if err != nil {
t.Errorf("Should have ignored calls to any subresource of pod %v", err)
return nil, f, err
}
plugins := []admission.Interface{handler}
pluginInitializer := admission.NewPluginInitializer(f, nil)
pluginInitializer.Initialize(plugins)
err = admission.Validate(plugins)
return handler, f, err
}
func validPersistentVolumeClaim(name string, resources api.ResourceRequirements) api.PersistentVolumeClaim {