apf: ensure exempt request notes the classification

This commit is contained in:
Abu Kashem 2021-12-05 11:29:15 -05:00
parent 0153febd9f
commit 8b2dd74c27
No known key found for this signature in database
GPG Key ID: 33A4FA7088DB68A9
5 changed files with 49 additions and 19 deletions

View File

@ -101,7 +101,7 @@ func WithPriorityAndFairness(
} }
var classification *PriorityAndFairnessClassification var classification *PriorityAndFairnessClassification
estimateWork := func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) flowcontrolrequest.WorkEstimate { noteFn := func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) {
classification = &PriorityAndFairnessClassification{ classification = &PriorityAndFairnessClassification{
FlowSchemaName: fs.Name, FlowSchemaName: fs.Name,
FlowSchemaUID: fs.UID, FlowSchemaUID: fs.UID,
@ -111,7 +111,19 @@ func WithPriorityAndFairness(
httplog.AddKeyValue(ctx, "apf_pl", truncateLogField(pl.Name)) httplog.AddKeyValue(ctx, "apf_pl", truncateLogField(pl.Name))
httplog.AddKeyValue(ctx, "apf_fs", truncateLogField(fs.Name)) httplog.AddKeyValue(ctx, "apf_fs", truncateLogField(fs.Name))
httplog.AddKeyValue(ctx, "apf_fd", truncateLogField(flowDistinguisher)) httplog.AddKeyValue(ctx, "apf_fd", truncateLogField(flowDistinguisher))
return workEstimator(r, fs.Name, pl.Name) }
// estimateWork is called, if at all, after noteFn
estimateWork := func() flowcontrolrequest.WorkEstimate {
if classification == nil {
// workEstimator is being invoked before classification of
// the request has completed, we should never be here though.
klog.ErrorS(fmt.Errorf("workEstimator is being invoked before classification of the request has completed"),
"Using empty FlowSchema and PriorityLevelConfiguration name", "verb", r.Method, "URI", r.RequestURI)
return workEstimator(r, "", "")
}
return workEstimator(r, classification.FlowSchemaName, classification.PriorityLevelName)
} }
var served bool var served bool
@ -235,7 +247,7 @@ func WithPriorityAndFairness(
// Note that Handle will return irrespective of whether the request // Note that Handle will return irrespective of whether the request
// executes or is rejected. In the latter case, the function will return // executes or is rejected. In the latter case, the function will return
// without calling the passed `execute` function. // without calling the passed `execute` function.
fcIfc.Handle(handleCtx, digest, estimateWork, queueNote, execute) fcIfc.Handle(handleCtx, digest, noteFn, estimateWork, queueNote, execute)
}() }()
select { select {
@ -266,7 +278,7 @@ func WithPriorityAndFairness(
handler.ServeHTTP(w, r) handler.ServeHTTP(w, r)
} }
fcIfc.Handle(ctx, digest, estimateWork, queueNote, execute) fcIfc.Handle(ctx, digest, noteFn, estimateWork, queueNote, execute)
} }
if !served { if !served {

View File

@ -87,14 +87,15 @@ func (t fakeApfFilter) MaintainObservations(stopCh <-chan struct{}) {
func (t fakeApfFilter) Handle(ctx context.Context, func (t fakeApfFilter) Handle(ctx context.Context,
requestDigest utilflowcontrol.RequestDigest, requestDigest utilflowcontrol.RequestDigest,
workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate, noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
workEstimator func() fcrequest.WorkEstimate,
queueNoteFn fq.QueueNoteFn, queueNoteFn fq.QueueNoteFn,
execFn func(), execFn func(),
) { ) {
if t.mockDecision == decisionSkipFilter { if t.mockDecision == decisionSkipFilter {
panic("Handle should not be invoked") panic("Handle should not be invoked")
} }
workEstimator(bootstrap.SuggestedFlowSchemaGlobalDefault, bootstrap.SuggestedPriorityLevelConfigurationGlobalDefault, requestDigest.User.GetName()) noteFn(bootstrap.SuggestedFlowSchemaGlobalDefault, bootstrap.SuggestedPriorityLevelConfigurationGlobalDefault, requestDigest.User.GetName())
switch t.mockDecision { switch t.mockDecision {
case decisionNoQueuingExecute: case decisionNoQueuingExecute:
execFn() execFn()
@ -390,7 +391,8 @@ func newFakeWatchApfFilter(capacity int) *fakeWatchApfFilter {
func (f *fakeWatchApfFilter) Handle(ctx context.Context, func (f *fakeWatchApfFilter) Handle(ctx context.Context,
requestDigest utilflowcontrol.RequestDigest, requestDigest utilflowcontrol.RequestDigest,
_ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate, _ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
_ func() fcrequest.WorkEstimate,
_ fq.QueueNoteFn, _ fq.QueueNoteFn,
execFn func(), execFn func(),
) { ) {
@ -640,11 +642,13 @@ type fakeFilterRequestDigest struct {
func (f *fakeFilterRequestDigest) Handle(ctx context.Context, func (f *fakeFilterRequestDigest) Handle(ctx context.Context,
requestDigest utilflowcontrol.RequestDigest, requestDigest utilflowcontrol.RequestDigest,
workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate, noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
workEstimator func() fcrequest.WorkEstimate,
_ fq.QueueNoteFn, _ func(), _ fq.QueueNoteFn, _ func(),
) { ) {
f.requestDigestGot = &requestDigest f.requestDigestGot = &requestDigest
f.workEstimateGot = workEstimator(bootstrap.MandatoryFlowSchemaCatchAll, bootstrap.MandatoryPriorityLevelConfigurationCatchAll, "") noteFn(bootstrap.MandatoryFlowSchemaCatchAll, bootstrap.MandatoryPriorityLevelConfigurationCatchAll, "")
f.workEstimateGot = workEstimator()
} }
func TestApfWithRequestDigest(t *testing.T) { func TestApfWithRequestDigest(t *testing.T) {

View File

@ -799,7 +799,10 @@ func (immediateRequest) Finish(execute func()) bool {
// The returned bool indicates whether the request is exempt from // The returned bool indicates whether the request is exempt from
// limitation. The startWaitingTime is when the request started // limitation. The startWaitingTime is when the request started
// waiting in its queue, or `Time{}` if this did not happen. // waiting in its queue, or `Time{}` if this did not happen.
func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate, queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) { func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest,
noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
workEstimator func() fcrequest.WorkEstimate,
queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) {
klog.V(7).Infof("startRequest(%#+v)", rd) klog.V(7).Infof("startRequest(%#+v)", rd)
cfgCtlr.lock.RLock() cfgCtlr.lock.RLock()
defer cfgCtlr.lock.RUnlock() defer cfgCtlr.lock.RUnlock()
@ -830,6 +833,7 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig
plName := selectedFlowSchema.Spec.PriorityLevelConfiguration.Name plName := selectedFlowSchema.Spec.PriorityLevelConfiguration.Name
plState := cfgCtlr.priorityLevelStates[plName] plState := cfgCtlr.priorityLevelStates[plName]
if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt { if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt {
noteFn(selectedFlowSchema, plState.pl, "")
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName) klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName)
return selectedFlowSchema, plState.pl, true, immediateRequest{}, time.Time{} return selectedFlowSchema, plState.pl, true, immediateRequest{}, time.Time{}
} }
@ -843,7 +847,10 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig
flowDistinguisher = computeFlowDistinguisher(rd, selectedFlowSchema.Spec.DistinguisherMethod) flowDistinguisher = computeFlowDistinguisher(rd, selectedFlowSchema.Spec.DistinguisherMethod)
hashValue = hashFlowID(selectedFlowSchema.Name, flowDistinguisher) hashValue = hashFlowID(selectedFlowSchema.Name, flowDistinguisher)
} }
workEstimate := workEstimator(selectedFlowSchema, plState.pl, flowDistinguisher)
noteFn(selectedFlowSchema, plState.pl, flowDistinguisher)
workEstimate := workEstimator()
startWaitingTime = time.Now() startWaitingTime = time.Now()
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues) klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues)
req, idle := plState.queues.StartRequest(ctx, &workEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn) req, idle := plState.queues.StartRequest(ctx, &workEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn)

View File

@ -42,10 +42,14 @@ const ConfigConsumerAsFieldManager = "api-priority-and-fairness-config-consumer-
// Interface defines how the API Priority and Fairness filter interacts with the underlying system. // Interface defines how the API Priority and Fairness filter interacts with the underlying system.
type Interface interface { type Interface interface {
// Handle takes care of queuing and dispatching a request // Handle takes care of queuing and dispatching a request
// characterized by the given digest. The given `workEstimator` will be // characterized by the given digest. The given `noteFn` will be
// invoked with the results of request classification and must return the // invoked with the results of request classification.
// work parameters for the request. If the // The given `workEstimator` is called, if at all, after noteFn.
// request is queued then `queueNoteFn` will be called twice, // `workEstimator` will be invoked only when the request
// is classified as non 'exempt'.
// 'workEstimator', when invoked, must return the
// work parameters for the request.
// If the request is queued then `queueNoteFn` will be called twice,
// first with `true` and then with `false`; otherwise // first with `true` and then with `false`; otherwise
// `queueNoteFn` will not be called at all. If Handle decides // `queueNoteFn` will not be called at all. If Handle decides
// that the request should be executed then `execute()` will be // that the request should be executed then `execute()` will be
@ -55,7 +59,8 @@ type Interface interface {
// ctx is cancelled or times out. // ctx is cancelled or times out.
Handle(ctx context.Context, Handle(ctx context.Context,
requestDigest RequestDigest, requestDigest RequestDigest,
workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate, noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
workEstimator func() fcrequest.WorkEstimate,
queueNoteFn fq.QueueNoteFn, queueNoteFn fq.QueueNoteFn,
execFn func(), execFn func(),
) )
@ -150,10 +155,11 @@ func NewTestable(config TestableConfig) Interface {
} }
func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest RequestDigest, func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest RequestDigest,
workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate, noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
workEstimator func() fcrequest.WorkEstimate,
queueNoteFn fq.QueueNoteFn, queueNoteFn fq.QueueNoteFn,
execFn func()) { execFn func()) {
fs, pl, isExempt, req, startWaitingTime := cfgCtlr.startRequest(ctx, requestDigest, workEstimator, queueNoteFn) fs, pl, isExempt, req, startWaitingTime := cfgCtlr.startRequest(ctx, requestDigest, noteFn, workEstimator, queueNoteFn)
queued := startWaitingTime != time.Time{} queued := startWaitingTime != time.Time{}
if req == nil { if req == nil {
if queued { if queued {

View File

@ -462,7 +462,7 @@ func checkNewFS(cts *ctlrTestState, rng *rand.Rand, trialName string, ftr *fsTes
startWG.Add(1) startWG.Add(1)
go func(matches, isResource bool, rdu RequestDigest) { go func(matches, isResource bool, rdu RequestDigest) {
expectedMatch := matches && ftr.wellFormed && (fsPrecedes(fs, catchAlls[isResource]) || fs.Name == catchAlls[isResource].Name) expectedMatch := matches && ftr.wellFormed && (fsPrecedes(fs, catchAlls[isResource]) || fs.Name == catchAlls[isResource].Name)
ctlr.Handle(ctx, rdu, func(matchFS *flowcontrol.FlowSchema, matchPL *flowcontrol.PriorityLevelConfiguration, _ string) fcrequest.WorkEstimate { ctlr.Handle(ctx, rdu, func(matchFS *flowcontrol.FlowSchema, matchPL *flowcontrol.PriorityLevelConfiguration, _ string) {
matchIsExempt := matchPL.Spec.Type == flowcontrol.PriorityLevelEnablementExempt matchIsExempt := matchPL.Spec.Type == flowcontrol.PriorityLevelEnablementExempt
if testDebugLogs { if testDebugLogs {
t.Logf("Considering FlowSchema %s, expectedMatch=%v, isResource=%v: Handle(%#+v) => note(fs=%s, pl=%s, isExempt=%v)", fs.Name, expectedMatch, isResource, rdu, matchFS.Name, matchPL.Name, matchIsExempt) t.Logf("Considering FlowSchema %s, expectedMatch=%v, isResource=%v: Handle(%#+v) => note(fs=%s, pl=%s, isExempt=%v)", fs.Name, expectedMatch, isResource, rdu, matchFS.Name, matchPL.Name, matchIsExempt)
@ -475,6 +475,7 @@ func checkNewFS(cts *ctlrTestState, rng *rand.Rand, trialName string, ftr *fsTes
t.Errorf("Fail at %s/%s: expected=%v, actual=%v", trialName, fs.Name, fs.Spec.PriorityLevelConfiguration.Name, matchPL.Name) t.Errorf("Fail at %s/%s: expected=%v, actual=%v", trialName, fs.Name, fs.Spec.PriorityLevelConfiguration.Name, matchPL.Name)
} }
} }
}, func() fcrequest.WorkEstimate {
return fcrequest.WorkEstimate{InitialSeats: 1} return fcrequest.WorkEstimate{InitialSeats: 1}
}, func(inQueue bool) { }, func(inQueue bool) {
}, func() { }, func() {