diff --git a/pkg/apis/flowcontrol/internalbootstrap/defaults_test.go b/pkg/apis/flowcontrol/internalbootstrap/defaults_test.go index 3e2482b01ce..740b281329e 100644 --- a/pkg/apis/flowcontrol/internalbootstrap/defaults_test.go +++ b/pkg/apis/flowcontrol/internalbootstrap/defaults_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" + flowcontrol "k8s.io/api/flowcontrol/v1beta3" apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap" diff --git a/pkg/apis/flowcontrol/v1alpha1/defaults_test.go b/pkg/apis/flowcontrol/v1alpha1/defaults_test.go index f5a3f54fc2b..35baccb63ae 100644 --- a/pkg/apis/flowcontrol/v1alpha1/defaults_test.go +++ b/pkg/apis/flowcontrol/v1alpha1/defaults_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" + flowcontrolv1alpha1 "k8s.io/api/flowcontrol/v1alpha1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/utils/pointer" diff --git a/pkg/apis/flowcontrol/v1beta1/defaults_test.go b/pkg/apis/flowcontrol/v1beta1/defaults_test.go index e6a5a356651..f5a83f521f2 100644 --- a/pkg/apis/flowcontrol/v1beta1/defaults_test.go +++ b/pkg/apis/flowcontrol/v1beta1/defaults_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" + flowcontrolv1beta1 "k8s.io/api/flowcontrol/v1beta1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/utils/pointer" diff --git a/pkg/apis/flowcontrol/v1beta2/defaults_test.go b/pkg/apis/flowcontrol/v1beta2/defaults_test.go index 4d9f07b7ee7..f1d33daf67e 100644 --- a/pkg/apis/flowcontrol/v1beta2/defaults_test.go +++ b/pkg/apis/flowcontrol/v1beta2/defaults_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" + flowcontrolv1beta2 "k8s.io/api/flowcontrol/v1beta2" "k8s.io/apimachinery/pkg/runtime" "k8s.io/utils/pointer" diff --git a/pkg/apis/flowcontrol/v1beta3/defaults_test.go b/pkg/apis/flowcontrol/v1beta3/defaults_test.go index b36334e96b5..cb42cb8c262 100644 --- a/pkg/apis/flowcontrol/v1beta3/defaults_test.go +++ b/pkg/apis/flowcontrol/v1beta3/defaults_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" + flowcontrolv1beta3 "k8s.io/api/flowcontrol/v1beta3" "k8s.io/apimachinery/pkg/runtime" "k8s.io/utils/pointer" diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index 88e3f9d22ad..3f093293ef5 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -1112,7 +1112,7 @@ func startAPFController(t *testing.T, stopCh <-chan struct{}, apfConfiguration [ // make sure that apf controller syncs the priority level configuration object we are using in this test. // read the metrics and ensure the concurrency limit for our priority level is set to the expected value. pollErr := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) { - if err := gaugeValueMatch("apiserver_flowcontrol_request_concurrency_limit", map[string]string{"priority_level": plName}, plConcurrency); err != nil { + if err := gaugeValueMatch("apiserver_flowcontrol_nominal_limit_seats", map[string]string{"priority_level": plName}, plConcurrency); err != nil { t.Logf("polling retry - error: %s", err) return false, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index 2cc4be2550c..5718dd9e866 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -177,6 +177,12 @@ type configController struct { // name to the state for that level. Every name referenced from a // member of `flowSchemas` has an entry here. priorityLevelStates map[string]*priorityLevelState + + // nominalCLSum is the sum of the nominalCL fields in the priorityLevelState records. + // This can exceed serverConcurrencyLimit because of the deliberate rounding up + // in the computation of the nominalCL values. + // This is tracked because it is an input to the allocation adjustment algorithm. + nominalCLSum int } type updateAttempt struct { @@ -225,6 +231,18 @@ type priorityLevelState struct { // Periodically smoothed gets replaced with `max(envelope, A*smoothed + (1-A)*envelope)`, // where A is seatDemandSmoothingCoefficient. seatDemandStats seatDemandStats + + // nominalCL is the nominal concurrency limit configured in the PriorityLevelConfiguration + nominalCL int + + // minCL is the nominal limit less the lendable amount + minCL int + + //maxCL is the nominal limit plus the amount that may be borrowed + maxCL int + + // currentCL is the dynamically derived concurrency limit to impose for now + currentCL int } type seatDemandStats struct { @@ -234,15 +252,6 @@ type seatDemandStats struct { smoothed float64 } -func newSeatDemandStats(val float64) seatDemandStats { - return seatDemandStats{ - avg: val, - stdDev: 0, - highWatermark: val, - smoothed: val, - } -} - func (stats *seatDemandStats) update(obs fq.IntegratorResults) { stats.avg = obs.Average stats.stdDev = obs.Deviation @@ -368,12 +377,74 @@ func (cfgCtlr *configController) Run(stopCh <-chan struct{}) error { func (cfgCtlr *configController) updateBorrowing() { cfgCtlr.lock.Lock() defer cfgCtlr.lock.Unlock() - for _, plState := range cfgCtlr.priorityLevelStates { + cfgCtlr.updateBorrowingLocked(true, cfgCtlr.priorityLevelStates) +} + +func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plStates map[string]*priorityLevelState) { + items := make([]allocProblemItem, 0, len(plStates)) + plNames := make([]string, 0, len(plStates)) + for plName, plState := range plStates { + if plState.pl.Spec.Limited == nil { + continue + } obs := plState.seatDemandIntegrator.Reset() plState.seatDemandStats.update(obs) - metrics.NotePriorityLevelConcurrencyAdjustment(plState.pl.Name, plState.seatDemandStats.highWatermark, plState.seatDemandStats.avg, plState.seatDemandStats.stdDev, plState.seatDemandStats.smoothed /* TODO: add the designed rest for borrowing */) - // TODO: updathe CurrentCL as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/1040-priority-and-fairness#dispatching + // Lower bound on this priority level's adjusted concurreny limit is the lesser of: + // - its seat demamd high watermark over the last adjustment period, and + // - its configured concurrency limit. + // BUT: we do not want this to be lower than the lower bound from configuration. + // See KEP-1040 for a more detailed explanation. + minCurrentCL := math.Max(float64(plState.minCL), math.Min(float64(plState.nominalCL), plState.seatDemandStats.highWatermark)) + plNames = append(plNames, plName) + items = append(items, allocProblemItem{ + lowerBound: minCurrentCL, + upperBound: float64(plState.maxCL), + target: math.Max(minCurrentCL, plState.seatDemandStats.smoothed), + }) } + if len(items) == 0 && cfgCtlr.nominalCLSum > 0 { + klog.ErrorS(nil, "Impossible: no non-exempt priority levels", "plStates", cfgCtlr.priorityLevelStates) + return + } + allocs, fairFrac, err := computeConcurrencyAllocation(cfgCtlr.nominalCLSum, items) + if err != nil { + klog.ErrorS(err, "Unable to derive new concurrency limits", "plNames", plNames, "items", items) + allocs = make([]float64, len(items)) + for idx, plName := range plNames { + plState := plStates[plName] + if plState.pl.Spec.Limited == nil { + continue + } + allocs[idx] = float64(plState.currentCL) + } + } + for idx, plName := range plNames { + plState := plStates[plName] + if plState.pl.Spec.Limited == nil { + continue + } + if setCompleters { + qsCompleter, err := queueSetCompleterForPL(cfgCtlr.queueSetFactory, plState.queues, + plState.pl, cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs, + metrics.NewUnionGauge(plState.seatDemandIntegrator, plState.seatDemandRatioedGauge)) + if err != nil { + klog.ErrorS(err, "Inconceivable! Configuration error in existing priority level", "pl", plState.pl) + continue + } + plState.qsCompleter = qsCompleter + } + currentCL := int(math.Round(float64(allocs[idx]))) + relChange := relDiff(float64(currentCL), float64(plState.currentCL)) + plState.currentCL = currentCL + metrics.NotePriorityLevelConcurrencyAdjustment(plState.pl.Name, plState.seatDemandStats.highWatermark, plState.seatDemandStats.avg, plState.seatDemandStats.stdDev, plState.seatDemandStats.smoothed, float64(items[idx].target), currentCL) + logLevel := klog.Level(4) + if relChange >= 0.05 { + logLevel = 2 + } + klog.V(logLevel).InfoS("Update CurrentCL", "plName", plName, "seatDemandHighWatermark", plState.seatDemandStats.highWatermark, "seatDemandAvg", plState.seatDemandStats.avg, "seatDemandStdev", plState.seatDemandStats.stdDev, "seatDemandSmoothed", plState.seatDemandStats.smoothed, "fairFrac", fairFrac, "currentCL", currentCL, "backstop", err != nil) + plState.queues = plState.qsCompleter.Complete(fq.DispatchingConfig{ConcurrencyLimit: currentCL}) + } + metrics.SetFairFrac(float64(fairFrac)) } // runWorker is the logic of the one and only worker goroutine. We @@ -605,7 +676,9 @@ func (meal *cfgMeal) digestNewPLsLocked(newPLs []*flowcontrol.PriorityLevelConfi seatDemandRatioedGauge: metrics.ApiserverSeatDemands.NewForLabelValuesSafe(0, 1, []string{pl.Name}), } } - qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.reqsGaugePair, state.execSeatsObs, metrics.NewUnionGauge(state.seatDemandIntegrator, state.seatDemandRatioedGauge)) + qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, + pl, meal.cfgCtlr.requestWaitLimit, state.reqsGaugePair, state.execSeatsObs, + metrics.NewUnionGauge(state.seatDemandIntegrator, state.seatDemandRatioedGauge)) if err != nil { klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err) continue @@ -709,7 +782,9 @@ func (meal *cfgMeal) processOldPLsLocked() { } } var err error - plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs, metrics.NewUnionGauge(plState.seatDemandIntegrator, plState.seatDemandRatioedGauge)) + plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, + plState.pl, meal.cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs, + metrics.NewUnionGauge(plState.seatDemandIntegrator, plState.seatDemandRatioedGauge)) if err != nil { // This can not happen because queueSetCompleterForPL already approved this config panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec))) @@ -739,29 +814,48 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() { continue } + limited := plState.pl.Spec.Limited // The use of math.Ceil here means that the results might sum // to a little more than serverConcurrencyLimit but the // difference will be negligible. - concurrencyLimit := int(math.Ceil(float64(meal.cfgCtlr.serverConcurrencyLimit) * float64(plState.pl.Spec.Limited.NominalConcurrencyShares) / meal.shareSum)) - metrics.UpdateSharedConcurrencyLimit(plName, concurrencyLimit) - metrics.SetPriorityLevelConfiguration(plName, concurrencyLimit /* TODO: pass min and max once new API is available */, concurrencyLimit, concurrencyLimit) + concurrencyLimit := int(math.Ceil(float64(meal.cfgCtlr.serverConcurrencyLimit) * float64(limited.NominalConcurrencyShares) / meal.shareSum)) + var lendableCL, borrowingCL int + if limited.LendablePercent != nil { + lendableCL = int(math.Round(float64(concurrencyLimit) * float64(*limited.LendablePercent) / 100)) + } + if limited.BorrowingLimitPercent != nil { + borrowingCL = int(math.Round(float64(concurrencyLimit) * float64(*limited.BorrowingLimitPercent) / 100)) + } else { + borrowingCL = meal.cfgCtlr.serverConcurrencyLimit + } + metrics.SetPriorityLevelConfiguration(plName, concurrencyLimit, concurrencyLimit-lendableCL, concurrencyLimit+borrowingCL) plState.seatDemandRatioedGauge.SetDenominator(float64(concurrencyLimit)) + cfgChanged := plState.nominalCL != concurrencyLimit || plState.minCL != concurrencyLimit-lendableCL || plState.maxCL != concurrencyLimit+borrowingCL + plState.nominalCL = concurrencyLimit + plState.minCL = concurrencyLimit - lendableCL + plState.maxCL = concurrencyLimit + borrowingCL meal.maxExecutingRequests += concurrencyLimit var waitLimit int - if qCfg := plState.pl.Spec.Limited.LimitResponse.Queuing; qCfg != nil { + if qCfg := limited.LimitResponse.Queuing; qCfg != nil { waitLimit = int(qCfg.Queues * qCfg.QueueLengthLimit) } meal.maxWaitingRequests += waitLimit if plState.queues == nil { - klog.V(5).Infof("Introducing queues for priority level %q: config=%s, concurrencyLimit=%d, quiescing=%v (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, plState.quiescing, plState.pl.Spec.Limited.NominalConcurrencyShares, meal.shareSum) - plState.seatDemandStats = newSeatDemandStats(float64(concurrencyLimit)) - // TODO: initialize as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/1040-priority-and-fairness#dispatching once NominalCL values are implemented + initialCL := concurrencyLimit - lendableCL/2 + klog.V(2).Infof("Introducing queues for priority level %q: config=%s, nominalCL=%d, lendableCL=%d, borrowingCL=%d, currentCL=%d, quiescing=%v (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, lendableCL, borrowingCL, initialCL, plState.quiescing, plState.pl.Spec.Limited.NominalConcurrencyShares, meal.shareSum) + plState.seatDemandStats = seatDemandStats{} + plState.currentCL = initialCL } else { - klog.V(5).Infof("Retaining queues for priority level %q: config=%s, concurrencyLimit=%d, quiescing=%v, numPending=%d (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, plState.quiescing, plState.numPending, plState.pl.Spec.Limited.NominalConcurrencyShares, meal.shareSum) + logLevel := klog.Level(5) + if cfgChanged { + logLevel = 2 + } + klog.V(logLevel).Infof("Retaining queues for priority level %q: config=%s, nominalCL=%d, lendableCL=%d, borrowingCL=%d, currentCL=%d, quiescing=%v, numPending=%d (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, lendableCL, borrowingCL, plState.currentCL, plState.quiescing, plState.numPending, plState.pl.Spec.Limited.NominalConcurrencyShares, meal.shareSum) } - plState.queues = plState.qsCompleter.Complete(fq.DispatchingConfig{ConcurrencyLimit: concurrencyLimit}) } + meal.cfgCtlr.nominalCLSum = meal.maxExecutingRequests + meal.cfgCtlr.updateBorrowingLocked(false, meal.newPLStates) } // queueSetCompleterForPL returns an appropriate QueueSetCompleter for the @@ -847,7 +941,9 @@ func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, re execSeatsObs := meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues) seatDemandIntegrator := fq.NewNamedIntegrator(meal.cfgCtlr.clock, proto.Name) seatDemandRatioedGauge := metrics.ApiserverSeatDemands.NewForLabelValuesSafe(0, 1, []string{proto.Name}) - qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, reqsGaugePair, execSeatsObs, metrics.NewUnionGauge(seatDemandIntegrator, seatDemandRatioedGauge)) + qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, + requestWaitLimit, reqsGaugePair, execSeatsObs, + metrics.NewUnionGauge(seatDemandIntegrator, seatDemandRatioedGauge)) if err != nil { // This can not happen because proto is one of the mandatory // objects and these are not erroneous @@ -1002,3 +1098,12 @@ func hashFlowID(fsName, fDistinguisher string) uint64 { hash.Sum(sum[:0]) return binary.LittleEndian.Uint64(sum[:8]) } + +func relDiff(x, y float64) float64 { + diff := math.Abs(x - y) + den := math.Max(math.Abs(x), math.Abs(y)) + if den == 0 { + return 0 + } + return diff / den +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/borrowing_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/borrowing_test.go new file mode 100644 index 00000000000..8511730f03c --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/borrowing_test.go @@ -0,0 +1,251 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flowcontrol + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + flowcontrol "k8s.io/api/flowcontrol/v1beta3" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/authentication/user" + "k8s.io/apiserver/pkg/endpoints/request" + fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" + "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/eventclock" + fqs "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset" + "k8s.io/apiserver/pkg/util/flowcontrol/metrics" + fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" + "k8s.io/client-go/informers" + clientsetfake "k8s.io/client-go/kubernetes/fake" +) + +type borrowingTestConstraints struct { + lendable, borrowing int32 +} + +// TestBorrowing tests borrowing of concurrency between priority levels. +// It runs two scenarios, one where the borrowing hits the limit on +// lendable concurrency and one where the borrowing hits the limit on +// borrowing of concurrency. +// Both scenarios are the same except for the limits. +// The test defines two priority levels, identified as "flows" 0 and 1. +// Both priority levels have a nominal concurrency limit of 12. +// The test maintains 24 concurrent clients for priority level 0 +// and 6 for level 1, +// using an exec func that simply sleeps for 250 ms, for +// 25 seconds. The first 10 seconds of behavior are ignored, allowing +// the borrowing to start at any point during that time. The test +// continues for another 15 seconds, and checks that the delivered +// concurrency is about 16 for flow 0 and 6 for flow 1. +func TestBorrowing(t *testing.T) { + clientsPerFlow := [2]int{24, 6} + metrics.Register() + for _, testCase := range []struct { + name string + constraints []borrowingTestConstraints + }{ + { + name: "lendable-limited", + constraints: []borrowingTestConstraints{ + {lendable: 50, borrowing: 67}, + {lendable: 33, borrowing: 50}, + }}, + { + name: "borrowing-limited", + constraints: []borrowingTestConstraints{ + {lendable: 50, borrowing: 33}, + {lendable: 67, borrowing: 50}, + }}, + } { + t.Run(testCase.name, func(t *testing.T) { + fsObjs := make([]*flowcontrol.FlowSchema, 2) + plcObjs := make([]*flowcontrol.PriorityLevelConfiguration, 2) + usernames := make([]string, 2) + cfgObjs := []runtime.Object{} + for flow := 0; flow < 2; flow++ { + usernames[flow] = fmt.Sprintf("test-user%d", flow) + plName := fmt.Sprintf("test-pl%d", flow) + fsObjs[flow] = &flowcontrol.FlowSchema{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-fs%d", flow), + }, + Spec: flowcontrol.FlowSchemaSpec{ + MatchingPrecedence: 100, + PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{ + Name: plName, + }, + DistinguisherMethod: &flowcontrol.FlowDistinguisherMethod{ + Type: flowcontrol.FlowDistinguisherMethodByUserType, + }, + Rules: []flowcontrol.PolicyRulesWithSubjects{{ + Subjects: []flowcontrol.Subject{{ + Kind: flowcontrol.SubjectKindUser, + User: &flowcontrol.UserSubject{Name: usernames[flow]}, + }}, + NonResourceRules: []flowcontrol.NonResourcePolicyRule{{ + Verbs: []string{"*"}, + NonResourceURLs: []string{"*"}, + }}, + }}, + }, + } + plcObjs[flow] = &flowcontrol.PriorityLevelConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + Name: plName, + }, + Spec: flowcontrol.PriorityLevelConfigurationSpec{ + Type: flowcontrol.PriorityLevelEnablementLimited, + Limited: &flowcontrol.LimitedPriorityLevelConfiguration{ + NominalConcurrencyShares: 100, + LendablePercent: &testCase.constraints[flow].lendable, + BorrowingLimitPercent: &testCase.constraints[flow].borrowing, + LimitResponse: flowcontrol.LimitResponse{ + Type: flowcontrol.LimitResponseTypeQueue, + Queuing: &flowcontrol.QueuingConfiguration{ + Queues: 10, + HandSize: 2, + QueueLengthLimit: 10, + }, + }, + }, + }, + } + cfgObjs = append(cfgObjs, fsObjs[flow], plcObjs[flow]) + } + clientset := clientsetfake.NewSimpleClientset(cfgObjs...) + informerFactory := informers.NewSharedInformerFactory(clientset, time.Second) + flowcontrolClient := clientset.FlowcontrolV1beta3() + clk := eventclock.Real{} + controller := newTestableController(TestableConfig{ + Name: "Controller", + Clock: clk, + AsFieldManager: ConfigConsumerAsFieldManager, + FoundToDangling: func(found bool) bool { return !found }, + InformerFactory: informerFactory, + FlowcontrolClient: flowcontrolClient, + ServerConcurrencyLimit: 24, + RequestWaitLimit: time.Minute, + ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec, + ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec, + QueueSetFactory: fqs.NewQueueSetFactory(clk), + }) + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Second) + stopCh := ctx.Done() + controllerCompletionCh := make(chan error) + + informerFactory.Start(stopCh) + + status := informerFactory.WaitForCacheSync(ctx.Done()) + if names := unsynced(status); len(names) > 0 { + t.Fatalf("WaitForCacheSync did not successfully complete, resources=%#v", names) + } + + go func() { + controllerCompletionCh <- controller.Run(stopCh) + }() + + // ensure that the controller has run its first loop. + err := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) { + return controller.hasPriorityLevelState(plcObjs[0].Name), nil + }) + if err != nil { + t.Errorf("expected the controller to reconcile the priority level configuration object: %s, error: %s", plcObjs[0].Name, err) + } + + concIntegrators := make([]fq.Integrator, 2) + reqInfo := &request.RequestInfo{ + IsResourceRequest: false, + Path: "/foobar", + Verb: "GET", + } + noteFn := func(fs *flowcontrol.FlowSchema, plc *flowcontrol.PriorityLevelConfiguration, fd string) {} + workEstr := func() fcrequest.WorkEstimate { return fcrequest.WorkEstimate{InitialSeats: 1} } + qnf := fq.QueueNoteFn(func(bool) {}) + var startWG sync.WaitGroup + startWG.Add(clientsPerFlow[0] + clientsPerFlow[1]) + // Launch 20 client threads for each flow + for flow := 0; flow < 2; flow++ { + username := usernames[flow] + flowUser := testUser{name: username} + rd := RequestDigest{ + RequestInfo: reqInfo, + User: flowUser, + } + concIntegrator := fq.NewNamedIntegrator(clk, username) + concIntegrators[flow] = concIntegrator + exec := func() { + concIntegrator.Inc() + clk.Sleep(250 * time.Millisecond) + concIntegrator.Dec() + } + nThreads := clientsPerFlow[flow] + for thread := 0; thread < nThreads; thread++ { + go func() { + startWG.Done() + wait.Until(func() { controller.Handle(ctx, rd, noteFn, workEstr, qnf, exec) }, 0, ctx.Done()) + }() + } + } + startWG.Wait() + // Make sure the controller has had time to sense the load and adjust + clk.Sleep(10 * time.Second) + // Start the stats that matter from now + for _, ci := range concIntegrators { + ci.Reset() + } + // Run for 15 seconds + clk.Sleep(15 * time.Second) + // Collect the delivered concurrency stats + results0 := concIntegrators[0].Reset() + results1 := concIntegrators[1].Reset() + // shut down all the async stuff + cancel() + + // Do the checking + + t.Log("waiting for the controller Run function to shutdown gracefully") + controllerErr := <-controllerCompletionCh + close(controllerCompletionCh) + if controllerErr != nil { + t.Errorf("expected nil error from controller Run function, but got: %#v", controllerErr) + } + if results0.Average < 15.5 || results0.Average > 16.1 { + t.Errorf("Flow 0 got average concurrency of %v but expected about 16", results0.Average) + } else { + t.Logf("Flow 0 got average concurrency of %v and expected about 16", results0.Average) + } + if results1.Average < 5.5 || results1.Average > 6.1 { + t.Errorf("Flow 1 got average concurrency of %v but expected about 6", results1.Average) + } else { + t.Logf("Flow 1 got average concurrency of %v and expected about 6", results1.Average) + } + }) + } +} + +type testUser struct{ name string } + +func (tu testUser) GetName() string { return tu.name } +func (tu testUser) GetUID() string { return tu.name } +func (tu testUser) GetGroups() []string { return []string{user.AllAuthenticated} } +func (tu testUser) GetExtra() map[string][]string { return map[string][]string{} } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 3a9f81ac635..d6ae14de2aa 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -1527,12 +1527,9 @@ func newExecSeatsGauge(clk clock.PassiveClock) metrics.RatioedGauge { func float64close(x, y float64) bool { x0 := float64NaNTo0(x) y0 := float64NaNTo0(y) - diff := math.Abs(x - y) + diff := math.Abs(x0 - y0) den := math.Max(math.Abs(x0), math.Abs(y0)) - if den == 0 { - return diff < 1e-10 - } - return diff/den < 1e-10 + return den == 0 || diff/den < 1e-10 } func uint64max(a, b uint64) uint64 { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go index 30094fdba9c..7cb05df6c89 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go @@ -529,11 +529,6 @@ func AddRequestConcurrencyInUse(priorityLevel, flowSchema string, delta int) { apiserverRequestConcurrencyInUse.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta)) } -// UpdateSharedConcurrencyLimit updates the value for the concurrency limit in flow control -func UpdateSharedConcurrencyLimit(priorityLevel string, limit int) { - apiserverRequestConcurrencyLimit.WithLabelValues(priorityLevel).Set(float64(limit)) -} - // AddReject increments the # of rejected requests for flow control func AddReject(ctx context.Context, priorityLevel, flowSchema, reason string) { apiserverRejectedRequestsTotal.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema, reason).Add(1) @@ -585,19 +580,19 @@ func AddDispatchWithNoAccommodation(priorityLevel, flowSchema string) { } func SetPriorityLevelConfiguration(priorityLevel string, nominalCL, minCL, maxCL int) { + apiserverRequestConcurrencyLimit.WithLabelValues(priorityLevel).Set(float64(nominalCL)) apiserverNominalConcurrencyLimits.WithLabelValues(priorityLevel).Set(float64(nominalCL)) apiserverMinimumConcurrencyLimits.WithLabelValues(priorityLevel).Set(float64(minCL)) apiserverMaximumConcurrencyLimits.WithLabelValues(priorityLevel).Set(float64(maxCL)) } -func NotePriorityLevelConcurrencyAdjustment(priorityLevel string, seatDemandHWM, seatDemandAvg, seatDemandStdev, seatDemandSmoothed float64 /* TODO: seatDemandTarget float64, currentCL int */) { +func NotePriorityLevelConcurrencyAdjustment(priorityLevel string, seatDemandHWM, seatDemandAvg, seatDemandStdev, seatDemandSmoothed, seatDemandTarget float64, currentCL int) { apiserverSeatDemandHighWatermarks.WithLabelValues(priorityLevel).Set(seatDemandHWM) apiserverSeatDemandAverages.WithLabelValues(priorityLevel).Set(seatDemandAvg) apiserverSeatDemandStandardDeviations.WithLabelValues(priorityLevel).Set(seatDemandStdev) apiserverSeatDemandSmootheds.WithLabelValues(priorityLevel).Set(seatDemandSmoothed) - // TODO: the following once new API is available - // apiserverSeatDemandTargets.WithLabelValues(priorityLevel).Set(seatDemandTarget) - // apiserverCurrentConcurrencyLimits.WithLabelValues(priorityLevel).Set(currentCL) + apiserverSeatDemandTargets.WithLabelValues(priorityLevel).Set(seatDemandTarget) + apiserverCurrentConcurrencyLimits.WithLabelValues(priorityLevel).Set(float64(currentCL)) } func SetFairFrac(fairFrac float64) { diff --git a/test/e2e/apimachinery/flowcontrol.go b/test/e2e/apimachinery/flowcontrol.go index 64da49b28b0..7ef5ecb8c96 100644 --- a/test/e2e/apimachinery/flowcontrol.go +++ b/test/e2e/apimachinery/flowcontrol.go @@ -43,7 +43,7 @@ import ( ) const ( - requestConcurrencyLimitMetricName = "apiserver_flowcontrol_request_concurrency_limit" + nominalConcurrencyLimitMetricName = "apiserver_flowcontrol_nominal_limit_seats" priorityLevelLabelName = "priority_level" ) @@ -146,7 +146,7 @@ var _ = SIGDescribe("API priority and fairness", func() { ginkgo.By("getting request concurrency from metrics") for i := range clients { - realConcurrency, err := getPriorityLevelConcurrency(f.ClientSet, clients[i].priorityLevelName) + realConcurrency, err := getPriorityLevelNominalConcurrency(f.ClientSet, clients[i].priorityLevelName) framework.ExpectNoError(err) clients[i].concurrency = int32(float64(realConcurrency) * clients[i].concurrencyMultiplier) if clients[i].concurrency < 1 { @@ -219,7 +219,7 @@ var _ = SIGDescribe("API priority and fairness", func() { } framework.Logf("getting real concurrency") - realConcurrency, err := getPriorityLevelConcurrency(f.ClientSet, priorityLevelName) + realConcurrency, err := getPriorityLevelNominalConcurrency(f.ClientSet, priorityLevelName) framework.ExpectNoError(err) for i := range clients { clients[i].concurrency = int32(float64(realConcurrency) * clients[i].concurrencyMultiplier) @@ -280,7 +280,7 @@ func createPriorityLevel(f *framework.Framework, priorityLevelName string, nomin } } -func getPriorityLevelConcurrency(c clientset.Interface, priorityLevelName string) (int32, error) { +func getPriorityLevelNominalConcurrency(c clientset.Interface, priorityLevelName string) (int32, error) { resp, err := c.CoreV1().RESTClient().Get().RequestURI("/metrics").DoRaw(context.TODO()) if err != nil { return 0, err @@ -299,7 +299,7 @@ func getPriorityLevelConcurrency(c clientset.Interface, priorityLevelName string return 0, err } for _, metric := range v { - if string(metric.Metric[model.MetricNameLabel]) != requestConcurrencyLimitMetricName { + if string(metric.Metric[model.MetricNameLabel]) != nominalConcurrencyLimitMetricName { continue } if string(metric.Metric[priorityLevelLabelName]) != priorityLevelName { @@ -376,7 +376,7 @@ func waitForSteadyState(f *framework.Framework, flowSchemaName string, priorityL // hasn't been achieved. return false, nil } - _, err = getPriorityLevelConcurrency(f.ClientSet, priorityLevelName) + _, err = getPriorityLevelNominalConcurrency(f.ClientSet, priorityLevelName) if err != nil { if err == errPriorityLevelNotFound { return false, nil diff --git a/test/integration/apiserver/flowcontrol/concurrency_test.go b/test/integration/apiserver/flowcontrol/concurrency_test.go index 5d3ae1f3e01..a090c7528f5 100644 --- a/test/integration/apiserver/flowcontrol/concurrency_test.go +++ b/test/integration/apiserver/flowcontrol/concurrency_test.go @@ -41,7 +41,7 @@ import ( ) const ( - sharedConcurrencyMetricsName = "apiserver_flowcontrol_request_concurrency_limit" + nominalConcurrencyMetricsName = "apiserver_flowcontrol_nominal_limit_seats" dispatchedRequestCountMetricsName = "apiserver_flowcontrol_dispatched_requests_total" rejectedRequestCountMetricsName = "apiserver_flowcontrol_rejected_requests_total" labelPriorityLevel = "priority_level" @@ -84,16 +84,16 @@ func TestPriorityLevelIsolation(t *testing.T) { t.Error(err) } - sharedConcurrency, err := getSharedConcurrencyOfPriorityLevel(loopbackClient) + nominalConcurrency, err := getNominalConcurrencyOfPriorityLevel(loopbackClient) if err != nil { t.Error(err) } - if 1 != sharedConcurrency[priorityLevelNoxu1.Name] { - t.Errorf("unexpected shared concurrency %v instead of %v", sharedConcurrency[priorityLevelNoxu1.Name], 1) + if 1 != nominalConcurrency[priorityLevelNoxu1.Name] { + t.Errorf("unexpected shared concurrency %v instead of %v", nominalConcurrency[priorityLevelNoxu1.Name], 1) } - if 1 != sharedConcurrency[priorityLevelNoxu2.Name] { - t.Errorf("unexpected shared concurrency %v instead of %v", sharedConcurrency[priorityLevelNoxu2.Name], 1) + if 1 != nominalConcurrency[priorityLevelNoxu2.Name] { + t.Errorf("unexpected shared concurrency %v instead of %v", nominalConcurrency[priorityLevelNoxu2.Name], 1) } stopCh := make(chan struct{}) @@ -164,7 +164,7 @@ func getMetrics(c clientset.Interface) (string, error) { return string(resp), err } -func getSharedConcurrencyOfPriorityLevel(c clientset.Interface) (map[string]int, error) { +func getNominalConcurrencyOfPriorityLevel(c clientset.Interface) (map[string]int, error) { resp, err := getMetrics(c) if err != nil { return nil, err @@ -188,7 +188,7 @@ func getSharedConcurrencyOfPriorityLevel(c clientset.Interface) (map[string]int, } for _, metric := range v { switch name := string(metric.Metric[model.MetricNameLabel]); name { - case sharedConcurrencyMetricsName: + case nominalConcurrencyMetricsName: concurrency[string(metric.Metric[labelPriorityLevel])] = int(metric.Value) } } @@ -230,6 +230,7 @@ func getRequestCountOfPriorityLevel(c clientset.Interface) (map[string]int, map[ } func createPriorityLevelAndBindingFlowSchemaForUser(c clientset.Interface, username string, concurrencyShares, queuelength int) (*flowcontrol.PriorityLevelConfiguration, *flowcontrol.FlowSchema, error) { + i0 := int32(0) pl, err := c.FlowcontrolV1beta3().PriorityLevelConfigurations().Create(context.Background(), &flowcontrol.PriorityLevelConfiguration{ ObjectMeta: metav1.ObjectMeta{ Name: username, @@ -238,6 +239,7 @@ func createPriorityLevelAndBindingFlowSchemaForUser(c clientset.Interface, usern Type: flowcontrol.PriorityLevelEnablementLimited, Limited: &flowcontrol.LimitedPriorityLevelConfiguration{ NominalConcurrencyShares: int32(concurrencyShares), + BorrowingLimitPercent: &i0, LimitResponse: flowcontrol.LimitResponse{ Type: flowcontrol.LimitResponseTypeQueue, Queuing: &flowcontrol.QueuingConfiguration{ diff --git a/test/integration/apiserver/flowcontrol/concurrency_util_test.go b/test/integration/apiserver/flowcontrol/concurrency_util_test.go index fb1198810d6..e5b90bae99f 100644 --- a/test/integration/apiserver/flowcontrol/concurrency_util_test.go +++ b/test/integration/apiserver/flowcontrol/concurrency_util_test.go @@ -41,7 +41,7 @@ import ( ) const ( - requestConcurrencyLimitMetricsName = "apiserver_flowcontrol_request_concurrency_limit" + nominalConcurrencyLimitMetricsName = "apiserver_flowcontrol_nominal_limit_seats" requestExecutionSecondsSumName = "apiserver_flowcontrol_request_execution_seconds_sum" requestExecutionSecondsCountName = "apiserver_flowcontrol_request_execution_seconds_count" priorityLevelSeatUtilSumName = "apiserver_flowcontrol_priority_level_seat_utilization_sum" @@ -350,7 +350,7 @@ func getRequestMetricsSnapshot(c clientset.Interface) (metricSnapshot, error) { entry.seatUtil.Sum = float64(metric.Value) case priorityLevelSeatUtilCountName: entry.seatUtil.Count = int(metric.Value) - case requestConcurrencyLimitMetricsName: + case nominalConcurrencyLimitMetricsName: entry.availableSeats = int(metric.Value) } snapshot[plLabel] = entry