diff --git a/pkg/client/cache/listers_core.go b/pkg/client/cache/listers_core.go index 5a137773b85..df8f4281dd6 100644 --- a/pkg/client/cache/listers_core.go +++ b/pkg/client/cache/listers_core.go @@ -203,3 +203,42 @@ func (s *StoreToReplicationControllerLister) GetPodControllers(pod *api.Pod) (co } return } + +// StoreToLimitRangeLister helps list limit ranges +type StoreToLimitRangeLister struct { + Indexer Indexer +} + +func (s *StoreToLimitRangeLister) List(selector labels.Selector) (ret []*api.LimitRange, err error) { + err = ListAll(s.Indexer, selector, func(m interface{}) { + ret = append(ret, m.(*api.LimitRange)) + }) + return ret, err +} + +func (s *StoreToLimitRangeLister) LimitRanges(namespace string) storeLimitRangesNamespacer { + return storeLimitRangesNamespacer{s.Indexer, namespace} +} + +type storeLimitRangesNamespacer struct { + indexer Indexer + namespace string +} + +func (s storeLimitRangesNamespacer) List(selector labels.Selector) (ret []*api.LimitRange, err error) { + err = ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*api.LimitRange)) + }) + return ret, err +} + +func (s storeLimitRangesNamespacer) Get(name string) (*api.LimitRange, error) { + obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(api.Resource("limitrange"), name) + } + return obj.(*api.LimitRange), nil +} diff --git a/pkg/controller/informers/core.go b/pkg/controller/informers/core.go index 3931b3d8a3e..2f5687fc2fc 100644 --- a/pkg/controller/informers/core.go +++ b/pkg/controller/informers/core.go @@ -205,6 +205,42 @@ func (f *pvInformer) Lister() *cache.StoreToPVFetcher { return &cache.StoreToPVFetcher{Store: informer.GetStore()} } +//***************************************************************************** + +// LimitRangeInformer is type of SharedIndexInformer which watches and lists all limit ranges. +// Interface provides constructor for informer and lister for limit ranges. +type LimitRangeInformer interface { + Informer() cache.SharedIndexInformer + Lister() *cache.StoreToLimitRangeLister +} + +type limitRangeInformer struct { + *sharedInformerFactory +} + +// Informer checks whether pvcInformer exists in sharedInformerFactory and if not, it creates new informer of type +// limitRangeInformer and connects it to sharedInformerFactory +func (f *limitRangeInformer) Informer() cache.SharedIndexInformer { + f.lock.Lock() + defer f.lock.Unlock() + + informerType := reflect.TypeOf(&api.LimitRange{}) + informer, exists := f.informers[informerType] + if exists { + return informer + } + informer = NewLimitRangeInformer(f.client, f.defaultResync) + f.informers[informerType] = informer + + return informer +} + +// Lister returns lister for limitRangeInformer +func (f *limitRangeInformer) Lister() *cache.StoreToLimitRangeLister { + informer := f.Informer() + return &cache.StoreToLimitRangeLister{Indexer: informer.GetIndexer()} +} + // NewPodInformer returns a SharedIndexInformer that lists and watches all pods func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { sharedIndexInformer := cache.NewSharedIndexInformer( @@ -295,3 +331,21 @@ func NewNamespaceInformer(client clientset.Interface, resyncPeriod time.Duration return sharedIndexInformer } + +// NewLimitRangeInformer returns a SharedIndexInformer that lists and watches all LimitRanges +func NewLimitRangeInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + sharedIndexInformer := cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return client.Core().LimitRanges(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return client.Core().LimitRanges(api.NamespaceAll).Watch(options) + }, + }, + &api.LimitRange{}, + resyncPeriod, + cache.Indexers{}) + + return sharedIndexInformer +} diff --git a/pkg/controller/informers/factory.go b/pkg/controller/informers/factory.go index a9f50aa4522..b8401ec36ee 100644 --- a/pkg/controller/informers/factory.go +++ b/pkg/controller/informers/factory.go @@ -45,6 +45,8 @@ type SharedInformerFactory interface { ClusterRoleBindings() ClusterRoleBindingInformer Roles() RoleInformer RoleBindings() RoleBindingInformer + + LimitRanges() LimitRangeInformer } type sharedInformerFactory struct { @@ -106,6 +108,7 @@ func (f *sharedInformerFactory) PersistentVolumes() PVInformer { return &pvInformer{sharedInformerFactory: f} } +// DaemonSets returns a SharedIndexInformer that lists and watches all daemon sets. func (f *sharedInformerFactory) DaemonSets() DaemonSetInformer { return &daemonSetInformer{sharedInformerFactory: f} } @@ -133,3 +136,8 @@ func (f *sharedInformerFactory) Roles() RoleInformer { func (f *sharedInformerFactory) RoleBindings() RoleBindingInformer { return &roleBindingInformer{sharedInformerFactory: f} } + +// LimitRanges returns a SharedIndexInformer that lists and watches all limit ranges. +func (f *sharedInformerFactory) LimitRanges() LimitRangeInformer { + return &limitRangeInformer{sharedInformerFactory: f} +} diff --git a/plugin/pkg/admission/limitranger/admission.go b/plugin/pkg/admission/limitranger/admission.go index e95abcb1382..ab7ffd20c54 100644 --- a/plugin/pkg/admission/limitranger/admission.go +++ b/plugin/pkg/admission/limitranger/admission.go @@ -26,15 +26,16 @@ import ( lru "github.com/hashicorp/golang-lru" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/controller/informers" "k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" utilerrors "k8s.io/kubernetes/pkg/util/errors" - "k8s.io/kubernetes/pkg/watch" ) const ( @@ -52,7 +53,7 @@ type limitRanger struct { *admission.Handler client clientset.Interface actions LimitRangerActions - indexer cache.Indexer + lister *cache.StoreToLimitRangeLister // liveLookups holds the last few live lookups we've done to help ammortize cost on repeated lookup failures. // This let's us handle the case of latent caches, by looking up actual results for a namespace on cache miss/no results. @@ -66,6 +67,19 @@ type liveLookupEntry struct { items []*api.LimitRange } +func (l *limitRanger) SetInformerFactory(f informers.SharedInformerFactory) { + limitRangeInformer := f.LimitRanges().Informer() + l.SetReadyFunc(limitRangeInformer.HasSynced) + l.lister = f.LimitRanges().Lister() +} + +func (l *limitRanger) Validate() error { + if l.lister == nil { + return fmt.Errorf("missing limitRange lister") + } + return nil +} + // Admit admits resources into cluster that do not violate any defined LimitRange in the namespace func (l *limitRanger) Admit(a admission.Attributes) (err error) { if !l.actions.SupportsAttributes(a) { @@ -81,13 +95,7 @@ func (l *limitRanger) Admit(a admission.Attributes) (err error) { } } - key := &api.LimitRange{ - ObjectMeta: api.ObjectMeta{ - Namespace: a.GetNamespace(), - Name: "", - }, - } - items, err := l.indexer.Index("namespace", key) + items, err := l.lister.LimitRanges(a.GetNamespace()).List(labels.Everything()) if err != nil { return admission.NewForbidden(a, fmt.Errorf("unable to %s %v at this time because there was an error enforcing limit ranges", a.GetOperation(), a.GetResource())) } @@ -122,7 +130,7 @@ func (l *limitRanger) Admit(a admission.Attributes) (err error) { // ensure it meets each prescribed min/max for i := range items { - limitRange := items[i].(*api.LimitRange) + limitRange := items[i] if !l.actions.SupportsLimit(limitRange) { continue @@ -143,17 +151,6 @@ func NewLimitRanger(client clientset.Interface, actions LimitRangerActions) (adm return nil, err } - lw := &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return client.Core().LimitRanges(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return client.Core().LimitRanges(api.NamespaceAll).Watch(options) - }, - } - indexer, reflector := cache.NewNamespaceKeyedIndexerAndReflector(lw, &api.LimitRange{}, 0) - reflector.Run() - if actions == nil { actions = &DefaultLimitRangerActions{} } @@ -162,7 +159,6 @@ func NewLimitRanger(client clientset.Interface, actions LimitRangerActions) (adm Handler: admission.NewHandler(admission.Create, admission.Update), client: client, actions: actions, - indexer: indexer, liveLookupCache: liveLookupCache, liveTTL: time.Duration(30 * time.Second), }, nil diff --git a/plugin/pkg/admission/limitranger/admission_test.go b/plugin/pkg/admission/limitranger/admission_test.go index 355bdf3e1a4..693a80c1ba9 100644 --- a/plugin/pkg/admission/limitranger/admission_test.go +++ b/plugin/pkg/admission/limitranger/admission_test.go @@ -17,6 +17,7 @@ limitations under the License. package limitranger import ( + "fmt" "strconv" "testing" "time" @@ -24,10 +25,13 @@ import ( "k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" - "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/api/unversioned" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" - - "github.com/hashicorp/golang-lru" + "k8s.io/kubernetes/pkg/client/testing/core" + "k8s.io/kubernetes/pkg/controller/informers" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/wait" ) func getComputeResourceList(cpu, memory string) api.ResourceList { @@ -522,20 +526,16 @@ func TestPodLimitFuncApplyDefault(t *testing.T) { } func TestLimitRangerIgnoresSubresource(t *testing.T) { - client := fake.NewSimpleClientset() - indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) - handler := &limitRanger{ - Handler: admission.NewHandler(admission.Create, admission.Update), - client: client, - actions: &DefaultLimitRangerActions{}, - indexer: indexer, - } - limitRange := validLimitRangeNoDefaults() - testPod := validPod("testPod", 1, api.ResourceRequirements{}) + mockClient := newMockClientForTest([]api.LimitRange{limitRange}) + handler, informerFactory, err := newHandlerForTest(mockClient) + if err != nil { + t.Errorf("unexpected error initializing handler: %v", err) + } + informerFactory.Start(wait.NeverStop) - indexer.Add(&limitRange) - err := handler.Admit(admission.NewAttributesRecord(&testPod, nil, api.Kind("Pod").WithVersion("version"), limitRange.Namespace, "testPod", api.Resource("pods").WithVersion("version"), "", admission.Update, nil)) + testPod := validPod("testPod", 1, api.ResourceRequirements{}) + err = handler.Admit(admission.NewAttributesRecord(&testPod, nil, api.Kind("Pod").WithVersion("version"), limitRange.Namespace, "testPod", api.Resource("pods").WithVersion("version"), "", admission.Update, nil)) if err == nil { t.Errorf("Expected an error since the pod did not specify resource limits in its update call") } @@ -547,28 +547,16 @@ func TestLimitRangerIgnoresSubresource(t *testing.T) { } -func TestLimitRangerCacheMisses(t *testing.T) { - liveLookupCache, err := lru.New(10000) - if err != nil { - t.Fatal(err) - } - - client := fake.NewSimpleClientset() - indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) - handler := &limitRanger{ - Handler: admission.NewHandler(admission.Create, admission.Update), - client: client, - actions: &DefaultLimitRangerActions{}, - indexer: indexer, - liveLookupCache: liveLookupCache, - } - +func TestLimitRangerAdmitPod(t *testing.T) { limitRange := validLimitRangeNoDefaults() + mockClient := newMockClientForTest([]api.LimitRange{limitRange}) + handler, informerFactory, err := newHandlerForTest(mockClient) + if err != nil { + t.Errorf("unexpected error initializing handler: %v", err) + } + informerFactory.Start(wait.NeverStop) + testPod := validPod("testPod", 1, api.ResourceRequirements{}) - - // add to the lru cache - liveLookupCache.Add(limitRange.Namespace, liveLookupEntry{expiry: time.Now().Add(time.Duration(30 * time.Second)), items: []*api.LimitRange{&limitRange}}) - err = handler.Admit(admission.NewAttributesRecord(&testPod, nil, api.Kind("Pod").WithVersion("version"), limitRange.Namespace, "testPod", api.Resource("pods").WithVersion("version"), "", admission.Update, nil)) if err == nil { t.Errorf("Expected an error since the pod did not specify resource limits in its update call") @@ -580,67 +568,36 @@ func TestLimitRangerCacheMisses(t *testing.T) { } } -func TestLimitRangerCacheAndLRUMisses(t *testing.T) { - liveLookupCache, err := lru.New(10000) - if err != nil { - t.Fatal(err) - } - - limitRange := validLimitRangeNoDefaults() - client := fake.NewSimpleClientset(&limitRange) - indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) - handler := &limitRanger{ - Handler: admission.NewHandler(admission.Create, admission.Update), - client: client, - actions: &DefaultLimitRangerActions{}, - indexer: indexer, - liveLookupCache: liveLookupCache, - } - - testPod := validPod("testPod", 1, api.ResourceRequirements{}) - - err = handler.Admit(admission.NewAttributesRecord(&testPod, nil, api.Kind("Pod").WithVersion("version"), limitRange.Namespace, "testPod", api.Resource("pods").WithVersion("version"), "", admission.Update, nil)) - if err == nil { - t.Errorf("Expected an error since the pod did not specify resource limits in its update call") - } - - err = handler.Admit(admission.NewAttributesRecord(&testPod, nil, api.Kind("Pod").WithVersion("version"), limitRange.Namespace, "testPod", api.Resource("pods").WithVersion("version"), "status", admission.Update, nil)) - if err != nil { - t.Errorf("Should have ignored calls to any subresource of pod %v", err) - } +// newMockClientForTest creates a mock client that returns a client configured for the specified list of limit ranges +func newMockClientForTest(limitRanges []api.LimitRange) *fake.Clientset { + mockClient := &fake.Clientset{} + mockClient.AddReactor("list", "limitranges", func(action core.Action) (bool, runtime.Object, error) { + limitRangeList := &api.LimitRangeList{ + ListMeta: unversioned.ListMeta{ + ResourceVersion: fmt.Sprintf("%d", len(limitRanges)), + }, + } + for index, value := range limitRanges { + value.ResourceVersion = fmt.Sprintf("%d", index) + limitRangeList.Items = append(limitRangeList.Items, value) + } + return true, limitRangeList, nil + }) + return mockClient } -func TestLimitRangerCacheAndLRUExpiredMisses(t *testing.T) { - liveLookupCache, err := lru.New(10000) +// newHandlerForTest returns a handler configured for testing. +func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) { + f := informers.NewSharedInformerFactory(c, 5*time.Minute) + handler, err := NewLimitRanger(c, &DefaultLimitRangerActions{}) if err != nil { - t.Fatal(err) - } - - limitRange := validLimitRangeNoDefaults() - client := fake.NewSimpleClientset(&limitRange) - indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) - handler := &limitRanger{ - Handler: admission.NewHandler(admission.Create, admission.Update), - client: client, - actions: &DefaultLimitRangerActions{}, - indexer: indexer, - liveLookupCache: liveLookupCache, - } - - testPod := validPod("testPod", 1, api.ResourceRequirements{}) - - // add to the lru cache - liveLookupCache.Add(limitRange.Namespace, liveLookupEntry{expiry: time.Now().Add(time.Duration(-30 * time.Second)), items: []*api.LimitRange{}}) - - err = handler.Admit(admission.NewAttributesRecord(&testPod, nil, api.Kind("Pod").WithVersion("version"), limitRange.Namespace, "testPod", api.Resource("pods").WithVersion("version"), "", admission.Update, nil)) - if err == nil { - t.Errorf("Expected an error since the pod did not specify resource limits in its update call") - } - - err = handler.Admit(admission.NewAttributesRecord(&testPod, nil, api.Kind("Pod").WithVersion("version"), limitRange.Namespace, "testPod", api.Resource("pods").WithVersion("version"), "status", admission.Update, nil)) - if err != nil { - t.Errorf("Should have ignored calls to any subresource of pod %v", err) + return nil, f, err } + plugins := []admission.Interface{handler} + pluginInitializer := admission.NewPluginInitializer(f, nil) + pluginInitializer.Initialize(plugins) + err = admission.Validate(plugins) + return handler, f, err } func validPersistentVolumeClaim(name string, resources api.ResourceRequirements) api.PersistentVolumeClaim {