From f14f4c933ee01d219ffbfa846875171e179019b7 Mon Sep 17 00:00:00 2001 From: David Eads Date: Wed, 28 Aug 2019 08:53:20 -0400 Subject: [PATCH] add ability to pre-configure poststarthooks for apiservers --- cmd/kube-apiserver/app/aggregator.go | 2 + cmd/kube-apiserver/app/apiextensions.go | 2 + cmd/kube-apiserver/app/server.go | 62 +++++++++---------- .../src/k8s.io/apiserver/pkg/server/config.go | 43 ++++++++++++- .../src/k8s.io/apiserver/pkg/server/hooks.go | 10 ++- test/integration/examples/apiserver_test.go | 4 +- test/integration/framework/test_server.go | 4 +- 7 files changed, 88 insertions(+), 39 deletions(-) diff --git a/cmd/kube-apiserver/app/aggregator.go b/cmd/kube-apiserver/app/aggregator.go index d1d3bcf21cf..fde0903dcfb 100644 --- a/cmd/kube-apiserver/app/aggregator.go +++ b/cmd/kube-apiserver/app/aggregator.go @@ -64,6 +64,8 @@ func createAggregatorConfig( // make a shallow copy to let us twiddle a few things // most of the config actually remains the same. We only need to mess with a couple items related to the particulars of the aggregator genericConfig := kubeAPIServerConfig + genericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{} + genericConfig.RESTOptionsGetter = nil // override genericConfig.AdmissionControl with kube-aggregator's scheme, // because aggregator apiserver should use its own scheme to convert its own resources. diff --git a/cmd/kube-apiserver/app/apiextensions.go b/cmd/kube-apiserver/app/apiextensions.go index 5c8e148bceb..a6ae80d5436 100644 --- a/cmd/kube-apiserver/app/apiextensions.go +++ b/cmd/kube-apiserver/app/apiextensions.go @@ -48,6 +48,8 @@ func createAPIExtensionsConfig( // make a shallow copy to let us twiddle a few things // most of the config actually remains the same. We only need to mess with a couple items related to the particulars of the apiextensions genericConfig := kubeAPIServerConfig + genericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{} + genericConfig.RESTOptionsGetter = nil // override genericConfig.AdmissionControl with apiextensions' scheme, // because apiextentions apiserver should use its own scheme to convert resources. diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index f014af7e752..ee8dabd4f37 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -168,7 +168,7 @@ func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan return nil, err } - kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, admissionPostStartHook, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport) + kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport) if err != nil { return nil, err } @@ -184,7 +184,7 @@ func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan return nil, err } - kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, admissionPostStartHook) + kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer) if err != nil { return nil, err } @@ -211,14 +211,12 @@ func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan } // CreateKubeAPIServer creates and wires a workable kube-apiserver -func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget, admissionPostStartHook genericapiserver.PostStartHookFunc) (*master.Master, error) { +func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget) (*master.Master, error) { kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer) if err != nil { return nil, err } - kubeAPIServer.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-admission-initializer", admissionPostStartHook) - return kubeAPIServer, nil } @@ -273,25 +271,20 @@ func CreateKubeAPIServerConfig( nodeTunneler tunneler.Tunneler, proxyTransport *http.Transport, ) ( - config *master.Config, - insecureServingInfo *genericapiserver.DeprecatedInsecureServingInfo, - serviceResolver aggregatorapiserver.ServiceResolver, - pluginInitializers []admission.PluginInitializer, - admissionPostStartHook genericapiserver.PostStartHookFunc, - lastErr error, + *master.Config, + *genericapiserver.DeprecatedInsecureServingInfo, + aggregatorapiserver.ServiceResolver, + []admission.PluginInitializer, + error, ) { - var genericConfig *genericapiserver.Config - var storageFactory *serverstorage.DefaultStorageFactory - var versionedInformers clientgoinformers.SharedInformerFactory - genericConfig, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, lastErr = buildGenericConfig(s.ServerRunOptions, proxyTransport) - if lastErr != nil { - return + genericConfig, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport) + if err != nil { + return nil, nil, nil, nil, err } if _, port, err := net.SplitHostPort(s.Etcd.StorageConfig.Transport.ServerList[0]); err == nil && port != "0" && len(port) != 0 { if err := utilwait.PollImmediate(etcdRetryInterval, etcdRetryLimit*etcdRetryInterval, preflight.EtcdConnection{ServerList: s.Etcd.StorageConfig.Transport.ServerList}.CheckEtcdServers); err != nil { - lastErr = fmt.Errorf("error waiting for etcd connection: %v", err) - return + return nil, nil, nil, nil, fmt.Errorf("error waiting for etcd connection: %v", err) } } @@ -306,31 +299,31 @@ func CreateKubeAPIServerConfig( PerConnectionBandwidthLimitBytesPerSec: s.MaxConnectionBytesPerSec, }) - serviceIPRange, apiServerServiceIP, lastErr := master.ServiceIPRange(s.PrimaryServiceClusterIPRange) - if lastErr != nil { - return + serviceIPRange, apiServerServiceIP, err := master.ServiceIPRange(s.PrimaryServiceClusterIPRange) + if err != nil { + return nil, nil, nil, nil, err } // defaults to empty range and ip var secondaryServiceIPRange net.IPNet // process secondary range only if provided by user if s.SecondaryServiceClusterIPRange.IP != nil { - secondaryServiceIPRange, _, lastErr = master.ServiceIPRange(s.SecondaryServiceClusterIPRange) - if lastErr != nil { - return + secondaryServiceIPRange, _, err = master.ServiceIPRange(s.SecondaryServiceClusterIPRange) + if err != nil { + return nil, nil, nil, nil, err } } - clientCA, lastErr := readCAorNil(s.Authentication.ClientCert.ClientCA) - if lastErr != nil { - return + clientCA, err := readCAorNil(s.Authentication.ClientCert.ClientCA) + if err != nil { + return nil, nil, nil, nil, err } - requestHeaderProxyCA, lastErr := readCAorNil(s.Authentication.RequestHeader.ClientCAFile) - if lastErr != nil { - return + requestHeaderProxyCA, err := readCAorNil(s.Authentication.RequestHeader.ClientCAFile) + if err != nil { + return nil, nil, nil, nil, err } - config = &master.Config{ + config := &master.Config{ GenericConfig: genericConfig, ExtraConfig: master.ExtraConfig{ ClientCARegistrationHook: master.ClientCARegistrationHook{ @@ -369,6 +362,9 @@ func CreateKubeAPIServerConfig( VersionedInformers: versionedInformers, }, } + if err := config.GenericConfig.AddPostStartHook("start-kube-apiserver-admission-initializer", admissionPostStartHook); err != nil { + return nil, nil, nil, nil, err + } if nodeTunneler != nil { // Use the nodeTunneler's dialer to connect to the kubelet @@ -379,7 +375,7 @@ func CreateKubeAPIServerConfig( config.ExtraConfig.KubeletClientConfig.Lookup = config.GenericConfig.EgressSelector.Lookup } - return + return config, insecureServingInfo, serviceResolver, pluginInitializers, nil } // BuildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index b75422a6e09..1def7e7a853 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -23,6 +23,7 @@ import ( "net" "net/http" goruntime "runtime" + "runtime/debug" "sort" "strconv" "strings" @@ -116,6 +117,8 @@ type Config struct { EnableMetrics bool DisabledPostStartHooks sets.String + // done values in this values for this map are ignored. + PostStartHooks map[string]PostStartHookConfigEntry // Version will enable the /version endpoint if non-nil Version *version.Info @@ -282,6 +285,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config { HandlerChainWaitGroup: new(utilwaitgroup.SafeWaitGroup), LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix), DisabledPostStartHooks: sets.NewString(), + PostStartHooks: map[string]PostStartHookConfigEntry{}, HealthzChecks: append([]healthz.HealthChecker{}, defaultHealthChecks...), ReadyzChecks: append([]healthz.HealthChecker{}, defaultHealthChecks...), LivezChecks: append([]healthz.HealthChecker{}, defaultHealthChecks...), @@ -391,6 +395,36 @@ func (c *Config) AddHealthChecks(healthChecks ...healthz.HealthChecker) { } } +// AddPostStartHook allows you to add a PostStartHook that will later be added to the server itself in a New call. +// Name conflicts will cause an error. +func (c *Config) AddPostStartHook(name string, hook PostStartHookFunc) error { + if len(name) == 0 { + return fmt.Errorf("missing name") + } + if hook == nil { + return fmt.Errorf("hook func may not be nil: %q", name) + } + if c.DisabledPostStartHooks.Has(name) { + klog.V(1).Infof("skipping %q because it was explicitly disabled", name) + return nil + } + + if postStartHook, exists := c.PostStartHooks[name]; exists { + // this is programmer error, but it can be hard to debug + return fmt.Errorf("unable to add %q because it was already registered by: %s", name, postStartHook.originatingStack) + } + c.PostStartHooks[name] = PostStartHookConfigEntry{hook: hook, originatingStack: string(debug.Stack())} + + return nil +} + +// AddPostStartHookOrDie allows you to add a PostStartHook, but dies on failure. +func (c *Config) AddPostStartHookOrDie(name string, hook PostStartHookFunc) { + if err := c.AddPostStartHook(name, hook); err != nil { + klog.Fatalf("Error registering PostStartHook %q: %v", name, err) + } +} + // Complete fills in any fields not set that are required to have valid data and can be derived // from other fields. If you're going to `ApplyOptions`, do that first. It's mutating the receiver. func (c *Config) Complete(informers informers.SharedInformerFactory) CompletedConfig { @@ -552,6 +586,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G } } + // first add poststarthooks from delegated targets for k, v := range delegationTarget.PostStartHooks() { s.postStartHooks[k] = v } @@ -560,6 +595,13 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G s.preShutdownHooks[k] = v } + // add poststarthooks that were preconfigured. Using the add method will give us an error if the same name has already been registered. + for name, preconfiguredPostStartHook := range c.PostStartHooks { + if err := s.AddPostStartHook(name, preconfiguredPostStartHook.hook); err != nil { + return nil, err + } + } + genericApiServerHookName := "generic-apiserver-start-informers" if c.SharedInformerFactory != nil && !s.isPostStartHookRegistered(genericApiServerHookName) { err := s.AddPostStartHook(genericApiServerHookName, func(context PostStartHookContext) error { @@ -613,7 +655,6 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout) handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup) handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver) - handler = genericapifilters.WithCacheControl(handler) handler = genericfilters.WithPanicRecovery(handler) return handler } diff --git a/staging/src/k8s.io/apiserver/pkg/server/hooks.go b/staging/src/k8s.io/apiserver/pkg/server/hooks.go index fba876d2573..04e7f830234 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/hooks.go +++ b/staging/src/k8s.io/apiserver/pkg/server/hooks.go @@ -66,6 +66,13 @@ type postStartHookEntry struct { done chan struct{} } +type PostStartHookConfigEntry struct { + hook PostStartHookFunc + // originatingStack holds the stack that registered postStartHooks. This allows us to show a more helpful message + // for duplicate registration. + originatingStack string +} + type preShutdownHookEntry struct { hook PreShutdownHookFunc } @@ -76,9 +83,10 @@ func (s *GenericAPIServer) AddPostStartHook(name string, hook PostStartHookFunc) return fmt.Errorf("missing name") } if hook == nil { - return nil + return fmt.Errorf("hook func may not be nil: %q", name) } if s.disabledPostStartHooks.Has(name) { + klog.V(1).Infof("skipping %q because it was explicitly disabled", name) return nil } diff --git a/test/integration/examples/apiserver_test.go b/test/integration/examples/apiserver_test.go index b93387f5616..e10af0c3318 100644 --- a/test/integration/examples/apiserver_test.go +++ b/test/integration/examples/apiserver_test.go @@ -118,7 +118,7 @@ func TestAggregatedAPIServer(t *testing.T) { if err != nil { t.Fatal(err) } - kubeAPIServerConfig, _, _, _, admissionPostStartHook, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport) + kubeAPIServerConfig, _, _, _, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport) if err != nil { t.Fatal(err) } @@ -129,7 +129,7 @@ func TestAggregatedAPIServer(t *testing.T) { kubeAPIServerClientConfig.ServerName = "" kubeClientConfigValue.Store(kubeAPIServerClientConfig) - kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.NewEmptyDelegate(), admissionPostStartHook) + kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.NewEmptyDelegate()) if err != nil { t.Fatal(err) } diff --git a/test/integration/framework/test_server.go b/test/integration/framework/test_server.go index a92e1d2f49c..c0f15e75972 100644 --- a/test/integration/framework/test_server.go +++ b/test/integration/framework/test_server.go @@ -113,7 +113,7 @@ func StartTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup if err != nil { t.Fatal(err) } - kubeAPIServerConfig, _, _, _, admissionPostStartHook, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport) + kubeAPIServerConfig, _, _, _, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport) if err != nil { t.Fatal(err) } @@ -121,7 +121,7 @@ func StartTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup if setup.ModifyServerConfig != nil { setup.ModifyServerConfig(kubeAPIServerConfig) } - kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.NewEmptyDelegate(), admissionPostStartHook) + kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.NewEmptyDelegate()) if err != nil { t.Fatal(err) }