Add metrics about watch counts seen by APF

This commit is contained in:
Mike Spreitzer 2021-10-25 03:31:47 -04:00
parent 945f960cfb
commit 154bf6aab3
12 changed files with 76 additions and 52 deletions

View File

@ -101,7 +101,7 @@ func WithPriorityAndFairness(
} }
var classification *PriorityAndFairnessClassification var classification *PriorityAndFairnessClassification
note := func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) { estimateWork := func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) flowcontrolrequest.WorkEstimate {
classification = &PriorityAndFairnessClassification{ classification = &PriorityAndFairnessClassification{
FlowSchemaName: fs.Name, FlowSchemaName: fs.Name,
FlowSchemaUID: fs.UID, FlowSchemaUID: fs.UID,
@ -111,6 +111,7 @@ func WithPriorityAndFairness(
httplog.AddKeyValue(ctx, "apf_pl", truncateLogField(pl.Name)) httplog.AddKeyValue(ctx, "apf_pl", truncateLogField(pl.Name))
httplog.AddKeyValue(ctx, "apf_fs", truncateLogField(fs.Name)) httplog.AddKeyValue(ctx, "apf_fs", truncateLogField(fs.Name))
httplog.AddKeyValue(ctx, "apf_fd", truncateLogField(flowDistinguisher)) httplog.AddKeyValue(ctx, "apf_fd", truncateLogField(flowDistinguisher))
return workEstimator(r, fs.Name, pl.Name)
} }
var served bool var served bool
@ -137,13 +138,9 @@ func WithPriorityAndFairness(
} }
} }
// find the estimated amount of work of the request
// TODO: Estimate cost should also take fcIfc.GetWatchCount(requestInfo) as a parameter.
workEstimate := workEstimator.EstimateWork(r)
digest := utilflowcontrol.RequestDigest{ digest := utilflowcontrol.RequestDigest{
RequestInfo: requestInfo, RequestInfo: requestInfo,
User: user, User: user,
WorkEstimate: workEstimate,
} }
if isWatchRequest { if isWatchRequest {
@ -179,7 +176,7 @@ func WithPriorityAndFairness(
execute := func() { execute := func() {
startedAt := time.Now() startedAt := time.Now()
defer func() { defer func() {
httplog.AddKeyValue(ctx, "apf_init_latency", time.Now().Sub(startedAt)) httplog.AddKeyValue(ctx, "apf_init_latency", time.Since(startedAt))
}() }()
noteExecutingDelta(1) noteExecutingDelta(1)
defer noteExecutingDelta(-1) defer noteExecutingDelta(-1)
@ -238,7 +235,7 @@ func WithPriorityAndFairness(
// Note that Handle will return irrespective of whether the request // Note that Handle will return irrespective of whether the request
// executes or is rejected. In the latter case, the function will return // executes or is rejected. In the latter case, the function will return
// without calling the passed `execute` function. // without calling the passed `execute` function.
fcIfc.Handle(handleCtx, digest, note, queueNote, execute) fcIfc.Handle(handleCtx, digest, estimateWork, queueNote, execute)
}() }()
select { select {
@ -269,7 +266,7 @@ func WithPriorityAndFairness(
handler.ServeHTTP(w, r) handler.ServeHTTP(w, r)
} }
fcIfc.Handle(ctx, digest, note, queueNote, execute) fcIfc.Handle(ctx, digest, estimateWork, queueNote, execute)
} }
if !served { if !served {

View File

@ -70,7 +70,7 @@ const (
decisionSkipFilter decisionSkipFilter
) )
var defaultRequestWorkEstimator = func(*http.Request) fcrequest.WorkEstimate { var defaultRequestWorkEstimator = func(req *http.Request, fsName, plName string) fcrequest.WorkEstimate {
return fcrequest.WorkEstimate{InitialSeats: 1} return fcrequest.WorkEstimate{InitialSeats: 1}
} }
@ -87,14 +87,14 @@ func (t fakeApfFilter) MaintainObservations(stopCh <-chan struct{}) {
func (t fakeApfFilter) Handle(ctx context.Context, func (t fakeApfFilter) Handle(ctx context.Context,
requestDigest utilflowcontrol.RequestDigest, requestDigest utilflowcontrol.RequestDigest,
noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate,
queueNoteFn fq.QueueNoteFn, queueNoteFn fq.QueueNoteFn,
execFn func(), execFn func(),
) { ) {
if t.mockDecision == decisionSkipFilter { if t.mockDecision == decisionSkipFilter {
panic("Handle should not be invoked") panic("Handle should not be invoked")
} }
noteFn(bootstrap.SuggestedFlowSchemaGlobalDefault, bootstrap.SuggestedPriorityLevelConfigurationGlobalDefault, requestDigest.User.GetName()) workEstimator(bootstrap.SuggestedFlowSchemaGlobalDefault, bootstrap.SuggestedPriorityLevelConfigurationGlobalDefault, requestDigest.User.GetName())
switch t.mockDecision { switch t.mockDecision {
case decisionNoQueuingExecute: case decisionNoQueuingExecute:
execFn() execFn()
@ -390,7 +390,7 @@ func newFakeWatchApfFilter(capacity int) *fakeWatchApfFilter {
func (f *fakeWatchApfFilter) Handle(ctx context.Context, func (f *fakeWatchApfFilter) Handle(ctx context.Context,
requestDigest utilflowcontrol.RequestDigest, requestDigest utilflowcontrol.RequestDigest,
_ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), _ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate,
_ fq.QueueNoteFn, _ fq.QueueNoteFn,
execFn func(), execFn func(),
) { ) {
@ -635,14 +635,16 @@ func TestContextClosesOnRequestProcessed(t *testing.T) {
type fakeFilterRequestDigest struct { type fakeFilterRequestDigest struct {
*fakeApfFilter *fakeApfFilter
requestDigestGot *utilflowcontrol.RequestDigest requestDigestGot *utilflowcontrol.RequestDigest
workEstimateGot fcrequest.WorkEstimate
} }
func (f *fakeFilterRequestDigest) Handle(ctx context.Context, func (f *fakeFilterRequestDigest) Handle(ctx context.Context,
requestDigest utilflowcontrol.RequestDigest, requestDigest utilflowcontrol.RequestDigest,
_ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate,
_ fq.QueueNoteFn, _ func(), _ fq.QueueNoteFn, _ func(),
) { ) {
f.requestDigestGot = &requestDigest f.requestDigestGot = &requestDigest
f.workEstimateGot = workEstimator(bootstrap.MandatoryFlowSchemaCatchAll, bootstrap.MandatoryPriorityLevelConfigurationCatchAll, "")
} }
func TestApfWithRequestDigest(t *testing.T) { func TestApfWithRequestDigest(t *testing.T) {
@ -652,17 +654,17 @@ func TestApfWithRequestDigest(t *testing.T) {
reqDigestExpected := &utilflowcontrol.RequestDigest{ reqDigestExpected := &utilflowcontrol.RequestDigest{
RequestInfo: &apirequest.RequestInfo{Verb: "get"}, RequestInfo: &apirequest.RequestInfo{Verb: "get"},
User: &user.DefaultInfo{Name: "foo"}, User: &user.DefaultInfo{Name: "foo"},
WorkEstimate: fcrequest.WorkEstimate{ }
workExpected := fcrequest.WorkEstimate{
InitialSeats: 5, InitialSeats: 5,
FinalSeats: 7, FinalSeats: 7,
AdditionalLatency: 3 * time.Second, AdditionalLatency: 3 * time.Second,
},
} }
handler := WithPriorityAndFairness(http.HandlerFunc(func(_ http.ResponseWriter, req *http.Request) {}), handler := WithPriorityAndFairness(http.HandlerFunc(func(_ http.ResponseWriter, req *http.Request) {}),
longRunningFunc, longRunningFunc,
fakeFilter, fakeFilter,
func(_ *http.Request) fcrequest.WorkEstimate { return reqDigestExpected.WorkEstimate }, func(_ *http.Request, _, _ string) fcrequest.WorkEstimate { return workExpected },
) )
w := httptest.NewRecorder() w := httptest.NewRecorder()
@ -678,6 +680,9 @@ func TestApfWithRequestDigest(t *testing.T) {
if !reflect.DeepEqual(reqDigestExpected, fakeFilter.requestDigestGot) { if !reflect.DeepEqual(reqDigestExpected, fakeFilter.requestDigestGot) {
t.Errorf("Expected RequestDigest to match, diff: %s", cmp.Diff(reqDigestExpected, fakeFilter.requestDigestGot)) t.Errorf("Expected RequestDigest to match, diff: %s", cmp.Diff(reqDigestExpected, fakeFilter.requestDigestGot))
} }
if !reflect.DeepEqual(workExpected, fakeFilter.workEstimateGot) {
t.Errorf("Expected WorkEstimate to match, diff: %s", cmp.Diff(workExpected, fakeFilter.workEstimateGot))
}
} }
func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) { func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {

View File

@ -91,7 +91,6 @@ type StartFunction func(ctx context.Context, hashValue uint64) (execute bool, af
type RequestDigest struct { type RequestDigest struct {
RequestInfo *request.RequestInfo RequestInfo *request.RequestInfo
User user.Info User user.Info
WorkEstimate fcrequest.WorkEstimate
} }
// `*configController` maintains eventual consistency with the API // `*configController` maintains eventual consistency with the API
@ -800,7 +799,7 @@ func (immediateRequest) Finish(execute func()) bool {
// The returned bool indicates whether the request is exempt from // The returned bool indicates whether the request is exempt from
// limitation. The startWaitingTime is when the request started // limitation. The startWaitingTime is when the request started
// waiting in its queue, or `Time{}` if this did not happen. // waiting in its queue, or `Time{}` if this did not happen.
func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time, flowDistinguisher string) { func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate, queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) {
klog.V(7).Infof("startRequest(%#+v)", rd) klog.V(7).Infof("startRequest(%#+v)", rd)
cfgCtlr.lock.RLock() cfgCtlr.lock.RLock()
defer cfgCtlr.lock.RUnlock() defer cfgCtlr.lock.RUnlock()
@ -832,24 +831,26 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig
plState := cfgCtlr.priorityLevelStates[plName] plState := cfgCtlr.priorityLevelStates[plName]
if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt { if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt {
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName) klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName)
return selectedFlowSchema, plState.pl, true, immediateRequest{}, time.Time{}, "" return selectedFlowSchema, plState.pl, true, immediateRequest{}, time.Time{}
} }
var numQueues int32 var numQueues int32
if plState.pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeQueue { if plState.pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeQueue {
numQueues = plState.pl.Spec.Limited.LimitResponse.Queuing.Queues numQueues = plState.pl.Spec.Limited.LimitResponse.Queuing.Queues
} }
var flowDistinguisher string
var hashValue uint64 var hashValue uint64
if numQueues > 1 { if numQueues > 1 {
flowDistinguisher = computeFlowDistinguisher(rd, selectedFlowSchema.Spec.DistinguisherMethod) flowDistinguisher = computeFlowDistinguisher(rd, selectedFlowSchema.Spec.DistinguisherMethod)
hashValue = hashFlowID(selectedFlowSchema.Name, flowDistinguisher) hashValue = hashFlowID(selectedFlowSchema.Name, flowDistinguisher)
} }
workEstimate := workEstimator(selectedFlowSchema, plState.pl, flowDistinguisher)
startWaitingTime = time.Now() startWaitingTime = time.Now()
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues) klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues)
req, idle := plState.queues.StartRequest(ctx, &rd.WorkEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn) req, idle := plState.queues.StartRequest(ctx, &workEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn)
if idle { if idle {
cfgCtlr.maybeReapReadLocked(plName, plState) cfgCtlr.maybeReapReadLocked(plName, plState)
} }
return selectedFlowSchema, plState.pl, false, req, startWaitingTime, flowDistinguisher return selectedFlowSchema, plState.pl, false, req, startWaitingTime
} }
// maybeReap will remove the last internal traces of the named // maybeReap will remove the last internal traces of the named

View File

@ -26,6 +26,7 @@ import (
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/eventclock" "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/eventclock"
fqs "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset" fqs "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics" "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
kubeinformers "k8s.io/client-go/informers" kubeinformers "k8s.io/client-go/informers"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/utils/clock" "k8s.io/utils/clock"
@ -41,8 +42,9 @@ const ConfigConsumerAsFieldManager = "api-priority-and-fairness-config-consumer-
// Interface defines how the API Priority and Fairness filter interacts with the underlying system. // Interface defines how the API Priority and Fairness filter interacts with the underlying system.
type Interface interface { type Interface interface {
// Handle takes care of queuing and dispatching a request // Handle takes care of queuing and dispatching a request
// characterized by the given digest. The given `noteFn` will be // characterized by the given digest. The given `workEstimator` will be
// invoked with the results of request classification. If the // invoked with the results of request classification and must return the
// work parameters for the request. If the
// request is queued then `queueNoteFn` will be called twice, // request is queued then `queueNoteFn` will be called twice,
// first with `true` and then with `false`; otherwise // first with `true` and then with `false`; otherwise
// `queueNoteFn` will not be called at all. If Handle decides // `queueNoteFn` will not be called at all. If Handle decides
@ -53,7 +55,7 @@ type Interface interface {
// ctx is cancelled or times out. // ctx is cancelled or times out.
Handle(ctx context.Context, Handle(ctx context.Context,
requestDigest RequestDigest, requestDigest RequestDigest,
noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate,
queueNoteFn fq.QueueNoteFn, queueNoteFn fq.QueueNoteFn,
execFn func(), execFn func(),
) )
@ -148,12 +150,11 @@ func NewTestable(config TestableConfig) Interface {
} }
func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest RequestDigest, func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest RequestDigest,
noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string), workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate,
queueNoteFn fq.QueueNoteFn, queueNoteFn fq.QueueNoteFn,
execFn func()) { execFn func()) {
fs, pl, isExempt, req, startWaitingTime, flowDistinguisher := cfgCtlr.startRequest(ctx, requestDigest, queueNoteFn) fs, pl, isExempt, req, startWaitingTime := cfgCtlr.startRequest(ctx, requestDigest, workEstimator, queueNoteFn)
queued := startWaitingTime != time.Time{} queued := startWaitingTime != time.Time{}
noteFn(fs, pl, flowDistinguisher)
if req == nil { if req == nil {
if queued { if queued {
metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))

View File

@ -462,7 +462,7 @@ func checkNewFS(cts *ctlrTestState, rng *rand.Rand, trialName string, ftr *fsTes
startWG.Add(1) startWG.Add(1)
go func(matches, isResource bool, rdu RequestDigest) { go func(matches, isResource bool, rdu RequestDigest) {
expectedMatch := matches && ftr.wellFormed && (fsPrecedes(fs, catchAlls[isResource]) || fs.Name == catchAlls[isResource].Name) expectedMatch := matches && ftr.wellFormed && (fsPrecedes(fs, catchAlls[isResource]) || fs.Name == catchAlls[isResource].Name)
ctlr.Handle(ctx, rdu, func(matchFS *flowcontrol.FlowSchema, matchPL *flowcontrol.PriorityLevelConfiguration, _ string) { ctlr.Handle(ctx, rdu, func(matchFS *flowcontrol.FlowSchema, matchPL *flowcontrol.PriorityLevelConfiguration, _ string) fcrequest.WorkEstimate {
matchIsExempt := matchPL.Spec.Type == flowcontrol.PriorityLevelEnablementExempt matchIsExempt := matchPL.Spec.Type == flowcontrol.PriorityLevelEnablementExempt
if testDebugLogs { if testDebugLogs {
t.Logf("Considering FlowSchema %s, expectedMatch=%v, isResource=%v: Handle(%#+v) => note(fs=%s, pl=%s, isExempt=%v)", fs.Name, expectedMatch, isResource, rdu, matchFS.Name, matchPL.Name, matchIsExempt) t.Logf("Considering FlowSchema %s, expectedMatch=%v, isResource=%v: Handle(%#+v) => note(fs=%s, pl=%s, isExempt=%v)", fs.Name, expectedMatch, isResource, rdu, matchFS.Name, matchPL.Name, matchIsExempt)
@ -475,6 +475,7 @@ func checkNewFS(cts *ctlrTestState, rng *rand.Rand, trialName string, ftr *fsTes
t.Errorf("Fail at %s/%s: expected=%v, actual=%v", trialName, fs.Name, fs.Spec.PriorityLevelConfiguration.Name, matchPL.Name) t.Errorf("Fail at %s/%s: expected=%v, actual=%v", trialName, fs.Name, fs.Spec.PriorityLevelConfiguration.Name, matchPL.Name)
} }
} }
return fcrequest.WorkEstimate{InitialSeats: 1}
}, func(inQueue bool) { }, func(inQueue bool) {
}, func() { }, func() {
startWG.Done() startWG.Done()

View File

@ -27,7 +27,6 @@ import (
"k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/endpoints/request"
fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format" fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format"
fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
) )
func TestMatching(t *testing.T) { func TestMatching(t *testing.T) {
@ -103,7 +102,6 @@ func TestLiterals(t *testing.T) {
Parts: []string{"goodrscs", "eman"}, Parts: []string{"goodrscs", "eman"},
}, },
User: ui, User: ui,
WorkEstimate: fcrequest.WorkEstimate{InitialSeats: 1},
} }
reqRU := RequestDigest{ reqRU := RequestDigest{
RequestInfo: &request.RequestInfo{ RequestInfo: &request.RequestInfo{
@ -119,7 +117,6 @@ func TestLiterals(t *testing.T) {
Parts: []string{"goodrscs", "eman"}, Parts: []string{"goodrscs", "eman"},
}, },
User: ui, User: ui,
WorkEstimate: fcrequest.WorkEstimate{InitialSeats: 1},
} }
reqN := RequestDigest{ reqN := RequestDigest{
RequestInfo: &request.RequestInfo{ RequestInfo: &request.RequestInfo{
@ -128,7 +125,6 @@ func TestLiterals(t *testing.T) {
Verb: "goodverb", Verb: "goodverb",
}, },
User: ui, User: ui,
WorkEstimate: fcrequest.WorkEstimate{InitialSeats: 1},
} }
checkRules(t, true, reqRN, []flowcontrol.PolicyRulesWithSubjects{{ checkRules(t, true, reqRN, []flowcontrol.PolicyRulesWithSubjects{{
Subjects: []flowcontrol.Subject{{Kind: flowcontrol.SubjectKindUser, Subjects: []flowcontrol.Subject{{Kind: flowcontrol.SubjectKindUser,

View File

@ -289,6 +289,17 @@ var (
}, },
[]string{priorityLevel, flowSchema}, []string{priorityLevel, flowSchema},
) )
watchCountSamples = compbasemetrics.NewHistogramVec(
&compbasemetrics.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "watch_count_samples",
Help: "count of watchers for mutating requests in API Priority and Fairness",
Buckets: []float64{0, 1, 10, 100, 1000, 10000},
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{priorityLevel, flowSchema},
)
apiserverEpochAdvances = compbasemetrics.NewCounterVec( apiserverEpochAdvances = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{ &compbasemetrics.CounterOpts{
Namespace: namespace, Namespace: namespace,
@ -315,6 +326,7 @@ var (
apiserverCurrentExecutingRequests, apiserverCurrentExecutingRequests,
apiserverRequestWaitingSeconds, apiserverRequestWaitingSeconds,
apiserverRequestExecutionSeconds, apiserverRequestExecutionSeconds,
watchCountSamples,
apiserverEpochAdvances, apiserverEpochAdvances,
}. }.
Append(PriorityLevelExecutionSeatsObserverGenerator.metrics()...). Append(PriorityLevelExecutionSeatsObserverGenerator.metrics()...).
@ -383,6 +395,12 @@ func ObserveExecutionDuration(ctx context.Context, priorityLevel, flowSchema str
apiserverRequestExecutionSeconds.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema).Observe(executionTime.Seconds()) apiserverRequestExecutionSeconds.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema).Observe(executionTime.Seconds())
} }
// ObserveWatchCount notes a sampling of a watch count
func ObserveWatchCount(ctx context.Context, priorityLevel, flowSchema string, count int) {
watchCountSamples.WithLabelValues(priorityLevel, flowSchema).Observe(float64(count))
}
// AddEpochAdvance notes an advance of the progress meter baseline for a given priority level
func AddEpochAdvance(ctx context.Context, priorityLevel string, success bool) { func AddEpochAdvance(ctx context.Context, priorityLevel string, success bool) {
apiserverEpochAdvances.WithContext(ctx).WithLabelValues(priorityLevel, strconv.FormatBool(success)).Inc() apiserverEpochAdvances.WithContext(ctx).WithLabelValues(priorityLevel, strconv.FormatBool(success)).Inc()
} }

View File

@ -40,7 +40,7 @@ type listWorkEstimator struct {
countGetterFn objectCountGetterFunc countGetterFn objectCountGetterFunc
} }
func (e *listWorkEstimator) estimate(r *http.Request) WorkEstimate { func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate {
requestInfo, ok := apirequest.RequestInfoFrom(r.Context()) requestInfo, ok := apirequest.RequestInfoFrom(r.Context())
if !ok { if !ok {
// no RequestInfo should never happen, but to be on the safe side // no RequestInfo should never happen, but to be on the safe side

View File

@ -22,6 +22,7 @@ import (
"time" "time"
apirequest "k8s.io/apiserver/pkg/endpoints/request" apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
) )
const ( const (
@ -49,7 +50,9 @@ type mutatingWorkEstimator struct {
enabled bool enabled bool
} }
func (e *mutatingWorkEstimator) estimate(r *http.Request) WorkEstimate { func (e *mutatingWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate {
// TODO(wojtekt): Remove once we tune the algorithm to not fail
// scalability tests.
if !e.enabled { if !e.enabled {
return WorkEstimate{ return WorkEstimate{
InitialSeats: 1, InitialSeats: 1,
@ -67,6 +70,7 @@ func (e *mutatingWorkEstimator) estimate(r *http.Request) WorkEstimate {
} }
} }
watchCount := e.countFn(requestInfo) watchCount := e.countFn(requestInfo)
metrics.ObserveWatchCount(r.Context(), priorityLevelName, flowSchemaName, watchCount)
// The cost of the request associated with the watchers of that event // The cost of the request associated with the watchers of that event
// consists of three parts: // consists of three parts:

View File

@ -83,10 +83,10 @@ func NewWorkEstimator(objectCountFn objectCountGetterFunc, watchCountFn watchCou
// WorkEstimatorFunc returns the estimated work of a given request. // WorkEstimatorFunc returns the estimated work of a given request.
// This function will be used by the Priority & Fairness filter to // This function will be used by the Priority & Fairness filter to
// estimate the work of of incoming requests. // estimate the work of of incoming requests.
type WorkEstimatorFunc func(*http.Request) WorkEstimate type WorkEstimatorFunc func(request *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate
func (e WorkEstimatorFunc) EstimateWork(r *http.Request) WorkEstimate { func (e WorkEstimatorFunc) EstimateWork(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate {
return e(r) return e(r, flowSchemaName, priorityLevelName)
} }
type workEstimator struct { type workEstimator struct {
@ -96,7 +96,7 @@ type workEstimator struct {
mutatingWorkEstimator WorkEstimatorFunc mutatingWorkEstimator WorkEstimatorFunc
} }
func (e *workEstimator) estimate(r *http.Request) WorkEstimate { func (e *workEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate {
requestInfo, ok := apirequest.RequestInfoFrom(r.Context()) requestInfo, ok := apirequest.RequestInfoFrom(r.Context())
if !ok { if !ok {
klog.ErrorS(fmt.Errorf("no RequestInfo found in context"), "Failed to estimate work for the request", "URI", r.RequestURI) klog.ErrorS(fmt.Errorf("no RequestInfo found in context"), "Failed to estimate work for the request", "URI", r.RequestURI)
@ -106,9 +106,9 @@ func (e *workEstimator) estimate(r *http.Request) WorkEstimate {
switch requestInfo.Verb { switch requestInfo.Verb {
case "list": case "list":
return e.listWorkEstimator.EstimateWork(r) return e.listWorkEstimator.EstimateWork(r, flowSchemaName, priorityLevelName)
case "create", "update", "patch", "delete": case "create", "update", "patch", "delete":
return e.mutatingWorkEstimator.EstimateWork(r) return e.mutatingWorkEstimator.EstimateWork(r, flowSchemaName, priorityLevelName)
} }
return WorkEstimate{InitialSeats: minimumSeats} return WorkEstimate{InitialSeats: minimumSeats}

View File

@ -410,7 +410,7 @@ func TestWorkEstimator(t *testing.T) {
req = req.WithContext(apirequest.WithRequestInfo(req.Context(), test.requestInfo)) req = req.WithContext(apirequest.WithRequestInfo(req.Context(), test.requestInfo))
} }
workestimateGot := estimator.EstimateWork(req) workestimateGot := estimator.EstimateWork(req, "testFS", "testPL")
if test.initialSeatsExpected != workestimateGot.InitialSeats { if test.initialSeatsExpected != workestimateGot.InitialSeats {
t.Errorf("Expected work estimate to match: %d initial seats, but got: %d", test.initialSeatsExpected, workestimateGot.InitialSeats) t.Errorf("Expected work estimate to match: %d initial seats, but got: %d", test.initialSeatsExpected, workestimateGot.InitialSeats)
} }

View File

@ -139,7 +139,8 @@ func (ft *fightTest) createController(invert bool, i int) {
FlowcontrolClient: fcIfc, FlowcontrolClient: fcIfc,
ServerConcurrencyLimit: 200, // server concurrency limit ServerConcurrencyLimit: 200, // server concurrency limit
RequestWaitLimit: time.Minute / 4, // request wait limit RequestWaitLimit: time.Minute / 4, // request wait limit
ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, ReqsObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator,
ExecSeatsObsGenerator: metrics.PriorityLevelExecutionSeatsObserverGenerator,
QueueSetFactory: fqtesting.NewNoRestraintFactory(), QueueSetFactory: fqtesting.NewNoRestraintFactory(),
}) })
ft.ctlrs[invert][i] = ctlr ft.ctlrs[invert][i] = ctlr