diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go index ad043d7f427..1f341935d3e 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go @@ -27,6 +27,7 @@ import ( apitypes "k8s.io/apimachinery/pkg/types" epmetrics "k8s.io/apiserver/pkg/endpoints/metrics" apirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/server/httplog" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics" flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" @@ -55,6 +56,15 @@ var atomicMutatingWaiting, atomicReadOnlyWaiting int32 // newInitializationSignal is defined for testing purposes. var newInitializationSignal = utilflowcontrol.NewInitializationSignal +func truncateLogField(s string) string { + const maxFieldLogLength = 64 + + if len(s) > maxFieldLogLength { + s = s[0:maxFieldLogLength] + } + return s +} + // WithPriorityAndFairness limits the number of in-flight // requests in a fine-grained way. func WithPriorityAndFairness( @@ -90,12 +100,16 @@ func WithPriorityAndFairness( } var classification *PriorityAndFairnessClassification - note := func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration) { + note := func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) { classification = &PriorityAndFairnessClassification{ FlowSchemaName: fs.Name, FlowSchemaUID: fs.UID, PriorityLevelName: pl.Name, PriorityLevelUID: pl.UID} + + httplog.AddKeyValue(ctx, "apf_pl", truncateLogField(pl.Name)) + httplog.AddKeyValue(ctx, "apf_fs", truncateLogField(fs.Name)) + httplog.AddKeyValue(ctx, "apf_d", truncateLogField(flowDistinguisher)) } var served bool diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index 91b2c9dde97..c84345c0f97 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -87,14 +87,14 @@ func (t fakeApfFilter) MaintainObservations(stopCh <-chan struct{}) { func (t fakeApfFilter) Handle(ctx context.Context, requestDigest utilflowcontrol.RequestDigest, - noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration), + noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), queueNoteFn fq.QueueNoteFn, execFn func(), ) { if t.mockDecision == decisionSkipFilter { panic("Handle should not be invoked") } - noteFn(bootstrap.SuggestedFlowSchemaGlobalDefault, bootstrap.SuggestedPriorityLevelConfigurationGlobalDefault) + noteFn(bootstrap.SuggestedFlowSchemaGlobalDefault, bootstrap.SuggestedPriorityLevelConfigurationGlobalDefault, requestDigest.User.GetName()) switch t.mockDecision { case decisionNoQueuingExecute: execFn() @@ -390,7 +390,7 @@ func newFakeWatchApfFilter(capacity int) *fakeWatchApfFilter { func (f *fakeWatchApfFilter) Handle(ctx context.Context, requestDigest utilflowcontrol.RequestDigest, - _ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration), + _ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), _ fq.QueueNoteFn, execFn func(), ) { @@ -638,7 +638,7 @@ type fakeFilterRequestDigest struct { func (f *fakeFilterRequestDigest) Handle(ctx context.Context, requestDigest utilflowcontrol.RequestDigest, - _ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration), + _ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), _ fq.QueueNoteFn, _ func(), ) { f.requestDigestGot = &requestDigest diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index 440259fbf63..bde6e79178a 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -775,7 +775,7 @@ func (immediateRequest) Finish(execute func()) bool { // The returned bool indicates whether the request is exempt from // limitation. The startWaitingTime is when the request started // waiting in its queue, or `Time{}` if this did not happen. -func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) { +func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time, flowDistinguisher string) { klog.V(7).Infof("startRequest(%#+v)", rd) cfgCtlr.lock.Lock() defer cfgCtlr.lock.Unlock() @@ -807,13 +807,12 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig plState := cfgCtlr.priorityLevelStates[plName] if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt { klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName) - return selectedFlowSchema, plState.pl, true, immediateRequest{}, time.Time{} + return selectedFlowSchema, plState.pl, true, immediateRequest{}, time.Time{}, "" } var numQueues int32 if plState.pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeQueue { numQueues = plState.pl.Spec.Limited.LimitResponse.Queuing.Queues } - var flowDistinguisher string var hashValue uint64 if numQueues > 1 { flowDistinguisher = computeFlowDistinguisher(rd, selectedFlowSchema.Spec.DistinguisherMethod) @@ -825,7 +824,7 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig if idle { cfgCtlr.maybeReapLocked(plName, plState) } - return selectedFlowSchema, plState.pl, false, req, startWaitingTime + return selectedFlowSchema, plState.pl, false, req, startWaitingTime, flowDistinguisher } // maybeReap will remove the last internal traces of the named diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go index 2268f6c94ca..32bcaf3e277 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go @@ -53,7 +53,7 @@ type Interface interface { // ctx is cancelled or times out. Handle(ctx context.Context, requestDigest RequestDigest, - noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration), + noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), queueNoteFn fq.QueueNoteFn, execFn func(), ) @@ -144,12 +144,12 @@ func NewTestable(config TestableConfig) Interface { } func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest RequestDigest, - noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration), + noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), queueNoteFn fq.QueueNoteFn, execFn func()) { - fs, pl, isExempt, req, startWaitingTime := cfgCtlr.startRequest(ctx, requestDigest, queueNoteFn) + fs, pl, isExempt, req, startWaitingTime, flowDistinguisher := cfgCtlr.startRequest(ctx, requestDigest, queueNoteFn) queued := startWaitingTime != time.Time{} - noteFn(fs, pl) + noteFn(fs, pl, flowDistinguisher) if req == nil { if queued { metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go index 9766bde7755..d36eca7c4d9 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go @@ -460,7 +460,7 @@ func checkNewFS(cts *ctlrTestState, rng *rand.Rand, trialName string, ftr *fsTes startWG.Add(1) go func(matches, isResource bool, rdu RequestDigest) { expectedMatch := matches && ftr.wellFormed && (fsPrecedes(fs, catchAlls[isResource]) || fs.Name == catchAlls[isResource].Name) - ctlr.Handle(ctx, rdu, func(matchFS *flowcontrol.FlowSchema, matchPL *flowcontrol.PriorityLevelConfiguration) { + ctlr.Handle(ctx, rdu, func(matchFS *flowcontrol.FlowSchema, matchPL *flowcontrol.PriorityLevelConfiguration, _ string) { matchIsExempt := matchPL.Spec.Type == flowcontrol.PriorityLevelEnablementExempt if testDebugLogs { t.Logf("Considering FlowSchema %s, expectedMatch=%v, isResource=%v: Handle(%#+v) => note(fs=%s, pl=%s, isExempt=%v)", fs.Name, expectedMatch, isResource, rdu, matchFS.Name, matchPL.Name, matchIsExempt)