Merge pull request #26709 from hodovska/master

Automatic merge from submit-queue

Allow shareable resources for admission control plugins.

Changes allow admission control plugins to share resources. This is done via new PluginInitialization structure. The structure can be extended for other resources, for now it is an shared informer for namespace plugins (NamespiceLifecycle, NamespaceAutoProvisioning, NamespaceExists).

If a plugins needs some kind of shared resource e.g. client, the client shall be added to PluginInitializer and Wants methods implemented to every plugin which will use it.
This commit is contained in:
k8s-merge-robot 2016-07-22 11:07:05 -07:00 committed by GitHub
commit df2cf16ddb
11 changed files with 567 additions and 65 deletions

View File

@ -25,6 +25,7 @@ import (
"net/url"
"strconv"
"strings"
"time"
"github.com/golang/glog"
"github.com/spf13/cobra"
@ -41,6 +42,7 @@ import (
"k8s.io/kubernetes/pkg/apiserver/authenticator"
"k8s.io/kubernetes/pkg/capabilities"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller/framework/informers"
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
"k8s.io/kubernetes/pkg/genericapiserver"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
@ -56,6 +58,7 @@ import (
"k8s.io/kubernetes/pkg/registry/rolebinding"
rolebindingetcd "k8s.io/kubernetes/pkg/registry/rolebinding/etcd"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/util/wait"
)
// NewAPIServerCommand creates a *cobra.Command object with default parameters
@ -243,7 +246,13 @@ func Run(s *options.APIServer) error {
if err != nil {
glog.Errorf("Failed to create clientset: %v", err)
}
admissionController := admission.NewFromPlugins(client, admissionControlPluginNames, s.AdmissionControlConfigFile)
sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute)
pluginInitializer := admission.NewPluginInitializer(sharedInformers)
admissionController, err := admission.NewFromPlugins(client, admissionControlPluginNames, s.AdmissionControlConfigFile, pluginInitializer)
if err != nil {
glog.Fatalf("Failed to initialize plugins: %v", err)
}
genericConfig := genericapiserver.NewConfig(s.ServerRunOptions)
// TODO: Move the following to generic api server as well.
@ -278,6 +287,7 @@ func Run(s *options.APIServer) error {
return err
}
sharedInformers.Start(wait.NeverStop)
m.Run(s.ServerRunOptions)
return nil
}

View File

@ -21,6 +21,7 @@ package app
import (
"strings"
"time"
"github.com/golang/glog"
"github.com/spf13/cobra"
@ -31,10 +32,12 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apiserver"
"k8s.io/kubernetes/pkg/apiserver/authenticator"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/genericapiserver"
genericoptions "k8s.io/kubernetes/pkg/genericapiserver/options"
"k8s.io/kubernetes/pkg/registry/cachesize"
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/util/wait"
)
// NewAPIServerCommand creates a *cobra.Command object with default parameters
@ -119,8 +122,13 @@ func Run(s *genericoptions.ServerRunOptions) error {
if err != nil {
glog.Errorf("Failed to create clientset: %v", err)
}
admissionController := admission.NewFromPlugins(client, admissionControlPluginNames, s.AdmissionControlConfigFile)
sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute)
pluginInitializer := admission.NewPluginInitializer(sharedInformers)
admissionController, err := admission.NewFromPlugins(client, admissionControlPluginNames, s.AdmissionControlConfigFile, pluginInitializer)
if err != nil {
glog.Fatalf("Failed to initialize plugins: %v", err)
}
genericConfig := genericapiserver.NewConfig(s)
// TODO: Move the following to generic api server as well.
genericConfig.StorageFactory = storageFactory
@ -146,6 +154,7 @@ func Run(s *genericoptions.ServerRunOptions) error {
installCoreAPIs(s, m, storageFactory)
installExtensionsAPIs(s, m, storageFactory)
sharedInformers.Start(wait.NeverStop)
m.Run(s)
return nil
}

View File

@ -23,7 +23,7 @@ type chainAdmissionHandler []Interface
// NewFromPlugins returns an admission.Interface that will enforce admission control decisions of all
// the given plugins.
func NewFromPlugins(client clientset.Interface, pluginNames []string, configFilePath string) Interface {
func NewFromPlugins(client clientset.Interface, pluginNames []string, configFilePath string, plugInit PluginInitializer) (Interface, error) {
plugins := []Interface{}
for _, pluginName := range pluginNames {
plugin := InitPlugin(pluginName, client, configFilePath)
@ -31,7 +31,12 @@ func NewFromPlugins(client clientset.Interface, pluginNames []string, configFile
plugins = append(plugins, plugin)
}
}
return chainAdmissionHandler(plugins)
plugInit.Initialize(plugins)
// ensure that plugins have been properly initialized
if err := Validate(plugins); err != nil {
return nil, err
}
return chainAdmissionHandler(plugins), nil
}
// NewChainHandler creates a new chain handler from an array of handlers. Used for testing.

63
pkg/admission/init.go Normal file
View File

@ -0,0 +1,63 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package admission
import (
"k8s.io/kubernetes/pkg/controller/framework/informers"
)
// PluginInitializer is used for initialization of shareable resources between admission plugins.
// After initialization the resources have to be set separately
type PluginInitializer interface {
Initialize(plugins []Interface)
}
type pluginInitializer struct {
informers informers.SharedInformerFactory
}
// NewPluginInitializer constructs new instance of PluginInitializer
func NewPluginInitializer(sharedInformers informers.SharedInformerFactory) PluginInitializer {
plugInit := &pluginInitializer{
informers: sharedInformers,
}
return plugInit
}
// Initialize checks the initialization interfaces implemented by each plugin
// and provide the appropriate initialization data
func (i *pluginInitializer) Initialize(plugins []Interface) {
for _, plugin := range plugins {
if wantsInformerFactory, ok := plugin.(WantsInformerFactory); ok {
wantsInformerFactory.SetInformerFactory(i.informers)
}
}
}
// Validate will call the Validate function in each plugin if they implement
// the Validator interface.
func Validate(plugins []Interface) error {
for _, plugin := range plugins {
if validater, ok := plugin.(Validator); ok {
err := validater.Validate()
if err != nil {
return err
}
}
}
return nil
}

33
pkg/admission/types.go Normal file
View File

@ -0,0 +1,33 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package admission
import (
"k8s.io/kubernetes/pkg/controller/framework/informers"
)
// Validator holds Validate functions, which are responsible for validation of initialized shared resources
// and should be implemented on admission plugins
type Validator interface {
Validate() error
}
// WantsNamespaceInformer defines a function which sets NamespaceInformer for admission plugins that need it
type WantsInformerFactory interface {
SetInformerFactory(informers.SharedInformerFactory)
Validator
}

View File

@ -693,3 +693,20 @@ func (s *StoreToCertificateRequestLister) List() (csrs certificates.CertificateS
}
return csrs, nil
}
// IndexerToNamespaceLister gives an Indexer List method
type IndexerToNamespaceLister struct {
Indexer
}
// List returns a list of namespaces
func (i *IndexerToNamespaceLister) List(selector labels.Selector) (namespaces []*api.Namespace, err error) {
for _, m := range i.Indexer.List() {
namespace := m.(*api.Namespace)
if selector.Matches(labels.Set(namespace.Labels)) {
namespaces = append(namespaces, namespace)
}
}
return namespaces, nil
}

View File

@ -0,0 +1,273 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package informers
import (
"reflect"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
)
// PodInformer is type of SharedIndexInformer which watches and lists all pods.
// Interface provides constructor for informer and lister for pods
type PodInformer interface {
Informer() framework.SharedIndexInformer
Lister() *cache.StoreToPodLister
}
type podInformer struct {
*sharedInformerFactory
}
// Informer checks whether podInformer exists in sharedInformerFactory and if not, it creates new informer of type
// podInformer and connects it to sharedInformerFactory
func (f *podInformer) Informer() framework.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerObj := &api.Pod{}
informerType := reflect.TypeOf(informerObj)
informer, exists := f.informers[informerType]
if exists {
return informer
}
informer = framework.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return f.client.Core().Pods(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return f.client.Core().Pods(api.NamespaceAll).Watch(options)
},
},
informerObj,
f.defaultResync,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
f.informers[informerType] = informer
return informer
}
// Lister returns lister for podInformer
func (f *podInformer) Lister() *cache.StoreToPodLister {
informer := f.Informer()
return &cache.StoreToPodLister{Indexer: informer.GetIndexer()}
}
//*****************************************************************************
// NamespaceInformer is type of SharedIndexInformer which watches and lists all namespaces.
// Interface provides constructor for informer and lister for namsespaces
type NamespaceInformer interface {
Informer() framework.SharedIndexInformer
Lister() *cache.IndexerToNamespaceLister
}
type namespaceInformer struct {
*sharedInformerFactory
}
// Informer checks whether namespaceInformer exists in sharedInformerFactory and if not, it creates new informer of type
// namespaceInformer and connects it to sharedInformerFactory
func (f *namespaceInformer) Informer() framework.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerObj := &api.Namespace{}
informerType := reflect.TypeOf(informerObj)
informer, exists := f.informers[informerType]
if exists {
return informer
}
informer = framework.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return f.client.Core().Namespaces().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return f.client.Core().Namespaces().Watch(options)
},
},
informerObj,
f.defaultResync,
cache.Indexers{},
)
f.informers[informerType] = informer
return informer
}
// Lister returns lister for namespaceInformer
func (f *namespaceInformer) Lister() *cache.IndexerToNamespaceLister {
informer := f.Informer()
return &cache.IndexerToNamespaceLister{Indexer: informer.GetIndexer()}
}
//*****************************************************************************
// NodeInformer is type of SharedIndexInformer which watches and lists all nodes.
// Interface provides constructor for informer and lister for nodes
type NodeInformer interface {
Informer() framework.SharedIndexInformer
Lister() *cache.StoreToNodeLister
}
type nodeInformer struct {
*sharedInformerFactory
}
// Informer checks whether nodeInformer exists in sharedInformerFactory and if not, it creates new informer of type
// nodeInformer and connects it to sharedInformerFactory
func (f *nodeInformer) Informer() framework.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerObj := &api.Node{}
informerType := reflect.TypeOf(informerObj)
informer, exists := f.informers[informerType]
if exists {
return informer
}
informer = framework.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return f.client.Core().Nodes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return f.client.Core().Nodes().Watch(options)
},
},
informerObj,
f.defaultResync,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
f.informers[informerType] = informer
return informer
}
// Lister returns lister for nodeInformer
func (f *nodeInformer) Lister() *cache.StoreToNodeLister {
informer := f.Informer()
return &cache.StoreToNodeLister{Store: informer.GetStore()}
}
//*****************************************************************************
// PVCInformer is type of SharedIndexInformer which watches and lists all persistent volume claims.
// Interface provides constructor for informer and lister for persistent volume claims
type PVCInformer interface {
Informer() framework.SharedIndexInformer
Lister() *cache.StoreToPVCFetcher
}
type pvcInformer struct {
*sharedInformerFactory
}
// Informer checks whether pvcInformer exists in sharedInformerFactory and if not, it creates new informer of type
// pvcInformer and connects it to sharedInformerFactory
func (f *pvcInformer) Informer() framework.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerObj := &api.PersistentVolumeClaim{}
informerType := reflect.TypeOf(informerObj)
informer, exists := f.informers[informerType]
if exists {
return informer
}
informer = framework.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return f.client.Core().PersistentVolumeClaims(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return f.client.Core().PersistentVolumeClaims(api.NamespaceAll).Watch(options)
},
},
informerObj,
f.defaultResync,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
f.informers[informerType] = informer
return informer
}
// Lister returns lister for pvcInformer
func (f *pvcInformer) Lister() *cache.StoreToPVCFetcher {
informer := f.Informer()
return &cache.StoreToPVCFetcher{Store: informer.GetStore()}
}
//*****************************************************************************
// PVInformer is type of SharedIndexInformer which watches and lists all persistent volumes.
// Interface provides constructor for informer and lister for persistent volumes
type PVInformer interface {
Informer() framework.SharedIndexInformer
Lister() *cache.StoreToPVFetcher
}
type pvInformer struct {
*sharedInformerFactory
}
// Informer checks whether pvInformer exists in sharedInformerFactory and if not, it creates new informer of type
// pvInformer and connects it to sharedInformerFactory
func (f *pvInformer) Informer() framework.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerObj := &api.PersistentVolume{}
informerType := reflect.TypeOf(informerObj)
informer, exists := f.informers[informerType]
if exists {
return informer
}
informer = framework.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return f.client.Core().PersistentVolumes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return f.client.Core().PersistentVolumes().Watch(options)
},
},
informerObj,
f.defaultResync,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
f.informers[informerType] = informer
return informer
}
// Lister returns lister for pvInformer
func (f *pvInformer) Lister() *cache.StoreToPVFetcher {
informer := f.Informer()
return &cache.StoreToPVFetcher{Store: informer.GetStore()}
}

