apiserver: finish implementation of borrowing in APF

Also make some design changes exposed in testing and review.

Do not remove the ambiguous old metric
`apiserver_flowcontrol_request_concurrency_limit` because reviewers
though it is too early.  This creates a problem, that metric can not
keep both of its old meanings.  I chose the configured concurrency
limit.

Testing has revealed a design flaw, which concerns the initialization
of the seat demand state tracking.  The current design in the KEP is
as follows.

> Adjustment is also done on configuration change … For a newly
> introduced priority level, we set HighSeatDemand, AvgSeatDemand, and
> SmoothSeatDemand to NominalCL-LendableSD/2 and StDevSeatDemand to
> zero.

But this does not work out well at server startup.  As part of its
construction, the APF controller does a configuration change with zero
objects read, to initialize its request-handling state.  As always,
the two mandatory priority levels are implicitly added whenever they
are not read.  So this initial reconfig has one non-exempt priority
level, the mandatory one called catch-all --- and it gets its
SmoothSeatDemand initialized to the whole server concurrency limit.
From there it decays slowly, as per the regular design.  So for a
fairly long time, it appears to have a high demand and competes
strongly with the other priority levels.  Its Target is higher than
all the others, once they start to show up.  It properly gets a low
NominalCL once other levels show up, which actually makes it compete
harder for borrowing: it has an exceptionally high Target and a rather
low NominalCL.

I have considered the following fix.  The idea is that the designed
initialization is not appropriate before all the default objects are
read.  So the fix is to have a mode bit in the controller.  In the
initial state, those seat demand tracking variables are set to zero.
Once the config-producing controller detects that all the default
objects are pre-existing, it flips the mode bit.  In the later mode,
the seat demand tracking variables are initialized as originally
designed.

However, that still gives preferential treatment to the default
PriorityLevelConfiguration objects, over any that may be added later.

So I have made a universal and simpler fix: always initialize those
seat demand tracking variables to zero.  Even if a lot of load shows
up quickly, remember that adjustments are frequent (every 10 sec) and
the very next one will fully respond to that load.

Also: revise logging logic, to log at numerically lower V level when
there is a change.

Also: bug fix in float64close.

Also, separate imports in some file

Co-authored-by: Han Kang <hankang@google.com>
This commit is contained in:
Mike Spreitzer 2022-10-31 16:13:25 -07:00
parent 172b27c80c
commit feb4227788
13 changed files with 410 additions and 55 deletions

View File

