From f2875274423dac61293069f79eddf1c397e7376a Mon Sep 17 00:00:00 2001 From: hzxuzhonghu Date: Wed, 29 Nov 2017 23:12:19 +0800 Subject: [PATCH] admission registration use shared informer instead of poll --- .../configuration/mutating_webhook_manager.go | 100 ++++++++------- .../mutating_webhook_manager_test.go | 118 ++++++++++++++++-- .../validating_webhook_manager.go | 90 +++++++------ .../validating_webhook_manager_test.go | 118 ++++++++++++++++-- .../plugin/webhook/mutating/admission.go | 9 +- .../plugin/webhook/validating/admission.go | 11 +- 6 files changed, 316 insertions(+), 130 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/admission/configuration/mutating_webhook_manager.go b/staging/src/k8s.io/apiserver/pkg/admission/configuration/mutating_webhook_manager.go index bf4d0eabf98..fbde83b7d82 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/configuration/mutating_webhook_manager.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/configuration/mutating_webhook_manager.go @@ -18,84 +18,82 @@ package configuration import ( "fmt" - "reflect" "sort" - - "github.com/golang/glog" + "sync/atomic" "k8s.io/api/admissionregistration/v1beta1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/labels" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + admissionregistrationinformers "k8s.io/client-go/informers/admissionregistration/v1beta1" + admissionregistrationlisters "k8s.io/client-go/listers/admissionregistration/v1beta1" + "k8s.io/client-go/tools/cache" ) -type MutatingWebhookConfigurationLister interface { - List(opts metav1.ListOptions) (*v1beta1.MutatingWebhookConfigurationList, error) -} - // MutatingWebhookConfigurationManager collects the mutating webhook objects so that they can be called. type MutatingWebhookConfigurationManager struct { - *poller + ready int32 + configuration *atomic.Value + hasSynced func() bool + lister admissionregistrationlisters.MutatingWebhookConfigurationLister } -func NewMutatingWebhookConfigurationManager(c MutatingWebhookConfigurationLister) *MutatingWebhookConfigurationManager { - getFn := func() (runtime.Object, error) { - list, err := c.List(metav1.ListOptions{}) - if err != nil { - if errors.IsNotFound(err) || errors.IsForbidden(err) { - glog.V(5).Infof("MutatingWebhookConfiguration are disabled due to an error: %v", err) - return nil, ErrDisabled - } - return nil, err - } - return mergeMutatingWebhookConfigurations(list), nil +func NewMutatingWebhookConfigurationManager(informer admissionregistrationinformers.MutatingWebhookConfigurationInformer) *MutatingWebhookConfigurationManager { + manager := &MutatingWebhookConfigurationManager{ + ready: 0, + configuration: &atomic.Value{}, + hasSynced: informer.Informer().HasSynced, + lister: informer.Lister(), } - return &MutatingWebhookConfigurationManager{ - newPoller(getFn), - } + // Start with an empty list + manager.configuration.Store(&v1beta1.MutatingWebhookConfiguration{}) + + // On any change, rebuild the config + informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(_ interface{}) { manager.updateConfiguration() }, + UpdateFunc: func(_, _ interface{}) { manager.updateConfiguration() }, + DeleteFunc: func(_ interface{}) { manager.updateConfiguration() }, + }) + + return manager } // Webhooks returns the merged MutatingWebhookConfiguration. -func (im *MutatingWebhookConfigurationManager) Webhooks() (*v1beta1.MutatingWebhookConfiguration, error) { - configuration, err := im.poller.configuration() +func (m *MutatingWebhookConfigurationManager) Webhooks() (*v1beta1.MutatingWebhookConfiguration, error) { + if atomic.LoadInt32(&m.ready) == 0 { + if !m.hasSynced() { + // Return an error until we've synced + return nil, fmt.Errorf("mutating webhook configuration is not ready") + } + // Remember we're ready + atomic.StoreInt32(&m.ready, 1) + } + return m.configuration.Load().(*v1beta1.MutatingWebhookConfiguration), nil +} + +func (m *MutatingWebhookConfigurationManager) updateConfiguration() { + configurations, err := m.lister.List(labels.Everything()) if err != nil { - return nil, err + utilruntime.HandleError(fmt.Errorf("error updating configuration: %v", err)) + return } - mutatingWebhookConfiguration, ok := configuration.(*v1beta1.MutatingWebhookConfiguration) - if !ok { - return nil, fmt.Errorf("expected type %v, got type %v", reflect.TypeOf(mutatingWebhookConfiguration), reflect.TypeOf(configuration)) - } - return mutatingWebhookConfiguration, nil + m.configuration.Store(mergeMutatingWebhookConfigurations(configurations)) } -func (im *MutatingWebhookConfigurationManager) Run(stopCh <-chan struct{}) { - im.poller.Run(stopCh) -} - -func mergeMutatingWebhookConfigurations( - list *v1beta1.MutatingWebhookConfigurationList, -) *v1beta1.MutatingWebhookConfiguration { - configurations := append([]v1beta1.MutatingWebhookConfiguration{}, list.Items...) +func mergeMutatingWebhookConfigurations(configurations []*v1beta1.MutatingWebhookConfiguration) *v1beta1.MutatingWebhookConfiguration { var ret v1beta1.MutatingWebhookConfiguration // The internal order of webhooks for each configuration is provided by the user // but configurations themselves can be in any order. As we are going to run these // webhooks in serial, they are sorted here to have a deterministic order. - sort.Sort(byName(configurations)) + sort.SliceStable(configurations, MutatingWebhookConfigurationSorter(configurations).ByName) for _, c := range configurations { ret.Webhooks = append(ret.Webhooks, c.Webhooks...) } return &ret } -// byName sorts MutatingWebhookConfiguration by name. These objects are all in -// cluster namespace (aka no namespace) thus they all have unique names. -type byName []v1beta1.MutatingWebhookConfiguration +type MutatingWebhookConfigurationSorter []*v1beta1.MutatingWebhookConfiguration -func (x byName) Len() int { return len(x) } - -func (x byName) Swap(i, j int) { x[i], x[j] = x[j], x[i] } - -func (x byName) Less(i, j int) bool { - return x[i].ObjectMeta.Name < x[j].ObjectMeta.Name +func (a MutatingWebhookConfigurationSorter) ByName(i, j int) bool { + return a[i].Name < a[j].Name } diff --git a/staging/src/k8s.io/apiserver/pkg/admission/configuration/mutating_webhook_manager_test.go b/staging/src/k8s.io/apiserver/pkg/admission/configuration/mutating_webhook_manager_test.go index 97333880b09..e6b50aa2320 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/configuration/mutating_webhook_manager_test.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/configuration/mutating_webhook_manager_test.go @@ -17,24 +17,118 @@ limitations under the License. package configuration import ( + "fmt" + "reflect" "testing" + "time" "k8s.io/api/admissionregistration/v1beta1" - "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/labels" + admissionregistrationlisters "k8s.io/client-go/listers/admissionregistration/v1beta1" + "k8s.io/client-go/tools/cache" ) -type disabledMutatingWebhookConfigLister struct{} - -func (l *disabledMutatingWebhookConfigLister) List(options metav1.ListOptions) (*v1beta1.MutatingWebhookConfigurationList, error) { - return nil, errors.NewNotFound(schema.GroupResource{Group: "admissionregistration", Resource: "MutatingWebhookConfigurations"}, "") +type fakeMutatingWebhookConfigSharedInformer struct { + informer *fakeMutatingWebhookConfigInformer + lister *fakeMutatingWebhookConfigLister } -func TestMutatingWebhookConfigDisabled(t *testing.T) { - manager := NewMutatingWebhookConfigurationManager(&disabledMutatingWebhookConfigLister{}) - manager.sync() - _, err := manager.Webhooks() - if err.Error() != ErrDisabled.Error() { - t.Errorf("expected %v, got %v", ErrDisabled, err) + +func (f *fakeMutatingWebhookConfigSharedInformer) Informer() cache.SharedIndexInformer { + return f.informer +} +func (f *fakeMutatingWebhookConfigSharedInformer) Lister() admissionregistrationlisters.MutatingWebhookConfigurationLister { + return f.lister +} + +type fakeMutatingWebhookConfigInformer struct { + eventHandler cache.ResourceEventHandler + hasSynced bool +} + +func (f *fakeMutatingWebhookConfigInformer) AddEventHandler(handler cache.ResourceEventHandler) { + fmt.Println("added handler") + f.eventHandler = handler +} +func (f *fakeMutatingWebhookConfigInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, resyncPeriod time.Duration) { + panic("unsupported") +} +func (f *fakeMutatingWebhookConfigInformer) GetStore() cache.Store { + panic("unsupported") +} +func (f *fakeMutatingWebhookConfigInformer) GetController() cache.Controller { + panic("unsupported") +} +func (f *fakeMutatingWebhookConfigInformer) Run(stopCh <-chan struct{}) { + panic("unsupported") +} +func (f *fakeMutatingWebhookConfigInformer) HasSynced() bool { + return f.hasSynced +} +func (f *fakeMutatingWebhookConfigInformer) LastSyncResourceVersion() string { + panic("unsupported") +} +func (f *fakeMutatingWebhookConfigInformer) AddIndexers(indexers cache.Indexers) error { + panic("unsupported") +} +func (f *fakeMutatingWebhookConfigInformer) GetIndexer() cache.Indexer { panic("unsupported") } + +type fakeMutatingWebhookConfigLister struct { + list []*v1beta1.MutatingWebhookConfiguration + err error +} + +func (f *fakeMutatingWebhookConfigLister) List(selector labels.Selector) (ret []*v1beta1.MutatingWebhookConfiguration, err error) { + return f.list, f.err +} + +func (f *fakeMutatingWebhookConfigLister) Get(name string) (*v1beta1.MutatingWebhookConfiguration, error) { + panic("unsupported") +} + +func TestGetMutatingWebhookConfig(t *testing.T) { + informer := &fakeMutatingWebhookConfigSharedInformer{ + informer: &fakeMutatingWebhookConfigInformer{}, + lister: &fakeMutatingWebhookConfigLister{}, + } + + // unsynced, error retrieving list + informer.informer.hasSynced = false + informer.lister.list = nil + informer.lister.err = fmt.Errorf("mutating webhook configuration is not ready") + manager := NewMutatingWebhookConfigurationManager(informer) + if _, err := manager.Webhooks(); err == nil { + t.Errorf("expected err, but got none") + } + + // list found, still unsynced + informer.informer.hasSynced = false + informer.lister.list = []*v1beta1.MutatingWebhookConfiguration{} + informer.lister.err = nil + if _, err := manager.Webhooks(); err == nil { + t.Errorf("expected err, but got none") + } + + // items populated, still unsynced + webhookContainer := &v1beta1.MutatingWebhookConfiguration{ + ObjectMeta: metav1.ObjectMeta{Name: "webhook1"}, + Webhooks: []v1beta1.Webhook{{Name: "webhook1.1"}}, + } + informer.informer.hasSynced = false + informer.lister.list = []*v1beta1.MutatingWebhookConfiguration{webhookContainer.DeepCopy()} + informer.lister.err = nil + informer.informer.eventHandler.OnAdd(webhookContainer.DeepCopy()) + if _, err := manager.Webhooks(); err == nil { + t.Errorf("expected err, but got none") + } + + // sync completed + informer.informer.hasSynced = true + hooks, err := manager.Webhooks() + if err != nil { + t.Errorf("unexpected err: %v", err) + } + if !reflect.DeepEqual(hooks.Webhooks, webhookContainer.Webhooks) { + t.Errorf("Expected\n%#v\ngot\n%#v", webhookContainer.Webhooks, hooks.Webhooks) } } diff --git a/staging/src/k8s.io/apiserver/pkg/admission/configuration/validating_webhook_manager.go b/staging/src/k8s.io/apiserver/pkg/admission/configuration/validating_webhook_manager.go index 8f9fd34daae..f93068b8037 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/configuration/validating_webhook_manager.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/configuration/validating_webhook_manager.go @@ -18,67 +18,81 @@ package configuration import ( "fmt" - "reflect" - - "github.com/golang/glog" + "sort" + "sync/atomic" "k8s.io/api/admissionregistration/v1beta1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/labels" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + admissionregistrationinformers "k8s.io/client-go/informers/admissionregistration/v1beta1" + admissionregistrationlisters "k8s.io/client-go/listers/admissionregistration/v1beta1" + "k8s.io/client-go/tools/cache" ) -type ValidatingWebhookConfigurationLister interface { - List(opts metav1.ListOptions) (*v1beta1.ValidatingWebhookConfigurationList, error) -} - // ValidatingWebhookConfigurationManager collects the validating webhook objects so that they can be called. type ValidatingWebhookConfigurationManager struct { - *poller + ready int32 + configuration *atomic.Value + hasSynced func() bool + lister admissionregistrationlisters.ValidatingWebhookConfigurationLister } -func NewValidatingWebhookConfigurationManager(c ValidatingWebhookConfigurationLister) *ValidatingWebhookConfigurationManager { - getFn := func() (runtime.Object, error) { - list, err := c.List(metav1.ListOptions{}) - if err != nil { - if errors.IsNotFound(err) || errors.IsForbidden(err) { - glog.V(5).Infof("ValidatingWebhookConfiguration are disabled due to an error: %v", err) - return nil, ErrDisabled - } - return nil, err - } - return mergeValidatingWebhookConfigurations(list), nil +func NewValidatingWebhookConfigurationManager(informer admissionregistrationinformers.ValidatingWebhookConfigurationInformer) *ValidatingWebhookConfigurationManager { + manager := &ValidatingWebhookConfigurationManager{ + ready: 0, + configuration: &atomic.Value{}, + hasSynced: informer.Informer().HasSynced, + lister: informer.Lister(), } - return &ValidatingWebhookConfigurationManager{ - newPoller(getFn), - } + // Start with an empty list + manager.configuration.Store(&v1beta1.ValidatingWebhookConfiguration{}) + + // On any change, rebuild the config + informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(_ interface{}) { manager.updateConfiguration() }, + UpdateFunc: func(_, _ interface{}) { manager.updateConfiguration() }, + DeleteFunc: func(_ interface{}) { manager.updateConfiguration() }, + }) + + return manager } // Webhooks returns the merged ValidatingWebhookConfiguration. -func (im *ValidatingWebhookConfigurationManager) Webhooks() (*v1beta1.ValidatingWebhookConfiguration, error) { - configuration, err := im.poller.configuration() - if err != nil { - return nil, err +func (v *ValidatingWebhookConfigurationManager) Webhooks() (*v1beta1.ValidatingWebhookConfiguration, error) { + if atomic.LoadInt32(&v.ready) == 0 { + if !v.hasSynced() { + // Return an error until we've synced + return nil, fmt.Errorf("validating webhook configuration is not ready") + } + // Remember we're ready + atomic.StoreInt32(&v.ready, 1) } - validatingWebhookConfiguration, ok := configuration.(*v1beta1.ValidatingWebhookConfiguration) - if !ok { - return nil, fmt.Errorf("expected type %v, got type %v", reflect.TypeOf(validatingWebhookConfiguration), reflect.TypeOf(configuration)) - } - return validatingWebhookConfiguration, nil + return v.configuration.Load().(*v1beta1.ValidatingWebhookConfiguration), nil } -func (im *ValidatingWebhookConfigurationManager) Run(stopCh <-chan struct{}) { - im.poller.Run(stopCh) +func (v *ValidatingWebhookConfigurationManager) updateConfiguration() { + configurations, err := v.lister.List(labels.Everything()) + if err != nil { + utilruntime.HandleError(fmt.Errorf("error updating configuration: %v", err)) + return + } + v.configuration.Store(mergeValidatingWebhookConfigurations(configurations)) } func mergeValidatingWebhookConfigurations( - list *v1beta1.ValidatingWebhookConfigurationList, + configurations []*v1beta1.ValidatingWebhookConfiguration, ) *v1beta1.ValidatingWebhookConfiguration { - configurations := list.Items + sort.SliceStable(configurations, ValidatingWebhookConfigurationSorter(configurations).ByName) var ret v1beta1.ValidatingWebhookConfiguration for _, c := range configurations { ret.Webhooks = append(ret.Webhooks, c.Webhooks...) } return &ret } + +type ValidatingWebhookConfigurationSorter []*v1beta1.ValidatingWebhookConfiguration + +func (a ValidatingWebhookConfigurationSorter) ByName(i, j int) bool { + return a[i].Name < a[j].Name +} diff --git a/staging/src/k8s.io/apiserver/pkg/admission/configuration/validating_webhook_manager_test.go b/staging/src/k8s.io/apiserver/pkg/admission/configuration/validating_webhook_manager_test.go index 60ba5367325..929d7b2cfcf 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/configuration/validating_webhook_manager_test.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/configuration/validating_webhook_manager_test.go @@ -17,24 +17,118 @@ limitations under the License. package configuration import ( + "fmt" + "reflect" "testing" + "time" "k8s.io/api/admissionregistration/v1beta1" - "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/labels" + admissionregistrationlisters "k8s.io/client-go/listers/admissionregistration/v1beta1" + "k8s.io/client-go/tools/cache" ) -type disabledValidatingWebhookConfigLister struct{} - -func (l *disabledValidatingWebhookConfigLister) List(options metav1.ListOptions) (*v1beta1.ValidatingWebhookConfigurationList, error) { - return nil, errors.NewNotFound(schema.GroupResource{Group: "admissionregistration", Resource: "ValidatingWebhookConfigurations"}, "") +type fakeValidatingWebhookConfigSharedInformer struct { + informer *fakeValidatingWebhookConfigInformer + lister *fakeValidatingWebhookConfigLister } -func TestWebhookConfigDisabled(t *testing.T) { - manager := NewValidatingWebhookConfigurationManager(&disabledValidatingWebhookConfigLister{}) - manager.sync() - _, err := manager.Webhooks() - if err.Error() != ErrDisabled.Error() { - t.Errorf("expected %v, got %v", ErrDisabled, err) + +func (f *fakeValidatingWebhookConfigSharedInformer) Informer() cache.SharedIndexInformer { + return f.informer +} +func (f *fakeValidatingWebhookConfigSharedInformer) Lister() admissionregistrationlisters.ValidatingWebhookConfigurationLister { + return f.lister +} + +type fakeValidatingWebhookConfigInformer struct { + eventHandler cache.ResourceEventHandler + hasSynced bool +} + +func (f *fakeValidatingWebhookConfigInformer) AddEventHandler(handler cache.ResourceEventHandler) { + fmt.Println("added handler") + f.eventHandler = handler +} +func (f *fakeValidatingWebhookConfigInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, resyncPeriod time.Duration) { + panic("unsupported") +} +func (f *fakeValidatingWebhookConfigInformer) GetStore() cache.Store { + panic("unsupported") +} +func (f *fakeValidatingWebhookConfigInformer) GetController() cache.Controller { + panic("unsupported") +} +func (f *fakeValidatingWebhookConfigInformer) Run(stopCh <-chan struct{}) { + panic("unsupported") +} +func (f *fakeValidatingWebhookConfigInformer) HasSynced() bool { + return f.hasSynced +} +func (f *fakeValidatingWebhookConfigInformer) LastSyncResourceVersion() string { + panic("unsupported") +} +func (f *fakeValidatingWebhookConfigInformer) AddIndexers(indexers cache.Indexers) error { + panic("unsupported") +} +func (f *fakeValidatingWebhookConfigInformer) GetIndexer() cache.Indexer { panic("unsupported") } + +type fakeValidatingWebhookConfigLister struct { + list []*v1beta1.ValidatingWebhookConfiguration + err error +} + +func (f *fakeValidatingWebhookConfigLister) List(selector labels.Selector) (ret []*v1beta1.ValidatingWebhookConfiguration, err error) { + return f.list, f.err +} + +func (f *fakeValidatingWebhookConfigLister) Get(name string) (*v1beta1.ValidatingWebhookConfiguration, error) { + panic("unsupported") +} + +func TestGettValidatingWebhookConfig(t *testing.T) { + informer := &fakeValidatingWebhookConfigSharedInformer{ + informer: &fakeValidatingWebhookConfigInformer{}, + lister: &fakeValidatingWebhookConfigLister{}, + } + + // unsynced, error retrieving list + informer.informer.hasSynced = false + informer.lister.list = nil + informer.lister.err = fmt.Errorf("validating webhook configuration is not ready") + manager := NewValidatingWebhookConfigurationManager(informer) + if _, err := manager.Webhooks(); err == nil { + t.Errorf("expected err, but got none") + } + + // list found, still unsynced + informer.informer.hasSynced = false + informer.lister.list = []*v1beta1.ValidatingWebhookConfiguration{} + informer.lister.err = nil + if _, err := manager.Webhooks(); err == nil { + t.Errorf("expected err, but got none") + } + + // items populated, still unsynced + webhookContainer := &v1beta1.ValidatingWebhookConfiguration{ + ObjectMeta: metav1.ObjectMeta{Name: "webhook1"}, + Webhooks: []v1beta1.Webhook{{Name: "webhook1.1"}}, + } + informer.informer.hasSynced = false + informer.lister.list = []*v1beta1.ValidatingWebhookConfiguration{webhookContainer.DeepCopy()} + informer.lister.err = nil + informer.informer.eventHandler.OnAdd(webhookContainer.DeepCopy()) + if _, err := manager.Webhooks(); err == nil { + t.Errorf("expected err, but got none") + } + + // sync completed + informer.informer.hasSynced = true + hooks, err := manager.Webhooks() + if err != nil { + t.Errorf("unexpected err: %v", err) + } + if !reflect.DeepEqual(hooks.Webhooks, webhookContainer.Webhooks) { + t.Errorf("Expected\n%#v\ngot\n%#v", webhookContainer.Webhooks, hooks.Webhooks) } } diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating/admission.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating/admission.go index 6d62a36f629..0f50edf6a3e 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating/admission.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating/admission.go @@ -35,7 +35,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/runtime/serializer/json" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/admission/configuration" genericadmissioninit "k8s.io/apiserver/pkg/admission/initializer" @@ -69,7 +68,6 @@ func Register(plugins *admission.Plugins) { // WebhookSource can list dynamic webhook plugins. type WebhookSource interface { - Run(stopCh <-chan struct{}) Webhooks() (*v1beta1.MutatingWebhookConfiguration, error) } @@ -146,7 +144,6 @@ func (a *MutatingWebhook) SetScheme(scheme *runtime.Scheme) { // WantsExternalKubeClientSet defines a function which sets external ClientSet for admission plugins that need it func (a *MutatingWebhook) SetExternalKubeClientSet(client clientset.Interface) { a.namespaceMatcher.Client = client - a.hookSource = configuration.NewMutatingWebhookConfigurationManager(client.AdmissionregistrationV1beta1().MutatingWebhookConfigurations()) } // SetExternalKubeInformerFactory implements the WantsExternalKubeInformerFactory interface. @@ -154,6 +151,7 @@ func (a *MutatingWebhook) SetExternalKubeInformerFactory(f informers.SharedInfor namespaceInformer := f.Core().V1().Namespaces() a.namespaceMatcher.NamespaceLister = namespaceInformer.Lister() a.SetReadyFunc(namespaceInformer.Informer().HasSynced) + a.hookSource = configuration.NewMutatingWebhookConfigurationManager(f.Admissionregistration().V1beta1().MutatingWebhookConfigurations()) } // ValidateInitialization implements the InitializationValidator interface. @@ -176,16 +174,11 @@ func (a *MutatingWebhook) ValidateInitialization() error { if a.defaulter == nil { return fmt.Errorf("MutatingWebhook.defaulter is not properly setup") } - go a.hookSource.Run(wait.NeverStop) return nil } func (a *MutatingWebhook) loadConfiguration(attr admission.Attributes) (*v1beta1.MutatingWebhookConfiguration, error) { hookConfig, err := a.hookSource.Webhooks() - // if Webhook configuration is disabled, fail open - if err == configuration.ErrDisabled { - return &v1beta1.MutatingWebhookConfiguration{}, nil - } if err != nil { e := apierrors.NewServerTimeout(attr.GetResource().GroupResource(), string(attr.GetOperation()), 1) e.ErrStatus.Message = fmt.Sprintf("Unable to refresh the Webhook configuration: %v", err) diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/validating/admission.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/validating/admission.go index f68e46fa585..f14d65083de 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/validating/admission.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/validating/admission.go @@ -34,7 +34,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/admission/configuration" genericadmissioninit "k8s.io/apiserver/pkg/admission/initializer" @@ -68,7 +67,6 @@ func Register(plugins *admission.Plugins) { // WebhookSource can list dynamic webhook plugins. type WebhookSource interface { - Run(stopCh <-chan struct{}) Webhooks() (*v1beta1.ValidatingWebhookConfiguration, error) } @@ -141,7 +139,6 @@ func (a *ValidatingAdmissionWebhook) SetScheme(scheme *runtime.Scheme) { // WantsExternalKubeClientSet defines a function which sets external ClientSet for admission plugins that need it func (a *ValidatingAdmissionWebhook) SetExternalKubeClientSet(client clientset.Interface) { a.namespaceMatcher.Client = client - a.hookSource = configuration.NewValidatingWebhookConfigurationManager(client.AdmissionregistrationV1beta1().ValidatingWebhookConfigurations()) } // SetExternalKubeInformerFactory implements the WantsExternalKubeInformerFactory interface. @@ -149,12 +146,13 @@ func (a *ValidatingAdmissionWebhook) SetExternalKubeInformerFactory(f informers. namespaceInformer := f.Core().V1().Namespaces() a.namespaceMatcher.NamespaceLister = namespaceInformer.Lister() a.SetReadyFunc(namespaceInformer.Informer().HasSynced) + a.hookSource = configuration.NewValidatingWebhookConfigurationManager(f.Admissionregistration().V1beta1().ValidatingWebhookConfigurations()) } // ValidateInitialization implements the InitializationValidator interface. func (a *ValidatingAdmissionWebhook) ValidateInitialization() error { if a.hookSource == nil { - return fmt.Errorf("ValidatingAdmissionWebhook admission plugin requires a Kubernetes client to be provided") + return fmt.Errorf("ValidatingAdmissionWebhook admission plugin requires a Kubernetes informer to be provided") } if err := a.namespaceMatcher.Validate(); err != nil { return fmt.Errorf("ValidatingAdmissionWebhook.namespaceMatcher is not properly setup: %v", err) @@ -165,16 +163,11 @@ func (a *ValidatingAdmissionWebhook) ValidateInitialization() error { if err := a.convertor.Validate(); err != nil { return fmt.Errorf("ValidatingAdmissionWebhook.convertor is not properly setup: %v", err) } - go a.hookSource.Run(wait.NeverStop) return nil } func (a *ValidatingAdmissionWebhook) loadConfiguration(attr admission.Attributes) (*v1beta1.ValidatingWebhookConfiguration, error) { hookConfig, err := a.hookSource.Webhooks() - // if Webhook configuration is disabled, fail open - if err == configuration.ErrDisabled { - return &v1beta1.ValidatingWebhookConfiguration{}, nil - } if err != nil { e := apierrors.NewServerTimeout(attr.GetResource().GroupResource(), string(attr.GetOperation()), 1) e.ErrStatus.Message = fmt.Sprintf("Unable to refresh the Webhook configuration: %v", err)