mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-13 21:25:09 +00:00
Use a dynamic RESTMapper for admission plugins
This change updates the REST mapper used by all admission plugins to be backed by cached discovery information. This cache is updated every ten seconds via a post start hook and will not attempt to update on calls to RESTMapping. It solely relies on the hook to keep the cache in sync with discovery. This prevents issues with the OwnerReferencesPermissionEnforcement admission plugin when it is used with custom resources that set blockOwnerDeletion. Signed-off-by: Monis Khan <mkhan@redhat.com>
This commit is contained in:
@@ -59,6 +59,7 @@ go_library(
|
||||
"//vendor/k8s.io/apiextensions-apiserver/pkg/apiserver:go_default_library",
|
||||
"//vendor/k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion:go_default_library",
|
||||
"//vendor/k8s.io/apiextensions-apiserver/pkg/cmd/server:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
|
||||
@@ -78,6 +79,8 @@ go_library(
|
||||
"//vendor/k8s.io/apiserver/pkg/server/storage:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/storage/etcd3/preflight:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//vendor/k8s.io/client-go/discovery:go_default_library",
|
||||
"//vendor/k8s.io/client-go/discovery/cached:go_default_library",
|
||||
"//vendor/k8s.io/client-go/informers:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//vendor/k8s.io/client-go/rest:go_default_library",
|
||||
|
@@ -35,6 +35,7 @@ import (
|
||||
"github.com/golang/glog"
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
@@ -54,6 +55,8 @@ import (
|
||||
serverstorage "k8s.io/apiserver/pkg/server/storage"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3/preflight"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/discovery"
|
||||
cacheddiscovery "k8s.io/client-go/discovery/cached"
|
||||
clientgoinformers "k8s.io/client-go/informers"
|
||||
clientgoclientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
@@ -156,7 +159,7 @@ func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan
|
||||
return nil, err
|
||||
}
|
||||
|
||||
kubeAPIServerConfig, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
|
||||
kubeAPIServerConfig, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, pluginInitializer, admissionPostStartHook, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -171,7 +174,7 @@ func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan
|
||||
return nil, err
|
||||
}
|
||||
|
||||
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, sharedInformers, versionedInformers)
|
||||
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, sharedInformers, versionedInformers, admissionPostStartHook)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -218,15 +221,17 @@ func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan
|
||||
}
|
||||
|
||||
// CreateKubeAPIServer creates and wires a workable kube-apiserver
|
||||
func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget, sharedInformers informers.SharedInformerFactory, versionedInformers clientgoinformers.SharedInformerFactory) (*master.Master, error) {
|
||||
func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget, sharedInformers informers.SharedInformerFactory, versionedInformers clientgoinformers.SharedInformerFactory, admissionPostStartHook genericapiserver.PostStartHookFunc) (*master.Master, error) {
|
||||
kubeAPIServer, err := kubeAPIServerConfig.Complete(versionedInformers).New(delegateAPIServer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
kubeAPIServer.GenericAPIServer.AddPostStartHook("start-kube-apiserver-informers", func(context genericapiserver.PostStartHookContext) error {
|
||||
|
||||
kubeAPIServer.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-informers", func(context genericapiserver.PostStartHookContext) error {
|
||||
sharedInformers.Start(context.StopCh)
|
||||
return nil
|
||||
})
|
||||
kubeAPIServer.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-admission-initializer", admissionPostStartHook)
|
||||
|
||||
return kubeAPIServer, nil
|
||||
}
|
||||
@@ -288,10 +293,11 @@ func CreateKubeAPIServerConfig(
|
||||
insecureServingInfo *kubeserver.InsecureServingInfo,
|
||||
serviceResolver aggregatorapiserver.ServiceResolver,
|
||||
pluginInitializers []admission.PluginInitializer,
|
||||
admissionPostStartHook genericapiserver.PostStartHookFunc,
|
||||
lastErr error,
|
||||
) {
|
||||
var genericConfig *genericapiserver.Config
|
||||
genericConfig, sharedInformers, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, lastErr = BuildGenericConfig(s.ServerRunOptions, proxyTransport)
|
||||
genericConfig, sharedInformers, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, admissionPostStartHook, lastErr = BuildGenericConfig(s.ServerRunOptions, proxyTransport)
|
||||
if lastErr != nil {
|
||||
return
|
||||
}
|
||||
@@ -413,6 +419,7 @@ func BuildGenericConfig(
|
||||
insecureServingInfo *kubeserver.InsecureServingInfo,
|
||||
serviceResolver aggregatorapiserver.ServiceResolver,
|
||||
pluginInitializers []admission.PluginInitializer,
|
||||
admissionPostStartHook genericapiserver.PostStartHookFunc,
|
||||
lastErr error,
|
||||
) {
|
||||
genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
|
||||
@@ -539,7 +546,7 @@ func BuildGenericConfig(
|
||||
},
|
||||
}
|
||||
}
|
||||
pluginInitializers, err = BuildAdmissionPluginInitializers(
|
||||
pluginInitializers, admissionPostStartHook, err = BuildAdmissionPluginInitializers(
|
||||
s,
|
||||
client,
|
||||
sharedInformers,
|
||||
@@ -571,7 +578,7 @@ func BuildAdmissionPluginInitializers(
|
||||
sharedInformers informers.SharedInformerFactory,
|
||||
serviceResolver aggregatorapiserver.ServiceResolver,
|
||||
webhookAuthWrapper webhookconfig.AuthenticationInfoResolverWrapper,
|
||||
) ([]admission.PluginInitializer, error) {
|
||||
) ([]admission.PluginInitializer, genericapiserver.PostStartHookFunc, error) {
|
||||
var cloudConfig []byte
|
||||
|
||||
if s.CloudProvider.CloudConfigFile != "" {
|
||||
@@ -582,15 +589,33 @@ func BuildAdmissionPluginInitializers(
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: use a dynamic restmapper. See https://github.com/kubernetes/kubernetes/pull/42615.
|
||||
// TODO: drop this REST mapper once client is guaranteed to be non-nil
|
||||
// See KUBE_API_VERSIONS comment above
|
||||
restMapper := legacyscheme.Registry.RESTMapper()
|
||||
admissionPostStartHook := func(_ genericapiserver.PostStartHookContext) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// We have a functional client so we can use that to build our discovery backed REST mapper
|
||||
if client != nil && client.Discovery() != nil {
|
||||
// Use a discovery client capable of being refreshed.
|
||||
discoveryClient := cacheddiscovery.NewMemCacheClient(client.Discovery())
|
||||
discoveryRESTMapper := discovery.NewDeferredDiscoveryRESTMapper(discoveryClient, meta.InterfacesForUnstructured)
|
||||
|
||||
restMapper = discoveryRESTMapper
|
||||
admissionPostStartHook = func(context genericapiserver.PostStartHookContext) error {
|
||||
discoveryRESTMapper.Reset()
|
||||
go utilwait.Until(discoveryRESTMapper.Reset, 10*time.Second, context.StopCh)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
quotaConfiguration := quotainstall.NewQuotaConfigurationForAdmission()
|
||||
|
||||
kubePluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, sharedInformers, cloudConfig, restMapper, quotaConfiguration)
|
||||
webhookPluginInitializer := webhookinit.NewPluginInitializer(webhookAuthWrapper, serviceResolver)
|
||||
|
||||
return []admission.PluginInitializer{webhookPluginInitializer, kubePluginInitializer}, nil
|
||||
return []admission.PluginInitializer{webhookPluginInitializer, kubePluginInitializer}, admissionPostStartHook, nil
|
||||
}
|
||||
|
||||
// BuildAuthenticator constructs the authenticator
|
||||
|
@@ -744,14 +744,14 @@ func startRealMasterOrDie(t *testing.T, certDir string) (*allClient, clientv3.KV
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
kubeAPIServerConfig, sharedInformers, versionedInformers, _, _, _, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport)
|
||||
kubeAPIServerConfig, sharedInformers, versionedInformers, _, _, _, admissionPostStartHook, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
kubeAPIServerConfig.ExtraConfig.APIResourceConfigSource = &allResourceSource{} // force enable all resources
|
||||
|
||||
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.NewEmptyDelegate(), sharedInformers, versionedInformers)
|
||||
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.NewEmptyDelegate(), sharedInformers, versionedInformers, admissionPostStartHook)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@@ -113,7 +113,7 @@ func TestAggregatedAPIServer(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
kubeAPIServerConfig, sharedInformers, versionedInformers, _, _, _, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport)
|
||||
kubeAPIServerConfig, sharedInformers, versionedInformers, _, _, _, admissionPostStartHook, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -124,7 +124,7 @@ func TestAggregatedAPIServer(t *testing.T) {
|
||||
kubeAPIServerClientConfig.ServerName = ""
|
||||
kubeClientConfigValue.Store(kubeAPIServerClientConfig)
|
||||
|
||||
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.NewEmptyDelegate(), sharedInformers, versionedInformers)
|
||||
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.NewEmptyDelegate(), sharedInformers, versionedInformers, admissionPostStartHook)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -135,7 +135,7 @@ func TestAggregatedAPIServer(t *testing.T) {
|
||||
}()
|
||||
|
||||
// just use json because everyone speaks it
|
||||
err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) {
|
||||
err = wait.PollImmediate(time.Second, time.Minute, func() (done bool, err error) {
|
||||
obj := kubeClientConfigValue.Load()
|
||||
if obj == nil {
|
||||
return false, nil
|
||||
|
Reference in New Issue
Block a user