From 300751393b4d47c3c5522a4812db9414c4234f96 Mon Sep 17 00:00:00 2001 From: Monis Khan Date: Mon, 16 Apr 2018 13:15:40 -0400 Subject: [PATCH] Use a dynamic RESTMapper for admission plugins This change updates the REST mapper used by all admission plugins to be backed by cached discovery information. This cache is updated every ten seconds via a post start hook and will not attempt to update on calls to RESTMapping. It solely relies on the hook to keep the cache in sync with discovery. This prevents issues with the OwnerReferencesPermissionEnforcement admission plugin when it is used with custom resources that set blockOwnerDeletion. Signed-off-by: Monis Khan --- cmd/kube-apiserver/app/BUILD | 3 ++ cmd/kube-apiserver/app/server.go | 43 +++++++++++++++---- .../etcd/etcd_storage_path_test.go | 4 +- test/integration/examples/apiserver_test.go | 6 +-- 4 files changed, 42 insertions(+), 14 deletions(-) diff --git a/cmd/kube-apiserver/app/BUILD b/cmd/kube-apiserver/app/BUILD index 876316b7aaa..36001672690 100644 --- a/cmd/kube-apiserver/app/BUILD +++ b/cmd/kube-apiserver/app/BUILD @@ -59,6 +59,7 @@ go_library( "//vendor/k8s.io/apiextensions-apiserver/pkg/apiserver:go_default_library", "//vendor/k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion:go_default_library", "//vendor/k8s.io/apiextensions-apiserver/pkg/cmd/server:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", @@ -78,6 +79,8 @@ go_library( "//vendor/k8s.io/apiserver/pkg/server/storage:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage/etcd3/preflight:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//vendor/k8s.io/client-go/discovery:go_default_library", + "//vendor/k8s.io/client-go/discovery/cached:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index ffec2d8fdeb..df212452688 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -35,6 +35,7 @@ import ( "github.com/golang/glog" "github.com/spf13/cobra" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" utilerrors "k8s.io/apimachinery/pkg/util/errors" @@ -54,6 +55,8 @@ import ( serverstorage "k8s.io/apiserver/pkg/server/storage" "k8s.io/apiserver/pkg/storage/etcd3/preflight" utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/discovery" + cacheddiscovery "k8s.io/client-go/discovery/cached" clientgoinformers "k8s.io/client-go/informers" clientgoclientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -156,7 +159,7 @@ func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan return nil, err } - kubeAPIServerConfig, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport) + kubeAPIServerConfig, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, pluginInitializer, admissionPostStartHook, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport) if err != nil { return nil, err } @@ -171,7 +174,7 @@ func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan return nil, err } - kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, sharedInformers, versionedInformers) + kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, sharedInformers, versionedInformers, admissionPostStartHook) if err != nil { return nil, err } @@ -218,15 +221,17 @@ func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan } // CreateKubeAPIServer creates and wires a workable kube-apiserver -func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget, sharedInformers informers.SharedInformerFactory, versionedInformers clientgoinformers.SharedInformerFactory) (*master.Master, error) { +func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget, sharedInformers informers.SharedInformerFactory, versionedInformers clientgoinformers.SharedInformerFactory, admissionPostStartHook genericapiserver.PostStartHookFunc) (*master.Master, error) { kubeAPIServer, err := kubeAPIServerConfig.Complete(versionedInformers).New(delegateAPIServer) if err != nil { return nil, err } - kubeAPIServer.GenericAPIServer.AddPostStartHook("start-kube-apiserver-informers", func(context genericapiserver.PostStartHookContext) error { + + kubeAPIServer.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-informers", func(context genericapiserver.PostStartHookContext) error { sharedInformers.Start(context.StopCh) return nil }) + kubeAPIServer.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-admission-initializer", admissionPostStartHook) return kubeAPIServer, nil } @@ -288,10 +293,11 @@ func CreateKubeAPIServerConfig( insecureServingInfo *kubeserver.InsecureServingInfo, serviceResolver aggregatorapiserver.ServiceResolver, pluginInitializers []admission.PluginInitializer, + admissionPostStartHook genericapiserver.PostStartHookFunc, lastErr error, ) { var genericConfig *genericapiserver.Config - genericConfig, sharedInformers, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, lastErr = BuildGenericConfig(s.ServerRunOptions, proxyTransport) + genericConfig, sharedInformers, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, admissionPostStartHook, lastErr = BuildGenericConfig(s.ServerRunOptions, proxyTransport) if lastErr != nil { return } @@ -413,6 +419,7 @@ func BuildGenericConfig( insecureServingInfo *kubeserver.InsecureServingInfo, serviceResolver aggregatorapiserver.ServiceResolver, pluginInitializers []admission.PluginInitializer, + admissionPostStartHook genericapiserver.PostStartHookFunc, lastErr error, ) { genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs) @@ -539,7 +546,7 @@ func BuildGenericConfig( }, } } - pluginInitializers, err = BuildAdmissionPluginInitializers( + pluginInitializers, admissionPostStartHook, err = BuildAdmissionPluginInitializers( s, client, sharedInformers, @@ -571,7 +578,7 @@ func BuildAdmissionPluginInitializers( sharedInformers informers.SharedInformerFactory, serviceResolver aggregatorapiserver.ServiceResolver, webhookAuthWrapper webhookconfig.AuthenticationInfoResolverWrapper, -) ([]admission.PluginInitializer, error) { +) ([]admission.PluginInitializer, genericapiserver.PostStartHookFunc, error) { var cloudConfig []byte if s.CloudProvider.CloudConfigFile != "" { @@ -582,15 +589,33 @@ func BuildAdmissionPluginInitializers( } } - // TODO: use a dynamic restmapper. See https://github.com/kubernetes/kubernetes/pull/42615. + // TODO: drop this REST mapper once client is guaranteed to be non-nil + // See KUBE_API_VERSIONS comment above restMapper := legacyscheme.Registry.RESTMapper() + admissionPostStartHook := func(_ genericapiserver.PostStartHookContext) error { + return nil + } + + // We have a functional client so we can use that to build our discovery backed REST mapper + if client != nil && client.Discovery() != nil { + // Use a discovery client capable of being refreshed. + discoveryClient := cacheddiscovery.NewMemCacheClient(client.Discovery()) + discoveryRESTMapper := discovery.NewDeferredDiscoveryRESTMapper(discoveryClient, meta.InterfacesForUnstructured) + + restMapper = discoveryRESTMapper + admissionPostStartHook = func(context genericapiserver.PostStartHookContext) error { + discoveryRESTMapper.Reset() + go utilwait.Until(discoveryRESTMapper.Reset, 10*time.Second, context.StopCh) + return nil + } + } quotaConfiguration := quotainstall.NewQuotaConfigurationForAdmission() kubePluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, sharedInformers, cloudConfig, restMapper, quotaConfiguration) webhookPluginInitializer := webhookinit.NewPluginInitializer(webhookAuthWrapper, serviceResolver) - return []admission.PluginInitializer{webhookPluginInitializer, kubePluginInitializer}, nil + return []admission.PluginInitializer{webhookPluginInitializer, kubePluginInitializer}, admissionPostStartHook, nil } // BuildAuthenticator constructs the authenticator diff --git a/test/integration/etcd/etcd_storage_path_test.go b/test/integration/etcd/etcd_storage_path_test.go index a1f5c17dbe5..0b1564f4e4c 100644 --- a/test/integration/etcd/etcd_storage_path_test.go +++ b/test/integration/etcd/etcd_storage_path_test.go @@ -744,14 +744,14 @@ func startRealMasterOrDie(t *testing.T, certDir string) (*allClient, clientv3.KV if err != nil { t.Fatal(err) } - kubeAPIServerConfig, sharedInformers, versionedInformers, _, _, _, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport) + kubeAPIServerConfig, sharedInformers, versionedInformers, _, _, _, admissionPostStartHook, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport) if err != nil { t.Fatal(err) } kubeAPIServerConfig.ExtraConfig.APIResourceConfigSource = &allResourceSource{} // force enable all resources - kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.NewEmptyDelegate(), sharedInformers, versionedInformers) + kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.NewEmptyDelegate(), sharedInformers, versionedInformers, admissionPostStartHook) if err != nil { t.Fatal(err) } diff --git a/test/integration/examples/apiserver_test.go b/test/integration/examples/apiserver_test.go index 9fc1df3947d..68cf9cff041 100644 --- a/test/integration/examples/apiserver_test.go +++ b/test/integration/examples/apiserver_test.go @@ -113,7 +113,7 @@ func TestAggregatedAPIServer(t *testing.T) { if err != nil { t.Fatal(err) } - kubeAPIServerConfig, sharedInformers, versionedInformers, _, _, _, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport) + kubeAPIServerConfig, sharedInformers, versionedInformers, _, _, _, admissionPostStartHook, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport) if err != nil { t.Fatal(err) } @@ -124,7 +124,7 @@ func TestAggregatedAPIServer(t *testing.T) { kubeAPIServerClientConfig.ServerName = "" kubeClientConfigValue.Store(kubeAPIServerClientConfig) - kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.NewEmptyDelegate(), sharedInformers, versionedInformers) + kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.NewEmptyDelegate(), sharedInformers, versionedInformers, admissionPostStartHook) if err != nil { t.Fatal(err) } @@ -135,7 +135,7 @@ func TestAggregatedAPIServer(t *testing.T) { }() // just use json because everyone speaks it - err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) { + err = wait.PollImmediate(time.Second, time.Minute, func() (done bool, err error) { obj := kubeClientConfigValue.Load() if obj == nil { return false, nil