diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 251ebb24386..1a2632e3a7a 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -25,6 +25,7 @@ import ( "net/url" "strconv" "strings" + "time" "github.com/golang/glog" "github.com/spf13/cobra" @@ -41,6 +42,7 @@ import ( "k8s.io/kubernetes/pkg/apiserver/authenticator" "k8s.io/kubernetes/pkg/capabilities" "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/controller/framework/informers" serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount" "k8s.io/kubernetes/pkg/genericapiserver" kubeletclient "k8s.io/kubernetes/pkg/kubelet/client" @@ -56,6 +58,7 @@ import ( "k8s.io/kubernetes/pkg/registry/rolebinding" rolebindingetcd "k8s.io/kubernetes/pkg/registry/rolebinding/etcd" "k8s.io/kubernetes/pkg/serviceaccount" + "k8s.io/kubernetes/pkg/util/wait" ) // NewAPIServerCommand creates a *cobra.Command object with default parameters @@ -243,7 +246,13 @@ func Run(s *options.APIServer) error { if err != nil { glog.Errorf("Failed to create clientset: %v", err) } - admissionController := admission.NewFromPlugins(client, admissionControlPluginNames, s.AdmissionControlConfigFile) + sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute) + pluginInitializer := admission.NewPluginInitializer(sharedInformers) + + admissionController, err := admission.NewFromPlugins(client, admissionControlPluginNames, s.AdmissionControlConfigFile, pluginInitializer) + if err != nil { + glog.Fatalf("Failed to initialize plugins: %v", err) + } genericConfig := genericapiserver.NewConfig(s.ServerRunOptions) // TODO: Move the following to generic api server as well. @@ -278,6 +287,7 @@ func Run(s *options.APIServer) error { return err } + sharedInformers.Start(wait.NeverStop) m.Run(s.ServerRunOptions) return nil } diff --git a/federation/cmd/federation-apiserver/app/server.go b/federation/cmd/federation-apiserver/app/server.go index df5a5f8dcc0..605e106555f 100644 --- a/federation/cmd/federation-apiserver/app/server.go +++ b/federation/cmd/federation-apiserver/app/server.go @@ -21,6 +21,7 @@ package app import ( "strings" + "time" "github.com/golang/glog" "github.com/spf13/cobra" @@ -31,10 +32,12 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apiserver" "k8s.io/kubernetes/pkg/apiserver/authenticator" + "k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/genericapiserver" genericoptions "k8s.io/kubernetes/pkg/genericapiserver/options" "k8s.io/kubernetes/pkg/registry/cachesize" "k8s.io/kubernetes/pkg/registry/generic" + "k8s.io/kubernetes/pkg/util/wait" ) // NewAPIServerCommand creates a *cobra.Command object with default parameters @@ -119,8 +122,13 @@ func Run(s *genericoptions.ServerRunOptions) error { if err != nil { glog.Errorf("Failed to create clientset: %v", err) } - admissionController := admission.NewFromPlugins(client, admissionControlPluginNames, s.AdmissionControlConfigFile) + sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute) + pluginInitializer := admission.NewPluginInitializer(sharedInformers) + admissionController, err := admission.NewFromPlugins(client, admissionControlPluginNames, s.AdmissionControlConfigFile, pluginInitializer) + if err != nil { + glog.Fatalf("Failed to initialize plugins: %v", err) + } genericConfig := genericapiserver.NewConfig(s) // TODO: Move the following to generic api server as well. genericConfig.StorageFactory = storageFactory @@ -146,6 +154,7 @@ func Run(s *genericoptions.ServerRunOptions) error { installCoreAPIs(s, m, storageFactory) installExtensionsAPIs(s, m, storageFactory) + sharedInformers.Start(wait.NeverStop) m.Run(s) return nil } diff --git a/pkg/admission/chain.go b/pkg/admission/chain.go index 41d40b67292..a43bf8bb5b6 100644 --- a/pkg/admission/chain.go +++ b/pkg/admission/chain.go @@ -23,7 +23,7 @@ type chainAdmissionHandler []Interface // NewFromPlugins returns an admission.Interface that will enforce admission control decisions of all // the given plugins. -func NewFromPlugins(client clientset.Interface, pluginNames []string, configFilePath string) Interface { +func NewFromPlugins(client clientset.Interface, pluginNames []string, configFilePath string, plugInit PluginInitializer) (Interface, error) { plugins := []Interface{} for _, pluginName := range pluginNames { plugin := InitPlugin(pluginName, client, configFilePath) @@ -31,7 +31,12 @@ func NewFromPlugins(client clientset.Interface, pluginNames []string, configFile plugins = append(plugins, plugin) } } - return chainAdmissionHandler(plugins) + plugInit.Initialize(plugins) + // ensure that plugins have been properly initialized + if err := Validate(plugins); err != nil { + return nil, err + } + return chainAdmissionHandler(plugins), nil } // NewChainHandler creates a new chain handler from an array of handlers. Used for testing. diff --git a/pkg/admission/init.go b/pkg/admission/init.go new file mode 100644 index 00000000000..c4433ffce50 --- /dev/null +++ b/pkg/admission/init.go @@ -0,0 +1,63 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package admission + +import ( + "k8s.io/kubernetes/pkg/controller/framework/informers" +) + +// PluginInitializer is used for initialization of shareable resources between admission plugins. +// After initialization the resources have to be set separately +type PluginInitializer interface { + Initialize(plugins []Interface) +} + +type pluginInitializer struct { + informers informers.SharedInformerFactory +} + +// NewPluginInitializer constructs new instance of PluginInitializer +func NewPluginInitializer(sharedInformers informers.SharedInformerFactory) PluginInitializer { + plugInit := &pluginInitializer{ + informers: sharedInformers, + } + return plugInit +} + +// Initialize checks the initialization interfaces implemented by each plugin +// and provide the appropriate initialization data +func (i *pluginInitializer) Initialize(plugins []Interface) { + for _, plugin := range plugins { + if wantsInformerFactory, ok := plugin.(WantsInformerFactory); ok { + wantsInformerFactory.SetInformerFactory(i.informers) + } + } +} + +// Validate will call the Validate function in each plugin if they implement +// the Validator interface. +func Validate(plugins []Interface) error { + for _, plugin := range plugins { + if validater, ok := plugin.(Validator); ok { + err := validater.Validate() + if err != nil { + return err + } + } + } + return nil +} diff --git a/pkg/admission/types.go b/pkg/admission/types.go new file mode 100644 index 00000000000..4de01016eb1 --- /dev/null +++ b/pkg/admission/types.go @@ -0,0 +1,33 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package admission + +import ( + "k8s.io/kubernetes/pkg/controller/framework/informers" +) + +// Validator holds Validate functions, which are responsible for validation of initialized shared resources +// and should be implemented on admission plugins +type Validator interface { + Validate() error +} + +// WantsNamespaceInformer defines a function which sets NamespaceInformer for admission plugins that need it +type WantsInformerFactory interface { + SetInformerFactory(informers.SharedInformerFactory) + Validator +} diff --git a/pkg/client/cache/listers.go b/pkg/client/cache/listers.go index 09572f3fca0..4b96cf43cbd 100644 --- a/pkg/client/cache/listers.go +++ b/pkg/client/cache/listers.go @@ -693,3 +693,20 @@ func (s *StoreToCertificateRequestLister) List() (csrs certificates.CertificateS } return csrs, nil } + +// IndexerToNamespaceLister gives an Indexer List method +type IndexerToNamespaceLister struct { + Indexer +} + +// List returns a list of namespaces +func (i *IndexerToNamespaceLister) List(selector labels.Selector) (namespaces []*api.Namespace, err error) { + for _, m := range i.Indexer.List() { + namespace := m.(*api.Namespace) + if selector.Matches(labels.Set(namespace.Labels)) { + namespaces = append(namespaces, namespace) + } + } + + return namespaces, nil +} diff --git a/pkg/controller/framework/informers/core.go b/pkg/controller/framework/informers/core.go new file mode 100644 index 00000000000..744aaabad4c --- /dev/null +++ b/pkg/controller/framework/informers/core.go @@ -0,0 +1,273 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package informers + +import ( + "reflect" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/watch" +) + +// PodInformer is type of SharedIndexInformer which watches and lists all pods. +// Interface provides constructor for informer and lister for pods +type PodInformer interface { + Informer() framework.SharedIndexInformer + Lister() *cache.StoreToPodLister +} + +type podInformer struct { + *sharedInformerFactory +} + +// Informer checks whether podInformer exists in sharedInformerFactory and if not, it creates new informer of type +// podInformer and connects it to sharedInformerFactory +func (f *podInformer) Informer() framework.SharedIndexInformer { + f.lock.Lock() + defer f.lock.Unlock() + + informerObj := &api.Pod{} + informerType := reflect.TypeOf(informerObj) + informer, exists := f.informers[informerType] + if exists { + return informer + } + + informer = framework.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return f.client.Core().Pods(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return f.client.Core().Pods(api.NamespaceAll).Watch(options) + }, + }, + informerObj, + f.defaultResync, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + f.informers[informerType] = informer + + return informer +} + +// Lister returns lister for podInformer +func (f *podInformer) Lister() *cache.StoreToPodLister { + informer := f.Informer() + return &cache.StoreToPodLister{Indexer: informer.GetIndexer()} +} + +//***************************************************************************** + +// NamespaceInformer is type of SharedIndexInformer which watches and lists all namespaces. +// Interface provides constructor for informer and lister for namsespaces +type NamespaceInformer interface { + Informer() framework.SharedIndexInformer + Lister() *cache.IndexerToNamespaceLister +} + +type namespaceInformer struct { + *sharedInformerFactory +} + +// Informer checks whether namespaceInformer exists in sharedInformerFactory and if not, it creates new informer of type +// namespaceInformer and connects it to sharedInformerFactory +func (f *namespaceInformer) Informer() framework.SharedIndexInformer { + f.lock.Lock() + defer f.lock.Unlock() + informerObj := &api.Namespace{} + informerType := reflect.TypeOf(informerObj) + informer, exists := f.informers[informerType] + if exists { + return informer + } + informer = framework.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return f.client.Core().Namespaces().List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return f.client.Core().Namespaces().Watch(options) + }, + }, + informerObj, + f.defaultResync, + cache.Indexers{}, + ) + f.informers[informerType] = informer + + return informer +} + +// Lister returns lister for namespaceInformer +func (f *namespaceInformer) Lister() *cache.IndexerToNamespaceLister { + informer := f.Informer() + return &cache.IndexerToNamespaceLister{Indexer: informer.GetIndexer()} +} + +//***************************************************************************** + +// NodeInformer is type of SharedIndexInformer which watches and lists all nodes. +// Interface provides constructor for informer and lister for nodes +type NodeInformer interface { + Informer() framework.SharedIndexInformer + Lister() *cache.StoreToNodeLister +} + +type nodeInformer struct { + *sharedInformerFactory +} + +// Informer checks whether nodeInformer exists in sharedInformerFactory and if not, it creates new informer of type +// nodeInformer and connects it to sharedInformerFactory +func (f *nodeInformer) Informer() framework.SharedIndexInformer { + f.lock.Lock() + defer f.lock.Unlock() + + informerObj := &api.Node{} + informerType := reflect.TypeOf(informerObj) + informer, exists := f.informers[informerType] + if exists { + return informer + } + + informer = framework.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return f.client.Core().Nodes().List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return f.client.Core().Nodes().Watch(options) + }, + }, + informerObj, + f.defaultResync, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + f.informers[informerType] = informer + + return informer +} + +// Lister returns lister for nodeInformer +func (f *nodeInformer) Lister() *cache.StoreToNodeLister { + informer := f.Informer() + return &cache.StoreToNodeLister{Store: informer.GetStore()} +} + +//***************************************************************************** + +// PVCInformer is type of SharedIndexInformer which watches and lists all persistent volume claims. +// Interface provides constructor for informer and lister for persistent volume claims +type PVCInformer interface { + Informer() framework.SharedIndexInformer + Lister() *cache.StoreToPVCFetcher +} + +type pvcInformer struct { + *sharedInformerFactory +} + +// Informer checks whether pvcInformer exists in sharedInformerFactory and if not, it creates new informer of type +// pvcInformer and connects it to sharedInformerFactory +func (f *pvcInformer) Informer() framework.SharedIndexInformer { + f.lock.Lock() + defer f.lock.Unlock() + + informerObj := &api.PersistentVolumeClaim{} + informerType := reflect.TypeOf(informerObj) + informer, exists := f.informers[informerType] + if exists { + return informer + } + + informer = framework.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return f.client.Core().PersistentVolumeClaims(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return f.client.Core().PersistentVolumeClaims(api.NamespaceAll).Watch(options) + }, + }, + informerObj, + f.defaultResync, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + f.informers[informerType] = informer + + return informer +} + +// Lister returns lister for pvcInformer +func (f *pvcInformer) Lister() *cache.StoreToPVCFetcher { + informer := f.Informer() + return &cache.StoreToPVCFetcher{Store: informer.GetStore()} +} + +//***************************************************************************** + +// PVInformer is type of SharedIndexInformer which watches and lists all persistent volumes. +// Interface provides constructor for informer and lister for persistent volumes +type PVInformer interface { + Informer() framework.SharedIndexInformer + Lister() *cache.StoreToPVFetcher +} + +type pvInformer struct { + *sharedInformerFactory +} + +// Informer checks whether pvInformer exists in sharedInformerFactory and if not, it creates new informer of type +// pvInformer and connects it to sharedInformerFactory +func (f *pvInformer) Informer() framework.SharedIndexInformer { + f.lock.Lock() + defer f.lock.Unlock() + + informerObj := &api.PersistentVolume{} + informerType := reflect.TypeOf(informerObj) + informer, exists := f.informers[informerType] + if exists { + return informer + } + + informer = framework.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return f.client.Core().PersistentVolumes().List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return f.client.Core().PersistentVolumes().Watch(options) + }, + }, + informerObj, + f.defaultResync, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + f.informers[informerType] = informer + + return informer +} + +// Lister returns lister for pvInformer +func (f *pvInformer) Lister() *cache.StoreToPVFetcher { + informer := f.Informer() + return &cache.StoreToPVFetcher{Store: informer.GetStore()} +} diff --git a/pkg/controller/framework/informers/factory.go b/pkg/controller/framework/informers/factory.go index be973bfb541..850b42af00e 100644 --- a/pkg/controller/framework/informers/factory.go +++ b/pkg/controller/framework/informers/factory.go @@ -17,6 +17,8 @@ limitations under the License. package informers import ( + "reflect" + "sync" "time" "k8s.io/kubernetes/pkg/api" @@ -27,6 +29,70 @@ import ( "k8s.io/kubernetes/pkg/watch" ) +// SharedInformerFactory provides interface which holds unique informers for pods, nodes, namespaces, persistent volume +// claims and persistent volumes +type SharedInformerFactory interface { + // Start starts informers that can start AFTER the API server and controllers have started + Start(stopCh <-chan struct{}) + + Pods() PodInformer + Nodes() NodeInformer + Namespaces() NamespaceInformer + PersistentVolumeClaims() PVCInformer + PersistentVolumes() PVInformer +} + +type sharedInformerFactory struct { + client clientset.Interface + lock sync.Mutex + defaultResync time.Duration + informers map[reflect.Type]framework.SharedIndexInformer +} + +// NewSharedInformerFactory constructs a new instance of sharedInformerFactory +func NewSharedInformerFactory(client clientset.Interface, defaultResync time.Duration) SharedInformerFactory { + return &sharedInformerFactory{ + client: client, + defaultResync: defaultResync, + informers: make(map[reflect.Type]framework.SharedIndexInformer), + } +} + +// Start initializes all requested informers. +func (s *sharedInformerFactory) Start(stopCh <-chan struct{}) { + s.lock.Lock() + defer s.lock.Unlock() + + for _, informer := range s.informers { + go informer.Run(stopCh) + } +} + +// Pods returns a SharedIndexInformer that lists and watches all pods +func (f *sharedInformerFactory) Pods() PodInformer { + return &podInformer{sharedInformerFactory: f} +} + +// Nodes returns a SharedIndexInformer that lists and watches all nodes +func (f *sharedInformerFactory) Nodes() NodeInformer { + return &nodeInformer{sharedInformerFactory: f} +} + +// Namespaces returns a SharedIndexInformer that lists and watches all namespaces +func (f *sharedInformerFactory) Namespaces() NamespaceInformer { + return &namespaceInformer{sharedInformerFactory: f} +} + +// PersistentVolumeClaims returns a SharedIndexInformer that lists and watches all persistent volume claims +func (f *sharedInformerFactory) PersistentVolumeClaims() PVCInformer { + return &pvcInformer{sharedInformerFactory: f} +} + +// PersistentVolumes returns a SharedIndexInformer that lists and watches all persistent volumes +func (f *sharedInformerFactory) PersistentVolumes() PVInformer { + return &pvInformer{sharedInformerFactory: f} +} + // CreateSharedPodInformer returns a SharedIndexInformer that lists and watches all pods func CreateSharedPodInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { sharedInformer := framework.NewSharedIndexInformer( @@ -118,3 +184,21 @@ func CreateSharedPVIndexInformer(client clientset.Interface, resyncPeriod time.D return sharedIndexInformer } + +// CreateSharedNamespaceIndexInformer returns a SharedIndexInformer that lists and watches namespaces +func CreateSharedNamespaceIndexInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { + sharedIndexInformer := framework.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return client.Core().Namespaces().List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return client.Core().Namespaces().Watch(options) + }, + }, + &api.Namespace{}, + resyncPeriod, + cache.Indexers{}) + + return sharedIndexInformer +} diff --git a/plugin/pkg/admission/namespace/autoprovision/admission.go b/plugin/pkg/admission/namespace/autoprovision/admission.go index 76291927b25..3f248ef5982 100644 --- a/plugin/pkg/admission/namespace/autoprovision/admission.go +++ b/plugin/pkg/admission/namespace/autoprovision/admission.go @@ -21,12 +21,11 @@ import ( clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "fmt" "k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" - "k8s.io/kubernetes/pkg/client/cache" - "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/watch" + "k8s.io/kubernetes/pkg/controller/framework/informers" ) func init() { @@ -40,10 +39,12 @@ func init() { // It is useful in deployments that do not want to restrict creation of a namespace prior to its usage. type provision struct { *admission.Handler - client clientset.Interface - store cache.Store + client clientset.Interface + informerFactory informers.SharedInformerFactory } +var _ = admission.WantsInformerFactory(&provision{}) + func (p *provision) Admit(a admission.Attributes) (err error) { // if we're here, then we've already passed authentication, so we're allowed to do what we're trying to do // if we're here, then the API server has found a route, which means that if we have a non-empty namespace @@ -59,7 +60,7 @@ func (p *provision) Admit(a admission.Attributes) (err error) { }, Status: api.NamespaceStatus{}, } - _, exists, err := p.store.Get(namespace) + _, exists, err := p.informerFactory.Namespaces().Informer().GetStore().Get(namespace) if err != nil { return admission.NewForbidden(a, err) } @@ -75,28 +76,19 @@ func (p *provision) Admit(a admission.Attributes) (err error) { // NewProvision creates a new namespace provision admission control handler func NewProvision(c clientset.Interface) admission.Interface { - store := cache.NewStore(cache.MetaNamespaceKeyFunc) - reflector := cache.NewReflector( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return c.Core().Namespaces().List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return c.Core().Namespaces().Watch(options) - }, - }, - &api.Namespace{}, - store, - 0, - ) - reflector.Run() - return createProvision(c, store) -} - -func createProvision(c clientset.Interface, store cache.Store) admission.Interface { return &provision{ Handler: admission.NewHandler(admission.Create), client: c, - store: store, } } + +func (p *provision) SetInformerFactory(f informers.SharedInformerFactory) { + p.informerFactory = f +} + +func (p *provision) Validate() error { + if p.informerFactory == nil { + return fmt.Errorf("namespace autoprovision plugin needs SharedInformerFactory") + } + return nil +} diff --git a/plugin/pkg/admission/namespace/autoprovision/admission_test.go b/plugin/pkg/admission/namespace/autoprovision/admission_test.go index 7592e5ffa3d..13f68c1fa21 100644 --- a/plugin/pkg/admission/namespace/autoprovision/admission_test.go +++ b/plugin/pkg/admission/namespace/autoprovision/admission_test.go @@ -18,23 +18,28 @@ package autoprovision import ( "testing" + "time" "k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" - "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/testing/core" + "k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/wait" ) // TestAdmission verifies a namespace is created on create requests for namespace managed resources func TestAdmission(t *testing.T) { namespace := "test" mockClient := &fake.Clientset{} + informerFactory := informers.NewSharedInformerFactory(mockClient, 5*time.Minute) + informerFactory.Namespaces() + informerFactory.Start(wait.NeverStop) handler := &provision{ - client: mockClient, - store: cache.NewStore(cache.MetaNamespaceKeyFunc), + client: mockClient, + informerFactory: informerFactory, } pod := api.Pod{ ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace}, @@ -60,13 +65,14 @@ func TestAdmission(t *testing.T) { func TestAdmissionNamespaceExists(t *testing.T) { namespace := "test" mockClient := &fake.Clientset{} - store := cache.NewStore(cache.MetaNamespaceKeyFunc) - store.Add(&api.Namespace{ + informerFactory := informers.NewSharedInformerFactory(mockClient, 5*time.Minute) + informerFactory.Namespaces().Informer().GetStore().Add(&api.Namespace{ ObjectMeta: api.ObjectMeta{Name: namespace}, }) + informerFactory.Start(wait.NeverStop) handler := &provision{ - client: mockClient, - store: store, + client: mockClient, + informerFactory: informerFactory, } pod := api.Pod{ ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace}, @@ -88,7 +94,7 @@ func TestAdmissionNamespaceExists(t *testing.T) { func TestIgnoreAdmission(t *testing.T) { namespace := "test" mockClient := &fake.Clientset{} - handler := admission.NewChainHandler(createProvision(mockClient, nil)) + handler := admission.NewChainHandler(NewProvision(mockClient)) pod := api.Pod{ ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace}, Spec: api.PodSpec{ @@ -112,11 +118,12 @@ func TestAdmissionNamespaceExistsUnknownToHandler(t *testing.T) { mockClient.AddReactor("create", "namespaces", func(action core.Action) (bool, runtime.Object, error) { return true, nil, errors.NewAlreadyExists(api.Resource("namespaces"), namespace) }) - - store := cache.NewStore(cache.MetaNamespaceKeyFunc) + informerFactory := informers.NewSharedInformerFactory(mockClient, 5*time.Minute) + informerFactory.Namespaces() + informerFactory.Start(wait.NeverStop) handler := &provision{ - client: mockClient, - store: store, + client: mockClient, + informerFactory: informerFactory, } pod := api.Pod{ ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace}, @@ -130,3 +137,17 @@ func TestAdmissionNamespaceExistsUnknownToHandler(t *testing.T) { t.Errorf("Unexpected error returned from admission handler") } } + +// TestAdmissionNamespaceValidate +func TestAdmissionNamespaceValidate(t *testing.T) { + mockClient := &fake.Clientset{} + informerFactory := informers.NewSharedInformerFactory(mockClient, 5*time.Minute) + handler := &provision{ + client: mockClient, + } + handler.SetInformerFactory(informerFactory) + err := handler.Validate() + if err != nil { + t.Errorf("Failed to initialize informer") + } +} diff --git a/plugin/pkg/admission/namespace/exists/admission.go b/plugin/pkg/admission/namespace/exists/admission.go index 42b6c7811c5..ab031ee607b 100644 --- a/plugin/pkg/admission/namespace/exists/admission.go +++ b/plugin/pkg/admission/namespace/exists/admission.go @@ -18,16 +18,14 @@ package exists import ( "io" - "time" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "fmt" "k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" - "k8s.io/kubernetes/pkg/client/cache" - "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/watch" + "k8s.io/kubernetes/pkg/controller/framework/informers" ) func init() { @@ -41,10 +39,12 @@ func init() { // It is useful in deployments that want to enforce pre-declaration of a Namespace resource. type exists struct { *admission.Handler - client clientset.Interface - store cache.Store + client clientset.Interface + informerFactory informers.SharedInformerFactory } +var _ = admission.WantsInformerFactory(&exists{}) + func (e *exists) Admit(a admission.Attributes) (err error) { // if we're here, then we've already passed authentication, so we're allowed to do what we're trying to do // if we're here, then the API server has found a route, which means that if we have a non-empty namespace @@ -60,7 +60,7 @@ func (e *exists) Admit(a admission.Attributes) (err error) { }, Status: api.NamespaceStatus{}, } - _, exists, err := e.store.Get(namespace) + _, exists, err := e.informerFactory.Namespaces().Informer().GetStore().Get(namespace) if err != nil { return errors.NewInternalError(err) } @@ -82,24 +82,19 @@ func (e *exists) Admit(a admission.Attributes) (err error) { // NewExists creates a new namespace exists admission control handler func NewExists(c clientset.Interface) admission.Interface { - store := cache.NewStore(cache.MetaNamespaceKeyFunc) - reflector := cache.NewReflector( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return c.Core().Namespaces().List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return c.Core().Namespaces().Watch(options) - }, - }, - &api.Namespace{}, - store, - 5*time.Minute, - ) - reflector.Run() return &exists{ client: c, - store: store, Handler: admission.NewHandler(admission.Create, admission.Update, admission.Delete), } } + +func (e *exists) SetInformerFactory(f informers.SharedInformerFactory) { + e.informerFactory = f +} + +func (e *exists) Validate() error { + if e.informerFactory == nil { + return fmt.Errorf("namespace exists plugin needs a namespace informer") + } + return nil +}