diff --git a/pkg/controlplane/apiserver/config.go b/pkg/controlplane/apiserver/config.go index 52eed984f43..898225d6df0 100644 --- a/pkg/controlplane/apiserver/config.go +++ b/pkg/controlplane/apiserver/config.go @@ -35,7 +35,6 @@ import ( serverstorage "k8s.io/apiserver/pkg/server/storage" "k8s.io/apiserver/pkg/storageversion" utilfeature "k8s.io/apiserver/pkg/util/feature" - utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" "k8s.io/apiserver/pkg/util/openapi" utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy" clientgoinformers "k8s.io/client-go/informers" @@ -76,7 +75,25 @@ func BuildGenericConfig( if lastErr = s.SecureServing.ApplyTo(&genericConfig.SecureServing, &genericConfig.LoopbackClientConfig); lastErr != nil { return } - if lastErr = s.Features.ApplyTo(genericConfig); lastErr != nil { + + // Use protobufs for self-communication. + // Since not every generic apiserver has to support protobufs, we + // cannot default to it in generic apiserver and need to explicitly + // set it in kube-apiserver. + genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf" + // Disable compression for self-communication, since we are going to be + // on a fast local network + genericConfig.LoopbackClientConfig.DisableCompression = true + + kubeClientConfig := genericConfig.LoopbackClientConfig + clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeClientConfig) + if err != nil { + lastErr = fmt.Errorf("failed to create real external clientset: %v", err) + return + } + versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute) + + if lastErr = s.Features.ApplyTo(genericConfig, clientgoExternalClient, versionedInformers); lastErr != nil { return } if lastErr = s.APIEnablement.ApplyTo(genericConfig, controlplane.DefaultAPIResourceConfigSource(), legacyscheme.Scheme); lastErr != nil { @@ -125,23 +142,6 @@ func BuildGenericConfig( return } - // Use protobufs for self-communication. - // Since not every generic apiserver has to support protobufs, we - // cannot default to it in generic apiserver and need to explicitly - // set it in kube-apiserver. - genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf" - // Disable compression for self-communication, since we are going to be - // on a fast local network - genericConfig.LoopbackClientConfig.DisableCompression = true - - kubeClientConfig := genericConfig.LoopbackClientConfig - clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeClientConfig) - if err != nil { - lastErr = fmt.Errorf("failed to create real external clientset: %v", err) - return - } - versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute) - // Authentication.ApplyTo requires already applied OpenAPIConfig and EgressSelector if present if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, genericConfig.OpenAPIV3Config, clientgoExternalClient, versionedInformers); lastErr != nil { return @@ -161,10 +161,6 @@ func BuildGenericConfig( return } - if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) && s.GenericServerRunOptions.EnablePriorityAndFairness { - genericConfig.FlowControl, lastErr = BuildPriorityAndFairness(s, clientgoExternalClient, versionedInformers) - } - if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) { genericConfig.AggregatedDiscoveryGroupManager = aggregated.NewResourceManager("apis") } @@ -193,18 +189,6 @@ func BuildAuthorizer(s controlplaneapiserver.CompletedOptions, EgressSelector *e return authorizationConfig.New() } -// BuildPriorityAndFairness constructs the guts of the API Priority and Fairness filter -func BuildPriorityAndFairness(s controlplaneapiserver.CompletedOptions, extclient clientgoclientset.Interface, versionedInformer clientgoinformers.SharedInformerFactory) (utilflowcontrol.Interface, error) { - if s.GenericServerRunOptions.MaxRequestsInFlight+s.GenericServerRunOptions.MaxMutatingRequestsInFlight <= 0 { - return nil, fmt.Errorf("invalid configuration: MaxRequestsInFlight=%d and MaxMutatingRequestsInFlight=%d; they must add up to something positive", s.GenericServerRunOptions.MaxRequestsInFlight, s.GenericServerRunOptions.MaxMutatingRequestsInFlight) - } - return utilflowcontrol.New( - versionedInformer, - extclient.FlowcontrolV1beta3(), - s.GenericServerRunOptions.MaxRequestsInFlight+s.GenericServerRunOptions.MaxMutatingRequestsInFlight, - ), nil -} - // CreatePeerEndpointLeaseReconciler creates a apiserver endpoint lease reconciliation loop // The peer endpoint leases are used to find network locations of apiservers for peer proxy func CreatePeerEndpointLeaseReconciler(c genericapiserver.Config, storageFactory serverstorage.StorageFactory) (reconcilers.PeerEndpointLeaseReconciler, error) { diff --git a/pkg/controlplane/apiserver/options/validation.go b/pkg/controlplane/apiserver/options/validation.go index 879833ed2a7..a1b7fcb2ca4 100644 --- a/pkg/controlplane/apiserver/options/validation.go +++ b/pkg/controlplane/apiserver/options/validation.go @@ -51,7 +51,7 @@ func validateTokenRequest(options *Options) []error { } func validateAPIPriorityAndFairness(options *Options) []error { - if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) && options.GenericServerRunOptions.EnablePriorityAndFairness { + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) && options.Features.EnablePriorityAndFairness { // If none of the following runtime config options are specified, // APF is assumed to be turned on. The internal APF controller uses // v1beta3 so it should be enabled. diff --git a/pkg/controlplane/apiserver/options/validation_test.go b/pkg/controlplane/apiserver/options/validation_test.go index b9327d0d429..13f1506dc5f 100644 --- a/pkg/controlplane/apiserver/options/validation_test.go +++ b/pkg/controlplane/apiserver/options/validation_test.go @@ -67,7 +67,7 @@ func TestValidateAPIPriorityAndFairness(t *testing.T) { for _, test := range tests { t.Run(test.runtimeConfig, func(t *testing.T) { options := &Options{ - GenericServerRunOptions: &genericoptions.ServerRunOptions{ + Features: &genericoptions.FeatureOptions{ EnablePriorityAndFairness: true, }, APIEnablement: genericoptions.NewAPIEnablementOptions(), @@ -192,6 +192,7 @@ func TestValidateOptions(t *testing.T) { APIEnablement: genericoptions.NewAPIEnablementOptions(), Metrics: &basemetrics.Options{}, ServiceAccountSigningKeyFile: "", + Features: &genericoptions.FeatureOptions{}, }, }, { @@ -215,6 +216,7 @@ func TestValidateOptions(t *testing.T) { APIEnablement: genericoptions.NewAPIEnablementOptions(), Metrics: &basemetrics.Options{}, ServiceAccountSigningKeyFile: "", + Features: &genericoptions.FeatureOptions{}, }, }, } diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/feature.go b/staging/src/k8s.io/apiserver/pkg/server/options/feature.go index 35596fba692..0f2ba362704 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/feature.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/feature.go @@ -17,16 +17,24 @@ limitations under the License. package options import ( + "fmt" + "github.com/spf13/pflag" "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/server" + "k8s.io/apiserver/pkg/util/feature" + utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" ) type FeatureOptions struct { EnableProfiling bool DebugSocketPath string EnableContentionProfiling bool + EnablePriorityAndFairness bool } func NewFeatureOptions() *FeatureOptions { @@ -36,6 +44,7 @@ func NewFeatureOptions() *FeatureOptions { EnableProfiling: defaults.EnableProfiling, DebugSocketPath: defaults.DebugSocketPath, EnableContentionProfiling: defaults.EnableContentionProfiling, + EnablePriorityAndFairness: true, } } @@ -50,9 +59,11 @@ func (o *FeatureOptions) AddFlags(fs *pflag.FlagSet) { "Enable block profiling, if profiling is enabled") fs.StringVar(&o.DebugSocketPath, "debug-socket-path", o.DebugSocketPath, "Use an unprotected (no authn/authz) unix-domain socket for profiling with the given path") + fs.BoolVar(&o.EnablePriorityAndFairness, "enable-priority-and-fairness", o.EnablePriorityAndFairness, ""+ + "If true and the APIPriorityAndFairness feature gate is enabled, replace the max-in-flight handler with an enhanced one that queues and dispatches with priority and fairness") } -func (o *FeatureOptions) ApplyTo(c *server.Config) error { +func (o *FeatureOptions) ApplyTo(c *server.Config, clientset kubernetes.Interface, informers informers.SharedInformerFactory) error { if o == nil { return nil } @@ -61,6 +72,18 @@ func (o *FeatureOptions) ApplyTo(c *server.Config) error { c.DebugSocketPath = o.DebugSocketPath c.EnableContentionProfiling = o.EnableContentionProfiling + if o.EnablePriorityAndFairness && feature.DefaultFeatureGate.Enabled(features.APIPriorityAndFairness) { + if c.MaxRequestsInFlight+c.MaxMutatingRequestsInFlight <= 0 { + return fmt.Errorf("invalid configuration: MaxRequestsInFlight=%d and MaxMutatingRequestsInFlight=%d; they must add up to something positive", c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight) + + } + c.FlowControl = utilflowcontrol.New( + informers, + clientset.FlowcontrolV1beta3(), + c.MaxRequestsInFlight+c.MaxMutatingRequestsInFlight, + ) + } + return nil } diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go b/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go index 5d031e202e0..eb7e67b3670 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go @@ -17,20 +17,15 @@ limitations under the License. package options import ( - "fmt" - "github.com/spf13/pflag" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apiserver/pkg/admission" - "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/util/feature" - utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/component-base/featuregate" - "k8s.io/klog/v2" ) // RecommendedOptions contains the recommended options for running an API server. @@ -122,17 +117,17 @@ func (o *RecommendedOptions) ApplyTo(config *server.RecommendedConfig) error { if err := o.Audit.ApplyTo(&config.Config); err != nil { return err } - if err := o.Features.ApplyTo(&config.Config); err != nil { - return err - } if err := o.CoreAPI.ApplyTo(config); err != nil { return err } - initializers, err := o.ExtraAdmissionInitializers(config) + kubeClient, err := kubernetes.NewForConfig(config.ClientConfig) if err != nil { return err } - kubeClient, err := kubernetes.NewForConfig(config.ClientConfig) + if err := o.Features.ApplyTo(&config.Config, kubeClient, config.SharedInformerFactory); err != nil { + return err + } + initializers, err := o.ExtraAdmissionInitializers(config) if err != nil { return err } @@ -144,21 +139,6 @@ func (o *RecommendedOptions) ApplyTo(config *server.RecommendedConfig) error { initializers...); err != nil { return err } - if feature.DefaultFeatureGate.Enabled(features.APIPriorityAndFairness) { - if config.ClientConfig != nil { - if config.MaxRequestsInFlight+config.MaxMutatingRequestsInFlight <= 0 { - return fmt.Errorf("invalid configuration: MaxRequestsInFlight=%d and MaxMutatingRequestsInFlight=%d; they must add up to something positive", config.MaxRequestsInFlight, config.MaxMutatingRequestsInFlight) - - } - config.FlowControl = utilflowcontrol.New( - config.SharedInformerFactory, - kubernetes.NewForConfigOrDie(config.ClientConfig).FlowcontrolV1beta3(), - config.MaxRequestsInFlight+config.MaxMutatingRequestsInFlight, - ) - } else { - klog.Warningf("Neither kubeconfig is provided nor service-account is mounted, so APIPriorityAndFairness will be disabled") - } - } return nil } diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go b/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go index f9d574d5d2c..1373d8a4d73 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go @@ -62,8 +62,7 @@ type ServerRunOptions struct { // decoded in a write request. 0 means no limit. // We intentionally did not add a flag for this option. Users of the // apiserver library can wire it to a flag. - MaxRequestBodyBytes int64 - EnablePriorityAndFairness bool + MaxRequestBodyBytes int64 // ShutdownSendRetryAfter dictates when to initiate shutdown of the HTTP // Server during the graceful termination of the apiserver. If true, we wait @@ -104,7 +103,6 @@ func NewServerRunOptions() *ServerRunOptions { ShutdownWatchTerminationGracePeriod: defaults.ShutdownWatchTerminationGracePeriod, JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes, MaxRequestBodyBytes: defaults.MaxRequestBodyBytes, - EnablePriorityAndFairness: true, ShutdownSendRetryAfter: false, } } @@ -325,9 +323,6 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) { "handler, which picks a randomized value above this number as the connection timeout, "+ "to spread out load.") - fs.BoolVar(&s.EnablePriorityAndFairness, "enable-priority-and-fairness", s.EnablePriorityAndFairness, ""+ - "If true and the APIPriorityAndFairness feature gate is enabled, replace the max-in-flight handler with an enhanced one that queues and dispatches with priority and fairness") - fs.DurationVar(&s.ShutdownDelayDuration, "shutdown-delay-duration", s.ShutdownDelayDuration, ""+ "Time to delay the termination. During that time the server keeps serving requests normally. The endpoints /healthz and /livez "+ "will return success, but /readyz immediately returns failure. Graceful termination starts after this delay "+