Merge pull request #118886 from benluddy/apf-option-disable

KEP-1040: Deep disablement for APF based on --enable-priority-and-fairness.
This commit is contained in:
Kubernetes Prow Robot 2023-10-30 09:38:59 +01:00 committed by GitHub
commit 38ed3ef7b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 53 additions and 69 deletions

View File

@ -35,7 +35,6 @@ import (
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storageversion"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/apiserver/pkg/util/openapi"
utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
clientgoinformers "k8s.io/client-go/informers"
@ -76,7 +75,25 @@ func BuildGenericConfig(
if lastErr = s.SecureServing.ApplyTo(&genericConfig.SecureServing, &genericConfig.LoopbackClientConfig); lastErr != nil {
return
}
if lastErr = s.Features.ApplyTo(genericConfig); lastErr != nil {
// Use protobufs for self-communication.
// Since not every generic apiserver has to support protobufs, we
// cannot default to it in generic apiserver and need to explicitly
// set it in kube-apiserver.
genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf"
// Disable compression for self-communication, since we are going to be
// on a fast local network
genericConfig.LoopbackClientConfig.DisableCompression = true
kubeClientConfig := genericConfig.LoopbackClientConfig
clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeClientConfig)
if err != nil {
lastErr = fmt.Errorf("failed to create real external clientset: %v", err)
return
}
versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)
if lastErr = s.Features.ApplyTo(genericConfig, clientgoExternalClient, versionedInformers); lastErr != nil {
return
}
if lastErr = s.APIEnablement.ApplyTo(genericConfig, controlplane.DefaultAPIResourceConfigSource(), legacyscheme.Scheme); lastErr != nil {
@ -125,23 +142,6 @@ func BuildGenericConfig(
return
}
// Use protobufs for self-communication.
// Since not every generic apiserver has to support protobufs, we
// cannot default to it in generic apiserver and need to explicitly
// set it in kube-apiserver.
genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf"
// Disable compression for self-communication, since we are going to be
// on a fast local network
genericConfig.LoopbackClientConfig.DisableCompression = true
kubeClientConfig := genericConfig.LoopbackClientConfig
clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeClientConfig)
if err != nil {
lastErr = fmt.Errorf("failed to create real external clientset: %v", err)
return
}
versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)
// Authentication.ApplyTo requires already applied OpenAPIConfig and EgressSelector if present
if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, genericConfig.OpenAPIV3Config, clientgoExternalClient, versionedInformers); lastErr != nil {
return
@ -161,10 +161,6 @@ func BuildGenericConfig(
return
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) && s.GenericServerRunOptions.EnablePriorityAndFairness {
genericConfig.FlowControl, lastErr = BuildPriorityAndFairness(s, clientgoExternalClient, versionedInformers)
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {
genericConfig.AggregatedDiscoveryGroupManager = aggregated.NewResourceManager("apis")
}
@ -193,18 +189,6 @@ func BuildAuthorizer(s controlplaneapiserver.CompletedOptions, EgressSelector *e
return authorizationConfig.New()
}
// BuildPriorityAndFairness constructs the guts of the API Priority and Fairness filter
func BuildPriorityAndFairness(s controlplaneapiserver.CompletedOptions, extclient clientgoclientset.Interface, versionedInformer clientgoinformers.SharedInformerFactory) (utilflowcontrol.Interface, error) {
if s.GenericServerRunOptions.MaxRequestsInFlight+s.GenericServerRunOptions.MaxMutatingRequestsInFlight <= 0 {
return nil, fmt.Errorf("invalid configuration: MaxRequestsInFlight=%d and MaxMutatingRequestsInFlight=%d; they must add up to something positive", s.GenericServerRunOptions.MaxRequestsInFlight, s.GenericServerRunOptions.MaxMutatingRequestsInFlight)
}
return utilflowcontrol.New(
versionedInformer,
extclient.FlowcontrolV1beta3(),
s.GenericServerRunOptions.MaxRequestsInFlight+s.GenericServerRunOptions.MaxMutatingRequestsInFlight,
), nil
}
// CreatePeerEndpointLeaseReconciler creates a apiserver endpoint lease reconciliation loop
// The peer endpoint leases are used to find network locations of apiservers for peer proxy
func CreatePeerEndpointLeaseReconciler(c genericapiserver.Config, storageFactory serverstorage.StorageFactory) (reconcilers.PeerEndpointLeaseReconciler, error) {

View File

@ -51,7 +51,7 @@ func validateTokenRequest(options *Options) []error {
}
func validateAPIPriorityAndFairness(options *Options) []error {
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) && options.GenericServerRunOptions.EnablePriorityAndFairness {
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) && options.Features.EnablePriorityAndFairness {
// If none of the following runtime config options are specified,
// APF is assumed to be turned on. The internal APF controller uses
// v1beta3 so it should be enabled.

View File

@ -67,7 +67,7 @@ func TestValidateAPIPriorityAndFairness(t *testing.T) {
for _, test := range tests {
t.Run(test.runtimeConfig, func(t *testing.T) {
options := &Options{
GenericServerRunOptions: &genericoptions.ServerRunOptions{
Features: &genericoptions.FeatureOptions{
EnablePriorityAndFairness: true,
},
APIEnablement: genericoptions.NewAPIEnablementOptions(),
@ -192,6 +192,7 @@ func TestValidateOptions(t *testing.T) {
APIEnablement: genericoptions.NewAPIEnablementOptions(),
Metrics: &basemetrics.Options{},
ServiceAccountSigningKeyFile: "",
Features: &genericoptions.FeatureOptions{},
},
},
{
@ -215,6 +216,7 @@ func TestValidateOptions(t *testing.T) {
APIEnablement: genericoptions.NewAPIEnablementOptions(),
Metrics: &basemetrics.Options{},
ServiceAccountSigningKeyFile: "",
Features: &genericoptions.FeatureOptions{},
},
},
}

View File

@ -17,16 +17,24 @@ limitations under the License.
package options
import (
"fmt"
"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
)
type FeatureOptions struct {
EnableProfiling bool
DebugSocketPath string
EnableContentionProfiling bool
EnablePriorityAndFairness bool
}
func NewFeatureOptions() *FeatureOptions {
@ -36,6 +44,7 @@ func NewFeatureOptions() *FeatureOptions {
EnableProfiling: defaults.EnableProfiling,
DebugSocketPath: defaults.DebugSocketPath,
EnableContentionProfiling: defaults.EnableContentionProfiling,
EnablePriorityAndFairness: true,
}
}
@ -50,9 +59,11 @@ func (o *FeatureOptions) AddFlags(fs *pflag.FlagSet) {
"Enable block profiling, if profiling is enabled")
fs.StringVar(&o.DebugSocketPath, "debug-socket-path", o.DebugSocketPath,
"Use an unprotected (no authn/authz) unix-domain socket for profiling with the given path")
fs.BoolVar(&o.EnablePriorityAndFairness, "enable-priority-and-fairness", o.EnablePriorityAndFairness, ""+
"If true and the APIPriorityAndFairness feature gate is enabled, replace the max-in-flight handler with an enhanced one that queues and dispatches with priority and fairness")
}
func (o *FeatureOptions) ApplyTo(c *server.Config) error {
func (o *FeatureOptions) ApplyTo(c *server.Config, clientset kubernetes.Interface, informers informers.SharedInformerFactory) error {
if o == nil {
return nil
}
@ -61,6 +72,18 @@ func (o *FeatureOptions) ApplyTo(c *server.Config) error {
c.DebugSocketPath = o.DebugSocketPath
c.EnableContentionProfiling = o.EnableContentionProfiling
if o.EnablePriorityAndFairness && feature.DefaultFeatureGate.Enabled(features.APIPriorityAndFairness) {
if c.MaxRequestsInFlight+c.MaxMutatingRequestsInFlight <= 0 {
return fmt.Errorf("invalid configuration: MaxRequestsInFlight=%d and MaxMutatingRequestsInFlight=%d; they must add up to something positive", c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight)
}
c.FlowControl = utilflowcontrol.New(
informers,
clientset.FlowcontrolV1beta3(),
c.MaxRequestsInFlight+c.MaxMutatingRequestsInFlight,
)
}
return nil
}

View File

@ -17,20 +17,15 @@ limitations under the License.
package options
import (
"fmt"
"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/component-base/featuregate"
"k8s.io/klog/v2"
)
// RecommendedOptions contains the recommended options for running an API server.
@ -122,17 +117,17 @@ func (o *RecommendedOptions) ApplyTo(config *server.RecommendedConfig) error {
if err := o.Audit.ApplyTo(&config.Config); err != nil {
return err
}
if err := o.Features.ApplyTo(&config.Config); err != nil {
return err
}
if err := o.CoreAPI.ApplyTo(config); err != nil {
return err
}
initializers, err := o.ExtraAdmissionInitializers(config)
kubeClient, err := kubernetes.NewForConfig(config.ClientConfig)
if err != nil {
return err
}
kubeClient, err := kubernetes.NewForConfig(config.ClientConfig)
if err := o.Features.ApplyTo(&config.Config, kubeClient, config.SharedInformerFactory); err != nil {
return err
}
initializers, err := o.ExtraAdmissionInitializers(config)
if err != nil {
return err
}
@ -144,21 +139,6 @@ func (o *RecommendedOptions) ApplyTo(config *server.RecommendedConfig) error {
initializers...); err != nil {
return err
}
if feature.DefaultFeatureGate.Enabled(features.APIPriorityAndFairness) {
if config.ClientConfig != nil {
if config.MaxRequestsInFlight+config.MaxMutatingRequestsInFlight <= 0 {
return fmt.Errorf("invalid configuration: MaxRequestsInFlight=%d and MaxMutatingRequestsInFlight=%d; they must add up to something positive", config.MaxRequestsInFlight, config.MaxMutatingRequestsInFlight)
}
config.FlowControl = utilflowcontrol.New(
config.SharedInformerFactory,
kubernetes.NewForConfigOrDie(config.ClientConfig).FlowcontrolV1beta3(),
config.MaxRequestsInFlight+config.MaxMutatingRequestsInFlight,
)
} else {
klog.Warningf("Neither kubeconfig is provided nor service-account is mounted, so APIPriorityAndFairness will be disabled")
}
}
return nil
}

View File

@ -62,8 +62,7 @@ type ServerRunOptions struct {
// decoded in a write request. 0 means no limit.
// We intentionally did not add a flag for this option. Users of the
// apiserver library can wire it to a flag.
MaxRequestBodyBytes int64
EnablePriorityAndFairness bool
MaxRequestBodyBytes int64
// ShutdownSendRetryAfter dictates when to initiate shutdown of the HTTP
// Server during the graceful termination of the apiserver. If true, we wait
@ -104,7 +103,6 @@ func NewServerRunOptions() *ServerRunOptions {
ShutdownWatchTerminationGracePeriod: defaults.ShutdownWatchTerminationGracePeriod,
JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes,
MaxRequestBodyBytes: defaults.MaxRequestBodyBytes,
EnablePriorityAndFairness: true,
ShutdownSendRetryAfter: false,
}
}
@ -325,9 +323,6 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) {
"handler, which picks a randomized value above this number as the connection timeout, "+
"to spread out load.")
fs.BoolVar(&s.EnablePriorityAndFairness, "enable-priority-and-fairness", s.EnablePriorityAndFairness, ""+
"If true and the APIPriorityAndFairness feature gate is enabled, replace the max-in-flight handler with an enhanced one that queues and dispatches with priority and fairness")
fs.DurationVar(&s.ShutdownDelayDuration, "shutdown-delay-duration", s.ShutdownDelayDuration, ""+
"Time to delay the termination. During that time the server keeps serving requests normally. The endpoints /healthz and /livez "+
"will return success, but /readyz immediately returns failure. Graceful termination starts after this delay "+