mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-26 05:03:09 +00:00
add ability to pre-configure poststarthooks for apiservers
This commit is contained in:
parent
c2c821534b
commit
f14f4c933e
@ -64,6 +64,8 @@ func createAggregatorConfig(
|
|||||||
// make a shallow copy to let us twiddle a few things
|
// make a shallow copy to let us twiddle a few things
|
||||||
// most of the config actually remains the same. We only need to mess with a couple items related to the particulars of the aggregator
|
// most of the config actually remains the same. We only need to mess with a couple items related to the particulars of the aggregator
|
||||||
genericConfig := kubeAPIServerConfig
|
genericConfig := kubeAPIServerConfig
|
||||||
|
genericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{}
|
||||||
|
genericConfig.RESTOptionsGetter = nil
|
||||||
|
|
||||||
// override genericConfig.AdmissionControl with kube-aggregator's scheme,
|
// override genericConfig.AdmissionControl with kube-aggregator's scheme,
|
||||||
// because aggregator apiserver should use its own scheme to convert its own resources.
|
// because aggregator apiserver should use its own scheme to convert its own resources.
|
||||||
|
@ -48,6 +48,8 @@ func createAPIExtensionsConfig(
|
|||||||
// make a shallow copy to let us twiddle a few things
|
// make a shallow copy to let us twiddle a few things
|
||||||
// most of the config actually remains the same. We only need to mess with a couple items related to the particulars of the apiextensions
|
// most of the config actually remains the same. We only need to mess with a couple items related to the particulars of the apiextensions
|
||||||
genericConfig := kubeAPIServerConfig
|
genericConfig := kubeAPIServerConfig
|
||||||
|
genericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{}
|
||||||
|
genericConfig.RESTOptionsGetter = nil
|
||||||
|
|
||||||
// override genericConfig.AdmissionControl with apiextensions' scheme,
|
// override genericConfig.AdmissionControl with apiextensions' scheme,
|
||||||
// because apiextentions apiserver should use its own scheme to convert resources.
|
// because apiextentions apiserver should use its own scheme to convert resources.
|
||||||
|
@ -168,7 +168,7 @@ func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, admissionPostStartHook, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
|
kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -184,7 +184,7 @@ func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, admissionPostStartHook)
|
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -211,14 +211,12 @@ func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan
|
|||||||
}
|
}
|
||||||
|
|
||||||
// CreateKubeAPIServer creates and wires a workable kube-apiserver
|
// CreateKubeAPIServer creates and wires a workable kube-apiserver
|
||||||
func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget, admissionPostStartHook genericapiserver.PostStartHookFunc) (*master.Master, error) {
|
func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget) (*master.Master, error) {
|
||||||
kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
|
kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
kubeAPIServer.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-admission-initializer", admissionPostStartHook)
|
|
||||||
|
|
||||||
return kubeAPIServer, nil
|
return kubeAPIServer, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -273,25 +271,20 @@ func CreateKubeAPIServerConfig(
|
|||||||
nodeTunneler tunneler.Tunneler,
|
nodeTunneler tunneler.Tunneler,
|
||||||
proxyTransport *http.Transport,
|
proxyTransport *http.Transport,
|
||||||
) (
|
) (
|
||||||
config *master.Config,
|
*master.Config,
|
||||||
insecureServingInfo *genericapiserver.DeprecatedInsecureServingInfo,
|
*genericapiserver.DeprecatedInsecureServingInfo,
|
||||||
serviceResolver aggregatorapiserver.ServiceResolver,
|
aggregatorapiserver.ServiceResolver,
|
||||||
pluginInitializers []admission.PluginInitializer,
|
[]admission.PluginInitializer,
|
||||||
admissionPostStartHook genericapiserver.PostStartHookFunc,
|
error,
|
||||||
lastErr error,
|
|
||||||
) {
|
) {
|
||||||
var genericConfig *genericapiserver.Config
|
genericConfig, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)
|
||||||
var storageFactory *serverstorage.DefaultStorageFactory
|
if err != nil {
|
||||||
var versionedInformers clientgoinformers.SharedInformerFactory
|
return nil, nil, nil, nil, err
|
||||||
genericConfig, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, lastErr = buildGenericConfig(s.ServerRunOptions, proxyTransport)
|
|
||||||
if lastErr != nil {
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, port, err := net.SplitHostPort(s.Etcd.StorageConfig.Transport.ServerList[0]); err == nil && port != "0" && len(port) != 0 {
|
if _, port, err := net.SplitHostPort(s.Etcd.StorageConfig.Transport.ServerList[0]); err == nil && port != "0" && len(port) != 0 {
|
||||||
if err := utilwait.PollImmediate(etcdRetryInterval, etcdRetryLimit*etcdRetryInterval, preflight.EtcdConnection{ServerList: s.Etcd.StorageConfig.Transport.ServerList}.CheckEtcdServers); err != nil {
|
if err := utilwait.PollImmediate(etcdRetryInterval, etcdRetryLimit*etcdRetryInterval, preflight.EtcdConnection{ServerList: s.Etcd.StorageConfig.Transport.ServerList}.CheckEtcdServers); err != nil {
|
||||||
lastErr = fmt.Errorf("error waiting for etcd connection: %v", err)
|
return nil, nil, nil, nil, fmt.Errorf("error waiting for etcd connection: %v", err)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -306,31 +299,31 @@ func CreateKubeAPIServerConfig(
|
|||||||
PerConnectionBandwidthLimitBytesPerSec: s.MaxConnectionBytesPerSec,
|
PerConnectionBandwidthLimitBytesPerSec: s.MaxConnectionBytesPerSec,
|
||||||
})
|
})
|
||||||
|
|
||||||
serviceIPRange, apiServerServiceIP, lastErr := master.ServiceIPRange(s.PrimaryServiceClusterIPRange)
|
serviceIPRange, apiServerServiceIP, err := master.ServiceIPRange(s.PrimaryServiceClusterIPRange)
|
||||||
if lastErr != nil {
|
if err != nil {
|
||||||
return
|
return nil, nil, nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// defaults to empty range and ip
|
// defaults to empty range and ip
|
||||||
var secondaryServiceIPRange net.IPNet
|
var secondaryServiceIPRange net.IPNet
|
||||||
// process secondary range only if provided by user
|
// process secondary range only if provided by user
|
||||||
if s.SecondaryServiceClusterIPRange.IP != nil {
|
if s.SecondaryServiceClusterIPRange.IP != nil {
|
||||||
secondaryServiceIPRange, _, lastErr = master.ServiceIPRange(s.SecondaryServiceClusterIPRange)
|
secondaryServiceIPRange, _, err = master.ServiceIPRange(s.SecondaryServiceClusterIPRange)
|
||||||
if lastErr != nil {
|
if err != nil {
|
||||||
return
|
return nil, nil, nil, nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
clientCA, lastErr := readCAorNil(s.Authentication.ClientCert.ClientCA)
|
clientCA, err := readCAorNil(s.Authentication.ClientCert.ClientCA)
|
||||||
if lastErr != nil {
|
if err != nil {
|
||||||
return
|
return nil, nil, nil, nil, err
|
||||||
}
|
}
|
||||||
requestHeaderProxyCA, lastErr := readCAorNil(s.Authentication.RequestHeader.ClientCAFile)
|
requestHeaderProxyCA, err := readCAorNil(s.Authentication.RequestHeader.ClientCAFile)
|
||||||
if lastErr != nil {
|
if err != nil {
|
||||||
return
|
return nil, nil, nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
config = &master.Config{
|
config := &master.Config{
|
||||||
GenericConfig: genericConfig,
|
GenericConfig: genericConfig,
|
||||||
ExtraConfig: master.ExtraConfig{
|
ExtraConfig: master.ExtraConfig{
|
||||||
ClientCARegistrationHook: master.ClientCARegistrationHook{
|
ClientCARegistrationHook: master.ClientCARegistrationHook{
|
||||||
@ -369,6 +362,9 @@ func CreateKubeAPIServerConfig(
|
|||||||
VersionedInformers: versionedInformers,
|
VersionedInformers: versionedInformers,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
if err := config.GenericConfig.AddPostStartHook("start-kube-apiserver-admission-initializer", admissionPostStartHook); err != nil {
|
||||||
|
return nil, nil, nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
if nodeTunneler != nil {
|
if nodeTunneler != nil {
|
||||||
// Use the nodeTunneler's dialer to connect to the kubelet
|
// Use the nodeTunneler's dialer to connect to the kubelet
|
||||||
@ -379,7 +375,7 @@ func CreateKubeAPIServerConfig(
|
|||||||
config.ExtraConfig.KubeletClientConfig.Lookup = config.GenericConfig.EgressSelector.Lookup
|
config.ExtraConfig.KubeletClientConfig.Lookup = config.GenericConfig.EgressSelector.Lookup
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return config, insecureServingInfo, serviceResolver, pluginInitializers, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// BuildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it
|
// BuildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it
|
||||||
|
@ -23,6 +23,7 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
goruntime "runtime"
|
goruntime "runtime"
|
||||||
|
"runtime/debug"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
@ -116,6 +117,8 @@ type Config struct {
|
|||||||
EnableMetrics bool
|
EnableMetrics bool
|
||||||
|
|
||||||
DisabledPostStartHooks sets.String
|
DisabledPostStartHooks sets.String
|
||||||
|
// done values in this values for this map are ignored.
|
||||||
|
PostStartHooks map[string]PostStartHookConfigEntry
|
||||||
|
|
||||||
// Version will enable the /version endpoint if non-nil
|
// Version will enable the /version endpoint if non-nil
|
||||||
Version *version.Info
|
Version *version.Info
|
||||||
@ -282,6 +285,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
|
|||||||
HandlerChainWaitGroup: new(utilwaitgroup.SafeWaitGroup),
|
HandlerChainWaitGroup: new(utilwaitgroup.SafeWaitGroup),
|
||||||
LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix),
|
LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix),
|
||||||
DisabledPostStartHooks: sets.NewString(),
|
DisabledPostStartHooks: sets.NewString(),
|
||||||
|
PostStartHooks: map[string]PostStartHookConfigEntry{},
|
||||||
HealthzChecks: append([]healthz.HealthChecker{}, defaultHealthChecks...),
|
HealthzChecks: append([]healthz.HealthChecker{}, defaultHealthChecks...),
|
||||||
ReadyzChecks: append([]healthz.HealthChecker{}, defaultHealthChecks...),
|
ReadyzChecks: append([]healthz.HealthChecker{}, defaultHealthChecks...),
|
||||||
LivezChecks: append([]healthz.HealthChecker{}, defaultHealthChecks...),
|
LivezChecks: append([]healthz.HealthChecker{}, defaultHealthChecks...),
|
||||||
@ -391,6 +395,36 @@ func (c *Config) AddHealthChecks(healthChecks ...healthz.HealthChecker) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AddPostStartHook allows you to add a PostStartHook that will later be added to the server itself in a New call.
|
||||||
|
// Name conflicts will cause an error.
|
||||||
|
func (c *Config) AddPostStartHook(name string, hook PostStartHookFunc) error {
|
||||||
|
if len(name) == 0 {
|
||||||
|
return fmt.Errorf("missing name")
|
||||||
|
}
|
||||||
|
if hook == nil {
|
||||||
|
return fmt.Errorf("hook func may not be nil: %q", name)
|
||||||
|
}
|
||||||
|
if c.DisabledPostStartHooks.Has(name) {
|
||||||
|
klog.V(1).Infof("skipping %q because it was explicitly disabled", name)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if postStartHook, exists := c.PostStartHooks[name]; exists {
|
||||||
|
// this is programmer error, but it can be hard to debug
|
||||||
|
return fmt.Errorf("unable to add %q because it was already registered by: %s", name, postStartHook.originatingStack)
|
||||||
|
}
|
||||||
|
c.PostStartHooks[name] = PostStartHookConfigEntry{hook: hook, originatingStack: string(debug.Stack())}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddPostStartHookOrDie allows you to add a PostStartHook, but dies on failure.
|
||||||
|
func (c *Config) AddPostStartHookOrDie(name string, hook PostStartHookFunc) {
|
||||||
|
if err := c.AddPostStartHook(name, hook); err != nil {
|
||||||
|
klog.Fatalf("Error registering PostStartHook %q: %v", name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Complete fills in any fields not set that are required to have valid data and can be derived
|
// Complete fills in any fields not set that are required to have valid data and can be derived
|
||||||
// from other fields. If you're going to `ApplyOptions`, do that first. It's mutating the receiver.
|
// from other fields. If you're going to `ApplyOptions`, do that first. It's mutating the receiver.
|
||||||
func (c *Config) Complete(informers informers.SharedInformerFactory) CompletedConfig {
|
func (c *Config) Complete(informers informers.SharedInformerFactory) CompletedConfig {
|
||||||
@ -552,6 +586,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// first add poststarthooks from delegated targets
|
||||||
for k, v := range delegationTarget.PostStartHooks() {
|
for k, v := range delegationTarget.PostStartHooks() {
|
||||||
s.postStartHooks[k] = v
|
s.postStartHooks[k] = v
|
||||||
}
|
}
|
||||||
@ -560,6 +595,13 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
|
|||||||
s.preShutdownHooks[k] = v
|
s.preShutdownHooks[k] = v
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// add poststarthooks that were preconfigured. Using the add method will give us an error if the same name has already been registered.
|
||||||
|
for name, preconfiguredPostStartHook := range c.PostStartHooks {
|
||||||
|
if err := s.AddPostStartHook(name, preconfiguredPostStartHook.hook); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
genericApiServerHookName := "generic-apiserver-start-informers"
|
genericApiServerHookName := "generic-apiserver-start-informers"
|
||||||
if c.SharedInformerFactory != nil && !s.isPostStartHookRegistered(genericApiServerHookName) {
|
if c.SharedInformerFactory != nil && !s.isPostStartHookRegistered(genericApiServerHookName) {
|
||||||
err := s.AddPostStartHook(genericApiServerHookName, func(context PostStartHookContext) error {
|
err := s.AddPostStartHook(genericApiServerHookName, func(context PostStartHookContext) error {
|
||||||
@ -613,7 +655,6 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
|
|||||||
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout)
|
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout)
|
||||||
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
|
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
|
||||||
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
|
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
|
||||||
handler = genericapifilters.WithCacheControl(handler)
|
|
||||||
handler = genericfilters.WithPanicRecovery(handler)
|
handler = genericfilters.WithPanicRecovery(handler)
|
||||||
return handler
|
return handler
|
||||||
}
|
}
|
||||||
|
@ -66,6 +66,13 @@ type postStartHookEntry struct {
|
|||||||
done chan struct{}
|
done chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PostStartHookConfigEntry struct {
|
||||||
|
hook PostStartHookFunc
|
||||||
|
// originatingStack holds the stack that registered postStartHooks. This allows us to show a more helpful message
|
||||||
|
// for duplicate registration.
|
||||||
|
originatingStack string
|
||||||
|
}
|
||||||
|
|
||||||
type preShutdownHookEntry struct {
|
type preShutdownHookEntry struct {
|
||||||
hook PreShutdownHookFunc
|
hook PreShutdownHookFunc
|
||||||
}
|
}
|
||||||
@ -76,9 +83,10 @@ func (s *GenericAPIServer) AddPostStartHook(name string, hook PostStartHookFunc)
|
|||||||
return fmt.Errorf("missing name")
|
return fmt.Errorf("missing name")
|
||||||
}
|
}
|
||||||
if hook == nil {
|
if hook == nil {
|
||||||
return nil
|
return fmt.Errorf("hook func may not be nil: %q", name)
|
||||||
}
|
}
|
||||||
if s.disabledPostStartHooks.Has(name) {
|
if s.disabledPostStartHooks.Has(name) {
|
||||||
|
klog.V(1).Infof("skipping %q because it was explicitly disabled", name)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -118,7 +118,7 @@ func TestAggregatedAPIServer(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
kubeAPIServerConfig, _, _, _, admissionPostStartHook, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport)
|
kubeAPIServerConfig, _, _, _, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -129,7 +129,7 @@ func TestAggregatedAPIServer(t *testing.T) {
|
|||||||
kubeAPIServerClientConfig.ServerName = ""
|
kubeAPIServerClientConfig.ServerName = ""
|
||||||
kubeClientConfigValue.Store(kubeAPIServerClientConfig)
|
kubeClientConfigValue.Store(kubeAPIServerClientConfig)
|
||||||
|
|
||||||
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.NewEmptyDelegate(), admissionPostStartHook)
|
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.NewEmptyDelegate())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -113,7 +113,7 @@ func StartTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
kubeAPIServerConfig, _, _, _, admissionPostStartHook, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport)
|
kubeAPIServerConfig, _, _, _, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -121,7 +121,7 @@ func StartTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup
|
|||||||
if setup.ModifyServerConfig != nil {
|
if setup.ModifyServerConfig != nil {
|
||||||
setup.ModifyServerConfig(kubeAPIServerConfig)
|
setup.ModifyServerConfig(kubeAPIServerConfig)
|
||||||
}
|
}
|
||||||
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.NewEmptyDelegate(), admissionPostStartHook)
|
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.NewEmptyDelegate())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user