Allow shareable resources for admission control plugins

This commit is contained in:
Dominika Hodovska 2016-06-08 15:04:38 +02:00
parent 976ca09d71
commit fc0a3c6dcb
10 changed files with 216 additions and 89 deletions

View File

@ -25,6 +25,7 @@ import (
"net/url"
"strconv"
"strings"
"time"
"github.com/golang/glog"
"github.com/spf13/cobra"
@ -41,6 +42,7 @@ import (
"k8s.io/kubernetes/pkg/apiserver/authenticator"
"k8s.io/kubernetes/pkg/capabilities"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller/framework/informers"
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
"k8s.io/kubernetes/pkg/genericapiserver"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
@ -243,6 +245,11 @@ func Run(s *options.APIServer) error {
if err != nil {
glog.Errorf("Failed to create clientset: %v", err)
}
namespaceInformer := informers.CreateSharedNamespaceIndexInformer(client, 5*time.Minute)
pluginInit := admission.NewPluginInitializer()
pluginInit.SetNamespaceInformer(namespaceInformer)
admissionController := admission.NewFromPlugins(client, admissionControlPluginNames, s.AdmissionControlConfigFile)
genericConfig := genericapiserver.NewConfig(s.ServerRunOptions)

View File

@ -21,6 +21,7 @@ package app
import (
"strings"
"time"
"github.com/golang/glog"
"github.com/spf13/cobra"
@ -31,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apiserver"
"k8s.io/kubernetes/pkg/apiserver/authenticator"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/genericapiserver"
genericoptions "k8s.io/kubernetes/pkg/genericapiserver/options"
"k8s.io/kubernetes/pkg/registry/cachesize"
@ -119,8 +121,11 @@ func Run(s *genericoptions.ServerRunOptions) error {
if err != nil {
glog.Errorf("Failed to create clientset: %v", err)
}
admissionController := admission.NewFromPlugins(client, admissionControlPluginNames, s.AdmissionControlConfigFile)
namespaceInformer := informers.CreateSharedNamespaceIndexInformer(client, 5*time.Minute)
pluginInit := admission.NewPluginInitializer()
pluginInit.SetNamespaceInformer(namespaceInformer)
admissionController := admission.NewFromPlugins(client, admissionControlPluginNames, s.AdmissionControlConfigFile)
genericConfig := genericapiserver.NewConfig(s)
// TODO: Move the following to generic api server as well.
genericConfig.StorageFactory = storageFactory

71
pkg/admission/init.go Normal file
View File

@ -0,0 +1,71 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package admission
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/controller/framework"
"reflect"
)
// PluginInitializer is used for Initialization of shareable resources between admission plugins
// After Initialization the resources have to be set separately
type PluginInitializer interface {
Initialize(plugins []Interface)
SetNamespaceInformer(namespaceInformer framework.SharedIndexInformer)
}
type pluginInitializer struct {
informers map[reflect.Type]framework.SharedIndexInformer
}
// NewPluginInitializer constructs new instance of PluginInitializer
func NewPluginInitializer() PluginInitializer {
plugInit := &pluginInitializer{
informers: make(map[reflect.Type]framework.SharedIndexInformer),
}
return plugInit
}
// SetNamespaceInformer sets unique namespaceInformer for instance of PluginInitializer
func (i *pluginInitializer) SetNamespaceInformer(namespaceInformer framework.SharedIndexInformer) {
i.informers[reflect.TypeOf(&api.Namespace{})] = namespaceInformer
}
// Initialize will check the initialization interfaces implemented by each plugin
// and provide the appropriate initialization data
func (i *pluginInitializer) Initialize(plugins []Interface) {
for _, plugin := range plugins {
if wantsNamespaceInformer, ok := plugin.(WantsNamespaceInformer); ok {
wantsNamespaceInformer.SetNamespaceInformer(i.informers[reflect.TypeOf(&api.Namespace{})])
}
}
}
// Validate will call the Validate function in each plugin if they implement
// the Validator interface.
func Validate(plugins []Interface) error {
for _, plugin := range plugins {
if validater, ok := plugin.(Validator); ok {
err := validater.Validate()
if err != nil {
return err
}
}
}
return nil
}

30
pkg/admission/types.go Normal file
View File

@ -0,0 +1,30 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package admission
import "k8s.io/kubernetes/pkg/controller/framework"
// Validator holds Validate functions, which are responsible for validation of initialized shared resources
// and should be implemented on admission plugins
type Validator interface {
Validate() error
}
// WantsNamespaceInformer defines a function witch sets NamespaceInformer for admission plugins that need it
type WantsNamespaceInformer interface {
SetNamespaceInformer(framework.SharedIndexInformer)
}

View File

@ -118,3 +118,21 @@ func CreateSharedPVIndexInformer(client clientset.Interface, resyncPeriod time.D
return sharedIndexInformer
}
// CreateSharedNamespaceIndexInformer returns a SharedIndexInformer that lists and watches namespaces
func CreateSharedNamespaceIndexInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer {
sharedIndexInformer := framework.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().Namespaces().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().Namespaces().Watch(options)
},
},
&api.Namespace{},
resyncPeriod,
cache.Indexers{})
return sharedIndexInformer
}

View File

@ -21,12 +21,11 @@ import (
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"fmt"
"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/pkg/controller/framework"
)
func init() {
@ -40,10 +39,12 @@ func init() {
// It is useful in deployments that do not want to restrict creation of a namespace prior to its usage.
type provision struct {
*admission.Handler
client clientset.Interface
store cache.Store
client clientset.Interface
informer framework.SharedIndexInformer
}
var _ = admission.WantsNamespaceInformer(&provision{})
func (p *provision) Admit(a admission.Attributes) (err error) {
// if we're here, then we've already passed authentication, so we're allowed to do what we're trying to do
// if we're here, then the API server has found a route, which means that if we have a non-empty namespace
@ -59,7 +60,7 @@ func (p *provision) Admit(a admission.Attributes) (err error) {
},
Status: api.NamespaceStatus{},
}
_, exists, err := p.store.Get(namespace)
_, exists, err := p.informer.GetStore().Get(namespace)
if err != nil {
return admission.NewForbidden(a, err)
}
@ -75,28 +76,19 @@ func (p *provision) Admit(a admission.Attributes) (err error) {
// NewProvision creates a new namespace provision admission control handler
func NewProvision(c clientset.Interface) admission.Interface {
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
reflector := cache.NewReflector(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return c.Core().Namespaces().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return c.Core().Namespaces().Watch(options)
},
},
&api.Namespace{},
store,
0,
)
reflector.Run()
return createProvision(c, store)
}
func createProvision(c clientset.Interface, store cache.Store) admission.Interface {
return &provision{
Handler: admission.NewHandler(admission.Create),
client: c,
store: store,
}
}
func (p *provision) SetNamespaceInformer(c framework.SharedIndexInformer) {
p.informer = c
}
func (p *provision) Validate() error {
if p.informer == nil {
return fmt.Errorf("namespace autoprovision plugin needs a namespace informer")
}
return nil
}

View File

@ -18,13 +18,14 @@ package autoprovision
import (
"testing"
"time"
"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/runtime"
)
@ -33,8 +34,8 @@ func TestAdmission(t *testing.T) {
namespace := "test"
mockClient := &fake.Clientset{}
handler := &provision{
client: mockClient,
store: cache.NewStore(cache.MetaNamespaceKeyFunc),
client: mockClient,
informer: informers.CreateSharedNamespaceIndexInformer(mockClient, 5*time.Minute),
}
pod := api.Pod{
ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace},
@ -60,13 +61,13 @@ func TestAdmission(t *testing.T) {
func TestAdmissionNamespaceExists(t *testing.T) {
namespace := "test"
mockClient := &fake.Clientset{}
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
store.Add(&api.Namespace{
informer := informers.CreateSharedNamespaceIndexInformer(mockClient, 5*time.Minute)
informer.GetStore().Add(&api.Namespace{
ObjectMeta: api.ObjectMeta{Name: namespace},
})
handler := &provision{
client: mockClient,
store: store,
client: mockClient,
informer: informer,
}
pod := api.Pod{
ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace},
@ -88,7 +89,7 @@ func TestAdmissionNamespaceExists(t *testing.T) {
func TestIgnoreAdmission(t *testing.T) {
namespace := "test"
mockClient := &fake.Clientset{}
handler := admission.NewChainHandler(createProvision(mockClient, nil))
handler := admission.NewChainHandler(NewProvision(mockClient))
pod := api.Pod{
ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace},
Spec: api.PodSpec{
@ -113,10 +114,9 @@ func TestAdmissionNamespaceExistsUnknownToHandler(t *testing.T) {
return true, nil, errors.NewAlreadyExists(api.Resource("namespaces"), namespace)
})
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
handler := &provision{
client: mockClient,
store: store,
client: mockClient,
informer: informers.CreateSharedNamespaceIndexInformer(mockClient, 5*time.Minute),
}
pod := api.Pod{
ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace},
@ -130,3 +130,17 @@ func TestAdmissionNamespaceExistsUnknownToHandler(t *testing.T) {
t.Errorf("Unexpected error returned from admission handler")
}
}
// TestAdmissionNamespaceValidate
func TestAdmissionNamespaceValidate(t *testing.T) {
mockClient := &fake.Clientset{}
informer := informers.CreateSharedNamespaceIndexInformer(mockClient, 5*time.Minute)
handler := &provision{
client: mockClient,
}
handler.SetNamespaceInformer(informer)
err := handler.Validate()
if err != nil {
t.Errorf("Failed to initialize informer")
}
}

View File

@ -18,16 +18,14 @@ package exists
import (
"io"
"time"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"fmt"
"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/pkg/controller/framework"
)
func init() {
@ -41,10 +39,12 @@ func init() {
// It is useful in deployments that want to enforce pre-declaration of a Namespace resource.
type exists struct {
*admission.Handler
client clientset.Interface
store cache.Store
client clientset.Interface
informer framework.SharedIndexInformer
}
var _ = admission.WantsNamespaceInformer(&exists{})
func (e *exists) Admit(a admission.Attributes) (err error) {
// if we're here, then we've already passed authentication, so we're allowed to do what we're trying to do
// if we're here, then the API server has found a route, which means that if we have a non-empty namespace
@ -60,7 +60,7 @@ func (e *exists) Admit(a admission.Attributes) (err error) {
},
Status: api.NamespaceStatus{},
}
_, exists, err := e.store.Get(namespace)
_, exists, err := e.informer.GetStore().Get(namespace)
if err != nil {
return errors.NewInternalError(err)
}
@ -82,24 +82,19 @@ func (e *exists) Admit(a admission.Attributes) (err error) {
// NewExists creates a new namespace exists admission control handler
func NewExists(c clientset.Interface) admission.Interface {
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
reflector := cache.NewReflector(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return c.Core().Namespaces().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return c.Core().Namespaces().Watch(options)
},
},
&api.Namespace{},
store,
5*time.Minute,
)
reflector.Run()
return &exists{
client: c,
store: store,
Handler: admission.NewHandler(admission.Create, admission.Update, admission.Delete),
}
}
func (e *exists) SetNamespaceInformer(c framework.SharedIndexInformer) {
e.informer = c
}
func (e *exists) Validate() error {
if e.informer == nil {
return fmt.Errorf("namespace exists plugin needs a namespace informer")
}
return nil
}

View File

@ -19,17 +19,14 @@ package lifecycle
import (
"fmt"
"io"
"time"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/watch"
)
const PluginName = "NamespaceLifecycle"
@ -45,10 +42,12 @@ func init() {
type lifecycle struct {
*admission.Handler
client clientset.Interface
store cache.Store
informer framework.SharedIndexInformer
immortalNamespaces sets.String
}
var _ = admission.WantsNamespaceInformer(&lifecycle{})
func (l *lifecycle) Admit(a admission.Attributes) (err error) {
// prevent deletion of immortal namespaces
if a.GetOperation() == admission.Delete && a.GetKind().GroupKind() == api.Kind("Namespace") && l.immortalNamespaces.Has(a.GetName()) {
@ -65,7 +64,7 @@ func (l *lifecycle) Admit(a admission.Attributes) (err error) {
// this will cause a live lookup of the namespace to get its latest state even
// before the watch notification is received.
if a.GetOperation() == admission.Delete {
l.store.Delete(&api.Namespace{
l.informer.GetStore().Delete(&api.Namespace{
ObjectMeta: api.ObjectMeta{
Name: a.GetName(),
},
@ -74,7 +73,7 @@ func (l *lifecycle) Admit(a admission.Attributes) (err error) {
return nil
}
namespaceObj, exists, err := l.store.Get(&api.Namespace{
namespaceObj, exists, err := l.informer.GetStore().Get(&api.Namespace{
ObjectMeta: api.ObjectMeta{
Name: a.GetNamespace(),
Namespace: "",
@ -112,25 +111,19 @@ func (l *lifecycle) Admit(a admission.Attributes) (err error) {
// NewLifecycle creates a new namespace lifecycle admission control handler
func NewLifecycle(c clientset.Interface, immortalNamespaces sets.String) admission.Interface {
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
reflector := cache.NewReflector(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return c.Core().Namespaces().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return c.Core().Namespaces().Watch(options)
},
},
&api.Namespace{},
store,
5*time.Minute,
)
reflector.Run()
return &lifecycle{
Handler: admission.NewHandler(admission.Create, admission.Update, admission.Delete),
client: c,
store: store,
immortalNamespaces: immortalNamespaces,
}
}
func (l *lifecycle) SetNamespaceInformer(c framework.SharedIndexInformer) {
l.informer = c
}
func (l *lifecycle) Validate() error {
if l.informer == nil {
return fmt.Errorf("namespace lifecycle plugin needs a namespace informer")
}
return nil
}

View File

@ -20,13 +20,14 @@ import (
"fmt"
"sync"
"testing"
"time"
"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/sets"
)
@ -44,9 +45,10 @@ func TestAdmission(t *testing.T) {
}
var namespaceLock sync.RWMutex
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
store.Add(namespaceObj)
mockClient := fake.NewSimpleClientset()
informer := informers.CreateSharedNamespaceIndexInformer(mockClient, 5*time.Minute)
informer.GetStore().Add(namespaceObj)
mockClient.PrependReactor("get", "namespaces", func(action core.Action) (bool, runtime.Object, error) {
namespaceLock.RLock()
defer namespaceLock.RUnlock()
@ -62,7 +64,7 @@ func TestAdmission(t *testing.T) {
})
lfhandler := NewLifecycle(mockClient, sets.NewString("default")).(*lifecycle)
lfhandler.store = store
lfhandler.informer = informer
handler := admission.NewChainHandler(lfhandler)
pod := api.Pod{
ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespaceObj.Name},
@ -87,7 +89,7 @@ func TestAdmission(t *testing.T) {
namespaceLock.Lock()
namespaceObj.Status.Phase = api.NamespaceTerminating
namespaceLock.Unlock()
store.Add(namespaceObj)
informer.GetStore().Add(namespaceObj)
// verify create operations in the namespace cause an error
err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))