From 037d116add1f4a59db2767d21aa6189a4c9cabb0 Mon Sep 17 00:00:00 2001 From: Dominika Hodovska Date: Thu, 30 Jun 2016 10:50:41 +0200 Subject: [PATCH] Factory for SharedIndexInformers --- cmd/kube-apiserver/app/server.go | 12 +- .../cmd/federation-apiserver/app/server.go | 11 +- pkg/admission/init.go | 28 +- pkg/admission/types.go | 13 +- pkg/client/cache/listers.go | 17 ++ pkg/controller/framework/informers/core.go | 273 ++++++++++++++++++ pkg/controller/framework/informers/factory.go | 66 +++++ .../namespace/autoprovision/admission.go | 18 +- .../namespace/autoprovision/admission_test.go | 29 +- .../admission/namespace/exists/admission.go | 16 +- .../namespace/lifecycle/admission.go | 39 ++- .../namespace/lifecycle/admission_test.go | 12 +- 12 files changed, 449 insertions(+), 85 deletions(-) create mode 100644 pkg/controller/framework/informers/core.go diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 27683ea241c..1a2632e3a7a 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -58,6 +58,7 @@ import ( "k8s.io/kubernetes/pkg/registry/rolebinding" rolebindingetcd "k8s.io/kubernetes/pkg/registry/rolebinding/etcd" "k8s.io/kubernetes/pkg/serviceaccount" + "k8s.io/kubernetes/pkg/util/wait" ) // NewAPIServerCommand creates a *cobra.Command object with default parameters @@ -245,14 +246,12 @@ func Run(s *options.APIServer) error { if err != nil { glog.Errorf("Failed to create clientset: %v", err) } + sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute) + pluginInitializer := admission.NewPluginInitializer(sharedInformers) - namespaceInformer := informers.CreateSharedNamespaceIndexInformer(client, 5*time.Minute) - pluginInit := admission.NewPluginInitializer() - pluginInit.SetNamespaceInformer(namespaceInformer) - - admissionController, err := admission.NewFromPlugins(client, admissionControlPluginNames, s.AdmissionControlConfigFile, pluginInit) + admissionController, err := admission.NewFromPlugins(client, admissionControlPluginNames, s.AdmissionControlConfigFile, pluginInitializer) if err != nil { - glog.Errorf("Failed to initialize plugins: %v", err) + glog.Fatalf("Failed to initialize plugins: %v", err) } genericConfig := genericapiserver.NewConfig(s.ServerRunOptions) @@ -288,6 +287,7 @@ func Run(s *options.APIServer) error { return err } + sharedInformers.Start(wait.NeverStop) m.Run(s.ServerRunOptions) return nil } diff --git a/federation/cmd/federation-apiserver/app/server.go b/federation/cmd/federation-apiserver/app/server.go index ce457d641b5..605e106555f 100644 --- a/federation/cmd/federation-apiserver/app/server.go +++ b/federation/cmd/federation-apiserver/app/server.go @@ -37,6 +37,7 @@ import ( genericoptions "k8s.io/kubernetes/pkg/genericapiserver/options" "k8s.io/kubernetes/pkg/registry/cachesize" "k8s.io/kubernetes/pkg/registry/generic" + "k8s.io/kubernetes/pkg/util/wait" ) // NewAPIServerCommand creates a *cobra.Command object with default parameters @@ -121,13 +122,12 @@ func Run(s *genericoptions.ServerRunOptions) error { if err != nil { glog.Errorf("Failed to create clientset: %v", err) } - namespaceInformer := informers.CreateSharedNamespaceIndexInformer(client, 5*time.Minute) - pluginInit := admission.NewPluginInitializer() - pluginInit.SetNamespaceInformer(namespaceInformer) + sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute) + pluginInitializer := admission.NewPluginInitializer(sharedInformers) - admissionController, err := admission.NewFromPlugins(client, admissionControlPluginNames, s.AdmissionControlConfigFile, pluginInit) + admissionController, err := admission.NewFromPlugins(client, admissionControlPluginNames, s.AdmissionControlConfigFile, pluginInitializer) if err != nil { - glog.Errorf("Failed to initialize plugins: %v", err) + glog.Fatalf("Failed to initialize plugins: %v", err) } genericConfig := genericapiserver.NewConfig(s) // TODO: Move the following to generic api server as well. @@ -154,6 +154,7 @@ func Run(s *genericoptions.ServerRunOptions) error { installCoreAPIs(s, m, storageFactory) installExtensionsAPIs(s, m, storageFactory) + sharedInformers.Start(wait.NeverStop) m.Run(s) return nil } diff --git a/pkg/admission/init.go b/pkg/admission/init.go index 254dcc9672a..c4433ffce50 100644 --- a/pkg/admission/init.go +++ b/pkg/admission/init.go @@ -1,5 +1,5 @@ /* -Copyright 2016 The Kubernetes Authors All rights reserved. +Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -17,41 +17,33 @@ limitations under the License. package admission import ( - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/controller/framework" - "reflect" + "k8s.io/kubernetes/pkg/controller/framework/informers" ) -// PluginInitializer is used for Initialization of shareable resources between admission plugins -// After Initialization the resources have to be set separately +// PluginInitializer is used for initialization of shareable resources between admission plugins. +// After initialization the resources have to be set separately type PluginInitializer interface { Initialize(plugins []Interface) - SetNamespaceInformer(namespaceInformer framework.SharedIndexInformer) } type pluginInitializer struct { - informers map[reflect.Type]framework.SharedIndexInformer + informers informers.SharedInformerFactory } // NewPluginInitializer constructs new instance of PluginInitializer -func NewPluginInitializer() PluginInitializer { +func NewPluginInitializer(sharedInformers informers.SharedInformerFactory) PluginInitializer { plugInit := &pluginInitializer{ - informers: make(map[reflect.Type]framework.SharedIndexInformer), + informers: sharedInformers, } return plugInit } -// SetNamespaceInformer sets unique namespaceInformer for instance of PluginInitializer -func (i *pluginInitializer) SetNamespaceInformer(namespaceInformer framework.SharedIndexInformer) { - i.informers[reflect.TypeOf(&api.Namespace{})] = namespaceInformer -} - -// Initialize will check the initialization interfaces implemented by each plugin +// Initialize checks the initialization interfaces implemented by each plugin // and provide the appropriate initialization data func (i *pluginInitializer) Initialize(plugins []Interface) { for _, plugin := range plugins { - if wantsNamespaceInformer, ok := plugin.(WantsNamespaceInformer); ok { - wantsNamespaceInformer.SetNamespaceInformer(i.informers[reflect.TypeOf(&api.Namespace{})]) + if wantsInformerFactory, ok := plugin.(WantsInformerFactory); ok { + wantsInformerFactory.SetInformerFactory(i.informers) } } } diff --git a/pkg/admission/types.go b/pkg/admission/types.go index a3739e1915f..4de01016eb1 100644 --- a/pkg/admission/types.go +++ b/pkg/admission/types.go @@ -1,5 +1,5 @@ /* -Copyright 2016 The Kubernetes Authors All rights reserved. +Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -16,7 +16,9 @@ limitations under the License. package admission -import "k8s.io/kubernetes/pkg/controller/framework" +import ( + "k8s.io/kubernetes/pkg/controller/framework/informers" +) // Validator holds Validate functions, which are responsible for validation of initialized shared resources // and should be implemented on admission plugins @@ -24,7 +26,8 @@ type Validator interface { Validate() error } -// WantsNamespaceInformer defines a function witch sets NamespaceInformer for admission plugins that need it -type WantsNamespaceInformer interface { - SetNamespaceInformer(framework.SharedIndexInformer) +// WantsNamespaceInformer defines a function which sets NamespaceInformer for admission plugins that need it +type WantsInformerFactory interface { + SetInformerFactory(informers.SharedInformerFactory) + Validator } diff --git a/pkg/client/cache/listers.go b/pkg/client/cache/listers.go index 09572f3fca0..4b96cf43cbd 100644 --- a/pkg/client/cache/listers.go +++ b/pkg/client/cache/listers.go @@ -693,3 +693,20 @@ func (s *StoreToCertificateRequestLister) List() (csrs certificates.CertificateS } return csrs, nil } + +// IndexerToNamespaceLister gives an Indexer List method +type IndexerToNamespaceLister struct { + Indexer +} + +// List returns a list of namespaces +func (i *IndexerToNamespaceLister) List(selector labels.Selector) (namespaces []*api.Namespace, err error) { + for _, m := range i.Indexer.List() { + namespace := m.(*api.Namespace) + if selector.Matches(labels.Set(namespace.Labels)) { + namespaces = append(namespaces, namespace) + } + } + + return namespaces, nil +} diff --git a/pkg/controller/framework/informers/core.go b/pkg/controller/framework/informers/core.go new file mode 100644 index 00000000000..744aaabad4c --- /dev/null +++ b/pkg/controller/framework/informers/core.go @@ -0,0 +1,273 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package informers + +import ( + "reflect" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/watch" +) + +// PodInformer is type of SharedIndexInformer which watches and lists all pods. +// Interface provides constructor for informer and lister for pods +type PodInformer interface { + Informer() framework.SharedIndexInformer + Lister() *cache.StoreToPodLister +} + +type podInformer struct { + *sharedInformerFactory +} + +// Informer checks whether podInformer exists in sharedInformerFactory and if not, it creates new informer of type +// podInformer and connects it to sharedInformerFactory +func (f *podInformer) Informer() framework.SharedIndexInformer { + f.lock.Lock() + defer f.lock.Unlock() + + informerObj := &api.Pod{} + informerType := reflect.TypeOf(informerObj) + informer, exists := f.informers[informerType] + if exists { + return informer + } + + informer = framework.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return f.client.Core().Pods(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return f.client.Core().Pods(api.NamespaceAll).Watch(options) + }, + }, + informerObj, + f.defaultResync, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + f.informers[informerType] = informer + + return informer +} + +// Lister returns lister for podInformer +func (f *podInformer) Lister() *cache.StoreToPodLister { + informer := f.Informer() + return &cache.StoreToPodLister{Indexer: informer.GetIndexer()} +} + +//***************************************************************************** + +// NamespaceInformer is type of SharedIndexInformer which watches and lists all namespaces. +// Interface provides constructor for informer and lister for namsespaces +type NamespaceInformer interface { + Informer() framework.SharedIndexInformer + Lister() *cache.IndexerToNamespaceLister +} + +type namespaceInformer struct { + *sharedInformerFactory +} + +// Informer checks whether namespaceInformer exists in sharedInformerFactory and if not, it creates new informer of type +// namespaceInformer and connects it to sharedInformerFactory +func (f *namespaceInformer) Informer() framework.SharedIndexInformer { + f.lock.Lock() + defer f.lock.Unlock() + informerObj := &api.Namespace{} + informerType := reflect.TypeOf(informerObj) + informer, exists := f.informers[informerType] + if exists { + return informer + } + informer = framework.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return f.client.Core().Namespaces().List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return f.client.Core().Namespaces().Watch(options) + }, + }, + informerObj, + f.defaultResync, + cache.Indexers{}, + ) + f.informers[informerType] = informer + + return informer +} + +// Lister returns lister for namespaceInformer +func (f *namespaceInformer) Lister() *cache.IndexerToNamespaceLister { + informer := f.Informer() + return &cache.IndexerToNamespaceLister{Indexer: informer.GetIndexer()} +} + +//***************************************************************************** + +// NodeInformer is type of SharedIndexInformer which watches and lists all nodes. +// Interface provides constructor for informer and lister for nodes +type NodeInformer interface { + Informer() framework.SharedIndexInformer + Lister() *cache.StoreToNodeLister +} + +type nodeInformer struct { + *sharedInformerFactory +} + +// Informer checks whether nodeInformer exists in sharedInformerFactory and if not, it creates new informer of type +// nodeInformer and connects it to sharedInformerFactory +func (f *nodeInformer) Informer() framework.SharedIndexInformer { + f.lock.Lock() + defer f.lock.Unlock() + + informerObj := &api.Node{} + informerType := reflect.TypeOf(informerObj) + informer, exists := f.informers[informerType] + if exists { + return informer + } + + informer = framework.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return f.client.Core().Nodes().List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return f.client.Core().Nodes().Watch(options) + }, + }, + informerObj, + f.defaultResync, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + f.informers[informerType] = informer + + return informer +} + +// Lister returns lister for nodeInformer +func (f *nodeInformer) Lister() *cache.StoreToNodeLister { + informer := f.Informer() + return &cache.StoreToNodeLister{Store: informer.GetStore()} +} + +//***************************************************************************** + +// PVCInformer is type of SharedIndexInformer which watches and lists all persistent volume claims. +// Interface provides constructor for informer and lister for persistent volume claims +type PVCInformer interface { + Informer() framework.SharedIndexInformer + Lister() *cache.StoreToPVCFetcher +} + +type pvcInformer struct { + *sharedInformerFactory +} + +// Informer checks whether pvcInformer exists in sharedInformerFactory and if not, it creates new informer of type +// pvcInformer and connects it to sharedInformerFactory +func (f *pvcInformer) Informer() framework.SharedIndexInformer { + f.lock.Lock() + defer f.lock.Unlock() + + informerObj := &api.PersistentVolumeClaim{} + informerType := reflect.TypeOf(informerObj) + informer, exists := f.informers[informerType] + if exists { + return informer + } + + informer = framework.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return f.client.Core().PersistentVolumeClaims(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return f.client.Core().PersistentVolumeClaims(api.NamespaceAll).Watch(options) + }, + }, + informerObj, + f.defaultResync, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + f.informers[informerType] = informer + + return informer +} + +// Lister returns lister for pvcInformer +func (f *pvcInformer) Lister() *cache.StoreToPVCFetcher { + informer := f.Informer() + return &cache.StoreToPVCFetcher{Store: informer.GetStore()} +} + +//***************************************************************************** + +// PVInformer is type of SharedIndexInformer which watches and lists all persistent volumes. +// Interface provides constructor for informer and lister for persistent volumes +type PVInformer interface { + Informer() framework.SharedIndexInformer + Lister() *cache.StoreToPVFetcher +} + +type pvInformer struct { + *sharedInformerFactory +} + +// Informer checks whether pvInformer exists in sharedInformerFactory and if not, it creates new informer of type +// pvInformer and connects it to sharedInformerFactory +func (f *pvInformer) Informer() framework.SharedIndexInformer { + f.lock.Lock() + defer f.lock.Unlock() + + informerObj := &api.PersistentVolume{} + informerType := reflect.TypeOf(informerObj) + informer, exists := f.informers[informerType] + if exists { + return informer + } + + informer = framework.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return f.client.Core().PersistentVolumes().List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return f.client.Core().PersistentVolumes().Watch(options) + }, + }, + informerObj, + f.defaultResync, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + f.informers[informerType] = informer + + return informer +} + +// Lister returns lister for pvInformer +func (f *pvInformer) Lister() *cache.StoreToPVFetcher { + informer := f.Informer() + return &cache.StoreToPVFetcher{Store: informer.GetStore()} +} diff --git a/pkg/controller/framework/informers/factory.go b/pkg/controller/framework/informers/factory.go index bec2bdb91b7..850b42af00e 100644 --- a/pkg/controller/framework/informers/factory.go +++ b/pkg/controller/framework/informers/factory.go @@ -17,6 +17,8 @@ limitations under the License. package informers import ( + "reflect" + "sync" "time" "k8s.io/kubernetes/pkg/api" @@ -27,6 +29,70 @@ import ( "k8s.io/kubernetes/pkg/watch" ) +// SharedInformerFactory provides interface which holds unique informers for pods, nodes, namespaces, persistent volume +// claims and persistent volumes +type SharedInformerFactory interface { + // Start starts informers that can start AFTER the API server and controllers have started + Start(stopCh <-chan struct{}) + + Pods() PodInformer + Nodes() NodeInformer + Namespaces() NamespaceInformer + PersistentVolumeClaims() PVCInformer + PersistentVolumes() PVInformer +} + +type sharedInformerFactory struct { + client clientset.Interface + lock sync.Mutex + defaultResync time.Duration + informers map[reflect.Type]framework.SharedIndexInformer +} + +// NewSharedInformerFactory constructs a new instance of sharedInformerFactory +func NewSharedInformerFactory(client clientset.Interface, defaultResync time.Duration) SharedInformerFactory { + return &sharedInformerFactory{ + client: client, + defaultResync: defaultResync, + informers: make(map[reflect.Type]framework.SharedIndexInformer), + } +} + +// Start initializes all requested informers. +func (s *sharedInformerFactory) Start(stopCh <-chan struct{}) { + s.lock.Lock() + defer s.lock.Unlock() + + for _, informer := range s.informers { + go informer.Run(stopCh) + } +} + +// Pods returns a SharedIndexInformer that lists and watches all pods +func (f *sharedInformerFactory) Pods() PodInformer { + return &podInformer{sharedInformerFactory: f} +} + +// Nodes returns a SharedIndexInformer that lists and watches all nodes +func (f *sharedInformerFactory) Nodes() NodeInformer { + return &nodeInformer{sharedInformerFactory: f} +} + +// Namespaces returns a SharedIndexInformer that lists and watches all namespaces +func (f *sharedInformerFactory) Namespaces() NamespaceInformer { + return &namespaceInformer{sharedInformerFactory: f} +} + +// PersistentVolumeClaims returns a SharedIndexInformer that lists and watches all persistent volume claims +func (f *sharedInformerFactory) PersistentVolumeClaims() PVCInformer { + return &pvcInformer{sharedInformerFactory: f} +} + +// PersistentVolumes returns a SharedIndexInformer that lists and watches all persistent volumes +func (f *sharedInformerFactory) PersistentVolumes() PVInformer { + return &pvInformer{sharedInformerFactory: f} +} + // CreateSharedPodInformer returns a SharedIndexInformer that lists and watches all pods func CreateSharedPodInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { sharedInformer := framework.NewSharedIndexInformer( diff --git a/plugin/pkg/admission/namespace/autoprovision/admission.go b/plugin/pkg/admission/namespace/autoprovision/admission.go index 819a8135a3d..3f248ef5982 100644 --- a/plugin/pkg/admission/namespace/autoprovision/admission.go +++ b/plugin/pkg/admission/namespace/autoprovision/admission.go @@ -25,7 +25,7 @@ import ( "k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" - "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/controller/framework/informers" ) func init() { @@ -39,11 +39,11 @@ func init() { // It is useful in deployments that do not want to restrict creation of a namespace prior to its usage. type provision struct { *admission.Handler - client clientset.Interface - informer framework.SharedIndexInformer + client clientset.Interface + informerFactory informers.SharedInformerFactory } -var _ = admission.WantsNamespaceInformer(&provision{}) +var _ = admission.WantsInformerFactory(&provision{}) func (p *provision) Admit(a admission.Attributes) (err error) { // if we're here, then we've already passed authentication, so we're allowed to do what we're trying to do @@ -60,7 +60,7 @@ func (p *provision) Admit(a admission.Attributes) (err error) { }, Status: api.NamespaceStatus{}, } - _, exists, err := p.informer.GetStore().Get(namespace) + _, exists, err := p.informerFactory.Namespaces().Informer().GetStore().Get(namespace) if err != nil { return admission.NewForbidden(a, err) } @@ -82,13 +82,13 @@ func NewProvision(c clientset.Interface) admission.Interface { } } -func (p *provision) SetNamespaceInformer(c framework.SharedIndexInformer) { - p.informer = c +func (p *provision) SetInformerFactory(f informers.SharedInformerFactory) { + p.informerFactory = f } func (p *provision) Validate() error { - if p.informer == nil { - return fmt.Errorf("namespace autoprovision plugin needs a namespace informer") + if p.informerFactory == nil { + return fmt.Errorf("namespace autoprovision plugin needs SharedInformerFactory") } return nil } diff --git a/plugin/pkg/admission/namespace/autoprovision/admission_test.go b/plugin/pkg/admission/namespace/autoprovision/admission_test.go index 4faf1cca934..13f68c1fa21 100644 --- a/plugin/pkg/admission/namespace/autoprovision/admission_test.go +++ b/plugin/pkg/admission/namespace/autoprovision/admission_test.go @@ -27,15 +27,19 @@ import ( "k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/wait" ) // TestAdmission verifies a namespace is created on create requests for namespace managed resources func TestAdmission(t *testing.T) { namespace := "test" mockClient := &fake.Clientset{} + informerFactory := informers.NewSharedInformerFactory(mockClient, 5*time.Minute) + informerFactory.Namespaces() + informerFactory.Start(wait.NeverStop) handler := &provision{ - client: mockClient, - informer: informers.CreateSharedNamespaceIndexInformer(mockClient, 5*time.Minute), + client: mockClient, + informerFactory: informerFactory, } pod := api.Pod{ ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace}, @@ -61,13 +65,14 @@ func TestAdmission(t *testing.T) { func TestAdmissionNamespaceExists(t *testing.T) { namespace := "test" mockClient := &fake.Clientset{} - informer := informers.CreateSharedNamespaceIndexInformer(mockClient, 5*time.Minute) - informer.GetStore().Add(&api.Namespace{ + informerFactory := informers.NewSharedInformerFactory(mockClient, 5*time.Minute) + informerFactory.Namespaces().Informer().GetStore().Add(&api.Namespace{ ObjectMeta: api.ObjectMeta{Name: namespace}, }) + informerFactory.Start(wait.NeverStop) handler := &provision{ - client: mockClient, - informer: informer, + client: mockClient, + informerFactory: informerFactory, } pod := api.Pod{ ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace}, @@ -113,10 +118,12 @@ func TestAdmissionNamespaceExistsUnknownToHandler(t *testing.T) { mockClient.AddReactor("create", "namespaces", func(action core.Action) (bool, runtime.Object, error) { return true, nil, errors.NewAlreadyExists(api.Resource("namespaces"), namespace) }) - + informerFactory := informers.NewSharedInformerFactory(mockClient, 5*time.Minute) + informerFactory.Namespaces() + informerFactory.Start(wait.NeverStop) handler := &provision{ - client: mockClient, - informer: informers.CreateSharedNamespaceIndexInformer(mockClient, 5*time.Minute), + client: mockClient, + informerFactory: informerFactory, } pod := api.Pod{ ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace}, @@ -134,11 +141,11 @@ func TestAdmissionNamespaceExistsUnknownToHandler(t *testing.T) { // TestAdmissionNamespaceValidate func TestAdmissionNamespaceValidate(t *testing.T) { mockClient := &fake.Clientset{} - informer := informers.CreateSharedNamespaceIndexInformer(mockClient, 5*time.Minute) + informerFactory := informers.NewSharedInformerFactory(mockClient, 5*time.Minute) handler := &provision{ client: mockClient, } - handler.SetNamespaceInformer(informer) + handler.SetInformerFactory(informerFactory) err := handler.Validate() if err != nil { t.Errorf("Failed to initialize informer") diff --git a/plugin/pkg/admission/namespace/exists/admission.go b/plugin/pkg/admission/namespace/exists/admission.go index a189ce79a94..ab031ee607b 100644 --- a/plugin/pkg/admission/namespace/exists/admission.go +++ b/plugin/pkg/admission/namespace/exists/admission.go @@ -25,7 +25,7 @@ import ( "k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" - "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/controller/framework/informers" ) func init() { @@ -39,11 +39,11 @@ func init() { // It is useful in deployments that want to enforce pre-declaration of a Namespace resource. type exists struct { *admission.Handler - client clientset.Interface - informer framework.SharedIndexInformer + client clientset.Interface + informerFactory informers.SharedInformerFactory } -var _ = admission.WantsNamespaceInformer(&exists{}) +var _ = admission.WantsInformerFactory(&exists{}) func (e *exists) Admit(a admission.Attributes) (err error) { // if we're here, then we've already passed authentication, so we're allowed to do what we're trying to do @@ -60,7 +60,7 @@ func (e *exists) Admit(a admission.Attributes) (err error) { }, Status: api.NamespaceStatus{}, } - _, exists, err := e.informer.GetStore().Get(namespace) + _, exists, err := e.informerFactory.Namespaces().Informer().GetStore().Get(namespace) if err != nil { return errors.NewInternalError(err) } @@ -88,12 +88,12 @@ func NewExists(c clientset.Interface) admission.Interface { } } -func (e *exists) SetNamespaceInformer(c framework.SharedIndexInformer) { - e.informer = c +func (e *exists) SetInformerFactory(f informers.SharedInformerFactory) { + e.informerFactory = f } func (e *exists) Validate() error { - if e.informer == nil { + if e.informerFactory == nil { return fmt.Errorf("namespace exists plugin needs a namespace informer") } return nil diff --git a/plugin/pkg/admission/namespace/lifecycle/admission.go b/plugin/pkg/admission/namespace/lifecycle/admission.go index 4e928fc6737..e75f56397c4 100644 --- a/plugin/pkg/admission/namespace/lifecycle/admission.go +++ b/plugin/pkg/admission/namespace/lifecycle/admission.go @@ -19,14 +19,17 @@ package lifecycle import ( "fmt" "io" + "time" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" - "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/watch" ) const PluginName = "NamespaceLifecycle" @@ -42,12 +45,10 @@ func init() { type lifecycle struct { *admission.Handler client clientset.Interface - informer framework.SharedIndexInformer + store cache.Store immortalNamespaces sets.String } -var _ = admission.WantsNamespaceInformer(&lifecycle{}) - func (l *lifecycle) Admit(a admission.Attributes) (err error) { // prevent deletion of immortal namespaces if a.GetOperation() == admission.Delete && a.GetKind().GroupKind() == api.Kind("Namespace") && l.immortalNamespaces.Has(a.GetName()) { @@ -64,7 +65,7 @@ func (l *lifecycle) Admit(a admission.Attributes) (err error) { // this will cause a live lookup of the namespace to get its latest state even // before the watch notification is received. if a.GetOperation() == admission.Delete { - l.informer.GetStore().Delete(&api.Namespace{ + l.store.Delete(&api.Namespace{ ObjectMeta: api.ObjectMeta{ Name: a.GetName(), }, @@ -73,7 +74,7 @@ func (l *lifecycle) Admit(a admission.Attributes) (err error) { return nil } - namespaceObj, exists, err := l.informer.GetStore().Get(&api.Namespace{ + namespaceObj, exists, err := l.store.Get(&api.Namespace{ ObjectMeta: api.ObjectMeta{ Name: a.GetNamespace(), Namespace: "", @@ -111,19 +112,25 @@ func (l *lifecycle) Admit(a admission.Attributes) (err error) { // NewLifecycle creates a new namespace lifecycle admission control handler func NewLifecycle(c clientset.Interface, immortalNamespaces sets.String) admission.Interface { + store := cache.NewStore(cache.MetaNamespaceKeyFunc) + reflector := cache.NewReflector( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return c.Core().Namespaces().List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return c.Core().Namespaces().Watch(options) + }, + }, + &api.Namespace{}, + store, + 5*time.Minute, + ) + reflector.Run() return &lifecycle{ Handler: admission.NewHandler(admission.Create, admission.Update, admission.Delete), client: c, + store: store, immortalNamespaces: immortalNamespaces, } } -func (l *lifecycle) SetNamespaceInformer(c framework.SharedIndexInformer) { - l.informer = c -} - -func (l *lifecycle) Validate() error { - if l.informer == nil { - return fmt.Errorf("namespace lifecycle plugin needs a namespace informer") - } - return nil -} diff --git a/plugin/pkg/admission/namespace/lifecycle/admission_test.go b/plugin/pkg/admission/namespace/lifecycle/admission_test.go index c52ff8c159e..99597f48c18 100644 --- a/plugin/pkg/admission/namespace/lifecycle/admission_test.go +++ b/plugin/pkg/admission/namespace/lifecycle/admission_test.go @@ -20,14 +20,13 @@ import ( "fmt" "sync" "testing" - "time" "k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/client/unversioned/testclient" - "k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/sets" ) @@ -45,10 +44,9 @@ func TestAdmission(t *testing.T) { } var namespaceLock sync.RWMutex + store := cache.NewStore(cache.MetaNamespaceKeyFunc) + store.Add(namespaceObj) mockClient := fake.NewSimpleClientset() - informer := informers.CreateSharedNamespaceIndexInformer(mockClient, 5*time.Minute) - informer.GetStore().Add(namespaceObj) - mockClient.PrependReactor("get", "namespaces", func(action core.Action) (bool, runtime.Object, error) { namespaceLock.RLock() defer namespaceLock.RUnlock() @@ -64,7 +62,7 @@ func TestAdmission(t *testing.T) { }) lfhandler := NewLifecycle(mockClient, sets.NewString("default")).(*lifecycle) - lfhandler.informer = informer + lfhandler.store = store handler := admission.NewChainHandler(lfhandler) pod := api.Pod{ ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespaceObj.Name}, @@ -89,7 +87,7 @@ func TestAdmission(t *testing.T) { namespaceLock.Lock() namespaceObj.Status.Phase = api.NamespaceTerminating namespaceLock.Unlock() - informer.GetStore().Add(namespaceObj) + store.Add(namespaceObj) // verify create operations in the namespace cause an error err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))