Factory for SharedIndexInformers

This commit is contained in:
Dominika Hodovska 2016-06-30 10:50:41 +02:00
parent ba40a528e1
commit 037d116add
12 changed files with 449 additions and 85 deletions

View File

@ -58,6 +58,7 @@ import (
"k8s.io/kubernetes/pkg/registry/rolebinding"
rolebindingetcd "k8s.io/kubernetes/pkg/registry/rolebinding/etcd"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/util/wait"
)
// NewAPIServerCommand creates a *cobra.Command object with default parameters
@ -245,14 +246,12 @@ func Run(s *options.APIServer) error {
if err != nil {
glog.Errorf("Failed to create clientset: %v", err)
}
sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute)
pluginInitializer := admission.NewPluginInitializer(sharedInformers)
namespaceInformer := informers.CreateSharedNamespaceIndexInformer(client, 5*time.Minute)
pluginInit := admission.NewPluginInitializer()
pluginInit.SetNamespaceInformer(namespaceInformer)
admissionController, err := admission.NewFromPlugins(client, admissionControlPluginNames, s.AdmissionControlConfigFile, pluginInit)
admissionController, err := admission.NewFromPlugins(client, admissionControlPluginNames, s.AdmissionControlConfigFile, pluginInitializer)
if err != nil {
glog.Errorf("Failed to initialize plugins: %v", err)
glog.Fatalf("Failed to initialize plugins: %v", err)
}
genericConfig := genericapiserver.NewConfig(s.ServerRunOptions)
@ -288,6 +287,7 @@ func Run(s *options.APIServer) error {
return err
}
sharedInformers.Start(wait.NeverStop)
m.Run(s.ServerRunOptions)
return nil
}

View File

@ -37,6 +37,7 @@ import (
genericoptions "k8s.io/kubernetes/pkg/genericapiserver/options"
"k8s.io/kubernetes/pkg/registry/cachesize"
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/util/wait"
)
// NewAPIServerCommand creates a *cobra.Command object with default parameters
@ -121,13 +122,12 @@ func Run(s *genericoptions.ServerRunOptions) error {
if err != nil {
glog.Errorf("Failed to create clientset: %v", err)
}
namespaceInformer := informers.CreateSharedNamespaceIndexInformer(client, 5*time.Minute)
pluginInit := admission.NewPluginInitializer()
pluginInit.SetNamespaceInformer(namespaceInformer)
sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute)
pluginInitializer := admission.NewPluginInitializer(sharedInformers)
admissionController, err := admission.NewFromPlugins(client, admissionControlPluginNames, s.AdmissionControlConfigFile, pluginInit)
admissionController, err := admission.NewFromPlugins(client, admissionControlPluginNames, s.AdmissionControlConfigFile, pluginInitializer)
if err != nil {
glog.Errorf("Failed to initialize plugins: %v", err)
glog.Fatalf("Failed to initialize plugins: %v", err)
}
genericConfig := genericapiserver.NewConfig(s)
// TODO: Move the following to generic api server as well.
@ -154,6 +154,7 @@ func Run(s *genericoptions.ServerRunOptions) error {
installCoreAPIs(s, m, storageFactory)
installExtensionsAPIs(s, m, storageFactory)
sharedInformers.Start(wait.NeverStop)
m.Run(s)
return nil
}

View File