View File

@ -17,6 +17,8 @@ limitations under the License.
package informers
import (
"reflect"
"sync"
"time"
"k8s.io/kubernetes/pkg/api"
@ -27,6 +29,70 @@ import (
"k8s.io/kubernetes/pkg/watch"
)
// SharedInformerFactory provides interface which holds unique informers for pods, nodes, namespaces, persistent volume
// claims and persistent volumes
type SharedInformerFactory interface {
// Start starts informers that can start AFTER the API server and controllers have started
Start(stopCh <-chan struct{})
Pods() PodInformer
Nodes() NodeInformer
Namespaces() NamespaceInformer
PersistentVolumeClaims() PVCInformer
PersistentVolumes() PVInformer
}
type sharedInformerFactory struct {
client clientset.Interface
lock sync.Mutex
defaultResync time.Duration
informers map[reflect.Type]framework.SharedIndexInformer
}
// NewSharedInformerFactory constructs a new instance of sharedInformerFactory
func NewSharedInformerFactory(client clientset.Interface, defaultResync time.Duration) SharedInformerFactory {
return &sharedInformerFactory{
client: client,
defaultResync: defaultResync,
informers: make(map[reflect.Type]framework.SharedIndexInformer),
}
}
// Start initializes all requested informers.
func (s *sharedInformerFactory) Start(stopCh <-chan struct{}) {
s.lock.Lock()
defer s.lock.Unlock()
for _, informer := range s.informers {
go informer.Run(stopCh)
}
}
// Pods returns a SharedIndexInformer that lists and watches all pods
func (f *sharedInformerFactory) Pods() PodInformer {
return &podInformer{sharedInformerFactory: f}
}
// Nodes returns a SharedIndexInformer that lists and watches all nodes
func (f *sharedInformerFactory) Nodes() NodeInformer {
return &nodeInformer{sharedInformerFactory: f}
}
// Namespaces returns a SharedIndexInformer that lists and watches all namespaces
func (f *sharedInformerFactory) Namespaces() NamespaceInformer {
return &namespaceInformer{sharedInformerFactory: f}
}
// PersistentVolumeClaims returns a SharedIndexInformer that lists and watches all persistent volume claims
func (f *sharedInformerFactory) PersistentVolumeClaims() PVCInformer {
return &pvcInformer{sharedInformerFactory: f}
}
// PersistentVolumes returns a SharedIndexInformer that lists and watches all persistent volumes
func (f *sharedInformerFactory) PersistentVolumes() PVInformer {
return &pvInformer{sharedInformerFactory: f}
}
// CreateSharedPodInformer returns a SharedIndexInformer that lists and watches all pods
func CreateSharedPodInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer {
sharedInformer := framework.NewSharedIndexInformer(
@ -118,3 +184,21 @@ func CreateSharedPVIndexInformer(client clientset.Interface, resyncPeriod time.D
return sharedIndexInformer
}
// CreateSharedNamespaceIndexInformer returns a SharedIndexInformer that lists and watches namespaces
func CreateSharedNamespaceIndexInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer {
sharedIndexInformer := framework.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().Namespaces().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().Namespaces().Watch(options)
},
},
&api.Namespace{},
resyncPeriod,
cache.Indexers{})
return sharedIndexInformer
}

