mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 12:43:23 +00:00
Merge pull request #104359 from mborsz/apflog
Add APF's priorityLevel to httplog.go
This commit is contained in:
commit
ef754331c4
@ -27,6 +27,7 @@ import (
|
|||||||
apitypes "k8s.io/apimachinery/pkg/types"
|
apitypes "k8s.io/apimachinery/pkg/types"
|
||||||
epmetrics "k8s.io/apiserver/pkg/endpoints/metrics"
|
epmetrics "k8s.io/apiserver/pkg/endpoints/metrics"
|
||||||
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||||
|
"k8s.io/apiserver/pkg/server/httplog"
|
||||||
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
||||||
fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||||
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
|
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
|
||||||
@ -55,6 +56,15 @@ var atomicMutatingWaiting, atomicReadOnlyWaiting int32
|
|||||||
// newInitializationSignal is defined for testing purposes.
|
// newInitializationSignal is defined for testing purposes.
|
||||||
var newInitializationSignal = utilflowcontrol.NewInitializationSignal
|
var newInitializationSignal = utilflowcontrol.NewInitializationSignal
|
||||||
|
|
||||||
|
func truncateLogField(s string) string {
|
||||||
|
const maxFieldLogLength = 64
|
||||||
|
|
||||||
|
if len(s) > maxFieldLogLength {
|
||||||
|
s = s[0:maxFieldLogLength]
|
||||||
|
}
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
// WithPriorityAndFairness limits the number of in-flight
|
// WithPriorityAndFairness limits the number of in-flight
|
||||||
// requests in a fine-grained way.
|
// requests in a fine-grained way.
|
||||||
func WithPriorityAndFairness(
|
func WithPriorityAndFairness(
|
||||||
@ -90,12 +100,16 @@ func WithPriorityAndFairness(
|
|||||||
}
|
}
|
||||||
|
|
||||||
var classification *PriorityAndFairnessClassification
|
var classification *PriorityAndFairnessClassification
|
||||||
note := func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration) {
|
note := func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) {
|
||||||
classification = &PriorityAndFairnessClassification{
|
classification = &PriorityAndFairnessClassification{
|
||||||
FlowSchemaName: fs.Name,
|
FlowSchemaName: fs.Name,
|
||||||
FlowSchemaUID: fs.UID,
|
FlowSchemaUID: fs.UID,
|
||||||
PriorityLevelName: pl.Name,
|
PriorityLevelName: pl.Name,
|
||||||
PriorityLevelUID: pl.UID}
|
PriorityLevelUID: pl.UID}
|
||||||
|
|
||||||
|
httplog.AddKeyValue(ctx, "apf_pl", truncateLogField(pl.Name))
|
||||||
|
httplog.AddKeyValue(ctx, "apf_fs", truncateLogField(fs.Name))
|
||||||
|
httplog.AddKeyValue(ctx, "apf_d", truncateLogField(flowDistinguisher))
|
||||||
}
|
}
|
||||||
|
|
||||||
var served bool
|
var served bool
|
||||||
|
@ -87,14 +87,14 @@ func (t fakeApfFilter) MaintainObservations(stopCh <-chan struct{}) {
|
|||||||
|
|
||||||
func (t fakeApfFilter) Handle(ctx context.Context,
|
func (t fakeApfFilter) Handle(ctx context.Context,
|
||||||
requestDigest utilflowcontrol.RequestDigest,
|
requestDigest utilflowcontrol.RequestDigest,
|
||||||
noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration),
|
noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
|
||||||
queueNoteFn fq.QueueNoteFn,
|
queueNoteFn fq.QueueNoteFn,
|
||||||
execFn func(),
|
execFn func(),
|
||||||
) {
|
) {
|
||||||
if t.mockDecision == decisionSkipFilter {
|
if t.mockDecision == decisionSkipFilter {
|
||||||
panic("Handle should not be invoked")
|
panic("Handle should not be invoked")
|
||||||
}
|
}
|
||||||
noteFn(bootstrap.SuggestedFlowSchemaGlobalDefault, bootstrap.SuggestedPriorityLevelConfigurationGlobalDefault)
|
noteFn(bootstrap.SuggestedFlowSchemaGlobalDefault, bootstrap.SuggestedPriorityLevelConfigurationGlobalDefault, requestDigest.User.GetName())
|
||||||
switch t.mockDecision {
|
switch t.mockDecision {
|
||||||
case decisionNoQueuingExecute:
|
case decisionNoQueuingExecute:
|
||||||
execFn()
|
execFn()
|
||||||
@ -390,7 +390,7 @@ func newFakeWatchApfFilter(capacity int) *fakeWatchApfFilter {
|
|||||||
|
|
||||||
func (f *fakeWatchApfFilter) Handle(ctx context.Context,
|
func (f *fakeWatchApfFilter) Handle(ctx context.Context,
|
||||||
requestDigest utilflowcontrol.RequestDigest,
|
requestDigest utilflowcontrol.RequestDigest,
|
||||||
_ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration),
|
_ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
|
||||||
_ fq.QueueNoteFn,
|
_ fq.QueueNoteFn,
|
||||||
execFn func(),
|
execFn func(),
|
||||||
) {
|
) {
|
||||||
@ -638,7 +638,7 @@ type fakeFilterRequestDigest struct {
|
|||||||
|
|
||||||
func (f *fakeFilterRequestDigest) Handle(ctx context.Context,
|
func (f *fakeFilterRequestDigest) Handle(ctx context.Context,
|
||||||
requestDigest utilflowcontrol.RequestDigest,
|
requestDigest utilflowcontrol.RequestDigest,
|
||||||
_ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration),
|
_ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
|
||||||
_ fq.QueueNoteFn, _ func(),
|
_ fq.QueueNoteFn, _ func(),
|
||||||
) {
|
) {
|
||||||
f.requestDigestGot = &requestDigest
|
f.requestDigestGot = &requestDigest
|
||||||
|
@ -775,7 +775,7 @@ func (immediateRequest) Finish(execute func()) bool {
|
|||||||
// The returned bool indicates whether the request is exempt from
|
// The returned bool indicates whether the request is exempt from
|
||||||
// limitation. The startWaitingTime is when the request started
|
// limitation. The startWaitingTime is when the request started
|
||||||
// waiting in its queue, or `Time{}` if this did not happen.
|
// waiting in its queue, or `Time{}` if this did not happen.
|
||||||
func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) {
|
func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time, flowDistinguisher string) {
|
||||||
klog.V(7).Infof("startRequest(%#+v)", rd)
|
klog.V(7).Infof("startRequest(%#+v)", rd)
|
||||||
cfgCtlr.lock.Lock()
|
cfgCtlr.lock.Lock()
|
||||||
defer cfgCtlr.lock.Unlock()
|
defer cfgCtlr.lock.Unlock()
|
||||||
@ -807,13 +807,12 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig
|
|||||||
plState := cfgCtlr.priorityLevelStates[plName]
|
plState := cfgCtlr.priorityLevelStates[plName]
|
||||||
if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt {
|
if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt {
|
||||||
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName)
|
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName)
|
||||||
return selectedFlowSchema, plState.pl, true, immediateRequest{}, time.Time{}
|
return selectedFlowSchema, plState.pl, true, immediateRequest{}, time.Time{}, ""
|
||||||
}
|
}
|
||||||
var numQueues int32
|
var numQueues int32
|
||||||
if plState.pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeQueue {
|
if plState.pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeQueue {
|
||||||
numQueues = plState.pl.Spec.Limited.LimitResponse.Queuing.Queues
|
numQueues = plState.pl.Spec.Limited.LimitResponse.Queuing.Queues
|
||||||
}
|
}
|
||||||
var flowDistinguisher string
|
|
||||||
var hashValue uint64
|
var hashValue uint64
|
||||||
if numQueues > 1 {
|
if numQueues > 1 {
|
||||||
flowDistinguisher = computeFlowDistinguisher(rd, selectedFlowSchema.Spec.DistinguisherMethod)
|
flowDistinguisher = computeFlowDistinguisher(rd, selectedFlowSchema.Spec.DistinguisherMethod)
|
||||||
@ -825,7 +824,7 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig
|
|||||||
if idle {
|
if idle {
|
||||||
cfgCtlr.maybeReapLocked(plName, plState)
|
cfgCtlr.maybeReapLocked(plName, plState)
|
||||||
}
|
}
|
||||||
return selectedFlowSchema, plState.pl, false, req, startWaitingTime
|
return selectedFlowSchema, plState.pl, false, req, startWaitingTime, flowDistinguisher
|
||||||
}
|
}
|
||||||
|
|
||||||
// maybeReap will remove the last internal traces of the named
|
// maybeReap will remove the last internal traces of the named
|
||||||
|
@ -53,7 +53,7 @@ type Interface interface {
|
|||||||
// ctx is cancelled or times out.
|
// ctx is cancelled or times out.
|
||||||
Handle(ctx context.Context,
|
Handle(ctx context.Context,
|
||||||
requestDigest RequestDigest,
|
requestDigest RequestDigest,
|
||||||
noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration),
|
noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
|
||||||
queueNoteFn fq.QueueNoteFn,
|
queueNoteFn fq.QueueNoteFn,
|
||||||
execFn func(),
|
execFn func(),
|
||||||
)
|
)
|
||||||
@ -144,12 +144,12 @@ func NewTestable(config TestableConfig) Interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest RequestDigest,
|
func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest RequestDigest,
|
||||||
noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration),
|
noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
|
||||||
queueNoteFn fq.QueueNoteFn,
|
queueNoteFn fq.QueueNoteFn,
|
||||||
execFn func()) {
|
execFn func()) {
|
||||||
fs, pl, isExempt, req, startWaitingTime := cfgCtlr.startRequest(ctx, requestDigest, queueNoteFn)
|
fs, pl, isExempt, req, startWaitingTime, flowDistinguisher := cfgCtlr.startRequest(ctx, requestDigest, queueNoteFn)
|
||||||
queued := startWaitingTime != time.Time{}
|
queued := startWaitingTime != time.Time{}
|
||||||
noteFn(fs, pl)
|
noteFn(fs, pl, flowDistinguisher)
|
||||||
if req == nil {
|
if req == nil {
|
||||||
if queued {
|
if queued {
|
||||||
metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
|
metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
|
||||||
|
@ -460,7 +460,7 @@ func checkNewFS(cts *ctlrTestState, rng *rand.Rand, trialName string, ftr *fsTes
|
|||||||
startWG.Add(1)
|
startWG.Add(1)
|
||||||
go func(matches, isResource bool, rdu RequestDigest) {
|
go func(matches, isResource bool, rdu RequestDigest) {
|
||||||
expectedMatch := matches && ftr.wellFormed && (fsPrecedes(fs, catchAlls[isResource]) || fs.Name == catchAlls[isResource].Name)
|
expectedMatch := matches && ftr.wellFormed && (fsPrecedes(fs, catchAlls[isResource]) || fs.Name == catchAlls[isResource].Name)
|
||||||
ctlr.Handle(ctx, rdu, func(matchFS *flowcontrol.FlowSchema, matchPL *flowcontrol.PriorityLevelConfiguration) {
|
ctlr.Handle(ctx, rdu, func(matchFS *flowcontrol.FlowSchema, matchPL *flowcontrol.PriorityLevelConfiguration, _ string) {
|
||||||
matchIsExempt := matchPL.Spec.Type == flowcontrol.PriorityLevelEnablementExempt
|
matchIsExempt := matchPL.Spec.Type == flowcontrol.PriorityLevelEnablementExempt
|
||||||
if testDebugLogs {
|
if testDebugLogs {
|
||||||
t.Logf("Considering FlowSchema %s, expectedMatch=%v, isResource=%v: Handle(%#+v) => note(fs=%s, pl=%s, isExempt=%v)", fs.Name, expectedMatch, isResource, rdu, matchFS.Name, matchPL.Name, matchIsExempt)
|
t.Logf("Considering FlowSchema %s, expectedMatch=%v, isResource=%v: Handle(%#+v) => note(fs=%s, pl=%s, isExempt=%v)", fs.Name, expectedMatch, isResource, rdu, matchFS.Name, matchPL.Name, matchIsExempt)
|
||||||
|
Loading…
Reference in New Issue
Block a user