diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index 8c34b2b0615..3ee3867456d 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -23,6 +23,7 @@ import ( "encoding/json" "fmt" "math" + "math/rand" "sort" "sync" "time" @@ -34,8 +35,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" apitypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/clock" utilerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap" "k8s.io/apiserver/pkg/authentication/user" @@ -53,6 +56,8 @@ import ( flowcontrollister "k8s.io/client-go/listers/flowcontrol/v1beta1" ) +const timeFmt = "2006-01-02T15:04:05.999" + // This file contains a simple local (to the apiserver) controller // that digests API Priority and Fairness config objects (FlowSchema // and PriorityLevelConfiguration) into the data structure that the @@ -85,9 +90,19 @@ type RequestDigest struct { // this type and cfgMeal follow the convention that the suffix // "Locked" means that the caller must hold the configController lock. type configController struct { + name string // varies in tests of fighting controllers + clock clock.PassiveClock queueSetFactory fq.QueueSetFactory obsPairGenerator metrics.TimedObserverPairGenerator + // How this controller appears in an ObjectMeta ManagedFieldsEntry.Manager + asFieldManager string + + // Given a boolean indicating whether a FlowSchema's referenced + // PriorityLevelConfig exists, return a boolean indicating whether + // the reference is dangling + foundToDangling func(bool) bool + // configQueue holds `(interface{})(0)` when the configuration // objects need to be reprocessed. configQueue workqueue.RateLimitingInterface @@ -122,6 +137,18 @@ type configController struct { // name to the state for that level. Every name referenced from a // member of `flowSchemas` has an entry here. priorityLevelStates map[string]*priorityLevelState + + // the most recent update attempts, ordered by increasing age. + // Consumer trims to keep only the last minute's worth of entries. + // The controller uses this to limit itself to at most six updates + // to a given FlowSchema in any minute. + // This may only be accessed from the one and only worker goroutine. + mostRecentUpdates []updateAttempt +} + +type updateAttempt struct { + timeUpdated time.Time + updatedItems sets.String // FlowSchema names } // priorityLevelState holds the state specific to a priority level. @@ -154,14 +181,18 @@ type priorityLevelState struct { // NewTestableController is extra flexible to facilitate testing func newTestableController(config TestableConfig) *configController { cfgCtlr := &configController{ + name: config.Name, + clock: config.Clock, queueSetFactory: config.QueueSetFactory, obsPairGenerator: config.ObsPairGenerator, + asFieldManager: config.AsFieldManager, + foundToDangling: config.FoundToDangling, serverConcurrencyLimit: config.ServerConcurrencyLimit, requestWaitLimit: config.RequestWaitLimit, flowcontrolClient: config.FlowcontrolClient, priorityLevelStates: make(map[string]*priorityLevelState), } - klog.V(2).Infof("NewTestableController with serverConcurrencyLimit=%d, requestWaitLimit=%s", cfgCtlr.serverConcurrencyLimit, cfgCtlr.requestWaitLimit) + klog.V(2).Infof("NewTestableController %q with serverConcurrencyLimit=%d, requestWaitLimit=%s, name=%s, asFieldManager=%q", cfgCtlr.name, cfgCtlr.serverConcurrencyLimit, cfgCtlr.requestWaitLimit, cfgCtlr.name, cfgCtlr.asFieldManager) // Start with longish delay because conflicts will be between // different processes, so take some time to go away. cfgCtlr.configQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 8*time.Hour), "priority_and_fairness_config_queue") @@ -177,40 +208,60 @@ func newTestableController(config TestableConfig) *configController { pli.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { pl := obj.(*flowcontrol.PriorityLevelConfiguration) - klog.V(7).Infof("Triggered API priority and fairness config reloading due to creation of PLC %s", pl.Name) + klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to creation of PLC %s", cfgCtlr.name, pl.Name) cfgCtlr.configQueue.Add(0) }, UpdateFunc: func(oldObj, newObj interface{}) { newPL := newObj.(*flowcontrol.PriorityLevelConfiguration) oldPL := oldObj.(*flowcontrol.PriorityLevelConfiguration) if !apiequality.Semantic.DeepEqual(oldPL.Spec, newPL.Spec) { - klog.V(7).Infof("Triggered API priority and fairness config reloading due to spec update of PLC %s", newPL.Name) + klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to spec update of PLC %s", cfgCtlr.name, newPL.Name) cfgCtlr.configQueue.Add(0) + } else { + klog.V(7).Infof("No trigger API priority and fairness config reloading in %s due to spec non-change of PLC %s", cfgCtlr.name, newPL.Name) } }, DeleteFunc: func(obj interface{}) { name, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) - klog.V(7).Infof("Triggered API priority and fairness config reloading due to deletion of PLC %s", name) + klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to deletion of PLC %s", cfgCtlr.name, name) cfgCtlr.configQueue.Add(0) }}) fsi.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { fs := obj.(*flowcontrol.FlowSchema) - klog.V(7).Infof("Triggered API priority and fairness config reloading due to creation of FS %s", fs.Name) + klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to creation of FS %s", cfgCtlr.name, fs.Name) cfgCtlr.configQueue.Add(0) }, UpdateFunc: func(oldObj, newObj interface{}) { newFS := newObj.(*flowcontrol.FlowSchema) oldFS := oldObj.(*flowcontrol.FlowSchema) - if !apiequality.Semantic.DeepEqual(oldFS.Spec, newFS.Spec) { - klog.V(7).Infof("Triggered API priority and fairness config reloading due to spec update of FS %s", newFS.Name) + // Changes to either Spec or Status are relevant. The + // concern is that we might, in some future release, want + // different behavior than is implemented now. One of the + // hardest questions is how does an operator roll out the + // new release in a cluster with multiple kube-apiservers + // --- in a way that works no matter what servers crash + // and restart when. If this handler reacts only to + // changes in Spec then we have a scenario in which the + // rollout leaves the old Status in place. The scenario + // ends with this subsequence: deploy the last new server + // before deleting the last old server, and in between + // those two operations the last old server crashes and + // recovers. The chosen solution is making this controller + // insist on maintaining the particular state that it + // establishes. + if !(apiequality.Semantic.DeepEqual(oldFS.Spec, newFS.Spec) && + apiequality.Semantic.DeepEqual(oldFS.Status, newFS.Status)) { + klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to spec and/or status update of FS %s", cfgCtlr.name, newFS.Name) cfgCtlr.configQueue.Add(0) + } else { + klog.V(7).Infof("No trigger of API priority and fairness config reloading in %s due to spec and status non-change of FS %s", cfgCtlr.name, newFS.Name) } }, DeleteFunc: func(obj interface{}) { name, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) - klog.V(7).Infof("Triggered API priority and fairness config reloading due to deletion of FS %s", name) + klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to deletion of FS %s", cfgCtlr.name, name) cfgCtlr.configQueue.Add(0) }}) @@ -253,11 +304,16 @@ func (cfgCtlr *configController) Run(stopCh <-chan struct{}) error { return nil } +// runWorker is the logic of the one and only worker goroutine. We +// limit the number to one in order to obviate explicit +// synchronization around access to `cfgCtlr.mostRecentUpdates`. func (cfgCtlr *configController) runWorker() { for cfgCtlr.processNextWorkItem() { } } +// processNextWorkItem works on one entry from the work queue. +// Only invoke this in the one and only worker goroutine. func (cfgCtlr *configController) processNextWorkItem() bool { obj, shutdown := cfgCtlr.configQueue.Get() if shutdown { @@ -266,9 +322,14 @@ func (cfgCtlr *configController) processNextWorkItem() bool { func(obj interface{}) { defer cfgCtlr.configQueue.Done(obj) - if !cfgCtlr.syncOne() { + specificDelay, err := cfgCtlr.syncOne(map[string]string{}) + switch { + case err != nil: + klog.Error(err) cfgCtlr.configQueue.AddRateLimited(obj) - } else { + case specificDelay > 0: + cfgCtlr.configQueue.AddAfter(obj, specificDelay) + default: cfgCtlr.configQueue.Forget(obj) } }(obj) @@ -276,27 +337,22 @@ func (cfgCtlr *configController) processNextWorkItem() bool { return true } -// syncOne attempts to sync all the API Priority and Fairness config -// objects. It either succeeds and returns `true` or logs an error -// and returns `false`. -func (cfgCtlr *configController) syncOne() bool { +// syncOne does one full synchronization. It reads all the API +// objects that configure API Priority and Fairness and updates the +// local configController accordingly. +// Only invoke this in the one and only worker goroutine +func (cfgCtlr *configController) syncOne(flowSchemaRVs map[string]string) (specificDelay time.Duration, err error) { + klog.V(5).Infof("%s syncOne at %s", cfgCtlr.name, cfgCtlr.clock.Now().Format(timeFmt)) all := labels.Everything() newPLs, err := cfgCtlr.plLister.List(all) if err != nil { - klog.Errorf("Unable to list PriorityLevelConfiguration objects: %s", err.Error()) - return false + return 0, fmt.Errorf("unable to list PriorityLevelConfiguration objects: %w", err) } newFSs, err := cfgCtlr.fsLister.List(all) if err != nil { - klog.Errorf("Unable to list FlowSchema objects: %s", err.Error()) - return false + return 0, fmt.Errorf("unable to list FlowSchema objects: %w", err) } - err = cfgCtlr.digestConfigObjects(newPLs, newFSs) - if err == nil { - return true - } - klog.Error(err) - return false + return cfgCtlr.digestConfigObjects(newPLs, newFSs, flowSchemaRVs) } // cfgMeal is the data involved in the process of digesting the API @@ -336,35 +392,79 @@ type fsStatusUpdate struct { // digestConfigObjects is given all the API objects that configure // cfgCtlr and writes its consequent new configState. -func (cfgCtlr *configController) digestConfigObjects(newPLs []*flowcontrol.PriorityLevelConfiguration, newFSs []*flowcontrol.FlowSchema) error { +// Only invoke this in the one and only worker goroutine +func (cfgCtlr *configController) digestConfigObjects(newPLs []*flowcontrol.PriorityLevelConfiguration, newFSs []*flowcontrol.FlowSchema, flowSchemaRVs map[string]string) (time.Duration, error) { fsStatusUpdates := cfgCtlr.lockAndDigestConfigObjects(newPLs, newFSs) var errs []error + currResult := updateAttempt{ + timeUpdated: cfgCtlr.clock.Now(), + updatedItems: sets.String{}, + } + var suggestedDelay time.Duration for _, fsu := range fsStatusUpdates { + // if we should skip this name, indicate we will need a delay, but continue with other entries + if cfgCtlr.shouldDelayUpdate(fsu.flowSchema.Name) { + if suggestedDelay == 0 { + suggestedDelay = time.Duration(30+rand.Intn(45)) * time.Second + } + continue + } + + // if we are going to issue an update, be sure we track every name we update so we know if we update it too often. + currResult.updatedItems.Insert(fsu.flowSchema.Name) + enc, err := json.Marshal(fsu.condition) if err != nil { // should never happen because these conditions are created here and well formed panic(fmt.Sprintf("Failed to json.Marshall(%#+v): %s", fsu.condition, err.Error())) } - klog.V(4).Infof("Writing Condition %s to FlowSchema %s because its previous value was %s", string(enc), fsu.flowSchema.Name, fcfmt.Fmt(fsu.oldValue)) + klog.V(4).Infof("%s writing Condition %s to FlowSchema %s, which had ResourceVersion=%s, because its previous value was %s", cfgCtlr.name, string(enc), fsu.flowSchema.Name, fsu.flowSchema.ResourceVersion, fcfmt.Fmt(fsu.oldValue)) fsIfc := cfgCtlr.flowcontrolClient.FlowSchemas() patchBytes := []byte(fmt.Sprintf(`{"status": {"conditions": [ %s ] } }`, string(enc))) - patchOptions := metav1.PatchOptions{FieldManager: ConfigConsumerAsFieldManager} - _, err = fsIfc.Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, patchBytes, patchOptions, "status") + patchOptions := metav1.PatchOptions{FieldManager: cfgCtlr.asFieldManager} + patchedFlowSchema, err := fsIfc.Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, patchBytes, patchOptions, "status") if err == nil { - continue - } - if apierrors.IsNotFound(err) { + key, _ := cache.MetaNamespaceKeyFunc(patchedFlowSchema) + flowSchemaRVs[key] = patchedFlowSchema.ResourceVersion + } else if apierrors.IsNotFound(err) { // This object has been deleted. A notification is coming // and nothing more needs to be done here. - klog.V(5).Infof("Attempted update of concurrently deleted FlowSchema %s; nothing more needs to be done", fsu.flowSchema.Name) + klog.V(5).Infof("%s at %s: attempted update of concurrently deleted FlowSchema %s; nothing more needs to be done", cfgCtlr.name, cfgCtlr.clock.Now().Format(timeFmt), fsu.flowSchema.Name) } else { errs = append(errs, errors.Wrap(err, fmt.Sprintf("failed to set a status.condition for FlowSchema %s", fsu.flowSchema.Name))) } } - if len(errs) == 0 { - return nil + cfgCtlr.addUpdateResult(currResult) + + return suggestedDelay, utilerrors.NewAggregate(errs) +} + +// shouldDelayUpdate checks to see if a flowschema has been updated too often and returns true if a delay is needed. +// Only invoke this in the one and only worker goroutine +func (cfgCtlr *configController) shouldDelayUpdate(flowSchemaName string) bool { + numUpdatesInPastMinute := 0 + oneMinuteAgo := cfgCtlr.clock.Now().Add(-1 * time.Minute) + for idx, update := range cfgCtlr.mostRecentUpdates { + if oneMinuteAgo.After(update.timeUpdated) { + // this and the remaining items are no longer relevant + cfgCtlr.mostRecentUpdates = cfgCtlr.mostRecentUpdates[:idx] + return false + } + if update.updatedItems.Has(flowSchemaName) { + numUpdatesInPastMinute++ + if numUpdatesInPastMinute > 5 { + return true + } + } } - return utilerrors.NewAggregate(errs) + return false +} + +// addUpdateResult adds the result. It isn't a ring buffer because +// this is small and rate limited. +// Only invoke this in the one and only worker goroutine +func (cfgCtlr *configController) addUpdateResult(result updateAttempt) { + cfgCtlr.mostRecentUpdates = append([]updateAttempt{result}, cfgCtlr.mostRecentUpdates...) } func (cfgCtlr *configController) lockAndDigestConfigObjects(newPLs []*flowcontrol.PriorityLevelConfiguration, newFSs []*flowcontrol.FlowSchema) []fsStatusUpdate { @@ -448,7 +548,7 @@ func (meal *cfgMeal) digestFlowSchemasLocked(newFSs []*flowcontrol.FlowSchema) { // // TODO: consider not even trying if server is not handling // requests yet. - meal.presyncFlowSchemaStatus(fs, !goodPriorityRef, fs.Spec.PriorityLevelConfiguration.Name) + meal.presyncFlowSchemaStatus(fs, meal.cfgCtlr.foundToDangling(goodPriorityRef), fs.Spec.PriorityLevelConfiguration.Name) if !goodPriorityRef { klog.V(6).Infof("Ignoring FlowSchema %s because of bad priority level reference %q", fs.Name, fs.Spec.PriorityLevelConfiguration.Name) @@ -612,12 +712,13 @@ func (meal *cfgMeal) presyncFlowSchemaStatus(fs *flowcontrol.FlowSchema, isDangl if danglingCondition.Status == desiredStatus && danglingCondition.Reason == desiredReason && danglingCondition.Message == desiredMessage { return } + now := meal.cfgCtlr.clock.Now() meal.fsStatusUpdates = append(meal.fsStatusUpdates, fsStatusUpdate{ flowSchema: fs, condition: flowcontrol.FlowSchemaCondition{ Type: flowcontrol.FlowSchemaConditionDangling, Status: desiredStatus, - LastTransitionTime: metav1.Now(), + LastTransitionTime: metav1.NewTime(now), Reason: desiredReason, Message: desiredMessage, }, @@ -710,7 +811,9 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig return selectedFlowSchema, plState.pl, false, req, startWaitingTime } -// Call this after getting a clue that the given priority level is undesired and idle +// maybeReap will remove the last internal traces of the named +// priority level if it has no more use. Call this after getting a +// clue that the given priority level is undesired and idle. func (cfgCtlr *configController) maybeReap(plName string) { cfgCtlr.lock.Lock() defer cfgCtlr.lock.Unlock() @@ -730,8 +833,11 @@ func (cfgCtlr *configController) maybeReap(plName string) { cfgCtlr.configQueue.Add(0) } -// Call this if both (1) plState.queues is non-nil and reported being -// idle, and (2) cfgCtlr's lock has not been released since then. +// maybeReapLocked requires the cfgCtlr's lock to already be held and +// will remove the last internal traces of the named priority level if +// it has no more use. Call this if both (1) plState.queues is +// non-nil and reported being idle, and (2) cfgCtlr's lock has not +// been released since then. func (cfgCtlr *configController) maybeReapLocked(plName string, plState *priorityLevelState) { if !(plState.quiescing && plState.numPending == 0) { return diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go index e9564d4d51c..825ae09ce3d 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go @@ -78,18 +78,43 @@ func New( requestWaitLimit time.Duration, ) Interface { grc := counter.NoOp{} + clk := clock.RealClock{} return NewTestable(TestableConfig{ + Name: "Controller", + Clock: clk, + AsFieldManager: ConfigConsumerAsFieldManager, + FoundToDangling: func(found bool) bool { return !found }, InformerFactory: informerFactory, FlowcontrolClient: flowcontrolClient, ServerConcurrencyLimit: serverConcurrencyLimit, RequestWaitLimit: requestWaitLimit, ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, - QueueSetFactory: fqs.NewQueueSetFactory(&clock.RealClock{}, grc), + QueueSetFactory: fqs.NewQueueSetFactory(clk, grc), }) } // TestableConfig carries the parameters to an implementation that is testable type TestableConfig struct { + // Name of the controller + Name string + + // Clock to use in timing deliberate delays + Clock clock.PassiveClock + + // AsFieldManager is the string to use in the metadata for + // server-side apply. Normally this is + // `ConfigConsumerAsFieldManager`. This is exposed as a parameter + // so that a test of competing controllers can supply different + // values. + AsFieldManager string + + // FoundToDangling maps the boolean indicating whether a + // FlowSchema's referenced PLC exists to the boolean indicating + // that FlowSchema's status should indicate a dangling reference. + // This is a parameter so that we can write tests of what happens + // when servers disagree on that bit of Status. + FoundToDangling func(bool) bool + // InformerFactory to use in building the controller InformerFactory kubeinformers.SharedInformerFactory diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go index 985728ae73c..92f820916ca 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go @@ -28,6 +28,7 @@ import ( flowcontrol "k8s.io/api/flowcontrol/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap" @@ -251,10 +252,14 @@ func TestConfigConsumer(t *testing.T) { queues: map[string]*ctlrTestQueueSet{}, } ctlr := newTestableController(TestableConfig{ + Name: "Controller", + Clock: clock.RealClock{}, + AsFieldManager: ConfigConsumerAsFieldManager, + FoundToDangling: func(found bool) bool { return !found }, InformerFactory: informerFactory, FlowcontrolClient: flowcontrolClient, - ServerConcurrencyLimit: 100, - RequestWaitLimit: time.Minute, + ServerConcurrencyLimit: 100, // server concurrency limit + RequestWaitLimit: time.Minute, // request wait limit ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, QueueSetFactory: cts, }) @@ -378,12 +383,16 @@ func TestAPFControllerWithGracefulShutdown(t *testing.T) { queues: map[string]*ctlrTestQueueSet{}, } controller := newTestableController(TestableConfig{ - informerFactory, - flowcontrolClient, - 100, - time.Minute, - metrics.PriorityLevelConcurrencyObserverPairGenerator, - cts, + Name: "Controller", + Clock: clock.RealClock{}, + AsFieldManager: ConfigConsumerAsFieldManager, + FoundToDangling: func(found bool) bool { return !found }, + InformerFactory: informerFactory, + FlowcontrolClient: flowcontrolClient, + ServerConcurrencyLimit: 100, + RequestWaitLimit: time.Minute, + ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, + QueueSetFactory: cts, }) stopCh, controllerCompletedCh := make(chan struct{}), make(chan struct{}) diff --git a/test/integration/apiserver/flowcontrol/concurrency_test.go b/test/integration/apiserver/flowcontrol/concurrency_test.go index df0b5d24a29..4127947b232 100644 --- a/test/integration/apiserver/flowcontrol/concurrency_test.go +++ b/test/integration/apiserver/flowcontrol/concurrency_test.go @@ -49,7 +49,7 @@ const ( timeout = time.Second * 10 ) -func setup(t testing.TB) (*httptest.Server, *rest.Config, framework.CloseFunc) { +func setup(t testing.TB, maxReadonlyRequestsInFlight, MaxMutatingRequestsInFlight int) (*httptest.Server, *rest.Config, framework.CloseFunc) { opts := framework.MasterConfigOptions{EtcdOptions: framework.DefaultEtcdOptions()} opts.EtcdOptions.DefaultStorageMediaType = "application/vnd.kubernetes.protobuf" masterConfig := framework.NewIntegrationTestMasterConfigWithOptions(&opts) @@ -58,8 +58,8 @@ func setup(t testing.TB) (*httptest.Server, *rest.Config, framework.CloseFunc) { Group: "flowcontrol.apiserver.k8s.io", Version: "v1alpha1", }) - masterConfig.GenericConfig.MaxRequestsInFlight = 1 - masterConfig.GenericConfig.MaxMutatingRequestsInFlight = 1 + masterConfig.GenericConfig.MaxRequestsInFlight = maxReadonlyRequestsInFlight + masterConfig.GenericConfig.MaxMutatingRequestsInFlight = MaxMutatingRequestsInFlight masterConfig.GenericConfig.OpenAPIConfig = framework.DefaultOpenAPIConfig() masterConfig.ExtraConfig.APIResourceConfigSource = resourceConfig _, s, closeFn := framework.RunAMaster(masterConfig) @@ -70,7 +70,7 @@ func setup(t testing.TB) (*httptest.Server, *rest.Config, framework.CloseFunc) { func TestPriorityLevelIsolation(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.APIPriorityAndFairness, true)() // NOTE: disabling the feature should fail the test - _, loopbackConfig, closeFn := setup(t) + _, loopbackConfig, closeFn := setup(t, 1, 1) defer closeFn() loopbackClient := clientset.NewForConfigOrDie(loopbackConfig) diff --git a/test/integration/apiserver/flowcontrol/fight_test.go b/test/integration/apiserver/flowcontrol/fight_test.go new file mode 100644 index 00000000000..51a5f6a5131 --- /dev/null +++ b/test/integration/apiserver/flowcontrol/fight_test.go @@ -0,0 +1,192 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flowcontrol + +import ( + "context" + "fmt" + "math" + "math/rand" + "sync" + "testing" + "time" + + flowcontrol "k8s.io/api/flowcontrol/v1beta1" + "k8s.io/apimachinery/pkg/util/clock" + genericfeatures "k8s.io/apiserver/pkg/features" + utilfeature "k8s.io/apiserver/pkg/util/feature" + utilfc "k8s.io/apiserver/pkg/util/flowcontrol" + fqtesting "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing" + "k8s.io/apiserver/pkg/util/flowcontrol/metrics" + "k8s.io/client-go/informers" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + featuregatetesting "k8s.io/component-base/featuregate/testing" +) + +/* fightTest configures a test of how API Priority and Fairness config + controllers fight when they disagree on how to set FlowSchemaStatus. + In particular, they set the condition that indicates integrity of + the reference to the PriorityLevelConfiguration. The scenario tested is + two teams of controllers, where the controllers in one team set the + condition normally and the controllers in the other team set the condition + to the opposite value. + + This is a behavioral test: it instantiates these controllers and runs them + almost normally. The test aims to run the controllers for a little under + 2 minutes. The test takes clock readings to get upper and lower bounds on + how long each controller ran, and calculates consequent bounds on the number + of writes that should happen to each FlowSchemaStatus. The test creates + an informer to observe the writes. The calculated lower bound on the + number of writes is very lax, assuming only that one write can be done + every 10 seconds. +*/ +type fightTest struct { + t *testing.T + ctx context.Context + loopbackConfig *rest.Config + teamSize int + stopCh chan struct{} + now time.Time + clk *clock.FakeClock + ctlrs map[bool][]utilfc.Interface + + countsMutex sync.Mutex + + // writeCounts maps FlowSchema.Name to number of writes + writeCounts map[string]int +} + +func newFightTest(t *testing.T, loopbackConfig *rest.Config, teamSize int) *fightTest { + now := time.Now() + ft := &fightTest{ + t: t, + ctx: context.Background(), + loopbackConfig: loopbackConfig, + teamSize: teamSize, + stopCh: make(chan struct{}), + now: now, + clk: clock.NewFakeClock(now), + ctlrs: map[bool][]utilfc.Interface{ + false: make([]utilfc.Interface, teamSize), + true: make([]utilfc.Interface, teamSize)}, + writeCounts: map[string]int{}, + } + return ft +} + +func (ft *fightTest) createMainInformer() { + myConfig := rest.CopyConfig(ft.loopbackConfig) + myConfig = rest.AddUserAgent(myConfig, "audience") + myClientset := clientset.NewForConfigOrDie(myConfig) + informerFactory := informers.NewSharedInformerFactory(myClientset, 0) + inf := informerFactory.Flowcontrol().V1beta1().FlowSchemas().Informer() + inf.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + fs := obj.(*flowcontrol.FlowSchema) + ft.countWrite(fs) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + fs := newObj.(*flowcontrol.FlowSchema) + ft.countWrite(fs) + }, + }) + go inf.Run(ft.stopCh) + if !cache.WaitForCacheSync(ft.stopCh, inf.HasSynced) { + ft.t.Errorf("Failed to sync main informer cache") + } +} + +func (ft *fightTest) countWrite(fs *flowcontrol.FlowSchema) { + ft.countsMutex.Lock() + defer ft.countsMutex.Unlock() + ft.writeCounts[fs.Name]++ +} + +func (ft *fightTest) createController(invert bool, i int) { + fieldMgr := fmt.Sprintf("testController%d%v", i, invert) + myConfig := rest.CopyConfig(ft.loopbackConfig) + myConfig = rest.AddUserAgent(myConfig, fieldMgr) + myClientset := clientset.NewForConfigOrDie(myConfig) + fcIfc := myClientset.FlowcontrolV1beta1() + informerFactory := informers.NewSharedInformerFactory(myClientset, 0) + foundToDangling := func(found bool) bool { return !found } + if invert { + foundToDangling = func(found bool) bool { return found } + } + ctlr := utilfc.NewTestable(utilfc.TestableConfig{ + Name: fieldMgr, + FoundToDangling: foundToDangling, + Clock: clock.RealClock{}, + AsFieldManager: fieldMgr, + InformerFactory: informerFactory, + FlowcontrolClient: fcIfc, + ServerConcurrencyLimit: 200, // server concurrency limit + RequestWaitLimit: time.Minute / 4, // request wait limit + ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, + QueueSetFactory: fqtesting.NewNoRestraintFactory(), + }) + ft.ctlrs[invert][i] = ctlr + informerFactory.Start(ft.stopCh) + go ctlr.Run(ft.stopCh) +} + +func (ft *fightTest) evaluate(tBeforeCreate, tAfterCreate time.Time) { + tBeforeLock := time.Now() + ft.countsMutex.Lock() + defer ft.countsMutex.Unlock() + tAfterLock := time.Now() + minFightSecs := tBeforeLock.Sub(tAfterCreate).Seconds() + maxFightSecs := tAfterLock.Sub(tBeforeCreate).Seconds() + minTotalWrites := int(minFightSecs / 10) + maxWritesPerWriter := 6 * int(math.Ceil(maxFightSecs/60)) + maxTotalWrites := (1 + ft.teamSize*2) * maxWritesPerWriter + for flowSchemaName, writeCount := range ft.writeCounts { + if writeCount < minTotalWrites { + ft.t.Errorf("There were a total of %d writes to FlowSchema %s but there should have been at least %d from %s to %s", writeCount, flowSchemaName, minTotalWrites, tAfterCreate, tBeforeLock) + } else if writeCount > maxTotalWrites { + ft.t.Errorf("There were a total of %d writes to FlowSchema %s but there should have been no more than %d from %s to %s", writeCount, flowSchemaName, maxTotalWrites, tBeforeCreate, tAfterLock) + } else { + ft.t.Logf("There were a total of %d writes to FlowSchema %s over %v, %v seconds", writeCount, flowSchemaName, minFightSecs, maxFightSecs) + } + } +} +func TestConfigConsumerFight(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.APIPriorityAndFairness, true)() + _, loopbackConfig, closeFn := setup(t, 100, 100) + defer closeFn() + const teamSize = 3 + ft := newFightTest(t, loopbackConfig, teamSize) + tBeforeCreate := time.Now() + ft.createMainInformer() + ft.foreach(ft.createController) + tAfterCreate := time.Now() + time.Sleep(110 * time.Second) + ft.evaluate(tBeforeCreate, tAfterCreate) + close(ft.stopCh) +} + +func (ft *fightTest) foreach(visit func(invert bool, i int)) { + for i := 0; i < ft.teamSize; i++ { + // The order of the following enumeration is not deterministic, + // and that is good. + invert := rand.Intn(2) == 0 + visit(invert, i) + visit(!invert, i) + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index ed7fcb88092..09ff2b70973 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1840,6 +1840,7 @@ k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset +k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing k8s.io/apiserver/pkg/util/flowcontrol/format k8s.io/apiserver/pkg/util/flowcontrol/metrics k8s.io/apiserver/pkg/util/flushwriter