diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index 44a2aed084e..c707884e133 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -204,7 +204,7 @@ type priorityLevelState struct { // reached through this pointer is mutable. pl *flowcontrol.PriorityLevelConfiguration - // qsCompleter holds the QueueSetCompleter derived from `config` + // qsCompleter holds the QueueSetCompleter derived from `pl` // and `queues`. qsCompleter fq.QueueSetCompleter @@ -255,12 +255,12 @@ type priorityLevelState struct { type seatDemandStats struct { avg float64 stdDev float64 - highWatermark float64 + highWatermark int smoothed float64 } func (stats *seatDemandStats) update(obs fq.IntegratorResults) { - stats.highWatermark = obs.Max + stats.highWatermark = int(math.Round(obs.Max)) if obs.Duration <= 0 { return } @@ -398,38 +398,63 @@ func (cfgCtlr *configController) updateBorrowing() { func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plStates map[string]*priorityLevelState) { items := make([]allocProblemItem, 0, len(plStates)) - plNames := make([]string, 0, len(plStates)) + nonExemptPLNames := make([]string, 0, len(plStates)) + idxOfNonExempt := map[string]int{} // items index of non-exempt classes + cclOfExempt := map[string]int{} // minCurrentCL of exempt classes + var minCLSum, minCurrentCLSum int // sums over non-exempt classes + remainingServerCL := cfgCtlr.nominalCLSum for plName, plState := range plStates { obs := plState.seatDemandIntegrator.Reset() plState.seatDemandStats.update(obs) - // Lower bound on this priority level's adjusted concurreny limit is the lesser of: - // - its seat demamd high watermark over the last adjustment period, and - // - its configured concurrency limit. - // BUT: we do not want this to be lower than the lower bound from configuration. - // See KEP-1040 for a more detailed explanation. - minCurrentCL := math.Max(float64(plState.minCL), math.Min(float64(plState.nominalCL), plState.seatDemandStats.highWatermark)) - plNames = append(plNames, plName) - items = append(items, allocProblemItem{ - lowerBound: minCurrentCL, - upperBound: float64(plState.maxCL), - target: math.Max(minCurrentCL, plState.seatDemandStats.smoothed), - }) + var minCurrentCL int + if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt { + minCurrentCL = max(plState.minCL, plState.seatDemandStats.highWatermark) + cclOfExempt[plName] = minCurrentCL + remainingServerCL -= minCurrentCL + } else { + // Lower bound on this priority level's adjusted concurreny limit is the lesser of: + // - its seat demamd high watermark over the last adjustment period, and + // - its configured concurrency limit. + // BUT: we do not want this to be lower than the lower bound from configuration. + // See KEP-1040 for a more detailed explanation. + minCurrentCL = max(plState.minCL, min(plState.nominalCL, plState.seatDemandStats.highWatermark)) + idxOfNonExempt[plName] = len(items) + nonExemptPLNames = append(nonExemptPLNames, plName) + items = append(items, allocProblemItem{ + lowerBound: float64(minCurrentCL), + upperBound: float64(plState.maxCL), + target: math.Max(float64(minCurrentCL), plState.seatDemandStats.smoothed), + }) + minCLSum += plState.minCL + minCurrentCLSum += minCurrentCL + } } if len(items) == 0 && cfgCtlr.nominalCLSum > 0 { klog.ErrorS(nil, "Impossible: no priority levels", "plStates", cfgCtlr.priorityLevelStates) return } - allocs, fairFrac, err := computeConcurrencyAllocation(cfgCtlr.nominalCLSum, items) - if err != nil { - klog.ErrorS(err, "Unable to derive new concurrency limits", "plNames", plNames, "items", items) - allocs = make([]float64, len(items)) - for idx, plName := range plNames { - plState := plStates[plName] - allocs[idx] = float64(plState.currentCL) + var allocs []float64 + var shareFrac, fairFrac float64 + var err error + if remainingServerCL <= minCLSum { + metrics.SetFairFrac(0) + } else if remainingServerCL <= minCurrentCLSum { + shareFrac = float64(remainingServerCL-minCLSum) / float64(minCurrentCLSum-minCLSum) + metrics.SetFairFrac(0) + } else { + allocs, fairFrac, err = computeConcurrencyAllocation(cfgCtlr.nominalCLSum, items) + if err != nil { + klog.ErrorS(err, "Unable to derive new concurrency limits", "plNames", nonExemptPLNames, "items", items) + allocs = make([]float64, len(items)) + for idx, plName := range nonExemptPLNames { + plState := plStates[plName] + allocs[idx] = float64(plState.currentCL) + } } + metrics.SetFairFrac(float64(fairFrac)) } - for idx, plName := range plNames { - plState := plStates[plName] + for plName, plState := range plStates { + idx, isNonExempt := idxOfNonExempt[plName] if setCompleters { qsCompleter, err := queueSetCompleterForPL(cfgCtlr.queueSetFactory, plState.queues, plState.pl, plState.reqsGaugePair, plState.execSeatsObs, @@ -440,10 +465,20 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta } plState.qsCompleter = qsCompleter } - currentCL := int(math.Round(float64(allocs[idx]))) + var currentCL int + if !isNonExempt { + currentCL = cclOfExempt[plName] + } else if remainingServerCL <= minCLSum { + currentCL = plState.minCL + } else if remainingServerCL <= minCurrentCLSum { + minCurrentCL := max(plState.minCL, min(plState.nominalCL, plState.seatDemandStats.highWatermark)) + currentCL = plState.minCL + int(math.Round(float64(minCurrentCL-plState.minCL)*shareFrac)) + } else { + currentCL = int(math.Round(float64(allocs[idx]))) + } relChange := relDiff(float64(currentCL), float64(plState.currentCL)) plState.currentCL = currentCL - metrics.NotePriorityLevelConcurrencyAdjustment(plState.pl.Name, plState.seatDemandStats.highWatermark, plState.seatDemandStats.avg, plState.seatDemandStats.stdDev, plState.seatDemandStats.smoothed, float64(items[idx].target), currentCL) + metrics.NotePriorityLevelConcurrencyAdjustment(plState.pl.Name, float64(plState.seatDemandStats.highWatermark), plState.seatDemandStats.avg, plState.seatDemandStats.stdDev, plState.seatDemandStats.smoothed, float64(items[idx].target), currentCL) logLevel := klog.Level(4) if relChange >= 0.05 { logLevel = 2 @@ -458,7 +493,6 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta klog.V(logLevel).InfoS("Update CurrentCL", "plName", plName, "seatDemandHighWatermark", plState.seatDemandStats.highWatermark, "seatDemandAvg", plState.seatDemandStats.avg, "seatDemandStdev", plState.seatDemandStats.stdDev, "seatDemandSmoothed", plState.seatDemandStats.smoothed, "fairFrac", fairFrac, "currentCL", currentCL, "concurrencyDenominator", concurrencyDenominator, "backstop", err != nil) plState.queues = plState.qsCompleter.Complete(fq.DispatchingConfig{ConcurrencyLimit: currentCL, ConcurrencyDenominator: concurrencyDenominator}) } - metrics.SetFairFrac(float64(fairFrac)) } // runWorker is the logic of the one and only worker goroutine. We diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/exempt_borrowing_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/exempt_borrowing_test.go new file mode 100644 index 00000000000..308c972f7cb --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/exempt_borrowing_test.go @@ -0,0 +1,164 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flowcontrol + +import ( + "testing" + "time" + + fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap" + fqs "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset" + testeventclock "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/eventclock" + "k8s.io/apiserver/pkg/util/flowcontrol/metrics" + "k8s.io/client-go/informers" + clientsetfake "k8s.io/client-go/kubernetes/fake" + + flowcontrol "k8s.io/api/flowcontrol/v1" +) + +func TestUpdateBorrowing(t *testing.T) { + startTime := time.Now() + clk, _ := testeventclock.NewFake(startTime, 0, nil) + plcExempt := fcboot.MandatoryPriorityLevelConfigurationExempt + plcHigh := fcboot.SuggestedPriorityLevelConfigurationWorkloadHigh + plcMid := fcboot.SuggestedPriorityLevelConfigurationWorkloadLow + plcLow := fcboot.MandatoryPriorityLevelConfigurationCatchAll + plcs := []*flowcontrol.PriorityLevelConfiguration{plcHigh, plcExempt, plcMid, plcLow} + fses := []*flowcontrol.FlowSchema{} + k8sClient := clientsetfake.NewSimpleClientset(plcLow, plcExempt, plcHigh, plcMid) + informerFactory := informers.NewSharedInformerFactory(k8sClient, 0) + flowcontrolClient := k8sClient.FlowcontrolV1() + serverCL := int(*plcHigh.Spec.Limited.NominalConcurrencyShares+ + *plcMid.Spec.Limited.NominalConcurrencyShares+ + *plcLow.Spec.Limited.NominalConcurrencyShares) * 6 + config := TestableConfig{ + Name: "test", + Clock: clk, + AsFieldManager: "testfm", + FoundToDangling: func(found bool) bool { return !found }, + InformerFactory: informerFactory, + FlowcontrolClient: flowcontrolClient, + ServerConcurrencyLimit: serverCL, + ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec, + ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec, + QueueSetFactory: fqs.NewQueueSetFactory(clk), + } + ctlr := newTestableController(config) + _ = ctlr.lockAndDigestConfigObjects(plcs, fses) + if ctlr.nominalCLSum != serverCL { + t.Fatalf("Unexpected rounding: nominalCLSum=%d", ctlr.nominalCLSum) + } + stateExempt := ctlr.priorityLevelStates[plcExempt.Name] + stateHigh := ctlr.priorityLevelStates[plcHigh.Name] + stateMid := ctlr.priorityLevelStates[plcMid.Name] + stateLow := ctlr.priorityLevelStates[plcLow.Name] + + // Scenario 1: everybody wants more than ServerConcurrencyLimit. + // Test the case of exempt borrowing so much that less than minCL + // is available to each non-exempt. + stateExempt.seatDemandIntegrator.Set(float64(serverCL + 100)) + stateHigh.seatDemandIntegrator.Set(float64(serverCL + 100)) + stateMid.seatDemandIntegrator.Set(float64(serverCL + 100)) + stateLow.seatDemandIntegrator.Set(float64(serverCL + 100)) + clk.SetTime(startTime.Add(borrowingAdjustmentPeriod)) + ctlr.updateBorrowing() + if expected, actual := serverCL+100, stateExempt.currentCL; expected != actual { + t.Errorf("Scenario 1: expected %d, got %d for exempt", expected, actual) + } else { + t.Logf("Scenario 1: expected and got %d for exempt", expected) + } + if expected, actual := stateHigh.minCL, stateHigh.currentCL; expected != actual { + t.Errorf("Scenario 1: expected %d, got %d for hi", expected, actual) + } else { + t.Logf("Scenario 1: expected and got %d for hi", expected) + } + if expected, actual := stateMid.minCL, stateMid.currentCL; expected != actual { + t.Errorf("Scenario 1: expected %d, got %d for mid", expected, actual) + } else { + t.Logf("Scenario 1: expected and got %d for mid", expected) + } + if expected, actual := stateLow.minCL, stateLow.currentCL; expected != actual { + t.Errorf("Scenario 1: expected %d, got %d for lo", expected, actual) + } else { + t.Logf("Scenario 1: expected and got %d for lo", expected) + } + + // Scenario 2: non-exempt want more than serverCL but get halfway between minCL and minCurrentCL. + expectedHigh := (stateHigh.nominalCL + stateHigh.minCL) / 2 + expectedMid := (stateMid.nominalCL + stateMid.minCL) / 2 + expectedLow := (stateLow.nominalCL + stateLow.minCL) / 2 + expectedExempt := serverCL - (expectedHigh + expectedMid + expectedLow) + stateExempt.seatDemandIntegrator.Set(float64(expectedExempt)) + clk.SetTime(startTime.Add(2 * borrowingAdjustmentPeriod)) + ctlr.updateBorrowing() + clk.SetTime(startTime.Add(3 * borrowingAdjustmentPeriod)) + ctlr.updateBorrowing() + if expected, actual := expectedExempt, stateExempt.currentCL; expected != actual { + t.Errorf("Scenario 2: expected %d, got %d for exempt", expected, actual) + } else { + t.Logf("Scenario 2: expected and got %d for exempt", expected) + } + if expected, actual := expectedHigh, stateHigh.currentCL; expected != actual { + t.Errorf("Scenario 2: expected %d, got %d for hi", expected, actual) + } else { + t.Logf("Scenario 2: expected and got %d for hi", expected) + } + if expected, actual := expectedMid, stateMid.currentCL; expected != actual { + t.Errorf("Scenario 2: expected %d, got %d for mid", expected, actual) + } else { + t.Logf("Scenario 2: expected and got %d for mid", expected) + } + if expected, actual := expectedLow, stateLow.currentCL; expected != actual { + t.Errorf("Scenario 2: expected %d, got %d for lo", expected, actual) + } else { + t.Logf("Scenario 2: expected and got %d for lo", expected) + } + + // Scenario 3: only mid is willing to lend, and exempt borrows all of that. + // Test the case of regular borrowing. + expectedHigh = stateHigh.nominalCL + expectedMid = stateMid.minCL + expectedLow = stateLow.nominalCL + expectedExempt = serverCL - (expectedHigh + expectedMid + expectedLow) + stateExempt.seatDemandIntegrator.Set(float64(expectedExempt)) + stateMid.seatDemandIntegrator.Set(float64(1)) + clk.SetTime(startTime.Add(4 * borrowingAdjustmentPeriod)) + ctlr.updateBorrowing() + clk.SetTime(startTime.Add(5 * borrowingAdjustmentPeriod)) + ctlr.updateBorrowing() + if expected, actual := expectedExempt, stateExempt.currentCL; expected != actual { + t.Errorf("Scenario 3: expected %d, got %d for exempt", expected, actual) + } else { + t.Logf("Scenario 3: expected and got %d for exempt", expected) + } + if expected, actual := expectedHigh, stateHigh.currentCL; expected != actual { + t.Errorf("Scenario 3: expected %d, got %d for hi", expected, actual) + } else { + t.Logf("Scenario 3: expected and got %d for hi", expected) + } + if expected, actual := expectedMid, stateMid.currentCL; expected != actual { + t.Errorf("Scenario 3: expected %d, got %d for mid", expected, actual) + } else { + t.Logf("Scenario 3: expected and got %d for mid", expected) + } + if expected, actual := expectedLow, stateLow.currentCL; expected != actual { + t.Errorf("Scenario 3: expected %d, got %d for lo", expected, actual) + } else { + t.Logf("Scenario 3: expected and got %d for lo", expected) + } + +}