@ -1,5 +1,5 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -17,41 +17,33 @@ limitations under the License.
package admission
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/controller/framework"
"reflect"
"k8s.io/kubernetes/pkg/controller/framework/informers"
)
// PluginInitializer is used for Initialization of shareable resources between admission plugins
// After Initialization the resources have to be set separately
// PluginInitializer is used for initialization of shareable resources between admission plugins.
// After initialization the resources have to be set separately
type PluginInitializer interface {
Initialize(plugins []Interface)
SetNamespaceInformer(namespaceInformer framework.SharedIndexInformer)
}
type pluginInitializer struct {
informers map[reflect.Type]framework.SharedIndexInformer
informers informers.SharedInformerFactory
}
// NewPluginInitializer constructs new instance of PluginInitializer
func NewPluginInitializer() PluginInitializer {
func NewPluginInitializer(sharedInformers informers.SharedInformerFactory) PluginInitializer {
plugInit := &pluginInitializer{
informers: make(map[reflect.Type]framework.SharedIndexInformer),
informers: sharedInformers,
}
return plugInit
}
// SetNamespaceInformer sets unique namespaceInformer for instance of PluginInitializer
func (i *pluginInitializer) SetNamespaceInformer(namespaceInformer framework.SharedIndexInformer) {
i.informers[reflect.TypeOf(&api.Namespace{})] = namespaceInformer
}
// Initialize will check the initialization interfaces implemented by each plugin
// Initialize checks the initialization interfaces implemented by each plugin
// and provide the appropriate initialization data
func (i *pluginInitializer) Initialize(plugins []Interface) {
for _, plugin := range plugins {
if wantsNamespaceInformer, ok := plugin.(WantsNamespaceInformer); ok {
wantsNamespaceInformer.SetNamespaceInformer(i.informers[reflect.TypeOf(&api.Namespace{})])
if wantsInformerFactory, ok := plugin.(WantsInformerFactory); ok {
wantsInformerFactory.SetInformerFactory(i.informers)
}
}
}

View File

@ -1,5 +1,5 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -16,7 +16,9 @@ limitations under the License.
package admission
import "k8s.io/kubernetes/pkg/controller/framework"
import (
"k8s.io/kubernetes/pkg/controller/framework/informers"
)
// Validator holds Validate functions, which are responsible for validation of initialized shared resources
// and should be implemented on admission plugins
@ -24,7 +26,8 @@ type Validator interface {
Validate() error
}
// WantsNamespaceInformer defines a function witch sets NamespaceInformer for admission plugins that need it
type WantsNamespaceInformer interface {
SetNamespaceInformer(framework.SharedIndexInformer)
// WantsNamespaceInformer defines a function which sets NamespaceInformer for admission plugins that need it
type WantsInformerFactory interface {
SetInformerFactory(informers.SharedInformerFactory)
Validator
}

View File

@ -693,3 +693,20 @@ func (s *StoreToCertificateRequestLister) List() (csrs certificates.CertificateS
}
return csrs, nil
}
// IndexerToNamespaceLister gives an Indexer List method
type IndexerToNamespaceLister struct {
Indexer
}
// List returns a list of namespaces
func (i *IndexerToNamespaceLister) List(selector labels.Selector) (namespaces []*api.Namespace, err error) {
for _, m := range i.Indexer.List() {
namespace := m.(*api.Namespace)
if selector.Matches(labels.Set(namespace.Labels)) {
namespaces = append(namespaces, namespace)
}
}
return namespaces, nil
}

View File

@ -0,0 +1,273 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package informers
import (
"reflect"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
)
// PodInformer is type of SharedIndexInformer which watches and lists all pods.
// Interface provides constructor for informer and lister for pods
type PodInformer interface {
Informer() framework.SharedIndexInformer
Lister() *cache.StoreToPodLister
}
type podInformer struct {
*sharedInformerFactory
}
// Informer checks whether podInformer exists in sharedInformerFactory and if not, it creates new informer of type
// podInformer and connects it to sharedInformerFactory
func (f *podInformer) Informer() framework.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerObj := &api.Pod{}
informerType := reflect.TypeOf(informerObj)
informer, exists := f.informers[informerType]
if exists {
return informer
}
informer = framework.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return f.client.Core().Pods(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return f.client.Core().Pods(api.NamespaceAll).Watch(options)
},
},
informerObj,
f.defaultResync,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
f.informers[informerType] = informer
return informer
}
// Lister returns lister for podInformer
func (f *podInformer) Lister() *cache.StoreToPodLister {
informer := f.Informer()
return &cache.StoreToPodLister{Indexer: informer.GetIndexer()}
}
//*****************************************************************************
// NamespaceInformer is type of SharedIndexInformer which watches and lists all namespaces.
// Interface provides constructor for informer and lister for namsespaces
type NamespaceInformer interface {
Informer() framework.SharedIndexInformer
Lister() *cache.IndexerToNamespaceLister
}
type namespaceInformer struct {
*sharedInformerFactory
}
// Informer checks whether namespaceInformer exists in sharedInformerFactory and if not, it creates new informer of type
// namespaceInformer and connects it to sharedInformerFactory
func (f *namespaceInformer) Informer() framework.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerObj := &api.Namespace{}
informerType := reflect.TypeOf(informerObj)
informer, exists := f.informers[informerType]
if exists {
return informer
}
informer = framework.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return f.client.Core().Namespaces().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return f.client.Core().Namespaces().Watch(options)
},
},
informerObj,
f.defaultResync,
cache.Indexers{},
)
f.informers[informerType] = informer
return informer
}
// Lister returns lister for namespaceInformer
func (f *namespaceInformer) Lister() *cache.IndexerToNamespaceLister {
informer := f.Informer()
return &cache.IndexerToNamespaceLister{Indexer: informer.GetIndexer()}
}
//*****************************************************************************
// NodeInformer is type of SharedIndexInformer which watches and lists all nodes.
// Interface provides constructor for informer and lister for nodes
type NodeInformer interface {
Informer() framework.SharedIndexInformer
Lister() *cache.StoreToNodeLister
}
type nodeInformer struct {
*sharedInformerFactory
}
// Informer checks whether nodeInformer exists in sharedInformerFactory and if not, it creates new informer of type
// nodeInformer and connects it to sharedInformerFactory
func (f *nodeInformer) Informer() framework.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerObj := &api.Node{}
informerType := reflect.TypeOf(informerObj)
informer, exists := f.informers[informerType]
if exists {
return informer
}
informer = framework.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return f.client.Core().Nodes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return f.client.Core().Nodes().Watch(options)
},
},
informerObj,
f.defaultResync,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
f.informers[informerType] = informer
return informer
}
// Lister returns lister for nodeInformer
func (f *nodeInformer) Lister() *cache.StoreToNodeLister {
informer := f.Informer()
return &cache.StoreToNodeLister{Store: informer.GetStore()}
}
//*****************************************************************************
// PVCInformer is type of SharedIndexInformer which watches and lists all persistent volume claims.
// Interface provides constructor for informer and lister for persistent volume claims
type PVCInformer interface {
Informer() framework.SharedIndexInformer
Lister() *cache.StoreToPVCFetcher
}
type pvcInformer struct {
*sharedInformerFactory
}
// Informer checks whether pvcInformer exists in sharedInformerFactory and if not, it creates new informer of type
// pvcInformer and connects it to sharedInformerFactory
func (f *pvcInformer) Informer() framework.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerObj := &api.PersistentVolumeClaim{}
informerType := reflect.TypeOf(informerObj)
informer, exists := f.informers[informerType]
if exists {
return informer
}
informer = framework.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return f.client.Core().PersistentVolumeClaims(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return f.client.Core().PersistentVolumeClaims(api.NamespaceAll).Watch(options)
},
},
informerObj,
f.defaultResync,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
f.informers[informerType] = informer
return informer
}
// Lister returns lister for pvcInformer
func (f *pvcInformer) Lister() *cache.StoreToPVCFetcher {
informer := f.Informer()
return &cache.StoreToPVCFetcher{Store: informer.GetStore()}
}
//*****************************************************************************
// PVInformer is type of SharedIndexInformer which watches and lists all persistent volumes.
// Interface provides constructor for informer and lister for persistent volumes
type PVInformer interface {
Informer() framework.SharedIndexInformer
Lister() *cache.StoreToPVFetcher
}
type pvInformer struct {
*sharedInformerFactory
}
// Informer checks whether pvInformer exists in sharedInformerFactory and if not, it creates new informer of type
// pvInformer and connects it to sharedInformerFactory
func (f *pvInformer) Informer() framework.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerObj := &api.PersistentVolume{}
informerType := reflect.TypeOf(informerObj)
informer, exists := f.informers[informerType]
if exists {
return informer
}
informer = framework.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return f.client.Core().PersistentVolumes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return f.client.Core().PersistentVolumes().Watch(options)
},
},
informerObj,
f.defaultResync,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
f.informers[informerType] = informer
return informer
}
// Lister returns lister for pvInformer
func (f *pvInformer) Lister() *cache.StoreToPVFetcher {
informer := f.Informer()
return &cache.StoreToPVFetcher{Store: informer.GetStore()}
}