View File

@ -21,12 +21,11 @@ import (
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"fmt"
"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/pkg/controller/framework/informers"
)
func init() {
@ -40,10 +39,12 @@ func init() {
// It is useful in deployments that do not want to restrict creation of a namespace prior to its usage.
type provision struct {
*admission.Handler
client clientset.Interface
store cache.Store
client clientset.Interface
informerFactory informers.SharedInformerFactory
}
var _ = admission.WantsInformerFactory(&provision{})
func (p *provision) Admit(a admission.Attributes) (err error) {
// if we're here, then we've already passed authentication, so we're allowed to do what we're trying to do
// if we're here, then the API server has found a route, which means that if we have a non-empty namespace
@ -59,7 +60,7 @@ func (p *provision) Admit(a admission.Attributes) (err error) {
},
Status: api.NamespaceStatus{},
}
_, exists, err := p.store.Get(namespace)
_, exists, err := p.informerFactory.Namespaces().Informer().GetStore().Get(namespace)
if err != nil {
return admission.NewForbidden(a, err)
}
@ -75,28 +76,19 @@ func (p *provision) Admit(a admission.Attributes) (err error) {
// NewProvision creates a new namespace provision admission control handler
func NewProvision(c clientset.Interface) admission.Interface {
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
reflector := cache.NewReflector(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return c.Core().Namespaces().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return c.Core().Namespaces().Watch(options)
},
},
&api.Namespace{},
store,
0,
)
reflector.Run()
return createProvision(c, store)
}
func createProvision(c clientset.Interface, store cache.Store) admission.Interface {
return &provision{
Handler: admission.NewHandler(admission.Create),
client: c,
store: store,
}
}
func (p *provision) SetInformerFactory(f informers.SharedInformerFactory) {
p.informerFactory = f
}
func (p *provision) Validate() error {
if p.informerFactory == nil {
return fmt.Errorf("namespace autoprovision plugin needs SharedInformerFactory")
}
return nil
}

View File

@ -18,23 +18,28 @@ package autoprovision
import (
"testing"
"time"
"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/wait"
)
// TestAdmission verifies a namespace is created on create requests for namespace managed resources
func TestAdmission(t *testing.T) {
namespace := "test"
mockClient := &fake.Clientset{}
informerFactory := informers.NewSharedInformerFactory(mockClient, 5*time.Minute)
informerFactory.Namespaces()
informerFactory.Start(wait.NeverStop)
handler := &provision{
client: mockClient,
store: cache.NewStore(cache.MetaNamespaceKeyFunc),
client: mockClient,
informerFactory: informerFactory,
}
pod := api.Pod{
ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace},
@ -60,13 +65,14 @@ func TestAdmission(t *testing.T) {
func TestAdmissionNamespaceExists(t *testing.T) {
namespace := "test"
mockClient := &fake.Clientset{}
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
store.Add(&api.Namespace{
informerFactory := informers.NewSharedInformerFactory(mockClient, 5*time.Minute)
informerFactory.Namespaces().Informer().GetStore().Add(&api.Namespace{
ObjectMeta: api.ObjectMeta{Name: namespace},
})
informerFactory.Start(wait.NeverStop)
handler := &provision{
client: mockClient,
store: store,
client: mockClient,
informerFactory: informerFactory,
}
pod := api.Pod{
ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace},
@ -88,7 +94,7 @@ func TestAdmissionNamespaceExists(t *testing.T) {
func TestIgnoreAdmission(t *testing.T) {
namespace := "test"
mockClient := &fake.Clientset{}
handler := admission.NewChainHandler(createProvision(mockClient, nil))
handler := admission.NewChainHandler(NewProvision(mockClient))
pod := api.Pod{
ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace},
Spec: api.PodSpec{
@ -112,11 +118,12 @@ func TestAdmissionNamespaceExistsUnknownToHandler(t *testing.T) {
mockClient.AddReactor("create", "namespaces", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, errors.NewAlreadyExists(api.Resource("namespaces"), namespace)
})
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
informerFactory := informers.NewSharedInformerFactory(mockClient, 5*time.Minute)
informerFactory.Namespaces()
informerFactory.Start(wait.NeverStop)
handler := &provision{
client: mockClient,
store: store,
client: mockClient,
informerFactory: informerFactory,
}
pod := api.Pod{
ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace},
@ -130,3 +137,17 @@ func TestAdmissionNamespaceExistsUnknownToHandler(t *testing.T) {
t.Errorf("Unexpected error returned from admission handler")
}
}
// TestAdmissionNamespaceValidate
func TestAdmissionNamespaceValidate(t *testing.T) {
mockClient := &fake.Clientset{}
informerFactory := informers.NewSharedInformerFactory(mockClient, 5*time.Minute)
handler := &provision{
client: mockClient,
}
handler.SetInformerFactory(informerFactory)
err := handler.Validate()
if err != nil {
t.Errorf("Failed to initialize informer")
}
}

View File

@ -18,16 +18,14 @@ package exists
import (
"io"
"time"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"fmt"
"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/pkg/controller/framework/informers"
)
func init() {
@ -41,10 +39,12 @@ func init() {
// It is useful in deployments that want to enforce pre-declaration of a Namespace resource.
type exists struct {
*admission.Handler
client clientset.Interface
store cache.Store
client clientset.Interface
informerFactory informers.SharedInformerFactory
}
var _ = admission.WantsInformerFactory(&exists{})
func (e *exists) Admit(a admission.Attributes) (err error) {
// if we're here, then we've already passed authentication, so we're allowed to do what we're trying to do
// if we're here, then the API server has found a route, which means that if we have a non-empty namespace
@ -60,7 +60,7 @@ func (e *exists) Admit(a admission.Attributes) (err error) {
},
Status: api.NamespaceStatus{},
}
_, exists, err := e.store.Get(namespace)
_, exists, err := e.informerFactory.Namespaces().Informer().GetStore().Get(namespace)
if err != nil {
return errors.NewInternalError(err)
}
@ -82,24 +82,19 @@ func (e *exists) Admit(a admission.Attributes) (err error) {
// NewExists creates a new namespace exists admission control handler
func NewExists(c clientset.Interface) admission.Interface {
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
reflector := cache.NewReflector(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return c.Core().Namespaces().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return c.Core().Namespaces().Watch(options)
},
},
&api.Namespace{},
store,
5*time.Minute,
)
reflector.Run()
return &exists{
client: c,
store: store,
Handler: admission.NewHandler(admission.Create, admission.Update, admission.Delete),
}
}
func (e *exists) SetInformerFactory(f informers.SharedInformerFactory) {
e.informerFactory = f
}
func (e *exists) Validate() error {
if e.informerFactory == nil {
return fmt.Errorf("namespace exists plugin needs a namespace informer")
}
return nil
}