mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #105873 from MikeSpreitzer/more-seat-metrics
More seat metrics for APF
This commit is contained in:
commit
9351ea291a
@ -101,7 +101,7 @@ func WithPriorityAndFairness(
|
|||||||
}
|
}
|
||||||
|
|
||||||
var classification *PriorityAndFairnessClassification
|
var classification *PriorityAndFairnessClassification
|
||||||
note := func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) {
|
estimateWork := func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) flowcontrolrequest.WorkEstimate {
|
||||||
classification = &PriorityAndFairnessClassification{
|
classification = &PriorityAndFairnessClassification{
|
||||||
FlowSchemaName: fs.Name,
|
FlowSchemaName: fs.Name,
|
||||||
FlowSchemaUID: fs.UID,
|
FlowSchemaUID: fs.UID,
|
||||||
@ -111,6 +111,7 @@ func WithPriorityAndFairness(
|
|||||||
httplog.AddKeyValue(ctx, "apf_pl", truncateLogField(pl.Name))
|
httplog.AddKeyValue(ctx, "apf_pl", truncateLogField(pl.Name))
|
||||||
httplog.AddKeyValue(ctx, "apf_fs", truncateLogField(fs.Name))
|
httplog.AddKeyValue(ctx, "apf_fs", truncateLogField(fs.Name))
|
||||||
httplog.AddKeyValue(ctx, "apf_fd", truncateLogField(flowDistinguisher))
|
httplog.AddKeyValue(ctx, "apf_fd", truncateLogField(flowDistinguisher))
|
||||||
|
return workEstimator(r, fs.Name, pl.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
var served bool
|
var served bool
|
||||||
@ -137,13 +138,9 @@ func WithPriorityAndFairness(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// find the estimated amount of work of the request
|
|
||||||
// TODO: Estimate cost should also take fcIfc.GetWatchCount(requestInfo) as a parameter.
|
|
||||||
workEstimate := workEstimator.EstimateWork(r)
|
|
||||||
digest := utilflowcontrol.RequestDigest{
|
digest := utilflowcontrol.RequestDigest{
|
||||||
RequestInfo: requestInfo,
|
RequestInfo: requestInfo,
|
||||||
User: user,
|
User: user,
|
||||||
WorkEstimate: workEstimate,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if isWatchRequest {
|
if isWatchRequest {
|
||||||
@ -179,7 +176,7 @@ func WithPriorityAndFairness(
|
|||||||
execute := func() {
|
execute := func() {
|
||||||
startedAt := time.Now()
|
startedAt := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
httplog.AddKeyValue(ctx, "apf_init_latency", time.Now().Sub(startedAt))
|
httplog.AddKeyValue(ctx, "apf_init_latency", time.Since(startedAt))
|
||||||
}()
|
}()
|
||||||
noteExecutingDelta(1)
|
noteExecutingDelta(1)
|
||||||
defer noteExecutingDelta(-1)
|
defer noteExecutingDelta(-1)
|
||||||
@ -238,7 +235,7 @@ func WithPriorityAndFairness(
|
|||||||
// Note that Handle will return irrespective of whether the request
|
// Note that Handle will return irrespective of whether the request
|
||||||
// executes or is rejected. In the latter case, the function will return
|
// executes or is rejected. In the latter case, the function will return
|
||||||
// without calling the passed `execute` function.
|
// without calling the passed `execute` function.
|
||||||
fcIfc.Handle(handleCtx, digest, note, queueNote, execute)
|
fcIfc.Handle(handleCtx, digest, estimateWork, queueNote, execute)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
@ -269,7 +266,7 @@ func WithPriorityAndFairness(
|
|||||||
handler.ServeHTTP(w, r)
|
handler.ServeHTTP(w, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
fcIfc.Handle(ctx, digest, note, queueNote, execute)
|
fcIfc.Handle(ctx, digest, estimateWork, queueNote, execute)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !served {
|
if !served {
|
||||||
|
@ -70,7 +70,7 @@ const (
|
|||||||
decisionSkipFilter
|
decisionSkipFilter
|
||||||
)
|
)
|
||||||
|
|
||||||
var defaultRequestWorkEstimator = func(*http.Request) fcrequest.WorkEstimate {
|
var defaultRequestWorkEstimator = func(req *http.Request, fsName, plName string) fcrequest.WorkEstimate {
|
||||||
return fcrequest.WorkEstimate{InitialSeats: 1}
|
return fcrequest.WorkEstimate{InitialSeats: 1}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -87,14 +87,14 @@ func (t fakeApfFilter) MaintainObservations(stopCh <-chan struct{}) {
|
|||||||
|
|
||||||
func (t fakeApfFilter) Handle(ctx context.Context,
|
func (t fakeApfFilter) Handle(ctx context.Context,
|
||||||
requestDigest utilflowcontrol.RequestDigest,
|
requestDigest utilflowcontrol.RequestDigest,
|
||||||
noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
|
workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate,
|
||||||
queueNoteFn fq.QueueNoteFn,
|
queueNoteFn fq.QueueNoteFn,
|
||||||
execFn func(),
|
execFn func(),
|
||||||
) {
|
) {
|
||||||
if t.mockDecision == decisionSkipFilter {
|
if t.mockDecision == decisionSkipFilter {
|
||||||
panic("Handle should not be invoked")
|
panic("Handle should not be invoked")
|
||||||
}
|
}
|
||||||
noteFn(bootstrap.SuggestedFlowSchemaGlobalDefault, bootstrap.SuggestedPriorityLevelConfigurationGlobalDefault, requestDigest.User.GetName())
|
workEstimator(bootstrap.SuggestedFlowSchemaGlobalDefault, bootstrap.SuggestedPriorityLevelConfigurationGlobalDefault, requestDigest.User.GetName())
|
||||||
switch t.mockDecision {
|
switch t.mockDecision {
|
||||||
case decisionNoQueuingExecute:
|
case decisionNoQueuingExecute:
|
||||||
execFn()
|
execFn()
|
||||||
@ -390,7 +390,7 @@ func newFakeWatchApfFilter(capacity int) *fakeWatchApfFilter {
|
|||||||
|
|
||||||
func (f *fakeWatchApfFilter) Handle(ctx context.Context,
|
func (f *fakeWatchApfFilter) Handle(ctx context.Context,
|
||||||
requestDigest utilflowcontrol.RequestDigest,
|
requestDigest utilflowcontrol.RequestDigest,
|
||||||
_ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
|
_ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate,
|
||||||
_ fq.QueueNoteFn,
|
_ fq.QueueNoteFn,
|
||||||
execFn func(),
|
execFn func(),
|
||||||
) {
|
) {
|
||||||
@ -635,14 +635,16 @@ func TestContextClosesOnRequestProcessed(t *testing.T) {
|
|||||||
type fakeFilterRequestDigest struct {
|
type fakeFilterRequestDigest struct {
|
||||||
*fakeApfFilter
|
*fakeApfFilter
|
||||||
requestDigestGot *utilflowcontrol.RequestDigest
|
requestDigestGot *utilflowcontrol.RequestDigest
|
||||||
|
workEstimateGot fcrequest.WorkEstimate
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fakeFilterRequestDigest) Handle(ctx context.Context,
|
func (f *fakeFilterRequestDigest) Handle(ctx context.Context,
|
||||||
requestDigest utilflowcontrol.RequestDigest,
|
requestDigest utilflowcontrol.RequestDigest,
|
||||||
_ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
|
workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate,
|
||||||
_ fq.QueueNoteFn, _ func(),
|
_ fq.QueueNoteFn, _ func(),
|
||||||
) {
|
) {
|
||||||
f.requestDigestGot = &requestDigest
|
f.requestDigestGot = &requestDigest
|
||||||
|
f.workEstimateGot = workEstimator(bootstrap.MandatoryFlowSchemaCatchAll, bootstrap.MandatoryPriorityLevelConfigurationCatchAll, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestApfWithRequestDigest(t *testing.T) {
|
func TestApfWithRequestDigest(t *testing.T) {
|
||||||
@ -652,17 +654,17 @@ func TestApfWithRequestDigest(t *testing.T) {
|
|||||||
reqDigestExpected := &utilflowcontrol.RequestDigest{
|
reqDigestExpected := &utilflowcontrol.RequestDigest{
|
||||||
RequestInfo: &apirequest.RequestInfo{Verb: "get"},
|
RequestInfo: &apirequest.RequestInfo{Verb: "get"},
|
||||||
User: &user.DefaultInfo{Name: "foo"},
|
User: &user.DefaultInfo{Name: "foo"},
|
||||||
WorkEstimate: fcrequest.WorkEstimate{
|
}
|
||||||
InitialSeats: 5,
|
workExpected := fcrequest.WorkEstimate{
|
||||||
FinalSeats: 7,
|
InitialSeats: 5,
|
||||||
AdditionalLatency: 3 * time.Second,
|
FinalSeats: 7,
|
||||||
},
|
AdditionalLatency: 3 * time.Second,
|
||||||
}
|
}
|
||||||
|
|
||||||
handler := WithPriorityAndFairness(http.HandlerFunc(func(_ http.ResponseWriter, req *http.Request) {}),
|
handler := WithPriorityAndFairness(http.HandlerFunc(func(_ http.ResponseWriter, req *http.Request) {}),
|
||||||
longRunningFunc,
|
longRunningFunc,
|
||||||
fakeFilter,
|
fakeFilter,
|
||||||
func(_ *http.Request) fcrequest.WorkEstimate { return reqDigestExpected.WorkEstimate },
|
func(_ *http.Request, _, _ string) fcrequest.WorkEstimate { return workExpected },
|
||||||
)
|
)
|
||||||
|
|
||||||
w := httptest.NewRecorder()
|
w := httptest.NewRecorder()
|
||||||
@ -678,6 +680,9 @@ func TestApfWithRequestDigest(t *testing.T) {
|
|||||||
if !reflect.DeepEqual(reqDigestExpected, fakeFilter.requestDigestGot) {
|
if !reflect.DeepEqual(reqDigestExpected, fakeFilter.requestDigestGot) {
|
||||||
t.Errorf("Expected RequestDigest to match, diff: %s", cmp.Diff(reqDigestExpected, fakeFilter.requestDigestGot))
|
t.Errorf("Expected RequestDigest to match, diff: %s", cmp.Diff(reqDigestExpected, fakeFilter.requestDigestGot))
|
||||||
}
|
}
|
||||||
|
if !reflect.DeepEqual(workExpected, fakeFilter.workEstimateGot) {
|
||||||
|
t.Errorf("Expected WorkEstimate to match, diff: %s", cmp.Diff(workExpected, fakeFilter.workEstimateGot))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
|
||||||
|
@ -89,9 +89,8 @@ type StartFunction func(ctx context.Context, hashValue uint64) (execute bool, af
|
|||||||
|
|
||||||
// RequestDigest holds necessary info from request for flow-control
|
// RequestDigest holds necessary info from request for flow-control
|
||||||
type RequestDigest struct {
|
type RequestDigest struct {
|
||||||
RequestInfo *request.RequestInfo
|
RequestInfo *request.RequestInfo
|
||||||
User user.Info
|
User user.Info
|
||||||
WorkEstimate fcrequest.WorkEstimate
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// `*configController` maintains eventual consistency with the API
|
// `*configController` maintains eventual consistency with the API
|
||||||
@ -100,10 +99,11 @@ type RequestDigest struct {
|
|||||||
// this type and cfgMeal follow the convention that the suffix
|
// this type and cfgMeal follow the convention that the suffix
|
||||||
// "Locked" means that the caller must hold the configController lock.
|
// "Locked" means that the caller must hold the configController lock.
|
||||||
type configController struct {
|
type configController struct {
|
||||||
name string // varies in tests of fighting controllers
|
name string // varies in tests of fighting controllers
|
||||||
clock clock.PassiveClock
|
clock clock.PassiveClock
|
||||||
queueSetFactory fq.QueueSetFactory
|
queueSetFactory fq.QueueSetFactory
|
||||||
obsPairGenerator metrics.TimedObserverPairGenerator
|
reqsObsPairGenerator metrics.TimedObserverPairGenerator
|
||||||
|
execSeatsObsGenerator metrics.TimedObserverGenerator
|
||||||
|
|
||||||
// How this controller appears in an ObjectMeta ManagedFieldsEntry.Manager
|
// How this controller appears in an ObjectMeta ManagedFieldsEntry.Manager
|
||||||
asFieldManager string
|
asFieldManager string
|
||||||
@ -192,8 +192,11 @@ type priorityLevelState struct {
|
|||||||
// returned StartFunction
|
// returned StartFunction
|
||||||
numPending int
|
numPending int
|
||||||
|
|
||||||
// Observers tracking number waiting, executing
|
// Observers tracking number of requests waiting, executing
|
||||||
obsPair metrics.TimedObserverPair
|
reqsObsPair metrics.TimedObserverPair
|
||||||
|
|
||||||
|
// Observer of number of seats occupied throughout execution
|
||||||
|
execSeatsObs metrics.TimedObserver
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTestableController is extra flexible to facilitate testing
|
// NewTestableController is extra flexible to facilitate testing
|
||||||
@ -202,7 +205,8 @@ func newTestableController(config TestableConfig) *configController {
|
|||||||
name: config.Name,
|
name: config.Name,
|
||||||
clock: config.Clock,
|
clock: config.Clock,
|
||||||
queueSetFactory: config.QueueSetFactory,
|
queueSetFactory: config.QueueSetFactory,
|
||||||
obsPairGenerator: config.ObsPairGenerator,
|
reqsObsPairGenerator: config.ReqsObsPairGenerator,
|
||||||
|
execSeatsObsGenerator: config.ExecSeatsObsGenerator,
|
||||||
asFieldManager: config.AsFieldManager,
|
asFieldManager: config.AsFieldManager,
|
||||||
foundToDangling: config.FoundToDangling,
|
foundToDangling: config.FoundToDangling,
|
||||||
serverConcurrencyLimit: config.ServerConcurrencyLimit,
|
serverConcurrencyLimit: config.ServerConcurrencyLimit,
|
||||||
@ -534,9 +538,10 @@ func (meal *cfgMeal) digestNewPLsLocked(newPLs []*flowcontrol.PriorityLevelConfi
|
|||||||
for _, pl := range newPLs {
|
for _, pl := range newPLs {
|
||||||
state := meal.cfgCtlr.priorityLevelStates[pl.Name]
|
state := meal.cfgCtlr.priorityLevelStates[pl.Name]
|
||||||
if state == nil {
|
if state == nil {
|
||||||
state = &priorityLevelState{obsPair: meal.cfgCtlr.obsPairGenerator.Generate(1, 1, []string{pl.Name})}
|
labelValues := []string{pl.Name}
|
||||||
|
state = &priorityLevelState{reqsObsPair: meal.cfgCtlr.reqsObsPairGenerator.Generate(1, 1, labelValues), execSeatsObs: meal.cfgCtlr.execSeatsObsGenerator.Generate(1, 1, labelValues)}
|
||||||
}
|
}
|
||||||
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.obsPair)
|
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.reqsObsPair, state.execSeatsObs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err)
|
klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err)
|
||||||
continue
|
continue
|
||||||
@ -639,7 +644,7 @@ func (meal *cfgMeal) processOldPLsLocked() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
var err error
|
var err error
|
||||||
plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.obsPair)
|
plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.reqsObsPair, plState.execSeatsObs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// This can not happen because queueSetCompleterForPL already approved this config
|
// This can not happen because queueSetCompleterForPL already approved this config
|
||||||
panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec)))
|
panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec)))
|
||||||
@ -688,7 +693,7 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
|
|||||||
// given priority level configuration. Returns nil if that config
|
// given priority level configuration. Returns nil if that config
|
||||||
// does not call for limiting. Returns nil and an error if the given
|
// does not call for limiting. Returns nil and an error if the given
|
||||||
// object is malformed in a way that is a problem for this package.
|
// object is malformed in a way that is a problem for this package.
|
||||||
func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, intPair metrics.TimedObserverPair) (fq.QueueSetCompleter, error) {
|
func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.TimedObserverPair, execSeatsObs metrics.TimedObserver) (fq.QueueSetCompleter, error) {
|
||||||
if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) {
|
if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) {
|
||||||
return nil, errors.New("broken union structure at the top")
|
return nil, errors.New("broken union structure at the top")
|
||||||
}
|
}
|
||||||
@ -717,7 +722,7 @@ func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flow
|
|||||||
if queues != nil {
|
if queues != nil {
|
||||||
qsc, err = queues.BeginConfigChange(qcQS)
|
qsc, err = queues.BeginConfigChange(qcQS)
|
||||||
} else {
|
} else {
|
||||||
qsc, err = qsf.BeginConstruction(qcQS, intPair)
|
qsc, err = qsf.BeginConstruction(qcQS, reqsIntPair, execSeatsObs)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("priority level %q has QueuingConfiguration %#+v, which is invalid: %w", pl.Name, qcAPI, err)
|
err = fmt.Errorf("priority level %q has QueuingConfiguration %#+v, which is invalid: %w", pl.Name, qcAPI, err)
|
||||||
@ -762,17 +767,20 @@ func (meal *cfgMeal) presyncFlowSchemaStatus(fs *flowcontrol.FlowSchema, isDangl
|
|||||||
// that does not actually exist (right now) as a real API object.
|
// that does not actually exist (right now) as a real API object.
|
||||||
func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration) {
|
func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration) {
|
||||||
klog.V(3).Infof("No %s PriorityLevelConfiguration found, imagining one", proto.Name)
|
klog.V(3).Infof("No %s PriorityLevelConfiguration found, imagining one", proto.Name)
|
||||||
obsPair := meal.cfgCtlr.obsPairGenerator.Generate(1, 1, []string{proto.Name})
|
labelValues := []string{proto.Name}
|
||||||
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, obsPair)
|
reqsObsPair := meal.cfgCtlr.reqsObsPairGenerator.Generate(1, 1, labelValues)
|
||||||
|
execSeatsObs := meal.cfgCtlr.execSeatsObsGenerator.Generate(1, 1, labelValues)
|
||||||
|
qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, reqsObsPair, execSeatsObs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// This can not happen because proto is one of the mandatory
|
// This can not happen because proto is one of the mandatory
|
||||||
// objects and these are not erroneous
|
// objects and these are not erroneous
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
meal.newPLStates[proto.Name] = &priorityLevelState{
|
meal.newPLStates[proto.Name] = &priorityLevelState{
|
||||||
pl: proto,
|
pl: proto,
|
||||||
qsCompleter: qsCompleter,
|
qsCompleter: qsCompleter,
|
||||||
obsPair: obsPair,
|
reqsObsPair: reqsObsPair,
|
||||||
|
execSeatsObs: execSeatsObs,
|
||||||
}
|
}
|
||||||
if proto.Spec.Limited != nil {
|
if proto.Spec.Limited != nil {
|
||||||
meal.shareSum += float64(proto.Spec.Limited.AssuredConcurrencyShares)
|
meal.shareSum += float64(proto.Spec.Limited.AssuredConcurrencyShares)
|
||||||
@ -791,7 +799,7 @@ func (immediateRequest) Finish(execute func()) bool {
|
|||||||
// The returned bool indicates whether the request is exempt from
|
// The returned bool indicates whether the request is exempt from
|
||||||
// limitation. The startWaitingTime is when the request started
|
// limitation. The startWaitingTime is when the request started
|
||||||
// waiting in its queue, or `Time{}` if this did not happen.
|
// waiting in its queue, or `Time{}` if this did not happen.
|
||||||
func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time, flowDistinguisher string) {
|
func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate, queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) {
|
||||||
klog.V(7).Infof("startRequest(%#+v)", rd)
|
klog.V(7).Infof("startRequest(%#+v)", rd)
|
||||||
cfgCtlr.lock.RLock()
|
cfgCtlr.lock.RLock()
|
||||||
defer cfgCtlr.lock.RUnlock()
|
defer cfgCtlr.lock.RUnlock()
|
||||||
@ -823,24 +831,26 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig
|
|||||||
plState := cfgCtlr.priorityLevelStates[plName]
|
plState := cfgCtlr.priorityLevelStates[plName]
|
||||||
if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt {
|
if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt {
|
||||||
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName)
|
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName)
|
||||||
return selectedFlowSchema, plState.pl, true, immediateRequest{}, time.Time{}, ""
|
return selectedFlowSchema, plState.pl, true, immediateRequest{}, time.Time{}
|
||||||
}
|
}
|
||||||
var numQueues int32
|
var numQueues int32
|
||||||
if plState.pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeQueue {
|
if plState.pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeQueue {
|
||||||
numQueues = plState.pl.Spec.Limited.LimitResponse.Queuing.Queues
|
numQueues = plState.pl.Spec.Limited.LimitResponse.Queuing.Queues
|
||||||
}
|
}
|
||||||
|
var flowDistinguisher string
|
||||||
var hashValue uint64
|
var hashValue uint64
|
||||||
if numQueues > 1 {
|
if numQueues > 1 {
|
||||||
flowDistinguisher = computeFlowDistinguisher(rd, selectedFlowSchema.Spec.DistinguisherMethod)
|
flowDistinguisher = computeFlowDistinguisher(rd, selectedFlowSchema.Spec.DistinguisherMethod)
|
||||||
hashValue = hashFlowID(selectedFlowSchema.Name, flowDistinguisher)
|
hashValue = hashFlowID(selectedFlowSchema.Name, flowDistinguisher)
|
||||||
}
|
}
|
||||||
|
workEstimate := workEstimator(selectedFlowSchema, plState.pl, flowDistinguisher)
|
||||||
startWaitingTime = time.Now()
|
startWaitingTime = time.Now()
|
||||||
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues)
|
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues)
|
||||||
req, idle := plState.queues.StartRequest(ctx, &rd.WorkEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn)
|
req, idle := plState.queues.StartRequest(ctx, &workEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn)
|
||||||
if idle {
|
if idle {
|
||||||
cfgCtlr.maybeReapReadLocked(plName, plState)
|
cfgCtlr.maybeReapReadLocked(plName, plState)
|
||||||
}
|
}
|
||||||
return selectedFlowSchema, plState.pl, false, req, startWaitingTime, flowDistinguisher
|
return selectedFlowSchema, plState.pl, false, req, startWaitingTime
|
||||||
}
|
}
|
||||||
|
|
||||||
// maybeReap will remove the last internal traces of the named
|
// maybeReap will remove the last internal traces of the named
|
||||||
|
@ -26,6 +26,7 @@ import (
|
|||||||
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/eventclock"
|
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/eventclock"
|
||||||
fqs "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset"
|
fqs "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset"
|
||||||
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||||
|
fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
|
||||||
kubeinformers "k8s.io/client-go/informers"
|
kubeinformers "k8s.io/client-go/informers"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
"k8s.io/utils/clock"
|
"k8s.io/utils/clock"
|
||||||
@ -41,8 +42,9 @@ const ConfigConsumerAsFieldManager = "api-priority-and-fairness-config-consumer-
|
|||||||
// Interface defines how the API Priority and Fairness filter interacts with the underlying system.
|
// Interface defines how the API Priority and Fairness filter interacts with the underlying system.
|
||||||
type Interface interface {
|
type Interface interface {
|
||||||
// Handle takes care of queuing and dispatching a request
|
// Handle takes care of queuing and dispatching a request
|
||||||
// characterized by the given digest. The given `noteFn` will be
|
// characterized by the given digest. The given `workEstimator` will be
|
||||||
// invoked with the results of request classification. If the
|
// invoked with the results of request classification and must return the
|
||||||
|
// work parameters for the request. If the
|
||||||
// request is queued then `queueNoteFn` will be called twice,
|
// request is queued then `queueNoteFn` will be called twice,
|
||||||
// first with `true` and then with `false`; otherwise
|
// first with `true` and then with `false`; otherwise
|
||||||
// `queueNoteFn` will not be called at all. If Handle decides
|
// `queueNoteFn` will not be called at all. If Handle decides
|
||||||
@ -53,7 +55,7 @@ type Interface interface {
|
|||||||
// ctx is cancelled or times out.
|
// ctx is cancelled or times out.
|
||||||
Handle(ctx context.Context,
|
Handle(ctx context.Context,
|
||||||
requestDigest RequestDigest,
|
requestDigest RequestDigest,
|
||||||
noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
|
workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate,
|
||||||
queueNoteFn fq.QueueNoteFn,
|
queueNoteFn fq.QueueNoteFn,
|
||||||
execFn func(),
|
execFn func(),
|
||||||
)
|
)
|
||||||
@ -92,7 +94,8 @@ func New(
|
|||||||
FlowcontrolClient: flowcontrolClient,
|
FlowcontrolClient: flowcontrolClient,
|
||||||
ServerConcurrencyLimit: serverConcurrencyLimit,
|
ServerConcurrencyLimit: serverConcurrencyLimit,
|
||||||
RequestWaitLimit: requestWaitLimit,
|
RequestWaitLimit: requestWaitLimit,
|
||||||
ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator,
|
ReqsObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator,
|
||||||
|
ExecSeatsObsGenerator: metrics.PriorityLevelExecutionSeatsObserverGenerator,
|
||||||
QueueSetFactory: fqs.NewQueueSetFactory(clk),
|
QueueSetFactory: fqs.NewQueueSetFactory(clk),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -131,8 +134,11 @@ type TestableConfig struct {
|
|||||||
// RequestWaitLimit configured on the server
|
// RequestWaitLimit configured on the server
|
||||||
RequestWaitLimit time.Duration
|
RequestWaitLimit time.Duration
|
||||||
|
|
||||||
// ObsPairGenerator for metrics
|
// ObsPairGenerator for metrics about requests
|
||||||
ObsPairGenerator metrics.TimedObserverPairGenerator
|
ReqsObsPairGenerator metrics.TimedObserverPairGenerator
|
||||||
|
|
||||||
|
// TimedObserverPairGenerator for metrics about seats occupied by all phases of execution
|
||||||
|
ExecSeatsObsGenerator metrics.TimedObserverGenerator
|
||||||
|
|
||||||
// QueueSetFactory for the queuing implementation
|
// QueueSetFactory for the queuing implementation
|
||||||
QueueSetFactory fq.QueueSetFactory
|
QueueSetFactory fq.QueueSetFactory
|
||||||
@ -144,12 +150,11 @@ func NewTestable(config TestableConfig) Interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest RequestDigest,
|
func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest RequestDigest,
|
||||||
noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
|
workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate,
|
||||||
queueNoteFn fq.QueueNoteFn,
|
queueNoteFn fq.QueueNoteFn,
|
||||||
execFn func()) {
|
execFn func()) {
|
||||||
fs, pl, isExempt, req, startWaitingTime, flowDistinguisher := cfgCtlr.startRequest(ctx, requestDigest, queueNoteFn)
|
fs, pl, isExempt, req, startWaitingTime := cfgCtlr.startRequest(ctx, requestDigest, workEstimator, queueNoteFn)
|
||||||
queued := startWaitingTime != time.Time{}
|
queued := startWaitingTime != time.Time{}
|
||||||
noteFn(fs, pl, flowDistinguisher)
|
|
||||||
if req == nil {
|
if req == nil {
|
||||||
if queued {
|
if queued {
|
||||||
metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
|
metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
|
||||||
|
@ -105,7 +105,7 @@ type ctlrTestRequest struct {
|
|||||||
descr1, descr2 interface{}
|
descr1, descr2 interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, ip metrics.TimedObserverPair) (fq.QueueSetCompleter, error) {
|
func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, rip metrics.TimedObserverPair, eso metrics.TimedObserver) (fq.QueueSetCompleter, error) {
|
||||||
return ctlrTestQueueSetCompleter{cts, nil, qc}, nil
|
return ctlrTestQueueSetCompleter{cts, nil, qc}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -261,7 +261,8 @@ func TestConfigConsumer(t *testing.T) {
|
|||||||
FlowcontrolClient: flowcontrolClient,
|
FlowcontrolClient: flowcontrolClient,
|
||||||
ServerConcurrencyLimit: 100, // server concurrency limit
|
ServerConcurrencyLimit: 100, // server concurrency limit
|
||||||
RequestWaitLimit: time.Minute, // request wait limit
|
RequestWaitLimit: time.Minute, // request wait limit
|
||||||
ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator,
|
ReqsObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator,
|
||||||
|
ExecSeatsObsGenerator: metrics.PriorityLevelExecutionSeatsObserverGenerator,
|
||||||
QueueSetFactory: cts,
|
QueueSetFactory: cts,
|
||||||
})
|
})
|
||||||
cts.cfgCtlr = ctlr
|
cts.cfgCtlr = ctlr
|
||||||
@ -392,7 +393,8 @@ func TestAPFControllerWithGracefulShutdown(t *testing.T) {
|
|||||||
FlowcontrolClient: flowcontrolClient,
|
FlowcontrolClient: flowcontrolClient,
|
||||||
ServerConcurrencyLimit: 100,
|
ServerConcurrencyLimit: 100,
|
||||||
RequestWaitLimit: time.Minute,
|
RequestWaitLimit: time.Minute,
|
||||||
ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator,
|
ReqsObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator,
|
||||||
|
ExecSeatsObsGenerator: metrics.PriorityLevelExecutionSeatsObserverGenerator,
|
||||||
QueueSetFactory: cts,
|
QueueSetFactory: cts,
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -460,7 +462,7 @@ func checkNewFS(cts *ctlrTestState, rng *rand.Rand, trialName string, ftr *fsTes
|
|||||||
startWG.Add(1)
|
startWG.Add(1)
|
||||||
go func(matches, isResource bool, rdu RequestDigest) {
|
go func(matches, isResource bool, rdu RequestDigest) {
|
||||||
expectedMatch := matches && ftr.wellFormed && (fsPrecedes(fs, catchAlls[isResource]) || fs.Name == catchAlls[isResource].Name)
|
expectedMatch := matches && ftr.wellFormed && (fsPrecedes(fs, catchAlls[isResource]) || fs.Name == catchAlls[isResource].Name)
|
||||||
ctlr.Handle(ctx, rdu, func(matchFS *flowcontrol.FlowSchema, matchPL *flowcontrol.PriorityLevelConfiguration, _ string) {
|
ctlr.Handle(ctx, rdu, func(matchFS *flowcontrol.FlowSchema, matchPL *flowcontrol.PriorityLevelConfiguration, _ string) fcrequest.WorkEstimate {
|
||||||
matchIsExempt := matchPL.Spec.Type == flowcontrol.PriorityLevelEnablementExempt
|
matchIsExempt := matchPL.Spec.Type == flowcontrol.PriorityLevelEnablementExempt
|
||||||
if testDebugLogs {
|
if testDebugLogs {
|
||||||
t.Logf("Considering FlowSchema %s, expectedMatch=%v, isResource=%v: Handle(%#+v) => note(fs=%s, pl=%s, isExempt=%v)", fs.Name, expectedMatch, isResource, rdu, matchFS.Name, matchPL.Name, matchIsExempt)
|
t.Logf("Considering FlowSchema %s, expectedMatch=%v, isResource=%v: Handle(%#+v) => note(fs=%s, pl=%s, isExempt=%v)", fs.Name, expectedMatch, isResource, rdu, matchFS.Name, matchPL.Name, matchIsExempt)
|
||||||
@ -473,6 +475,7 @@ func checkNewFS(cts *ctlrTestState, rng *rand.Rand, trialName string, ftr *fsTes
|
|||||||
t.Errorf("Fail at %s/%s: expected=%v, actual=%v", trialName, fs.Name, fs.Spec.PriorityLevelConfiguration.Name, matchPL.Name)
|
t.Errorf("Fail at %s/%s: expected=%v, actual=%v", trialName, fs.Name, fs.Spec.PriorityLevelConfiguration.Name, matchPL.Name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return fcrequest.WorkEstimate{InitialSeats: 1}
|
||||||
}, func(inQueue bool) {
|
}, func(inQueue bool) {
|
||||||
}, func() {
|
}, func() {
|
||||||
startWG.Done()
|
startWG.Done()
|
||||||
|
@ -31,8 +31,11 @@ import (
|
|||||||
// are separated so that errors from the first phase can be found
|
// are separated so that errors from the first phase can be found
|
||||||
// before committing to a concurrency allotment for the second.
|
// before committing to a concurrency allotment for the second.
|
||||||
type QueueSetFactory interface {
|
type QueueSetFactory interface {
|
||||||
// BeginConstruction does the first phase of creating a QueueSet
|
// BeginConstruction does the first phase of creating a QueueSet.
|
||||||
BeginConstruction(QueuingConfig, metrics.TimedObserverPair) (QueueSetCompleter, error)
|
// The TimedObserverPair observes number of requests,
|
||||||
|
// execution covering just the regular phase.
|
||||||
|
// The TimedObserver observes number of seats occupied through all phases of execution.
|
||||||
|
BeginConstruction(QueuingConfig, metrics.TimedObserverPair, metrics.TimedObserver) (QueueSetCompleter, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueueSetCompleter finishes the two-step process of creating or
|
// QueueSetCompleter finishes the two-step process of creating or
|
||||||
|
@ -60,11 +60,12 @@ type promiseFactoryFactory func(*queueSet) promiseFactory
|
|||||||
// `*queueSetCompleter` implements QueueSetCompleter. Exactly one of
|
// `*queueSetCompleter` implements QueueSetCompleter. Exactly one of
|
||||||
// the fields `factory` and `theSet` is non-nil.
|
// the fields `factory` and `theSet` is non-nil.
|
||||||
type queueSetCompleter struct {
|
type queueSetCompleter struct {
|
||||||
factory *queueSetFactory
|
factory *queueSetFactory
|
||||||
obsPair metrics.TimedObserverPair
|
reqsObsPair metrics.TimedObserverPair
|
||||||
theSet *queueSet
|
execSeatsObs metrics.TimedObserver
|
||||||
qCfg fq.QueuingConfig
|
theSet *queueSet
|
||||||
dealer *shufflesharding.Dealer
|
qCfg fq.QueuingConfig
|
||||||
|
dealer *shufflesharding.Dealer
|
||||||
}
|
}
|
||||||
|
|
||||||
// queueSet implements the Fair Queuing for Server Requests technique
|
// queueSet implements the Fair Queuing for Server Requests technique
|
||||||
@ -79,7 +80,10 @@ type queueSetCompleter struct {
|
|||||||
type queueSet struct {
|
type queueSet struct {
|
||||||
clock eventclock.Interface
|
clock eventclock.Interface
|
||||||
estimatedServiceDuration time.Duration
|
estimatedServiceDuration time.Duration
|
||||||
obsPair metrics.TimedObserverPair
|
|
||||||
|
reqsObsPair metrics.TimedObserverPair // .RequestsExecuting covers regular phase only
|
||||||
|
|
||||||
|
execSeatsObs metrics.TimedObserver // for all phases of execution
|
||||||
|
|
||||||
promiseFactory promiseFactory
|
promiseFactory promiseFactory
|
||||||
|
|
||||||
@ -144,16 +148,17 @@ func newTestableQueueSetFactory(c eventclock.Interface, promiseFactoryFactory pr
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, obsPair metrics.TimedObserverPair) (fq.QueueSetCompleter, error) {
|
func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsObsPair metrics.TimedObserverPair, execSeatsObs metrics.TimedObserver) (fq.QueueSetCompleter, error) {
|
||||||
dealer, err := checkConfig(qCfg)
|
dealer, err := checkConfig(qCfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &queueSetCompleter{
|
return &queueSetCompleter{
|
||||||
factory: qsf,
|
factory: qsf,
|
||||||
obsPair: obsPair,
|
reqsObsPair: reqsObsPair,
|
||||||
qCfg: qCfg,
|
execSeatsObs: execSeatsObs,
|
||||||
dealer: dealer}, nil
|
qCfg: qCfg,
|
||||||
|
dealer: dealer}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// checkConfig returns a non-nil Dealer if the config is valid and
|
// checkConfig returns a non-nil Dealer if the config is valid and
|
||||||
@ -176,7 +181,8 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet {
|
|||||||
qs = &queueSet{
|
qs = &queueSet{
|
||||||
clock: qsc.factory.clock,
|
clock: qsc.factory.clock,
|
||||||
estimatedServiceDuration: 3 * time.Millisecond,
|
estimatedServiceDuration: 3 * time.Millisecond,
|
||||||
obsPair: qsc.obsPair,
|
reqsObsPair: qsc.reqsObsPair,
|
||||||
|
execSeatsObs: qsc.execSeatsObs,
|
||||||
qCfg: qsc.qCfg,
|
qCfg: qsc.qCfg,
|
||||||
currentR: 0,
|
currentR: 0,
|
||||||
lastRealTime: qsc.factory.clock.Now(),
|
lastRealTime: qsc.factory.clock.Now(),
|
||||||
@ -237,8 +243,9 @@ func (qs *queueSet) setConfiguration(ctx context.Context, qCfg fq.QueuingConfig,
|
|||||||
if qll < 1 {
|
if qll < 1 {
|
||||||
qll = 1
|
qll = 1
|
||||||
}
|
}
|
||||||
qs.obsPair.RequestsWaiting.SetX1(float64(qll))
|
qs.reqsObsPair.RequestsWaiting.SetX1(float64(qll))
|
||||||
qs.obsPair.RequestsExecuting.SetX1(float64(dCfg.ConcurrencyLimit))
|
qs.reqsObsPair.RequestsExecuting.SetX1(float64(dCfg.ConcurrencyLimit))
|
||||||
|
qs.execSeatsObs.SetX1(float64(dCfg.ConcurrencyLimit))
|
||||||
|
|
||||||
qs.dispatchAsMuchAsPossibleLocked()
|
qs.dispatchAsMuchAsPossibleLocked()
|
||||||
}
|
}
|
||||||
@ -391,7 +398,7 @@ func (req *request) wait() (bool, bool) {
|
|||||||
metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "cancelled")
|
metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "cancelled")
|
||||||
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
|
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
|
||||||
req.NoteQueued(false)
|
req.NoteQueued(false)
|
||||||
qs.obsPair.RequestsWaiting.Add(-1)
|
qs.reqsObsPair.RequestsWaiting.Add(-1)
|
||||||
}
|
}
|
||||||
return false, qs.isIdleLocked()
|
return false, qs.isIdleLocked()
|
||||||
}
|
}
|
||||||
@ -602,7 +609,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s
|
|||||||
// remove timed out requests from queue
|
// remove timed out requests from queue
|
||||||
if timeoutCount > 0 {
|
if timeoutCount > 0 {
|
||||||
qs.totRequestsWaiting -= timeoutCount
|
qs.totRequestsWaiting -= timeoutCount
|
||||||
qs.obsPair.RequestsWaiting.Add(float64(-timeoutCount))
|
qs.reqsObsPair.RequestsWaiting.Add(float64(-timeoutCount))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -638,7 +645,7 @@ func (qs *queueSet) enqueueLocked(request *request) {
|
|||||||
qs.totRequestsWaiting++
|
qs.totRequestsWaiting++
|
||||||
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1)
|
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1)
|
||||||
request.NoteQueued(true)
|
request.NoteQueued(true)
|
||||||
qs.obsPair.RequestsWaiting.Add(1)
|
qs.reqsObsPair.RequestsWaiting.Add(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// dispatchAsMuchAsPossibleLocked does as many dispatches as possible now.
|
// dispatchAsMuchAsPossibleLocked does as many dispatches as possible now.
|
||||||
@ -667,7 +674,8 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
|
|||||||
qs.totSeatsInUse += req.MaxSeats()
|
qs.totSeatsInUse += req.MaxSeats()
|
||||||
metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1)
|
metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1)
|
||||||
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.MaxSeats())
|
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.MaxSeats())
|
||||||
qs.obsPair.RequestsExecuting.Add(1)
|
qs.reqsObsPair.RequestsExecuting.Add(1)
|
||||||
|
qs.execSeatsObs.Add(float64(req.MaxSeats()))
|
||||||
if klog.V(5).Enabled() {
|
if klog.V(5).Enabled() {
|
||||||
klog.Infof("QS(%s) at t=%s R=%v: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, fsName, descr1, descr2, qs.totRequestsExecuting)
|
klog.Infof("QS(%s) at t=%s R=%v: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, fsName, descr1, descr2, qs.totRequestsExecuting)
|
||||||
}
|
}
|
||||||
@ -690,7 +698,7 @@ func (qs *queueSet) dispatchLocked() bool {
|
|||||||
qs.totRequestsWaiting--
|
qs.totRequestsWaiting--
|
||||||
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1)
|
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1)
|
||||||
request.NoteQueued(false)
|
request.NoteQueued(false)
|
||||||
qs.obsPair.RequestsWaiting.Add(-1)
|
qs.reqsObsPair.RequestsWaiting.Add(-1)
|
||||||
defer qs.boundNextDispatchLocked(queue)
|
defer qs.boundNextDispatchLocked(queue)
|
||||||
if !request.decision.Set(decisionExecute) {
|
if !request.decision.Set(decisionExecute) {
|
||||||
return true
|
return true
|
||||||
@ -707,7 +715,8 @@ func (qs *queueSet) dispatchLocked() bool {
|
|||||||
queue.seatsInUse += request.MaxSeats()
|
queue.seatsInUse += request.MaxSeats()
|
||||||
metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1)
|
metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1)
|
||||||
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, request.fsName, request.MaxSeats())
|
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, request.fsName, request.MaxSeats())
|
||||||
qs.obsPair.RequestsExecuting.Add(1)
|
qs.reqsObsPair.RequestsExecuting.Add(1)
|
||||||
|
qs.execSeatsObs.Add(float64(request.MaxSeats()))
|
||||||
if klog.V(6).Enabled() {
|
if klog.V(6).Enabled() {
|
||||||
klog.Infof("QS(%s) at t=%s R=%v: dispatching request %#+v %#+v work %v from queue %d with start R %v, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied",
|
klog.Infof("QS(%s) at t=%s R=%v: dispatching request %#+v %#+v work %v from queue %d with start R %v, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied",
|
||||||
qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.currentR, request.descr1, request.descr2,
|
qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.currentR, request.descr1, request.descr2,
|
||||||
@ -848,7 +857,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
|
|||||||
now := qs.clock.Now()
|
now := qs.clock.Now()
|
||||||
qs.totRequestsExecuting--
|
qs.totRequestsExecuting--
|
||||||
metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1)
|
metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1)
|
||||||
qs.obsPair.RequestsExecuting.Add(-1)
|
qs.reqsObsPair.RequestsExecuting.Add(-1)
|
||||||
|
|
||||||
actualServiceDuration := now.Sub(r.startTime)
|
actualServiceDuration := now.Sub(r.startTime)
|
||||||
|
|
||||||
@ -860,6 +869,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
|
|||||||
|
|
||||||
qs.totSeatsInUse -= r.MaxSeats()
|
qs.totSeatsInUse -= r.MaxSeats()
|
||||||
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.MaxSeats())
|
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.MaxSeats())
|
||||||
|
qs.execSeatsObs.Add(-float64(r.MaxSeats()))
|
||||||
if r.queue != nil {
|
if r.queue != nil {
|
||||||
r.queue.seatsInUse -= r.MaxSeats()
|
r.queue.seatsInUse -= r.MaxSeats()
|
||||||
}
|
}
|
||||||
@ -973,8 +983,9 @@ func removeQueueAndUpdateIndexes(queues []*queue, index int) []*queue {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (qs *queueSet) UpdateObservations() {
|
func (qs *queueSet) UpdateObservations() {
|
||||||
qs.obsPair.RequestsWaiting.Add(0)
|
qs.reqsObsPair.RequestsWaiting.Add(0)
|
||||||
qs.obsPair.RequestsExecuting.Add(0)
|
qs.reqsObsPair.RequestsExecuting.Add(0)
|
||||||
|
qs.execSeatsObs.Add(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump {
|
func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump {
|
||||||
|
@ -445,7 +445,7 @@ func TestNoRestraint(t *testing.T) {
|
|||||||
t.Run(testCase.name, func(t *testing.T) {
|
t.Run(testCase.name, func(t *testing.T) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
clk, counter := testeventclock.NewFake(now, 0, nil)
|
clk, counter := testeventclock.NewFake(now, 0, nil)
|
||||||
nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newObserverPair(clk))
|
nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newObserverPair(clk), newExecSeatsObserver(clk))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -481,7 +481,7 @@ func TestBaseline(t *testing.T) {
|
|||||||
HandSize: 3,
|
HandSize: 3,
|
||||||
RequestWaitLimit: 10 * time.Minute,
|
RequestWaitLimit: 10 * time.Minute,
|
||||||
}
|
}
|
||||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -550,7 +550,7 @@ func TestSeparations(t *testing.T) {
|
|||||||
HandSize: 3,
|
HandSize: 3,
|
||||||
RequestWaitLimit: 10 * time.Minute,
|
RequestWaitLimit: 10 * time.Minute,
|
||||||
}
|
}
|
||||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -589,7 +589,7 @@ func TestUniformFlowsHandSize1(t *testing.T) {
|
|||||||
HandSize: 1,
|
HandSize: 1,
|
||||||
RequestWaitLimit: 10 * time.Minute,
|
RequestWaitLimit: 10 * time.Minute,
|
||||||
}
|
}
|
||||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -626,7 +626,7 @@ func TestUniformFlowsHandSize3(t *testing.T) {
|
|||||||
HandSize: 3,
|
HandSize: 3,
|
||||||
RequestWaitLimit: 10 * time.Minute,
|
RequestWaitLimit: 10 * time.Minute,
|
||||||
}
|
}
|
||||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -662,7 +662,7 @@ func TestDifferentFlowsExpectEqual(t *testing.T) {
|
|||||||
HandSize: 1,
|
HandSize: 1,
|
||||||
RequestWaitLimit: 10 * time.Minute,
|
RequestWaitLimit: 10 * time.Minute,
|
||||||
}
|
}
|
||||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -702,7 +702,7 @@ func TestSeatSecondsRollover(t *testing.T) {
|
|||||||
HandSize: 1,
|
HandSize: 1,
|
||||||
RequestWaitLimit: 40 * Quarter,
|
RequestWaitLimit: 40 * Quarter,
|
||||||
}
|
}
|
||||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -740,7 +740,7 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) {
|
|||||||
HandSize: 1,
|
HandSize: 1,
|
||||||
RequestWaitLimit: 10 * time.Minute,
|
RequestWaitLimit: 10 * time.Minute,
|
||||||
}
|
}
|
||||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -777,7 +777,7 @@ func TestDifferentWidths(t *testing.T) {
|
|||||||
HandSize: 7,
|
HandSize: 7,
|
||||||
RequestWaitLimit: 10 * time.Minute,
|
RequestWaitLimit: 10 * time.Minute,
|
||||||
}
|
}
|
||||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -813,7 +813,7 @@ func TestTooWide(t *testing.T) {
|
|||||||
HandSize: 7,
|
HandSize: 7,
|
||||||
RequestWaitLimit: 10 * time.Minute,
|
RequestWaitLimit: 10 * time.Minute,
|
||||||
}
|
}
|
||||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -874,7 +874,7 @@ func TestWindup(t *testing.T) {
|
|||||||
HandSize: 1,
|
HandSize: 1,
|
||||||
RequestWaitLimit: 10 * time.Minute,
|
RequestWaitLimit: 10 * time.Minute,
|
||||||
}
|
}
|
||||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -909,7 +909,7 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) {
|
|||||||
Name: "TestDifferentFlowsWithoutQueuing",
|
Name: "TestDifferentFlowsWithoutQueuing",
|
||||||
DesiredNumQueues: 0,
|
DesiredNumQueues: 0,
|
||||||
}
|
}
|
||||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -945,7 +945,7 @@ func TestTimeout(t *testing.T) {
|
|||||||
HandSize: 1,
|
HandSize: 1,
|
||||||
RequestWaitLimit: 0,
|
RequestWaitLimit: 0,
|
||||||
}
|
}
|
||||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -996,7 +996,7 @@ func TestContextCancel(t *testing.T) {
|
|||||||
HandSize: 1,
|
HandSize: 1,
|
||||||
RequestWaitLimit: 15 * time.Second,
|
RequestWaitLimit: 15 * time.Second,
|
||||||
}
|
}
|
||||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -1102,7 +1102,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) {
|
|||||||
DesiredNumQueues: 0,
|
DesiredNumQueues: 0,
|
||||||
RequestWaitLimit: 15 * time.Second,
|
RequestWaitLimit: 15 * time.Second,
|
||||||
}
|
}
|
||||||
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
|
qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk), newExecSeatsObserver(clk))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -1357,7 +1357,8 @@ func TestFinishRequestLocked(t *testing.T) {
|
|||||||
qs := &queueSet{
|
qs := &queueSet{
|
||||||
clock: clk,
|
clock: clk,
|
||||||
estimatedServiceDuration: time.Second,
|
estimatedServiceDuration: time.Second,
|
||||||
obsPair: newObserverPair(clk),
|
reqsObsPair: newObserverPair(clk),
|
||||||
|
execSeatsObs: newExecSeatsObserver(clk),
|
||||||
}
|
}
|
||||||
queue := &queue{
|
queue := &queue{
|
||||||
requests: newRequestFIFO(),
|
requests: newRequestFIFO(),
|
||||||
@ -1463,3 +1464,7 @@ func newFIFO(requests ...*request) fifo {
|
|||||||
func newObserverPair(clk clock.PassiveClock) metrics.TimedObserverPair {
|
func newObserverPair(clk clock.PassiveClock) metrics.TimedObserverPair {
|
||||||
return metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, []string{"test"})
|
return metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, []string{"test"})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newExecSeatsObserver(clk clock.PassiveClock) metrics.TimedObserver {
|
||||||
|
return metrics.PriorityLevelExecutionSeatsObserverGenerator.Generate(1, 1, []string{"test"})
|
||||||
|
}
|
||||||
|
@ -40,7 +40,7 @@ type noRestraint struct{}
|
|||||||
|
|
||||||
type noRestraintRequest struct{}
|
type noRestraintRequest struct{}
|
||||||
|
|
||||||
func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.TimedObserverPair) (fq.QueueSetCompleter, error) {
|
func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.TimedObserverPair, metrics.TimedObserver) (fq.QueueSetCompleter, error) {
|
||||||
return noRestraintCompleter{}, nil
|
return noRestraintCompleter{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -56,7 +56,8 @@ func genPL(rng *rand.Rand, name string) *flowcontrol.PriorityLevelConfiguration
|
|||||||
HandSize: hs,
|
HandSize: hs,
|
||||||
QueueLengthLimit: 5}
|
QueueLengthLimit: 5}
|
||||||
}
|
}
|
||||||
_, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, time.Minute, metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, []string{"test"}))
|
labelVals := []string{"test"}
|
||||||
|
_, err := queueSetCompleterForPL(noRestraintQSF, nil, plc, time.Minute, metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, labelVals), metrics.PriorityLevelExecutionSeatsObserverGenerator.Generate(1, 1, labelVals))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
@ -27,7 +27,6 @@ import (
|
|||||||
"k8s.io/apiserver/pkg/authentication/user"
|
"k8s.io/apiserver/pkg/authentication/user"
|
||||||
"k8s.io/apiserver/pkg/endpoints/request"
|
"k8s.io/apiserver/pkg/endpoints/request"
|
||||||
fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format"
|
fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format"
|
||||||
fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMatching(t *testing.T) {
|
func TestMatching(t *testing.T) {
|
||||||
@ -102,8 +101,7 @@ func TestLiterals(t *testing.T) {
|
|||||||
Name: "eman",
|
Name: "eman",
|
||||||
Parts: []string{"goodrscs", "eman"},
|
Parts: []string{"goodrscs", "eman"},
|
||||||
},
|
},
|
||||||
User: ui,
|
User: ui,
|
||||||
WorkEstimate: fcrequest.WorkEstimate{InitialSeats: 1},
|
|
||||||
}
|
}
|
||||||
reqRU := RequestDigest{
|
reqRU := RequestDigest{
|
||||||
RequestInfo: &request.RequestInfo{
|
RequestInfo: &request.RequestInfo{
|
||||||
@ -118,8 +116,7 @@ func TestLiterals(t *testing.T) {
|
|||||||
Name: "eman",
|
Name: "eman",
|
||||||
Parts: []string{"goodrscs", "eman"},
|
Parts: []string{"goodrscs", "eman"},
|
||||||
},
|
},
|
||||||
User: ui,
|
User: ui,
|
||||||
WorkEstimate: fcrequest.WorkEstimate{InitialSeats: 1},
|
|
||||||
}
|
}
|
||||||
reqN := RequestDigest{
|
reqN := RequestDigest{
|
||||||
RequestInfo: &request.RequestInfo{
|
RequestInfo: &request.RequestInfo{
|
||||||
@ -127,8 +124,7 @@ func TestLiterals(t *testing.T) {
|
|||||||
Path: "/openapi/v2",
|
Path: "/openapi/v2",
|
||||||
Verb: "goodverb",
|
Verb: "goodverb",
|
||||||
},
|
},
|
||||||
User: ui,
|
User: ui,
|
||||||
WorkEstimate: fcrequest.WorkEstimate{InitialSeats: 1},
|
|
||||||
}
|
}
|
||||||
checkRules(t, true, reqRN, []flowcontrol.PolicyRulesWithSubjects{{
|
checkRules(t, true, reqRN, []flowcontrol.PolicyRulesWithSubjects{{
|
||||||
Subjects: []flowcontrol.Subject{{Kind: flowcontrol.SubjectKindUser,
|
Subjects: []flowcontrol.Subject{{Kind: flowcontrol.SubjectKindUser,
|
||||||
|
@ -104,6 +104,28 @@ var (
|
|||||||
},
|
},
|
||||||
[]string{priorityLevel, flowSchema},
|
[]string{priorityLevel, flowSchema},
|
||||||
)
|
)
|
||||||
|
// PriorityLevelExecutionSeatsObserverGenerator creates observers of seats occupied throughout execution for priority levels
|
||||||
|
PriorityLevelExecutionSeatsObserverGenerator = NewSampleAndWaterMarkHistogramsGenerator(clock.RealClock{}, time.Millisecond,
|
||||||
|
&compbasemetrics.HistogramOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "priority_level_seat_count_samples",
|
||||||
|
Help: "Periodic observations of the number of requests",
|
||||||
|
Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1},
|
||||||
|
ConstLabels: map[string]string{phase: "executing"},
|
||||||
|
StabilityLevel: compbasemetrics.ALPHA,
|
||||||
|
},
|
||||||
|
&compbasemetrics.HistogramOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "priority_level_seat_count_watermarks",
|
||||||
|
Help: "Watermarks of the number of requests",
|
||||||
|
Buckets: []float64{0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1},
|
||||||
|
ConstLabels: map[string]string{phase: "executing"},
|
||||||
|
StabilityLevel: compbasemetrics.ALPHA,
|
||||||
|
},
|
||||||
|
[]string{priorityLevel},
|
||||||
|
)
|
||||||
// PriorityLevelConcurrencyObserverPairGenerator creates pairs that observe concurrency for priority levels
|
// PriorityLevelConcurrencyObserverPairGenerator creates pairs that observe concurrency for priority levels
|
||||||
PriorityLevelConcurrencyObserverPairGenerator = NewSampleAndWaterMarkHistogramsPairGenerator(clock.RealClock{}, time.Millisecond,
|
PriorityLevelConcurrencyObserverPairGenerator = NewSampleAndWaterMarkHistogramsPairGenerator(clock.RealClock{}, time.Millisecond,
|
||||||
&compbasemetrics.HistogramOpts{
|
&compbasemetrics.HistogramOpts{
|
||||||
@ -267,6 +289,17 @@ var (
|
|||||||
},
|
},
|
||||||
[]string{priorityLevel, flowSchema},
|
[]string{priorityLevel, flowSchema},
|
||||||
)
|
)
|
||||||
|
watchCountSamples = compbasemetrics.NewHistogramVec(
|
||||||
|
&compbasemetrics.HistogramOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "watch_count_samples",
|
||||||
|
Help: "count of watchers for mutating requests in API Priority and Fairness",
|
||||||
|
Buckets: []float64{0, 1, 10, 100, 1000, 10000},
|
||||||
|
StabilityLevel: compbasemetrics.ALPHA,
|
||||||
|
},
|
||||||
|
[]string{priorityLevel, flowSchema},
|
||||||
|
)
|
||||||
apiserverEpochAdvances = compbasemetrics.NewCounterVec(
|
apiserverEpochAdvances = compbasemetrics.NewCounterVec(
|
||||||
&compbasemetrics.CounterOpts{
|
&compbasemetrics.CounterOpts{
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
@ -293,8 +326,10 @@ var (
|
|||||||
apiserverCurrentExecutingRequests,
|
apiserverCurrentExecutingRequests,
|
||||||
apiserverRequestWaitingSeconds,
|
apiserverRequestWaitingSeconds,
|
||||||
apiserverRequestExecutionSeconds,
|
apiserverRequestExecutionSeconds,
|
||||||
|
watchCountSamples,
|
||||||
apiserverEpochAdvances,
|
apiserverEpochAdvances,
|
||||||
}.
|
}.
|
||||||
|
Append(PriorityLevelExecutionSeatsObserverGenerator.metrics()...).
|
||||||
Append(PriorityLevelConcurrencyObserverPairGenerator.metrics()...).
|
Append(PriorityLevelConcurrencyObserverPairGenerator.metrics()...).
|
||||||
Append(ReadWriteConcurrencyObserverPairGenerator.metrics()...)
|
Append(ReadWriteConcurrencyObserverPairGenerator.metrics()...)
|
||||||
)
|
)
|
||||||
@ -360,6 +395,12 @@ func ObserveExecutionDuration(ctx context.Context, priorityLevel, flowSchema str
|
|||||||
apiserverRequestExecutionSeconds.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema).Observe(executionTime.Seconds())
|
apiserverRequestExecutionSeconds.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema).Observe(executionTime.Seconds())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ObserveWatchCount notes a sampling of a watch count
|
||||||
|
func ObserveWatchCount(ctx context.Context, priorityLevel, flowSchema string, count int) {
|
||||||
|
watchCountSamples.WithLabelValues(priorityLevel, flowSchema).Observe(float64(count))
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddEpochAdvance notes an advance of the progress meter baseline for a given priority level
|
||||||
func AddEpochAdvance(ctx context.Context, priorityLevel string, success bool) {
|
func AddEpochAdvance(ctx context.Context, priorityLevel string, success bool) {
|
||||||
apiserverEpochAdvances.WithContext(ctx).WithLabelValues(priorityLevel, strconv.FormatBool(success)).Inc()
|
apiserverEpochAdvances.WithContext(ctx).WithLabelValues(priorityLevel, strconv.FormatBool(success)).Inc()
|
||||||
}
|
}
|
||||||
|
@ -79,7 +79,7 @@ type sampleAndWaterMarkObserverGenerator struct {
|
|||||||
waterMarks *compbasemetrics.HistogramVec
|
waterMarks *compbasemetrics.HistogramVec
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ TimedObserverGenerator = (*sampleAndWaterMarkObserverGenerator)(nil)
|
var _ TimedObserverGenerator = SampleAndWaterMarkObserverGenerator{}
|
||||||
|
|
||||||
// NewSampleAndWaterMarkHistogramsGenerator makes a new one
|
// NewSampleAndWaterMarkHistogramsGenerator makes a new one
|
||||||
func NewSampleAndWaterMarkHistogramsGenerator(clock clock.PassiveClock, samplePeriod time.Duration, sampleOpts, waterMarkOpts *compbasemetrics.HistogramOpts, labelNames []string) SampleAndWaterMarkObserverGenerator {
|
func NewSampleAndWaterMarkHistogramsGenerator(clock clock.PassiveClock, samplePeriod time.Duration, sampleOpts, waterMarkOpts *compbasemetrics.HistogramOpts, labelNames []string) SampleAndWaterMarkObserverGenerator {
|
||||||
|
@ -40,7 +40,7 @@ type listWorkEstimator struct {
|
|||||||
countGetterFn objectCountGetterFunc
|
countGetterFn objectCountGetterFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *listWorkEstimator) estimate(r *http.Request) WorkEstimate {
|
func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate {
|
||||||
requestInfo, ok := apirequest.RequestInfoFrom(r.Context())
|
requestInfo, ok := apirequest.RequestInfoFrom(r.Context())
|
||||||
if !ok {
|
if !ok {
|
||||||
// no RequestInfo should never happen, but to be on the safe side
|
// no RequestInfo should never happen, but to be on the safe side
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||||
|
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -49,7 +50,9 @@ type mutatingWorkEstimator struct {
|
|||||||
enabled bool
|
enabled bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *mutatingWorkEstimator) estimate(r *http.Request) WorkEstimate {
|
func (e *mutatingWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate {
|
||||||
|
// TODO(wojtekt): Remove once we tune the algorithm to not fail
|
||||||
|
// scalability tests.
|
||||||
if !e.enabled {
|
if !e.enabled {
|
||||||
return WorkEstimate{
|
return WorkEstimate{
|
||||||
InitialSeats: 1,
|
InitialSeats: 1,
|
||||||
@ -67,6 +70,7 @@ func (e *mutatingWorkEstimator) estimate(r *http.Request) WorkEstimate {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
watchCount := e.countFn(requestInfo)
|
watchCount := e.countFn(requestInfo)
|
||||||
|
metrics.ObserveWatchCount(r.Context(), priorityLevelName, flowSchemaName, watchCount)
|
||||||
|
|
||||||
// The cost of the request associated with the watchers of that event
|
// The cost of the request associated with the watchers of that event
|
||||||
// consists of three parts:
|
// consists of three parts:
|
||||||
|
@ -83,10 +83,10 @@ func NewWorkEstimator(objectCountFn objectCountGetterFunc, watchCountFn watchCou
|
|||||||
// WorkEstimatorFunc returns the estimated work of a given request.
|
// WorkEstimatorFunc returns the estimated work of a given request.
|
||||||
// This function will be used by the Priority & Fairness filter to
|
// This function will be used by the Priority & Fairness filter to
|
||||||
// estimate the work of of incoming requests.
|
// estimate the work of of incoming requests.
|
||||||
type WorkEstimatorFunc func(*http.Request) WorkEstimate
|
type WorkEstimatorFunc func(request *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate
|
||||||
|
|
||||||
func (e WorkEstimatorFunc) EstimateWork(r *http.Request) WorkEstimate {
|
func (e WorkEstimatorFunc) EstimateWork(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate {
|
||||||
return e(r)
|
return e(r, flowSchemaName, priorityLevelName)
|
||||||
}
|
}
|
||||||
|
|
||||||
type workEstimator struct {
|
type workEstimator struct {
|
||||||
@ -96,7 +96,7 @@ type workEstimator struct {
|
|||||||
mutatingWorkEstimator WorkEstimatorFunc
|
mutatingWorkEstimator WorkEstimatorFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *workEstimator) estimate(r *http.Request) WorkEstimate {
|
func (e *workEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate {
|
||||||
requestInfo, ok := apirequest.RequestInfoFrom(r.Context())
|
requestInfo, ok := apirequest.RequestInfoFrom(r.Context())
|
||||||
if !ok {
|
if !ok {
|
||||||
klog.ErrorS(fmt.Errorf("no RequestInfo found in context"), "Failed to estimate work for the request", "URI", r.RequestURI)
|
klog.ErrorS(fmt.Errorf("no RequestInfo found in context"), "Failed to estimate work for the request", "URI", r.RequestURI)
|
||||||
@ -106,9 +106,9 @@ func (e *workEstimator) estimate(r *http.Request) WorkEstimate {
|
|||||||
|
|
||||||
switch requestInfo.Verb {
|
switch requestInfo.Verb {
|
||||||
case "list":
|
case "list":
|
||||||
return e.listWorkEstimator.EstimateWork(r)
|
return e.listWorkEstimator.EstimateWork(r, flowSchemaName, priorityLevelName)
|
||||||
case "create", "update", "patch", "delete":
|
case "create", "update", "patch", "delete":
|
||||||
return e.mutatingWorkEstimator.EstimateWork(r)
|
return e.mutatingWorkEstimator.EstimateWork(r, flowSchemaName, priorityLevelName)
|
||||||
}
|
}
|
||||||
|
|
||||||
return WorkEstimate{InitialSeats: minimumSeats}
|
return WorkEstimate{InitialSeats: minimumSeats}
|
||||||
|
@ -410,7 +410,7 @@ func TestWorkEstimator(t *testing.T) {
|
|||||||
req = req.WithContext(apirequest.WithRequestInfo(req.Context(), test.requestInfo))
|
req = req.WithContext(apirequest.WithRequestInfo(req.Context(), test.requestInfo))
|
||||||
}
|
}
|
||||||
|
|
||||||
workestimateGot := estimator.EstimateWork(req)
|
workestimateGot := estimator.EstimateWork(req, "testFS", "testPL")
|
||||||
if test.initialSeatsExpected != workestimateGot.InitialSeats {
|
if test.initialSeatsExpected != workestimateGot.InitialSeats {
|
||||||
t.Errorf("Expected work estimate to match: %d initial seats, but got: %d", test.initialSeatsExpected, workestimateGot.InitialSeats)
|
t.Errorf("Expected work estimate to match: %d initial seats, but got: %d", test.initialSeatsExpected, workestimateGot.InitialSeats)
|
||||||
}
|
}
|
||||||
|
@ -139,7 +139,8 @@ func (ft *fightTest) createController(invert bool, i int) {
|
|||||||
FlowcontrolClient: fcIfc,
|
FlowcontrolClient: fcIfc,
|
||||||
ServerConcurrencyLimit: 200, // server concurrency limit
|
ServerConcurrencyLimit: 200, // server concurrency limit
|
||||||
RequestWaitLimit: time.Minute / 4, // request wait limit
|
RequestWaitLimit: time.Minute / 4, // request wait limit
|
||||||
ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator,
|
ReqsObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator,
|
||||||
|
ExecSeatsObsGenerator: metrics.PriorityLevelExecutionSeatsObserverGenerator,
|
||||||
QueueSetFactory: fqtesting.NewNoRestraintFactory(),
|
QueueSetFactory: fqtesting.NewNoRestraintFactory(),
|
||||||
})
|
})
|
||||||
ft.ctlrs[invert][i] = ctlr
|
ft.ctlrs[invert][i] = ctlr
|
||||||
|
Loading…
Reference in New Issue
Block a user