View File

@ -17,6 +17,8 @@ limitations under the License.
package informers
import (
"reflect"
"sync"
"time"
"k8s.io/kubernetes/pkg/api"
@ -27,6 +29,70 @@ import (
"k8s.io/kubernetes/pkg/watch"
)
// SharedInformerFactory provides interface which holds unique informers for pods, nodes, namespaces, persistent volume
// claims and persistent volumes
type SharedInformerFactory interface {
// Start starts informers that can start AFTER the API server and controllers have started
Start(stopCh <-chan struct{})
Pods() PodInformer
Nodes() NodeInformer
Namespaces() NamespaceInformer
PersistentVolumeClaims() PVCInformer
PersistentVolumes() PVInformer
}
type sharedInformerFactory struct {
client clientset.Interface
lock sync.Mutex
defaultResync time.Duration
informers map[reflect.Type]framework.SharedIndexInformer
}
// NewSharedInformerFactory constructs a new instance of sharedInformerFactory
func NewSharedInformerFactory(client clientset.Interface, defaultResync time.Duration) SharedInformerFactory {
return &sharedInformerFactory{
client: client,
defaultResync: defaultResync,
informers: make(map[reflect.Type]framework.SharedIndexInformer),
}
}
// Start initializes all requested informers.
func (s *sharedInformerFactory) Start(stopCh <-chan struct{}) {
s.lock.Lock()
defer s.lock.Unlock()
for _, informer := range s.informers {
go informer.Run(stopCh)
}
}
// Pods returns a SharedIndexInformer that lists and watches all pods
func (f *sharedInformerFactory) Pods() PodInformer {
return &podInformer{sharedInformerFactory: f}
}
// Nodes returns a SharedIndexInformer that lists and watches all nodes
func (f *sharedInformerFactory) Nodes() NodeInformer {
return &nodeInformer{sharedInformerFactory: f}
}
// Namespaces returns a SharedIndexInformer that lists and watches all namespaces
func (f *sharedInformerFactory) Namespaces() NamespaceInformer {
return &namespaceInformer{sharedInformerFactory: f}
}
// PersistentVolumeClaims returns a SharedIndexInformer that lists and watches all persistent volume claims
func (f *sharedInformerFactory) PersistentVolumeClaims() PVCInformer {
return &pvcInformer{sharedInformerFactory: f}
}
// PersistentVolumes returns a SharedIndexInformer that lists and watches all persistent volumes
func (f *sharedInformerFactory) PersistentVolumes() PVInformer {
return &pvInformer{sharedInformerFactory: f}
}
// CreateSharedPodInformer returns a SharedIndexInformer that lists and watches all pods
func CreateSharedPodInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer {
sharedInformer := framework.NewSharedIndexInformer(

View File

@ -25,7 +25,7 @@ import (
"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/framework/informers"
)
func init() {
@ -39,11 +39,11 @@ func init() {
// It is useful in deployments that do not want to restrict creation of a namespace prior to its usage.
type provision struct {
*admission.Handler
client clientset.Interface
informer framework.SharedIndexInformer
client clientset.Interface
informerFactory informers.SharedInformerFactory
}
var _ = admission.WantsNamespaceInformer(&provision{})
var _ = admission.WantsInformerFactory(&provision{})
func (p *provision) Admit(a admission.Attributes) (err error) {
// if we're here, then we've already passed authentication, so we're allowed to do what we're trying to do
@ -60,7 +60,7 @@ func (p *provision) Admit(a admission.Attributes) (err error) {
},
Status: api.NamespaceStatus{},
}
_, exists, err := p.informer.GetStore().Get(namespace)
_, exists, err := p.informerFactory.Namespaces().Informer().GetStore().Get(namespace)
if err != nil {
return admission.NewForbidden(a, err)
}
@ -82,13 +82,13 @@ func NewProvision(c clientset.Interface) admission.Interface {
}
}
func (p *provision) SetNamespaceInformer(c framework.SharedIndexInformer) {
p.informer = c
func (p *provision) SetInformerFactory(f informers.SharedInformerFactory) {
p.informerFactory = f
}
func (p *provision) Validate() error {
if p.informer == nil {
return fmt.Errorf("namespace autoprovision plugin needs a namespace informer")
if p.informerFactory == nil {
return fmt.Errorf("namespace autoprovision plugin needs SharedInformerFactory")
}
return nil
}

View File

@ -27,15 +27,19 @@ import (
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/wait"
)
// TestAdmission verifies a namespace is created on create requests for namespace managed resources
func TestAdmission(t *testing.T) {
namespace := "test"
mockClient := &fake.Clientset{}
informerFactory := informers.NewSharedInformerFactory(mockClient, 5*time.Minute)
informerFactory.Namespaces()
informerFactory.Start(wait.NeverStop)
handler := &provision{
client: mockClient,
informer: informers.CreateSharedNamespaceIndexInformer(mockClient, 5*time.Minute),
client: mockClient,
informerFactory: informerFactory,
}
pod := api.Pod{
ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace},
@ -61,13 +65,14 @@ func TestAdmission(t *testing.T) {
func TestAdmissionNamespaceExists(t *testing.T) {
namespace := "test"
mockClient := &fake.Clientset{}
informer := informers.CreateSharedNamespaceIndexInformer(mockClient, 5*time.Minute)
informer.GetStore().Add(&api.Namespace{
informerFactory := informers.NewSharedInformerFactory(mockClient, 5*time.Minute)
informerFactory.Namespaces().Informer().GetStore().Add(&api.Namespace{
ObjectMeta: api.ObjectMeta{Name: namespace},
})
informerFactory.Start(wait.NeverStop)
handler := &provision{
client: mockClient,
informer: informer,
client: mockClient,
informerFactory: informerFactory,
}
pod := api.Pod{
ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace},
@ -113,10 +118,12 @@ func TestAdmissionNamespaceExistsUnknownToHandler(t *testing.T) {
mockClient.AddReactor("create", "namespaces", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, errors.NewAlreadyExists(api.Resource("namespaces"), namespace)
})
informerFactory := informers.NewSharedInformerFactory(mockClient, 5*time.Minute)
informerFactory.Namespaces()
informerFactory.Start(wait.NeverStop)
handler := &provision{
client: mockClient,
informer: informers.CreateSharedNamespaceIndexInformer(mockClient, 5*time.Minute),
client: mockClient,
informerFactory: informerFactory,
}
pod := api.Pod{
ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace},
@ -134,11 +141,11 @@ func TestAdmissionNamespaceExistsUnknownToHandler(t *testing.T) {
// TestAdmissionNamespaceValidate
func TestAdmissionNamespaceValidate(t *testing.T) {
mockClient := &fake.Clientset{}
informer := informers.CreateSharedNamespaceIndexInformer(mockClient, 5*time.Minute)
informerFactory := informers.NewSharedInformerFactory(mockClient, 5*time.Minute)
handler := &provision{
client: mockClient,
}
handler.SetNamespaceInformer(informer)
handler.SetInformerFactory(informerFactory)
err := handler.Validate()
if err != nil {
t.Errorf("Failed to initialize informer")

View File

@ -25,7 +25,7 @@ import (
"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/framework/informers"
)
func init() {
@ -39,11 +39,11 @@ func init() {
// It is useful in deployments that want to enforce pre-declaration of a Namespace resource.
type exists struct {
*admission.Handler
client clientset.Interface
informer framework.SharedIndexInformer
client clientset.Interface
informerFactory informers.SharedInformerFactory
}
var _ = admission.WantsNamespaceInformer(&exists{})
var _ = admission.WantsInformerFactory(&exists{})
func (e *exists) Admit(a admission.Attributes) (err error) {
// if we're here, then we've already passed authentication, so we're allowed to do what we're trying to do
@ -60,7 +60,7 @@ func (e *exists) Admit(a admission.Attributes) (err error) {
},
Status: api.NamespaceStatus{},
}
_, exists, err := e.informer.GetStore().Get(namespace)
_, exists, err := e.informerFactory.Namespaces().Informer().GetStore().Get(namespace)
if err != nil {
return errors.NewInternalError(err)
}
@ -88,12 +88,12 @@ func NewExists(c clientset.Interface) admission.Interface {
}
}
func (e *exists) SetNamespaceInformer(c framework.SharedIndexInformer) {
e.informer = c
func (e *exists) SetInformerFactory(f informers.SharedInformerFactory) {
e.informerFactory = f
}
func (e *exists) Validate() error {
if e.informer == nil {
if e.informerFactory == nil {
return fmt.Errorf("namespace exists plugin needs a namespace informer")
}
return nil

View File

@ -19,14 +19,17 @@ package lifecycle
import (
"fmt"
"io"
"time"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/watch"
)
const PluginName = "NamespaceLifecycle"
@ -42,12 +45,10 @@ func init() {
type lifecycle struct {
*admission.Handler
client clientset.Interface
informer framework.SharedIndexInformer
store cache.Store
immortalNamespaces sets.String
}
var _ = admission.WantsNamespaceInformer(&lifecycle{})
func (l *lifecycle) Admit(a admission.Attributes) (err error) {
// prevent deletion of immortal namespaces
if a.GetOperation() == admission.Delete && a.GetKind().GroupKind() == api.Kind("Namespace") && l.immortalNamespaces.Has(a.GetName()) {
@ -64,7 +65,7 @@ func (l *lifecycle) Admit(a admission.Attributes) (err error) {
// this will cause a live lookup of the namespace to get its latest state even
// before the watch notification is received.
if a.GetOperation() == admission.Delete {
l.informer.GetStore().Delete(&api.Namespace{
l.store.Delete(&api.Namespace{
ObjectMeta: api.ObjectMeta{
Name: a.GetName(),
},
@ -73,7 +74,7 @@ func (l *lifecycle) Admit(a admission.Attributes) (err error) {
return nil
}
namespaceObj, exists, err := l.informer.GetStore().Get(&api.Namespace{
namespaceObj, exists, err := l.store.Get(&api.Namespace{
ObjectMeta: api.ObjectMeta{
Name: a.GetNamespace(),
Namespace: "",
@ -111,19 +112,25 @@ func (l *lifecycle) Admit(a admission.Attributes) (err error) {
// NewLifecycle creates a new namespace lifecycle admission control handler
func NewLifecycle(c clientset.Interface, immortalNamespaces sets.String) admission.Interface {
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
reflector := cache.NewReflector(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return c.Core().Namespaces().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return c.Core().Namespaces().Watch(options)
},
},
&api.Namespace{},
store,
5*time.Minute,
)
reflector.Run()
return &lifecycle{
Handler: admission.NewHandler(admission.Create, admission.Update, admission.Delete),
client: c,
store: store,
immortalNamespaces: immortalNamespaces,
}
}
func (l *lifecycle) SetNamespaceInformer(c framework.SharedIndexInformer) {
l.informer = c
}
func (l *lifecycle) Validate() error {
if l.informer == nil {
return fmt.Errorf("namespace lifecycle plugin needs a namespace informer")
}
return nil
}

View File

@ -20,14 +20,13 @@ import (
"fmt"
"sync"
"testing"
"time"
"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/sets"
)
@ -45,10 +44,9 @@ func TestAdmission(t *testing.T) {
}
var namespaceLock sync.RWMutex
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
store.Add(namespaceObj)
mockClient := fake.NewSimpleClientset()
informer := informers.CreateSharedNamespaceIndexInformer(mockClient, 5*time.Minute)
informer.GetStore().Add(namespaceObj)
mockClient.PrependReactor("get", "namespaces", func(action core.Action) (bool, runtime.Object, error) {
namespaceLock.RLock()
defer namespaceLock.RUnlock()
@ -64,7 +62,7 @@ func TestAdmission(t *testing.T) {
})
lfhandler := NewLifecycle(mockClient, sets.NewString("default")).(*lifecycle)
lfhandler.informer = informer
lfhandler.store = store
handler := admission.NewChainHandler(lfhandler)
pod := api.Pod{
ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespaceObj.Name},
@ -89,7 +87,7 @@ func TestAdmission(t *testing.T) {
namespaceLock.Lock()
namespaceObj.Status.Phase = api.NamespaceTerminating
namespaceLock.Unlock()
informer.GetStore().Add(namespaceObj)
store.Add(namespaceObj)
// verify create operations in the namespace cause an error
err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))