@ -21,6 +21,7 @@ import (
"testing"
"github.com/google/go-cmp/cmp"
flowcontrol "k8s.io/api/flowcontrol/v1beta3"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"

View File

@ -21,6 +21,7 @@ import (
"testing"
"github.com/google/go-cmp/cmp"
flowcontrolv1alpha1 "k8s.io/api/flowcontrol/v1alpha1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/pointer"

View File

@ -21,6 +21,7 @@ import (
"testing"
"github.com/google/go-cmp/cmp"
flowcontrolv1beta1 "k8s.io/api/flowcontrol/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/pointer"

View File

@ -21,6 +21,7 @@ import (
"testing"
"github.com/google/go-cmp/cmp"
flowcontrolv1beta2 "k8s.io/api/flowcontrol/v1beta2"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/pointer"

View File

@ -21,6 +21,7 @@ import (
"testing"
"github.com/google/go-cmp/cmp"
flowcontrolv1beta3 "k8s.io/api/flowcontrol/v1beta3"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/pointer"

View File

@ -1112,7 +1112,7 @@ func startAPFController(t *testing.T, stopCh <-chan struct{}, apfConfiguration [
// make sure that apf controller syncs the priority level configuration object we are using in this test.
// read the metrics and ensure the concurrency limit for our priority level is set to the expected value.
pollErr := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) {
if err := gaugeValueMatch("apiserver_flowcontrol_request_concurrency_limit", map[string]string{"priority_level": plName}, plConcurrency); err != nil {
if err := gaugeValueMatch("apiserver_flowcontrol_nominal_limit_seats", map[string]string{"priority_level": plName}, plConcurrency); err != nil {
t.Logf("polling retry - error: %s", err)
return false, nil
}

View File

@ -177,6 +177,12 @@ type configController struct {
// name to the state for that level. Every name referenced from a
// member of `flowSchemas` has an entry here.
priorityLevelStates map[string]*priorityLevelState
// nominalCLSum is the sum of the nominalCL fields in the priorityLevelState records.
// This can exceed serverConcurrencyLimit because of the deliberate rounding up
// in the computation of the nominalCL values.
// This is tracked because it is an input to the allocation adjustment algorithm.
nominalCLSum int
}
type updateAttempt struct {
@ -225,6 +231,18 @@ type priorityLevelState struct {
// Periodically smoothed gets replaced with `max(envelope, A*smoothed + (1-A)*envelope)`,
// where A is seatDemandSmoothingCoefficient.
seatDemandStats seatDemandStats
// nominalCL is the nominal concurrency limit configured in the PriorityLevelConfiguration
nominalCL int
// minCL is the nominal limit less the lendable amount
minCL int
//maxCL is the nominal limit plus the amount that may be borrowed
maxCL int
// currentCL is the dynamically derived concurrency limit to impose for now
currentCL int
}
type seatDemandStats struct {
@ -234,15 +252,6 @@ type seatDemandStats struct {
smoothed float64
}
func newSeatDemandStats(val float64) seatDemandStats {
return seatDemandStats{
avg: val,
stdDev: 0,
highWatermark: val,
smoothed: val,
}
}
func (stats *seatDemandStats) update(obs fq.IntegratorResults) {
stats.avg = obs.Average
stats.stdDev = obs.Deviation
@ -368,12 +377,74 @@ func (cfgCtlr *configController) Run(stopCh <-chan struct{}) error {
func (cfgCtlr *configController) updateBorrowing() {
cfgCtlr.lock.Lock()
defer cfgCtlr.lock.Unlock()
for _, plState := range cfgCtlr.priorityLevelStates {
cfgCtlr.updateBorrowingLocked(true, cfgCtlr.priorityLevelStates)
}
func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plStates map[string]*priorityLevelState) {
items := make([]allocProblemItem, 0, len(plStates))
plNames := make([]string, 0, len(plStates))
for plName, plState := range plStates {
if plState.pl.Spec.Limited == nil {
continue
}
obs := plState.seatDemandIntegrator.Reset()
plState.seatDemandStats.update(obs)
metrics.NotePriorityLevelConcurrencyAdjustment(plState.pl.Name, plState.seatDemandStats.highWatermark, plState.seatDemandStats.avg, plState.seatDemandStats.stdDev, plState.seatDemandStats.smoothed /* TODO: add the designed rest for borrowing */)
// TODO: updathe CurrentCL as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/1040-priority-and-fairness#dispatching
// Lower bound on this priority level's adjusted concurreny limit is the lesser of:
// - its seat demamd high watermark over the last adjustment period, and
// - its configured concurrency limit.
// BUT: we do not want this to be lower than the lower bound from configuration.
// See KEP-1040 for a more detailed explanation.
minCurrentCL := math.Max(float64(plState.minCL), math.Min(float64(plState.nominalCL), plState.seatDemandStats.highWatermark))
plNames = append(plNames, plName)
items = append(items, allocProblemItem{
lowerBound: minCurrentCL,
upperBound: float64(plState.maxCL),
target: math.Max(minCurrentCL, plState.seatDemandStats.smoothed),
})
}
if len(items) == 0 && cfgCtlr.nominalCLSum > 0 {
klog.ErrorS(nil, "Impossible: no non-exempt priority levels", "plStates", cfgCtlr.priorityLevelStates)
return
}
allocs, fairFrac, err := computeConcurrencyAllocation(cfgCtlr.nominalCLSum, items)
if err != nil {
klog.ErrorS(err, "Unable to derive new concurrency limits", "plNames", plNames, "items", items)
allocs = make([]float64, len(items))
for idx, plName := range plNames {
plState := plStates[plName]
if plState.pl.Spec.Limited == nil {
continue
}
allocs[idx] = float64(plState.currentCL)
}
}
for idx, plName := range plNames {
plState := plStates[plName]
if plState.pl.Spec.Limited == nil {
continue
}
if setCompleters {
qsCompleter, err := queueSetCompleterForPL(cfgCtlr.queueSetFactory, plState.queues,
plState.pl, cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs,
metrics.NewUnionGauge(plState.seatDemandIntegrator, plState.seatDemandRatioedGauge))
if err != nil {
klog.ErrorS(err, "Inconceivable! Configuration error in existing priority level", "pl", plState.pl)
continue
}
plState.qsCompleter = qsCompleter
}
currentCL := int(math.Round(float64(allocs[idx])))
relChange := relDiff(float64(currentCL), float64(plState.currentCL))
plState.currentCL = currentCL
metrics.NotePriorityLevelConcurrencyAdjustment(plState.pl.Name, plState.seatDemandStats.highWatermark, plState.seatDemandStats.avg, plState.seatDemandStats.stdDev, plState.seatDemandStats.smoothed, float64(items[idx].target), currentCL)
logLevel := klog.Level(4)
if relChange >= 0.05 {
logLevel = 2
}
klog.V(logLevel).InfoS("Update CurrentCL", "plName", plName, "seatDemandHighWatermark", plState.seatDemandStats.highWatermark, "seatDemandAvg", plState.seatDemandStats.avg, "seatDemandStdev", plState.seatDemandStats.stdDev, "seatDemandSmoothed", plState.seatDemandStats.smoothed, "fairFrac", fairFrac, "currentCL", currentCL, "backstop", err != nil)
plState.queues = plState.qsCompleter.Complete(fq.DispatchingConfig{ConcurrencyLimit: currentCL})
}
metrics.SetFairFrac(float64(fairFrac))
}
// runWorker is the logic of the one and only worker goroutine. We
@ -605,7 +676,9 @@ func (meal *cfgMeal) digestNewPLsLocked(newPLs []*flowcontrol.PriorityLevelConfi
seatDemandRatioedGauge: metrics.ApiserverSeatDemands.NewForLabelValuesSafe(0, 1, []string{pl.Name}),
}
}
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.reqsGaugePair, state.execSeatsObs, metrics.NewUnionGauge(state.seatDemandIntegrator, state.seatDemandRatioedGauge))
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues,
pl, meal.cfgCtlr.requestWaitLimit, state.reqsGaugePair, state.execSeatsObs,
metrics.NewUnionGauge(state.seatDemandIntegrator, state.seatDemandRatioedGauge))
if err != nil {
klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err)
continue
@ -709,7 +782,9 @@ func (meal *cfgMeal) processOldPLsLocked() {
}
}
var err error
plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs, metrics.NewUnionGauge(plState.seatDemandIntegrator, plState.seatDemandRatioedGauge))
plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues,
plState.pl, meal.cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs,
metrics.NewUnionGauge(plState.seatDemandIntegrator, plState.seatDemandRatioedGauge))
if err != nil {
// This can not happen because queueSetCompleterForPL already approved this config
panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec)))
@ -739,29 +814,48 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
continue
}
limited := plState.pl.Spec.Limited
// The use of math.Ceil here means that the results might sum
// to a little more than serverConcurrencyLimit but the
// difference will be negligible.
concurrencyLimit := int(math.Ceil(float64(meal.cfgCtlr.serverConcurrencyLimit) * float64(plState.pl.Spec.Limited.NominalConcurrencyShares) / meal.shareSum))
metrics.UpdateSharedConcurrencyLimit(plName, concurrencyLimit)
metrics.SetPriorityLevelConfiguration(plName, concurrencyLimit /* TODO: pass min and max once new API is available */, concurrencyLimit, concurrencyLimit)
concurrencyLimit := int(math.Ceil(float64(meal.cfgCtlr.serverConcurrencyLimit) * float64(limited.NominalConcurrencyShares) / meal.shareSum))
var lendableCL, borrowingCL int
if limited.LendablePercent != nil {
lendableCL = int(math.Round(float64(concurrencyLimit) * float64(*limited.LendablePercent) / 100))
}
if limited.BorrowingLimitPercent != nil {
borrowingCL = int(math.Round(float64(concurrencyLimit) * float64(*limited.BorrowingLimitPercent) / 100))
} else {
borrowingCL = meal.cfgCtlr.serverConcurrencyLimit
}
metrics.SetPriorityLevelConfiguration(plName, concurrencyLimit, concurrencyLimit-lendableCL, concurrencyLimit+borrowingCL)
plState.seatDemandRatioedGauge.SetDenominator(float64(concurrencyLimit))
cfgChanged := plState.nominalCL != concurrencyLimit || plState.minCL != concurrencyLimit-lendableCL || plState.maxCL != concurrencyLimit+borrowingCL
plState.nominalCL = concurrencyLimit
plState.minCL = concurrencyLimit - lendableCL
plState.maxCL = concurrencyLimit + borrowingCL
meal.maxExecutingRequests += concurrencyLimit
var waitLimit int
if qCfg := plState.pl.Spec.Limited.LimitResponse.Queuing; qCfg != nil {
if qCfg := limited.LimitResponse.Queuing; qCfg != nil {
waitLimit = int(qCfg.Queues * qCfg.QueueLengthLimit)
}
meal.maxWaitingRequests += waitLimit
if plState.queues == nil {
klog.V(5).Infof("Introducing queues for priority level %q: config=%s, concurrencyLimit=%d, quiescing=%v (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, plState.quiescing, plState.pl.Spec.Limited.NominalConcurrencyShares, meal.shareSum)
plState.seatDemandStats = newSeatDemandStats(float64(concurrencyLimit))
// TODO: initialize as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/1040-priority-and-fairness#dispatching once NominalCL values are implemented
initialCL := concurrencyLimit - lendableCL/2
klog.V(2).Infof("Introducing queues for priority level %q: config=%s, nominalCL=%d, lendableCL=%d, borrowingCL=%d, currentCL=%d, quiescing=%v (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, lendableCL, borrowingCL, initialCL, plState.quiescing, plState.pl.Spec.Limited.NominalConcurrencyShares, meal.shareSum)
plState.seatDemandStats = seatDemandStats{}
plState.currentCL = initialCL
} else {
klog.V(5).Infof("Retaining queues for priority level %q: config=%s, concurrencyLimit=%d, quiescing=%v, numPending=%d (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, plState.quiescing, plState.numPending, plState.pl.Spec.Limited.NominalConcurrencyShares, meal.shareSum)
logLevel := klog.Level(5)
if cfgChanged {
logLevel = 2
}
klog.V(logLevel).Infof("Retaining queues for priority level %q: config=%s, nominalCL=%d, lendableCL=%d, borrowingCL=%d, currentCL=%d, quiescing=%v, numPending=%d (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, lendableCL, borrowingCL, plState.currentCL, plState.quiescing, plState.numPending, plState.pl.Spec.Limited.NominalConcurrencyShares, meal.shareSum)
}
plState.queues = plState.qsCompleter.Complete(fq.DispatchingConfig{ConcurrencyLimit: concurrencyLimit})
}
meal.cfgCtlr.nominalCLSum = meal.maxExecutingRequests
meal.cfgCtlr.updateBorrowingLocked(false, meal.newPLStates)
}
// queueSetCompleterForPL returns an appropriate QueueSetCompleter for the
@ -847,7 +941,9 @@ func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, re
execSeatsObs := meal.cfgCtlr.execSeatsGaugeVec.NewForLabelValuesSafe(0, 1, labelValues)
seatDemandIntegrator := fq.NewNamedIntegrator(meal.cfgCtlr.clock, proto.Name)
seatDemandRatioedGauge := metrics.ApiserverSeatDemands.NewForLabelValuesSafe(0, 1, []string{proto.Name})
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, reqsGaugePair, execSeatsObs, metrics.NewUnionGauge(seatDemandIntegrator, seatDemandRatioedGauge))
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto,
requestWaitLimit, reqsGaugePair, execSeatsObs,
metrics.NewUnionGauge(seatDemandIntegrator, seatDemandRatioedGauge))
if err != nil {
// This can not happen because proto is one of the mandatory
// objects and these are not erroneous
@ -1002,3 +1098,12 @@ func hashFlowID(fsName, fDistinguisher string) uint64 {
hash.Sum(sum[:0])
return binary.LittleEndian.Uint64(sum[:8])
}
func relDiff(x, y float64) float64 {
diff := math.Abs(x - y)
den := math.Max(math.Abs(x), math.Abs(y))
if den == 0 {
return 0
}
return diff / den
}

View File

@ -0,0 +1,251 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flowcontrol
import (
"context"
"fmt"
"sync"
"testing"
"time"
flowcontrol "k8s.io/api/flowcontrol/v1beta3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/eventclock"
fqs "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
)
type borrowingTestConstraints struct {
lendable, borrowing int32
}
// TestBorrowing tests borrowing of concurrency between priority levels.
// It runs two scenarios, one where the borrowing hits the limit on
// lendable concurrency and one where the borrowing hits the limit on
// borrowing of concurrency.
// Both scenarios are the same except for the limits.
// The test defines two priority levels, identified as "flows" 0 and 1.
// Both priority levels have a nominal concurrency limit of 12.
// The test maintains 24 concurrent clients for priority level 0
// and 6 for level 1,
// using an exec func that simply sleeps for 250 ms, for
// 25 seconds. The first 10 seconds of behavior are ignored, allowing
// the borrowing to start at any point during that time. The test
// continues for another 15 seconds, and checks that the delivered
// concurrency is about 16 for flow 0 and 6 for flow 1.
func TestBorrowing(t *testing.T) {
clientsPerFlow := [2]int{24, 6}
metrics.Register()
for _, testCase := range []struct {
name string
constraints []borrowingTestConstraints
}{
{
name: "lendable-limited",
constraints: []borrowingTestConstraints{
{lendable: 50, borrowing: 67},
{lendable: 33, borrowing: 50},
}},
{
name: "borrowing-limited",
constraints: []borrowingTestConstraints{
{lendable: 50, borrowing: 33},
{lendable: 67, borrowing: 50},
}},
} {
t.Run(testCase.name, func(t *testing.T) {
fsObjs := make([]*flowcontrol.FlowSchema, 2)
plcObjs := make([]*flowcontrol.PriorityLevelConfiguration, 2)
usernames := make([]string, 2)
cfgObjs := []runtime.Object{}
for flow := 0; flow < 2; flow++ {
usernames[flow] = fmt.Sprintf("test-user%d", flow)
plName := fmt.Sprintf("test-pl%d", flow)
fsObjs[flow] = &flowcontrol.FlowSchema{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("test-fs%d", flow),
},
Spec: flowcontrol.FlowSchemaSpec{
MatchingPrecedence: 100,
PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{
Name: plName,
},
DistinguisherMethod: &flowcontrol.FlowDistinguisherMethod{
Type: flowcontrol.FlowDistinguisherMethodByUserType,
},
Rules: []flowcontrol.PolicyRulesWithSubjects{{
Subjects: []flowcontrol.Subject{{
Kind: flowcontrol.SubjectKindUser,
User: &flowcontrol.UserSubject{Name: usernames[flow]},
}},
NonResourceRules: []flowcontrol.NonResourcePolicyRule{{
Verbs: []string{"*"},
NonResourceURLs: []string{"*"},
}},
}},
},
}
plcObjs[flow] = &flowcontrol.PriorityLevelConfiguration{
ObjectMeta: metav1.ObjectMeta{
Name: plName,
},
Spec: flowcontrol.PriorityLevelConfigurationSpec{
Type: flowcontrol.PriorityLevelEnablementLimited,
Limited: &flowcontrol.LimitedPriorityLevelConfiguration{
NominalConcurrencyShares: 100,
LendablePercent: &testCase.constraints[flow].lendable,
BorrowingLimitPercent: &testCase.constraints[flow].borrowing,
LimitResponse: flowcontrol.LimitResponse{
Type: flowcontrol.LimitResponseTypeQueue,
Queuing: &flowcontrol.QueuingConfiguration{
Queues: 10,
HandSize: 2,
QueueLengthLimit: 10,
},
},
},
},
}
cfgObjs = append(cfgObjs, fsObjs[flow], plcObjs[flow])
}
clientset := clientsetfake.NewSimpleClientset(cfgObjs...)
informerFactory := informers.NewSharedInformerFactory(clientset, time.Second)
flowcontrolClient := clientset.FlowcontrolV1beta3()
clk := eventclock.Real{}
controller := newTestableController(TestableConfig{
Name: "Controller",
Clock: clk,
AsFieldManager: ConfigConsumerAsFieldManager,
FoundToDangling: func(found bool) bool { return !found },
InformerFactory: informerFactory,
FlowcontrolClient: flowcontrolClient,
ServerConcurrencyLimit: 24,
RequestWaitLimit: time.Minute,
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
QueueSetFactory: fqs.NewQueueSetFactory(clk),
})
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Second)
stopCh := ctx.Done()
controllerCompletionCh := make(chan error)
informerFactory.Start(stopCh)
status := informerFactory.WaitForCacheSync(ctx.Done())
if names := unsynced(status); len(names) > 0 {
t.Fatalf("WaitForCacheSync did not successfully complete, resources=%#v", names)
}
go func() {
controllerCompletionCh <- controller.Run(stopCh)
}()
// ensure that the controller has run its first loop.
err := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) {
return controller.hasPriorityLevelState(plcObjs[0].Name), nil
})
if err != nil {
t.Errorf("expected the controller to reconcile the priority level configuration object: %s, error: %s", plcObjs[0].Name, err)
}
concIntegrators := make([]fq.Integrator, 2)
reqInfo := &request.RequestInfo{
IsResourceRequest: false,
Path: "/foobar",
Verb: "GET",
}
noteFn := func(fs *flowcontrol.FlowSchema, plc *flowcontrol.PriorityLevelConfiguration, fd string) {}
workEstr := func() fcrequest.WorkEstimate { return fcrequest.WorkEstimate{InitialSeats: 1} }
qnf := fq.QueueNoteFn(func(bool) {})
var startWG sync.WaitGroup
startWG.Add(clientsPerFlow[0] + clientsPerFlow[1])
// Launch 20 client threads for each flow
for flow := 0; flow < 2; flow++ {
username := usernames[flow]
flowUser := testUser{name: username}
rd := RequestDigest{
RequestInfo: reqInfo,
User: flowUser,
}
concIntegrator := fq.NewNamedIntegrator(clk, username)
concIntegrators[flow] = concIntegrator
exec := func() {
concIntegrator.Inc()
clk.Sleep(250 * time.Millisecond)
concIntegrator.Dec()
}
nThreads := clientsPerFlow[flow]
for thread := 0; thread < nThreads; thread++ {
go func() {
startWG.Done()
wait.Until(func() { controller.Handle(ctx, rd, noteFn, workEstr, qnf, exec) }, 0, ctx.Done())
}()
}
}
startWG.Wait()
// Make sure the controller has had time to sense the load and adjust
clk.Sleep(10 * time.Second)
// Start the stats that matter from now
for _, ci := range concIntegrators {
ci.Reset()
}
// Run for 15 seconds
clk.Sleep(15 * time.Second)
// Collect the delivered concurrency stats
results0 := concIntegrators[0].Reset()
results1 := concIntegrators[1].Reset()
// shut down all the async stuff
cancel()
// Do the checking
t.Log("waiting for the controller Run function to shutdown gracefully")
controllerErr := <-controllerCompletionCh
close(controllerCompletionCh)
if controllerErr != nil {
t.Errorf("expected nil error from controller Run function, but got: %#v", controllerErr)
}
if results0.Average < 15.5 || results0.Average > 16.1 {
t.Errorf("Flow 0 got average concurrency of %v but expected about 16", results0.Average)
} else {
t.Logf("Flow 0 got average concurrency of %v and expected about 16", results0.Average)
}
if results1.Average < 5.5 || results1.Average > 6.1 {
t.Errorf("Flow 1 got average concurrency of %v but expected about 6", results1.Average)
} else {
t.Logf("Flow 1 got average concurrency of %v and expected about 6", results1.Average)
}
})
}
}
type testUser struct{ name string }
func (tu testUser) GetName() string { return tu.name }
func (tu testUser) GetUID() string { return tu.name }
func (tu testUser) GetGroups() []string { return []string{user.AllAuthenticated} }
func (tu testUser) GetExtra() map[string][]string { return map[string][]string{} }

View File

@ -1527,12 +1527,9 @@ func newExecSeatsGauge(clk clock.PassiveClock) metrics.RatioedGauge {
func float64close(x, y float64) bool {
x0 := float64NaNTo0(x)
y0 := float64NaNTo0(y)
diff := math.Abs(x - y)
diff := math.Abs(x0 - y0)
den := math.Max(math.Abs(x0), math.Abs(y0))
if den == 0 {
return diff < 1e-10
}
return diff/den < 1e-10
return den == 0 || diff/den < 1e-10
}
func uint64max(a, b uint64) uint64 {

View File

@ -529,11 +529,6 @@ func AddRequestConcurrencyInUse(priorityLevel, flowSchema string, delta int) {
apiserverRequestConcurrencyInUse.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta))
}
// UpdateSharedConcurrencyLimit updates the value for the concurrency limit in flow control
func UpdateSharedConcurrencyLimit(priorityLevel string, limit int) {
apiserverRequestConcurrencyLimit.WithLabelValues(priorityLevel).Set(float64(limit))
}
// AddReject increments the # of rejected requests for flow control
func AddReject(ctx context.Context, priorityLevel, flowSchema, reason string) {
apiserverRejectedRequestsTotal.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema, reason).Add(1)
@ -585,19 +580,19 @@ func AddDispatchWithNoAccommodation(priorityLevel, flowSchema string) {
}
func SetPriorityLevelConfiguration(priorityLevel string, nominalCL, minCL, maxCL int) {
apiserverRequestConcurrencyLimit.WithLabelValues(priorityLevel).Set(float64(nominalCL))
apiserverNominalConcurrencyLimits.WithLabelValues(priorityLevel).Set(float64(nominalCL))
apiserverMinimumConcurrencyLimits.WithLabelValues(priorityLevel).Set(float64(minCL))
apiserverMaximumConcurrencyLimits.WithLabelValues(priorityLevel).Set(float64(maxCL))
}
func NotePriorityLevelConcurrencyAdjustment(priorityLevel string, seatDemandHWM, seatDemandAvg, seatDemandStdev, seatDemandSmoothed float64 /* TODO: seatDemandTarget float64, currentCL int */) {
func NotePriorityLevelConcurrencyAdjustment(priorityLevel string, seatDemandHWM, seatDemandAvg, seatDemandStdev, seatDemandSmoothed, seatDemandTarget float64, currentCL int) {
apiserverSeatDemandHighWatermarks.WithLabelValues(priorityLevel).Set(seatDemandHWM)
apiserverSeatDemandAverages.WithLabelValues(priorityLevel).Set(seatDemandAvg)
apiserverSeatDemandStandardDeviations.WithLabelValues(priorityLevel).Set(seatDemandStdev)
apiserverSeatDemandSmootheds.WithLabelValues(priorityLevel).Set(seatDemandSmoothed)
// TODO: the following once new API is available
// apiserverSeatDemandTargets.WithLabelValues(priorityLevel).Set(seatDemandTarget)
// apiserverCurrentConcurrencyLimits.WithLabelValues(priorityLevel).Set(currentCL)
apiserverSeatDemandTargets.WithLabelValues(priorityLevel).Set(seatDemandTarget)
apiserverCurrentConcurrencyLimits.WithLabelValues(priorityLevel).Set(float64(currentCL))
}
func SetFairFrac(fairFrac float64) {

View File

@ -43,7 +43,7 @@ import (
)
const (
requestConcurrencyLimitMetricName = "apiserver_flowcontrol_request_concurrency_limit"
nominalConcurrencyLimitMetricName = "apiserver_flowcontrol_nominal_limit_seats"
priorityLevelLabelName = "priority_level"
)
@ -146,7 +146,7 @@ var _ = SIGDescribe("API priority and fairness", func() {
ginkgo.By("getting request concurrency from metrics")
for i := range clients {
realConcurrency, err := getPriorityLevelConcurrency(f.ClientSet, clients[i].priorityLevelName)
realConcurrency, err := getPriorityLevelNominalConcurrency(f.ClientSet, clients[i].priorityLevelName)
framework.ExpectNoError(err)
clients[i].concurrency = int32(float64(realConcurrency) * clients[i].concurrencyMultiplier)
if clients[i].concurrency < 1 {
@ -219,7 +219,7 @@ var _ = SIGDescribe("API priority and fairness", func() {
}
framework.Logf("getting real concurrency")
realConcurrency, err := getPriorityLevelConcurrency(f.ClientSet, priorityLevelName)
realConcurrency, err := getPriorityLevelNominalConcurrency(f.ClientSet, priorityLevelName)
framework.ExpectNoError(err)
for i := range clients {
clients[i].concurrency = int32(float64(realConcurrency) * clients[i].concurrencyMultiplier)
@ -280,7 +280,7 @@ func createPriorityLevel(f *framework.Framework, priorityLevelName string, nomin
}
}
func getPriorityLevelConcurrency(c clientset.Interface, priorityLevelName string) (int32, error) {
func getPriorityLevelNominalConcurrency(c clientset.Interface, priorityLevelName string) (int32, error) {
resp, err := c.CoreV1().RESTClient().Get().RequestURI("/metrics").DoRaw(context.TODO())
if err != nil {
return 0, err
@ -299,7 +299,7 @@ func getPriorityLevelConcurrency(c clientset.Interface, priorityLevelName string
return 0, err
}
for _, metric := range v {
if string(metric.Metric[model.MetricNameLabel]) != requestConcurrencyLimitMetricName {
if string(metric.Metric[model.MetricNameLabel]) != nominalConcurrencyLimitMetricName {
continue
}
if string(metric.Metric[priorityLevelLabelName]) != priorityLevelName {
@ -376,7 +376,7 @@ func waitForSteadyState(f *framework.Framework, flowSchemaName string, priorityL
// hasn't been achieved.
return false, nil
}
_, err = getPriorityLevelConcurrency(f.ClientSet, priorityLevelName)
_, err = getPriorityLevelNominalConcurrency(f.ClientSet, priorityLevelName)
if err != nil {
if err == errPriorityLevelNotFound {
return false, nil

View File

@ -41,7 +41,7 @@ import (
)
const (
sharedConcurrencyMetricsName = "apiserver_flowcontrol_request_concurrency_limit"
nominalConcurrencyMetricsName = "apiserver_flowcontrol_nominal_limit_seats"
dispatchedRequestCountMetricsName = "apiserver_flowcontrol_dispatched_requests_total"
rejectedRequestCountMetricsName = "apiserver_flowcontrol_rejected_requests_total"
labelPriorityLevel = "priority_level"
@ -84,16 +84,16 @@ func TestPriorityLevelIsolation(t *testing.T) {
t.Error(err)
}
sharedConcurrency, err := getSharedConcurrencyOfPriorityLevel(loopbackClient)
nominalConcurrency, err := getNominalConcurrencyOfPriorityLevel(loopbackClient)
if err != nil {
t.Error(err)
}
if 1 != sharedConcurrency[priorityLevelNoxu1.Name] {
t.Errorf("unexpected shared concurrency %v instead of %v", sharedConcurrency[priorityLevelNoxu1.Name], 1)
if 1 != nominalConcurrency[priorityLevelNoxu1.Name] {
t.Errorf("unexpected shared concurrency %v instead of %v", nominalConcurrency[priorityLevelNoxu1.Name], 1)
}
if 1 != sharedConcurrency[priorityLevelNoxu2.Name] {
t.Errorf("unexpected shared concurrency %v instead of %v", sharedConcurrency[priorityLevelNoxu2.Name], 1)
if 1 != nominalConcurrency[priorityLevelNoxu2.Name] {
t.Errorf("unexpected shared concurrency %v instead of %v", nominalConcurrency[priorityLevelNoxu2.Name], 1)
}
stopCh := make(chan struct{})
@ -164,7 +164,7 @@ func getMetrics(c clientset.Interface) (string, error) {
return string(resp), err
}
func getSharedConcurrencyOfPriorityLevel(c clientset.Interface) (map[string]int, error) {
func getNominalConcurrencyOfPriorityLevel(c clientset.Interface) (map[string]int, error) {
resp, err := getMetrics(c)
if err != nil {
return nil, err
@ -188,7 +188,7 @@ func getSharedConcurrencyOfPriorityLevel(c clientset.Interface) (map[string]int,
}
for _, metric := range v {
switch name := string(metric.Metric[model.MetricNameLabel]); name {
case sharedConcurrencyMetricsName:
case nominalConcurrencyMetricsName:
concurrency[string(metric.Metric[labelPriorityLevel])] = int(metric.Value)
}
}
@ -230,6 +230,7 @@ func getRequestCountOfPriorityLevel(c clientset.Interface) (map[string]int, map[
}
func createPriorityLevelAndBindingFlowSchemaForUser(c clientset.Interface, username string, concurrencyShares, queuelength int) (*flowcontrol.PriorityLevelConfiguration, *flowcontrol.FlowSchema, error) {
i0 := int32(0)
pl, err := c.FlowcontrolV1beta3().PriorityLevelConfigurations().Create(context.Background(), &flowcontrol.PriorityLevelConfiguration{
ObjectMeta: metav1.ObjectMeta{
Name: username,
@ -238,6 +239,7 @@ func createPriorityLevelAndBindingFlowSchemaForUser(c clientset.Interface, usern
Type: flowcontrol.PriorityLevelEnablementLimited,
Limited: &flowcontrol.LimitedPriorityLevelConfiguration{
NominalConcurrencyShares: int32(concurrencyShares),
BorrowingLimitPercent: &i0,
LimitResponse: flowcontrol.LimitResponse{
Type: flowcontrol.LimitResponseTypeQueue,
Queuing: &flowcontrol.QueuingConfiguration{

View File

@ -41,7 +41,7 @@ import (
)
const (
requestConcurrencyLimitMetricsName = "apiserver_flowcontrol_request_concurrency_limit"
nominalConcurrencyLimitMetricsName = "apiserver_flowcontrol_nominal_limit_seats"
requestExecutionSecondsSumName = "apiserver_flowcontrol_request_execution_seconds_sum"
requestExecutionSecondsCountName = "apiserver_flowcontrol_request_execution_seconds_count"
priorityLevelSeatUtilSumName = "apiserver_flowcontrol_priority_level_seat_utilization_sum"
@ -350,7 +350,7 @@ func getRequestMetricsSnapshot(c clientset.Interface) (metricSnapshot, error) {
entry.seatUtil.Sum = float64(metric.Value)
case priorityLevelSeatUtilCountName:
entry.seatUtil.Count = int(metric.Value)
case requestConcurrencyLimitMetricsName:
case nominalConcurrencyLimitMetricsName:
entry.availableSeats = int(metric.Value)
}
snapshot[plLabel] = entry