mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 13:37:30 +00:00
Merge pull request #124736 from MikeSpreitzer/exempt-borrows-more
More assertive borrowing by exempt
This commit is contained in:
commit
d040043edb
@ -204,7 +204,7 @@ type priorityLevelState struct {
|
|||||||
// reached through this pointer is mutable.
|
// reached through this pointer is mutable.
|
||||||
pl *flowcontrol.PriorityLevelConfiguration
|
pl *flowcontrol.PriorityLevelConfiguration
|
||||||
|
|
||||||
// qsCompleter holds the QueueSetCompleter derived from `config`
|
// qsCompleter holds the QueueSetCompleter derived from `pl`
|
||||||
// and `queues`.
|
// and `queues`.
|
||||||
qsCompleter fq.QueueSetCompleter
|
qsCompleter fq.QueueSetCompleter
|
||||||
|
|
||||||
@ -255,12 +255,12 @@ type priorityLevelState struct {
|
|||||||
type seatDemandStats struct {
|
type seatDemandStats struct {
|
||||||
avg float64
|
avg float64
|
||||||
stdDev float64
|
stdDev float64
|
||||||
highWatermark float64
|
highWatermark int
|
||||||
smoothed float64
|
smoothed float64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (stats *seatDemandStats) update(obs fq.IntegratorResults) {
|
func (stats *seatDemandStats) update(obs fq.IntegratorResults) {
|
||||||
stats.highWatermark = obs.Max
|
stats.highWatermark = int(math.Round(obs.Max))
|
||||||
if obs.Duration <= 0 {
|
if obs.Duration <= 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -398,38 +398,63 @@ func (cfgCtlr *configController) updateBorrowing() {
|
|||||||
|
|
||||||
func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plStates map[string]*priorityLevelState) {
|
func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plStates map[string]*priorityLevelState) {
|
||||||
items := make([]allocProblemItem, 0, len(plStates))
|
items := make([]allocProblemItem, 0, len(plStates))
|
||||||
plNames := make([]string, 0, len(plStates))
|
nonExemptPLNames := make([]string, 0, len(plStates))
|
||||||
|
idxOfNonExempt := map[string]int{} // items index of non-exempt classes
|
||||||
|
cclOfExempt := map[string]int{} // minCurrentCL of exempt classes
|
||||||
|
var minCLSum, minCurrentCLSum int // sums over non-exempt classes
|
||||||
|
remainingServerCL := cfgCtlr.nominalCLSum
|
||||||
for plName, plState := range plStates {
|
for plName, plState := range plStates {
|
||||||
obs := plState.seatDemandIntegrator.Reset()
|
obs := plState.seatDemandIntegrator.Reset()
|
||||||
plState.seatDemandStats.update(obs)
|
plState.seatDemandStats.update(obs)
|
||||||
|
var minCurrentCL int
|
||||||
|
if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt {
|
||||||
|
minCurrentCL = max(plState.minCL, plState.seatDemandStats.highWatermark)
|
||||||
|
cclOfExempt[plName] = minCurrentCL
|
||||||
|
remainingServerCL -= minCurrentCL
|
||||||
|
} else {
|
||||||
// Lower bound on this priority level's adjusted concurreny limit is the lesser of:
|
// Lower bound on this priority level's adjusted concurreny limit is the lesser of:
|
||||||
// - its seat demamd high watermark over the last adjustment period, and
|
// - its seat demamd high watermark over the last adjustment period, and
|
||||||
// - its configured concurrency limit.
|
// - its configured concurrency limit.
|
||||||
// BUT: we do not want this to be lower than the lower bound from configuration.
|
// BUT: we do not want this to be lower than the lower bound from configuration.
|
||||||
// See KEP-1040 for a more detailed explanation.
|
// See KEP-1040 for a more detailed explanation.
|
||||||
minCurrentCL := math.Max(float64(plState.minCL), math.Min(float64(plState.nominalCL), plState.seatDemandStats.highWatermark))
|
minCurrentCL = max(plState.minCL, min(plState.nominalCL, plState.seatDemandStats.highWatermark))
|
||||||
plNames = append(plNames, plName)
|
idxOfNonExempt[plName] = len(items)
|
||||||
|
nonExemptPLNames = append(nonExemptPLNames, plName)
|
||||||
items = append(items, allocProblemItem{
|
items = append(items, allocProblemItem{
|
||||||
lowerBound: minCurrentCL,
|
lowerBound: float64(minCurrentCL),
|
||||||
upperBound: float64(plState.maxCL),
|
upperBound: float64(plState.maxCL),
|
||||||
target: math.Max(minCurrentCL, plState.seatDemandStats.smoothed),
|
target: math.Max(float64(minCurrentCL), plState.seatDemandStats.smoothed),
|
||||||
})
|
})
|
||||||
|
minCLSum += plState.minCL
|
||||||
|
minCurrentCLSum += minCurrentCL
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if len(items) == 0 && cfgCtlr.nominalCLSum > 0 {
|
if len(items) == 0 && cfgCtlr.nominalCLSum > 0 {
|
||||||
klog.ErrorS(nil, "Impossible: no priority levels", "plStates", cfgCtlr.priorityLevelStates)
|
klog.ErrorS(nil, "Impossible: no priority levels", "plStates", cfgCtlr.priorityLevelStates)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
allocs, fairFrac, err := computeConcurrencyAllocation(cfgCtlr.nominalCLSum, items)
|
var allocs []float64
|
||||||
|
var shareFrac, fairFrac float64
|
||||||
|
var err error
|
||||||
|
if remainingServerCL <= minCLSum {
|
||||||
|
metrics.SetFairFrac(0)
|
||||||
|
} else if remainingServerCL <= minCurrentCLSum {
|
||||||
|
shareFrac = float64(remainingServerCL-minCLSum) / float64(minCurrentCLSum-minCLSum)
|
||||||
|
metrics.SetFairFrac(0)
|
||||||
|
} else {
|
||||||
|
allocs, fairFrac, err = computeConcurrencyAllocation(cfgCtlr.nominalCLSum, items)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.ErrorS(err, "Unable to derive new concurrency limits", "plNames", plNames, "items", items)
|
klog.ErrorS(err, "Unable to derive new concurrency limits", "plNames", nonExemptPLNames, "items", items)
|
||||||
allocs = make([]float64, len(items))
|
allocs = make([]float64, len(items))
|
||||||
for idx, plName := range plNames {
|
for idx, plName := range nonExemptPLNames {
|
||||||
plState := plStates[plName]
|
plState := plStates[plName]
|
||||||
allocs[idx] = float64(plState.currentCL)
|
allocs[idx] = float64(plState.currentCL)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for idx, plName := range plNames {
|
metrics.SetFairFrac(float64(fairFrac))
|
||||||
plState := plStates[plName]
|
}
|
||||||
|
for plName, plState := range plStates {
|
||||||
|
idx, isNonExempt := idxOfNonExempt[plName]
|
||||||
if setCompleters {
|
if setCompleters {
|
||||||
qsCompleter, err := queueSetCompleterForPL(cfgCtlr.queueSetFactory, plState.queues,
|
qsCompleter, err := queueSetCompleterForPL(cfgCtlr.queueSetFactory, plState.queues,
|
||||||
plState.pl, plState.reqsGaugePair, plState.execSeatsObs,
|
plState.pl, plState.reqsGaugePair, plState.execSeatsObs,
|
||||||
@ -440,10 +465,20 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta
|
|||||||
}
|
}
|
||||||
plState.qsCompleter = qsCompleter
|
plState.qsCompleter = qsCompleter
|
||||||
}
|
}
|
||||||
currentCL := int(math.Round(float64(allocs[idx])))
|
var currentCL int
|
||||||
|
if !isNonExempt {
|
||||||
|
currentCL = cclOfExempt[plName]
|
||||||
|
} else if remainingServerCL <= minCLSum {
|
||||||
|
currentCL = plState.minCL
|
||||||
|
} else if remainingServerCL <= minCurrentCLSum {
|
||||||
|
minCurrentCL := max(plState.minCL, min(plState.nominalCL, plState.seatDemandStats.highWatermark))
|
||||||
|
currentCL = plState.minCL + int(math.Round(float64(minCurrentCL-plState.minCL)*shareFrac))
|
||||||
|
} else {
|
||||||
|
currentCL = int(math.Round(float64(allocs[idx])))
|
||||||
|
}
|
||||||
relChange := relDiff(float64(currentCL), float64(plState.currentCL))
|
relChange := relDiff(float64(currentCL), float64(plState.currentCL))
|
||||||
plState.currentCL = currentCL
|
plState.currentCL = currentCL
|
||||||
metrics.NotePriorityLevelConcurrencyAdjustment(plState.pl.Name, plState.seatDemandStats.highWatermark, plState.seatDemandStats.avg, plState.seatDemandStats.stdDev, plState.seatDemandStats.smoothed, float64(items[idx].target), currentCL)
|
metrics.NotePriorityLevelConcurrencyAdjustment(plState.pl.Name, float64(plState.seatDemandStats.highWatermark), plState.seatDemandStats.avg, plState.seatDemandStats.stdDev, plState.seatDemandStats.smoothed, float64(items[idx].target), currentCL)
|
||||||
logLevel := klog.Level(4)
|
logLevel := klog.Level(4)
|
||||||
if relChange >= 0.05 {
|
if relChange >= 0.05 {
|
||||||
logLevel = 2
|
logLevel = 2
|
||||||
@ -458,7 +493,6 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta
|
|||||||
klog.V(logLevel).InfoS("Update CurrentCL", "plName", plName, "seatDemandHighWatermark", plState.seatDemandStats.highWatermark, "seatDemandAvg", plState.seatDemandStats.avg, "seatDemandStdev", plState.seatDemandStats.stdDev, "seatDemandSmoothed", plState.seatDemandStats.smoothed, "fairFrac", fairFrac, "currentCL", currentCL, "concurrencyDenominator", concurrencyDenominator, "backstop", err != nil)
|
klog.V(logLevel).InfoS("Update CurrentCL", "plName", plName, "seatDemandHighWatermark", plState.seatDemandStats.highWatermark, "seatDemandAvg", plState.seatDemandStats.avg, "seatDemandStdev", plState.seatDemandStats.stdDev, "seatDemandSmoothed", plState.seatDemandStats.smoothed, "fairFrac", fairFrac, "currentCL", currentCL, "concurrencyDenominator", concurrencyDenominator, "backstop", err != nil)
|
||||||
plState.queues = plState.qsCompleter.Complete(fq.DispatchingConfig{ConcurrencyLimit: currentCL, ConcurrencyDenominator: concurrencyDenominator})
|
plState.queues = plState.qsCompleter.Complete(fq.DispatchingConfig{ConcurrencyLimit: currentCL, ConcurrencyDenominator: concurrencyDenominator})
|
||||||
}
|
}
|
||||||
metrics.SetFairFrac(float64(fairFrac))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// runWorker is the logic of the one and only worker goroutine. We
|
// runWorker is the logic of the one and only worker goroutine. We
|
||||||
|
@ -0,0 +1,164 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2024 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package flowcontrol
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
|
||||||
|
fqs "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset"
|
||||||
|
testeventclock "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/eventclock"
|
||||||
|
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||||
|
"k8s.io/client-go/informers"
|
||||||
|
clientsetfake "k8s.io/client-go/kubernetes/fake"
|
||||||
|
|
||||||
|
flowcontrol "k8s.io/api/flowcontrol/v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestUpdateBorrowing(t *testing.T) {
|
||||||
|
startTime := time.Now()
|
||||||
|
clk, _ := testeventclock.NewFake(startTime, 0, nil)
|
||||||
|
plcExempt := fcboot.MandatoryPriorityLevelConfigurationExempt
|
||||||
|
plcHigh := fcboot.SuggestedPriorityLevelConfigurationWorkloadHigh
|
||||||
|
plcMid := fcboot.SuggestedPriorityLevelConfigurationWorkloadLow
|
||||||
|
plcLow := fcboot.MandatoryPriorityLevelConfigurationCatchAll
|
||||||
|
plcs := []*flowcontrol.PriorityLevelConfiguration{plcHigh, plcExempt, plcMid, plcLow}
|
||||||
|
fses := []*flowcontrol.FlowSchema{}
|
||||||
|
k8sClient := clientsetfake.NewSimpleClientset(plcLow, plcExempt, plcHigh, plcMid)
|
||||||
|
informerFactory := informers.NewSharedInformerFactory(k8sClient, 0)
|
||||||
|
flowcontrolClient := k8sClient.FlowcontrolV1()
|
||||||
|
serverCL := int(*plcHigh.Spec.Limited.NominalConcurrencyShares+
|
||||||
|
*plcMid.Spec.Limited.NominalConcurrencyShares+
|
||||||
|
*plcLow.Spec.Limited.NominalConcurrencyShares) * 6
|
||||||
|
config := TestableConfig{
|
||||||
|
Name: "test",
|
||||||
|
Clock: clk,
|
||||||
|
AsFieldManager: "testfm",
|
||||||
|
FoundToDangling: func(found bool) bool { return !found },
|
||||||
|
InformerFactory: informerFactory,
|
||||||
|
FlowcontrolClient: flowcontrolClient,
|
||||||
|
ServerConcurrencyLimit: serverCL,
|
||||||
|
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
|
||||||
|
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
|
||||||
|
QueueSetFactory: fqs.NewQueueSetFactory(clk),
|
||||||
|
}
|
||||||
|
ctlr := newTestableController(config)
|
||||||
|
_ = ctlr.lockAndDigestConfigObjects(plcs, fses)
|
||||||
|
if ctlr.nominalCLSum != serverCL {
|
||||||
|
t.Fatalf("Unexpected rounding: nominalCLSum=%d", ctlr.nominalCLSum)
|
||||||
|
}
|
||||||
|
stateExempt := ctlr.priorityLevelStates[plcExempt.Name]
|
||||||
|
stateHigh := ctlr.priorityLevelStates[plcHigh.Name]
|
||||||
|
stateMid := ctlr.priorityLevelStates[plcMid.Name]
|
||||||
|
stateLow := ctlr.priorityLevelStates[plcLow.Name]
|
||||||
|
|
||||||
|
// Scenario 1: everybody wants more than ServerConcurrencyLimit.
|
||||||
|
// Test the case of exempt borrowing so much that less than minCL
|
||||||
|
// is available to each non-exempt.
|
||||||
|
stateExempt.seatDemandIntegrator.Set(float64(serverCL + 100))
|
||||||
|
stateHigh.seatDemandIntegrator.Set(float64(serverCL + 100))
|
||||||
|
stateMid.seatDemandIntegrator.Set(float64(serverCL + 100))
|
||||||
|
stateLow.seatDemandIntegrator.Set(float64(serverCL + 100))
|
||||||
|
clk.SetTime(startTime.Add(borrowingAdjustmentPeriod))
|
||||||
|
ctlr.updateBorrowing()
|
||||||
|
if expected, actual := serverCL+100, stateExempt.currentCL; expected != actual {
|
||||||
|
t.Errorf("Scenario 1: expected %d, got %d for exempt", expected, actual)
|
||||||
|
} else {
|
||||||
|
t.Logf("Scenario 1: expected and got %d for exempt", expected)
|
||||||
|
}
|
||||||
|
if expected, actual := stateHigh.minCL, stateHigh.currentCL; expected != actual {
|
||||||
|
t.Errorf("Scenario 1: expected %d, got %d for hi", expected, actual)
|
||||||
|
} else {
|
||||||
|
t.Logf("Scenario 1: expected and got %d for hi", expected)
|
||||||
|
}
|
||||||
|
if expected, actual := stateMid.minCL, stateMid.currentCL; expected != actual {
|
||||||
|
t.Errorf("Scenario 1: expected %d, got %d for mid", expected, actual)
|
||||||
|
} else {
|
||||||
|
t.Logf("Scenario 1: expected and got %d for mid", expected)
|
||||||
|
}
|
||||||
|
if expected, actual := stateLow.minCL, stateLow.currentCL; expected != actual {
|
||||||
|
t.Errorf("Scenario 1: expected %d, got %d for lo", expected, actual)
|
||||||
|
} else {
|
||||||
|
t.Logf("Scenario 1: expected and got %d for lo", expected)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Scenario 2: non-exempt want more than serverCL but get halfway between minCL and minCurrentCL.
|
||||||
|
expectedHigh := (stateHigh.nominalCL + stateHigh.minCL) / 2
|
||||||
|
expectedMid := (stateMid.nominalCL + stateMid.minCL) / 2
|
||||||
|
expectedLow := (stateLow.nominalCL + stateLow.minCL) / 2
|
||||||
|
expectedExempt := serverCL - (expectedHigh + expectedMid + expectedLow)
|
||||||
|
stateExempt.seatDemandIntegrator.Set(float64(expectedExempt))
|
||||||
|
clk.SetTime(startTime.Add(2 * borrowingAdjustmentPeriod))
|
||||||
|
ctlr.updateBorrowing()
|
||||||
|
clk.SetTime(startTime.Add(3 * borrowingAdjustmentPeriod))
|
||||||
|
ctlr.updateBorrowing()
|
||||||
|
if expected, actual := expectedExempt, stateExempt.currentCL; expected != actual {
|
||||||
|
t.Errorf("Scenario 2: expected %d, got %d for exempt", expected, actual)
|
||||||
|
} else {
|
||||||
|
t.Logf("Scenario 2: expected and got %d for exempt", expected)
|
||||||
|
}
|
||||||
|
if expected, actual := expectedHigh, stateHigh.currentCL; expected != actual {
|
||||||
|
t.Errorf("Scenario 2: expected %d, got %d for hi", expected, actual)
|
||||||
|
} else {
|
||||||
|
t.Logf("Scenario 2: expected and got %d for hi", expected)
|
||||||
|
}
|
||||||
|
if expected, actual := expectedMid, stateMid.currentCL; expected != actual {
|
||||||
|
t.Errorf("Scenario 2: expected %d, got %d for mid", expected, actual)
|
||||||
|
} else {
|
||||||
|
t.Logf("Scenario 2: expected and got %d for mid", expected)
|
||||||
|
}
|
||||||
|
if expected, actual := expectedLow, stateLow.currentCL; expected != actual {
|
||||||
|
t.Errorf("Scenario 2: expected %d, got %d for lo", expected, actual)
|
||||||
|
} else {
|
||||||
|
t.Logf("Scenario 2: expected and got %d for lo", expected)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Scenario 3: only mid is willing to lend, and exempt borrows all of that.
|
||||||
|
// Test the case of regular borrowing.
|
||||||
|
expectedHigh = stateHigh.nominalCL
|
||||||
|
expectedMid = stateMid.minCL
|
||||||
|
expectedLow = stateLow.nominalCL
|
||||||
|
expectedExempt = serverCL - (expectedHigh + expectedMid + expectedLow)
|
||||||
|
stateExempt.seatDemandIntegrator.Set(float64(expectedExempt))
|
||||||
|
stateMid.seatDemandIntegrator.Set(float64(1))
|
||||||
|
clk.SetTime(startTime.Add(4 * borrowingAdjustmentPeriod))
|
||||||
|
ctlr.updateBorrowing()
|
||||||
|
clk.SetTime(startTime.Add(5 * borrowingAdjustmentPeriod))
|
||||||
|
ctlr.updateBorrowing()
|
||||||
|
if expected, actual := expectedExempt, stateExempt.currentCL; expected != actual {
|
||||||
|
t.Errorf("Scenario 3: expected %d, got %d for exempt", expected, actual)
|
||||||
|
} else {
|
||||||
|
t.Logf("Scenario 3: expected and got %d for exempt", expected)
|
||||||
|
}
|
||||||
|
if expected, actual := expectedHigh, stateHigh.currentCL; expected != actual {
|
||||||
|
t.Errorf("Scenario 3: expected %d, got %d for hi", expected, actual)
|
||||||
|
} else {
|
||||||
|
t.Logf("Scenario 3: expected and got %d for hi", expected)
|
||||||
|
}
|
||||||
|
if expected, actual := expectedMid, stateMid.currentCL; expected != actual {
|
||||||
|
t.Errorf("Scenario 3: expected %d, got %d for mid", expected, actual)
|
||||||
|
} else {
|
||||||
|
t.Logf("Scenario 3: expected and got %d for mid", expected)
|
||||||
|
}
|
||||||
|
if expected, actual := expectedLow, stateLow.currentCL; expected != actual {
|
||||||
|
t.Errorf("Scenario 3: expected %d, got %d for lo", expected, actual)
|
||||||
|
} else {
|
||||||
|
t.Logf("Scenario 3: expected and got %d for lo", expected)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user