diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index c4bbe15119e..b9a6c54bdfe 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -222,9 +222,6 @@ type Config struct { // If not specify any in flags, then genericapiserver will only enable defaultAPIResourceConfig. MergedResourceConfig *serverstore.ResourceConfig - // RequestWidthEstimator is used to estimate the "width" of the incoming request(s). - RequestWidthEstimator flowcontrolrequest.WidthEstimatorFunc - // lifecycleSignals provides access to the various signals // that happen during lifecycle of the apiserver. // it's intentionally marked private as it should never be overridden. @@ -352,9 +349,8 @@ func NewConfig(codecs serializer.CodecFactory) *Config { // Default to treating watch as a long-running operation // Generic API servers have no inherent long-running subresources - LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()), - RequestWidthEstimator: flowcontrolrequest.DefaultWidthEstimator, - lifecycleSignals: newLifecycleSignals(), + LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()), + lifecycleSignals: newLifecycleSignals(), APIServerID: id, StorageVersionManager: storageversion.NewDefaultManager(), @@ -747,7 +743,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { if c.FlowControl != nil { handler = filterlatency.TrackCompleted(handler) - handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, c.RequestWidthEstimator) + handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, flowcontrolrequest.DefaultWorkEstimator) handler = filterlatency.TrackStarted(handler, "priorityandfairness") } else { handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go index c89cbbd86a1..ad043d7f427 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go @@ -61,7 +61,7 @@ func WithPriorityAndFairness( handler http.Handler, longRunningRequestCheck apirequest.LongRunningRequestCheck, fcIfc utilflowcontrol.Interface, - widthEstimator flowcontrolrequest.WidthEstimatorFunc, + workEstimator flowcontrolrequest.WorkEstimatorFunc, ) http.Handler { if fcIfc == nil { klog.Warningf("priority and fairness support not found, skipping") @@ -122,11 +122,14 @@ func WithPriorityAndFairness( } } - // find the estimated "width" of the request - // TODO: Maybe just make it costEstimator and let it return additionalLatency too for the watch? + // find the estimated amount of work of the request // TODO: Estimate cost should also take fcIfc.GetWatchCount(requestInfo) as a parameter. - width := widthEstimator.EstimateWidth(r) - digest := utilflowcontrol.RequestDigest{RequestInfo: requestInfo, User: user, Width: width} + workEstimate := workEstimator.EstimateWork(r) + digest := utilflowcontrol.RequestDigest{ + RequestInfo: requestInfo, + User: user, + WorkEstimate: workEstimate, + } if isWatchRequest { // This channel blocks calling handler.ServeHTTP() until closed, and is closed inside execute(). diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index 7ebf1c5167b..91b2c9dde97 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -70,7 +70,9 @@ const ( decisionSkipFilter ) -var defaultRequestWidthEstimator = func(*http.Request) fcrequest.Width { return fcrequest.Width{Seats: 1} } +var defaultRequestWorkEstimator = func(*http.Request) fcrequest.WorkEstimate { + return fcrequest.WorkEstimate{Seats: 1} +} type fakeApfFilter struct { mockDecision mockDecision @@ -165,7 +167,7 @@ func newApfHandlerWithFilter(t *testing.T, flowControlFilter utilflowcontrol.Int apfHandler := WithPriorityAndFairness(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { onExecute() - }), longRunningRequestCheck, flowControlFilter, defaultRequestWidthEstimator) + }), longRunningRequestCheck, flowControlFilter, defaultRequestWorkEstimator) handler := apifilters.WithRequestInfo(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { r = r.WithContext(apirequest.WithUser(r.Context(), &user.DefaultInfo{ @@ -649,7 +651,7 @@ func TestApfWithRequestDigest(t *testing.T) { reqDigestExpected := &utilflowcontrol.RequestDigest{ RequestInfo: &apirequest.RequestInfo{Verb: "get"}, User: &user.DefaultInfo{Name: "foo"}, - Width: fcrequest.Width{ + WorkEstimate: fcrequest.WorkEstimate{ Seats: 5, }, } @@ -657,7 +659,7 @@ func TestApfWithRequestDigest(t *testing.T) { handler := WithPriorityAndFairness(http.HandlerFunc(func(_ http.ResponseWriter, req *http.Request) {}), longRunningFunc, fakeFilter, - func(_ *http.Request) fcrequest.Width { return reqDigestExpected.Width }, + func(_ *http.Request) fcrequest.WorkEstimate { return reqDigestExpected.WorkEstimate }, ) w := httptest.NewRecorder() @@ -1171,7 +1173,7 @@ func newHandlerChain(t *testing.T, handler http.Handler, filter utilflowcontrol. requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")} longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy")) - apfHandler := WithPriorityAndFairness(handler, longRunningRequestCheck, filter, defaultRequestWidthEstimator) + apfHandler := WithPriorityAndFairness(handler, longRunningRequestCheck, filter, defaultRequestWorkEstimator) // add the handler in the chain that adds the specified user to the request context handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index bd9fd9ad921..919bda40e96 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -80,9 +80,9 @@ type StartFunction func(ctx context.Context, hashValue uint64) (execute bool, af // RequestDigest holds necessary info from request for flow-control type RequestDigest struct { - RequestInfo *request.RequestInfo - User user.Info - Width fcrequest.Width + RequestInfo *request.RequestInfo + User user.Info + WorkEstimate fcrequest.WorkEstimate } // `*configController` maintains eventual consistency with the API @@ -327,7 +327,7 @@ func (cfgCtlr *configController) processNextWorkItem() bool { func(obj interface{}) { defer cfgCtlr.configQueue.Done(obj) - specificDelay, err := cfgCtlr.syncOne(map[string]string{}) + specificDelay, err := cfgCtlr.syncOne() switch { case err != nil: klog.Error(err) @@ -346,7 +346,7 @@ func (cfgCtlr *configController) processNextWorkItem() bool { // objects that configure API Priority and Fairness and updates the // local configController accordingly. // Only invoke this in the one and only worker goroutine -func (cfgCtlr *configController) syncOne(flowSchemaRVs map[string]string) (specificDelay time.Duration, err error) { +func (cfgCtlr *configController) syncOne() (specificDelay time.Duration, err error) { klog.V(5).Infof("%s syncOne at %s", cfgCtlr.name, cfgCtlr.clock.Now().Format(timeFmt)) all := labels.Everything() newPLs, err := cfgCtlr.plLister.List(all) @@ -357,7 +357,7 @@ func (cfgCtlr *configController) syncOne(flowSchemaRVs map[string]string) (speci if err != nil { return 0, fmt.Errorf("unable to list FlowSchema objects: %w", err) } - return cfgCtlr.digestConfigObjects(newPLs, newFSs, flowSchemaRVs) + return cfgCtlr.digestConfigObjects(newPLs, newFSs) } // cfgMeal is the data involved in the process of digesting the API @@ -398,7 +398,7 @@ type fsStatusUpdate struct { // digestConfigObjects is given all the API objects that configure // cfgCtlr and writes its consequent new configState. // Only invoke this in the one and only worker goroutine -func (cfgCtlr *configController) digestConfigObjects(newPLs []*flowcontrol.PriorityLevelConfiguration, newFSs []*flowcontrol.FlowSchema, flowSchemaRVs map[string]string) (time.Duration, error) { +func (cfgCtlr *configController) digestConfigObjects(newPLs []*flowcontrol.PriorityLevelConfiguration, newFSs []*flowcontrol.FlowSchema) (time.Duration, error) { fsStatusUpdates := cfgCtlr.lockAndDigestConfigObjects(newPLs, newFSs) var errs []error currResult := updateAttempt{ @@ -427,16 +427,15 @@ func (cfgCtlr *configController) digestConfigObjects(newPLs []*flowcontrol.Prior fsIfc := cfgCtlr.flowcontrolClient.FlowSchemas() patchBytes := []byte(fmt.Sprintf(`{"status": {"conditions": [ %s ] } }`, string(enc))) patchOptions := metav1.PatchOptions{FieldManager: cfgCtlr.asFieldManager} - patchedFlowSchema, err := fsIfc.Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, patchBytes, patchOptions, "status") - if err == nil { - key, _ := cache.MetaNamespaceKeyFunc(patchedFlowSchema) - flowSchemaRVs[key] = patchedFlowSchema.ResourceVersion - } else if apierrors.IsNotFound(err) { - // This object has been deleted. A notification is coming - // and nothing more needs to be done here. - klog.V(5).Infof("%s at %s: attempted update of concurrently deleted FlowSchema %s; nothing more needs to be done", cfgCtlr.name, cfgCtlr.clock.Now().Format(timeFmt), fsu.flowSchema.Name) - } else { - errs = append(errs, fmt.Errorf("failed to set a status.condition for FlowSchema %s: %w", fsu.flowSchema.Name, err)) + _, err = fsIfc.Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, patchBytes, patchOptions, "status") + if err != nil { + if apierrors.IsNotFound(err) { + // This object has been deleted. A notification is coming + // and nothing more needs to be done here. + klog.V(5).Infof("%s at %s: attempted update of concurrently deleted FlowSchema %s; nothing more needs to be done", cfgCtlr.name, cfgCtlr.clock.Now().Format(timeFmt), fsu.flowSchema.Name) + } else { + errs = append(errs, fmt.Errorf("failed to set a status.condition for FlowSchema %s: %w", fsu.flowSchema.Name, err)) + } } } cfgCtlr.addUpdateResult(currResult) @@ -809,7 +808,7 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig } startWaitingTime = time.Now() klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues) - req, idle := plState.queues.StartRequest(ctx, &rd.Width, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn) + req, idle := plState.queues.StartRequest(ctx, &rd.WorkEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn) if idle { cfgCtlr.maybeReapLocked(plName, plState) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go index 08b7722b0cd..9766bde7755 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go @@ -140,7 +140,7 @@ func (cqs *ctlrTestQueueSet) IsIdle() bool { return cqs.countActive == 0 } -func (cqs *ctlrTestQueueSet) StartRequest(ctx context.Context, width *fcrequest.Width, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (req fq.Request, idle bool) { +func (cqs *ctlrTestQueueSet) StartRequest(ctx context.Context, width *fcrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (req fq.Request, idle bool) { cqs.cts.lock.Lock() defer cqs.cts.lock.Unlock() cqs.countActive++ diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go index 2de44c350d1..add5d4cd185 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go @@ -81,7 +81,7 @@ type QueueSet interface { // was idle at the moment of the return. Otherwise idle==false // and the client must call the Finish method of the Request // exactly once. - StartRequest(ctx context.Context, width *request.Width, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn QueueNoteFn) (req Request, idle bool) + StartRequest(ctx context.Context, width *request.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn QueueNoteFn) (req Request, idle bool) // UpdateObservations makes sure any time-based statistics have // caught up with the current clock reading diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go index 4abedd1d789..bdaca72c8be 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list_test.go @@ -154,7 +154,7 @@ func TestFIFOSeatsSum(t *testing.T) { list := newRequestFIFO() newRequest := func(width uint) *request { - return &request{width: fcrequest.Width{Seats: width}} + return &request{workEstimate: fcrequest.WorkEstimate{Seats: width}} } arrival := []*request{newRequest(1), newRequest(2), newRequest(3)} removeFn := make([]removeFromFIFOFunc, 0) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index e4938e9f2e0..a5c7b1a4d20 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -235,7 +235,7 @@ const ( // executing at each point where there is a change in that quantity, // because the metrics --- and only the metrics --- track that // quantity per FlowSchema. -func (qs *queueSet) StartRequest(ctx context.Context, width *fqrequest.Width, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) { +func (qs *queueSet) StartRequest(ctx context.Context, workEstimate *fqrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) { qs.lockAndSyncTime() defer qs.lock.Unlock() var req *request @@ -244,13 +244,13 @@ func (qs *queueSet) StartRequest(ctx context.Context, width *fqrequest.Width, ha // Step 0: // Apply only concurrency limit, if zero queues desired if qs.qCfg.DesiredNumQueues < 1 { - if !qs.canAccommodateSeatsLocked(int(width.Seats)) { + if !qs.canAccommodateSeatsLocked(int(workEstimate.Seats)) { klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d seats are asked for, %d seats are in use (%d are executing) and the limit is %d", - qs.qCfg.Name, fsName, descr1, descr2, width, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit) + qs.qCfg.Name, fsName, descr1, descr2, workEstimate, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit) metrics.AddReject(ctx, qs.qCfg.Name, fsName, "concurrency-limit") return nil, qs.isIdleLocked() } - req = qs.dispatchSansQueueLocked(ctx, width, flowDistinguisher, fsName, descr1, descr2) + req = qs.dispatchSansQueueLocked(ctx, workEstimate, flowDistinguisher, fsName, descr1, descr2) return req, false } @@ -261,7 +261,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, width *fqrequest.Width, ha // 3) Reject current request if there is not enough concurrency shares and // we are at max queue length // 4) If not rejected, create a request and enqueue - req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, width, hashValue, flowDistinguisher, fsName, descr1, descr2, queueNoteFn) + req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, workEstimate, hashValue, flowDistinguisher, fsName, descr1, descr2, queueNoteFn) // req == nil means that the request was rejected - no remaining // concurrency shares and at max queue length already if req == nil { @@ -316,7 +316,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, width *fqrequest.Width, ha // Seats returns the number of seats this request requires. func (req *request) Seats() int { - return int(req.width.Seats) + return int(req.workEstimate.Seats) } func (req *request) NoteQueued(inQueue bool) { @@ -437,7 +437,7 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 { // returns the enqueud request on a successful enqueue // returns nil in the case that there is no available concurrency or // the queuelengthlimit has been reached -func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, width *fqrequest.Width, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request { +func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, workEstimate *fqrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request { // Start with the shuffle sharding, to pick a queue. queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2) queue := qs.queues[queueIdx] @@ -459,7 +459,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte descr1: descr1, descr2: descr2, queueNoteFn: queueNoteFn, - width: *width, + workEstimate: *workEstimate, } if ok := qs.rejectOrEnqueueLocked(req); !ok { return nil @@ -476,7 +476,7 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte // the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`. qs.dealer.Deal(hashValue, func(queueIdx int) { // TODO: Consider taking into account `additional latency` of requests - // in addition to their widths. + // in addition to their seats. // Ideally, this should be based on projected completion time in the // virtual world of the youngest request in the queue. thisSeatsSum := qs.queues[queueIdx].requests.SeatsSum() @@ -579,7 +579,7 @@ func (qs *queueSet) dispatchAsMuchAsPossibleLocked() { } } -func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, width *fqrequest.Width, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request { +func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *fqrequest.WorkEstimate, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request { // does not call metrics.SetDispatchMetrics because there is no queuing and thus no interesting virtual world now := qs.clock.Now() req := &request{ @@ -592,7 +592,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, width *fqreques arrivalTime: now, descr1: descr1, descr2: descr2, - width: *width, + workEstimate: *workEstimate, } req.decision.SetLocked(decisionExecute) qs.totRequestsExecuting++ diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 3e85daaba32..84442abc362 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -227,7 +227,7 @@ func (ust *uniformScenarioThread) callK(k int) { if k >= ust.nCalls { return } - req, idle := ust.uss.qs.StartRequest(context.Background(), &fcrequest.Width{Seats: 1}, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil) + req, idle := ust.uss.qs.StartRequest(context.Background(), &fcrequest.WorkEstimate{Seats: 1}, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil) ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle) if req == nil { atomic.AddUint64(&ust.uss.failedCount, 1) @@ -672,7 +672,7 @@ func TestContextCancel(t *testing.T) { ctx1 := context.Background() b2i := map[bool]int{false: 0, true: 1} var qnc [2][2]int32 - req1, _ := qs.StartRequest(ctx1, &fcrequest.Width{Seats: 1}, 1, "", "fs1", "test", "one", func(inQueue bool) { atomic.AddInt32(&qnc[0][b2i[inQueue]], 1) }) + req1, _ := qs.StartRequest(ctx1, &fcrequest.WorkEstimate{Seats: 1}, 1, "", "fs1", "test", "one", func(inQueue bool) { atomic.AddInt32(&qnc[0][b2i[inQueue]], 1) }) if req1 == nil { t.Error("Request rejected") return @@ -700,7 +700,7 @@ func TestContextCancel(t *testing.T) { counter.Add(1) cancel2() }() - req2, idle2a := qs.StartRequest(ctx2, &fcrequest.Width{Seats: 1}, 2, "", "fs2", "test", "two", func(inQueue bool) { atomic.AddInt32(&qnc[1][b2i[inQueue]], 1) }) + req2, idle2a := qs.StartRequest(ctx2, &fcrequest.WorkEstimate{Seats: 1}, 2, "", "fs2", "test", "two", func(inQueue bool) { atomic.AddInt32(&qnc[1][b2i[inQueue]], 1) }) if idle2a { t.Error("2nd StartRequest returned idle") } @@ -759,7 +759,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) { } ctx := context.Background() - req, _ := qs.StartRequest(ctx, &fcrequest.Width{Seats: 1}, 1, "", "fs", "test", "one", func(inQueue bool) {}) + req, _ := qs.StartRequest(ctx, &fcrequest.WorkEstimate{Seats: 1}, 1, "", "fs", "test", "one", func(inQueue bool) {}) if req == nil { t.Fatal("expected a Request object from StartRequest, but got nil") } @@ -812,13 +812,13 @@ func TestSelectQueueLocked(t *testing.T) { { virtualStart: 200, requests: newFIFO( - &request{width: fcrequest.Width{Seats: 1}}, + &request{workEstimate: fcrequest.WorkEstimate{Seats: 1}}, ), }, { virtualStart: 100, requests: newFIFO( - &request{width: fcrequest.Width{Seats: 1}}, + &request{workEstimate: fcrequest.WorkEstimate{Seats: 1}}, ), }, }, @@ -835,7 +835,7 @@ func TestSelectQueueLocked(t *testing.T) { { virtualStart: 200, requests: newFIFO( - &request{width: fcrequest.Width{Seats: 1}}, + &request{workEstimate: fcrequest.WorkEstimate{Seats: 1}}, ), }, }, @@ -852,13 +852,13 @@ func TestSelectQueueLocked(t *testing.T) { { virtualStart: 200, requests: newFIFO( - &request{width: fcrequest.Width{Seats: 50}}, + &request{workEstimate: fcrequest.WorkEstimate{Seats: 50}}, ), }, { virtualStart: 100, requests: newFIFO( - &request{width: fcrequest.Width{Seats: 25}}, + &request{workEstimate: fcrequest.WorkEstimate{Seats: 25}}, ), }, }, @@ -875,13 +875,13 @@ func TestSelectQueueLocked(t *testing.T) { { virtualStart: 200, requests: newFIFO( - &request{width: fcrequest.Width{Seats: 10}}, + &request{workEstimate: fcrequest.WorkEstimate{Seats: 10}}, ), }, { virtualStart: 100, requests: newFIFO( - &request{width: fcrequest.Width{Seats: 25}}, + &request{workEstimate: fcrequest.WorkEstimate{Seats: 25}}, ), }, }, @@ -898,13 +898,13 @@ func TestSelectQueueLocked(t *testing.T) { { virtualStart: 200, requests: newFIFO( - &request{width: fcrequest.Width{Seats: 10}}, + &request{workEstimate: fcrequest.WorkEstimate{Seats: 10}}, ), }, { virtualStart: 100, requests: newFIFO( - &request{width: fcrequest.Width{Seats: 25}}, + &request{workEstimate: fcrequest.WorkEstimate{Seats: 25}}, ), }, }, diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go index 7f121d707aa..72943cf5a3c 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -44,8 +44,8 @@ type request struct { // startTime is the real time when the request began executing startTime time.Time - // width of the request - width fcrequest.Width + // estimated amount of work of the request + workEstimate fcrequest.WorkEstimate // decision gets set to a `requestDecision` indicating what to do // with this request. It gets set exactly once, when the request diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go index dadd2f5b1d7..09bb89d1838 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go @@ -56,7 +56,7 @@ func (noRestraint) IsIdle() bool { return false } -func (noRestraint) StartRequest(ctx context.Context, width *fcrequest.Width, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) { +func (noRestraint) StartRequest(ctx context.Context, workEstimate *fcrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) { return noRestraintRequest{}, false } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/match_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/match_test.go index 936f10f0fda..0ad992ce9c3 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/match_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/match_test.go @@ -102,8 +102,8 @@ func TestLiterals(t *testing.T) { Name: "eman", Parts: []string{"goodrscs", "eman"}, }, - User: ui, - Width: fcrequest.Width{Seats: 1}, + User: ui, + WorkEstimate: fcrequest.WorkEstimate{Seats: 1}, } reqRU := RequestDigest{ RequestInfo: &request.RequestInfo{ @@ -118,8 +118,8 @@ func TestLiterals(t *testing.T) { Name: "eman", Parts: []string{"goodrscs", "eman"}, }, - User: ui, - Width: fcrequest.Width{Seats: 1}, + User: ui, + WorkEstimate: fcrequest.WorkEstimate{Seats: 1}, } reqN := RequestDigest{ RequestInfo: &request.RequestInfo{ @@ -127,8 +127,8 @@ func TestLiterals(t *testing.T) { Path: "/openapi/v2", Verb: "goodverb", }, - User: ui, - Width: fcrequest.Width{Seats: 1}, + User: ui, + WorkEstimate: fcrequest.WorkEstimate{Seats: 1}, } checkRules(t, true, reqRN, []flowcontrol.PolicyRulesWithSubjects{{ Subjects: []flowcontrol.Subject{{Kind: flowcontrol.SubjectKindUser, diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go index c5fa478a711..f2f4fdab94d 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go @@ -20,28 +20,28 @@ import ( "net/http" ) -type Width struct { +type WorkEstimate struct { // Seats represents the number of seats associated with this request Seats uint } -// DefaultWidthEstimator returns returns '1' as the "width" -// of the given request. +// DefaultWorkEstimator returns estimation with default number of seats +// of 1. // -// TODO: when we plumb in actual "width" handling for different +// TODO: when we plumb in actual work estimate handling for different // type of request(s) this function will iterate through a chain -// of widthEstimator instance(s). -func DefaultWidthEstimator(_ *http.Request) Width { - return Width{ +// of workEstimator instance(s). +func DefaultWorkEstimator(_ *http.Request) WorkEstimate { + return WorkEstimate{ Seats: 1, } } -// WidthEstimatorFunc returns the estimated "width" of a given request. +// WorkEstimatorFunc returns the estimated work of a given request. // This function will be used by the Priority & Fairness filter to -// estimate the "width" of incoming requests. -type WidthEstimatorFunc func(*http.Request) Width +// estimate the work of of incoming requests. +type WorkEstimatorFunc func(*http.Request) WorkEstimate -func (e WidthEstimatorFunc) EstimateWidth(r *http.Request) Width { +func (e WorkEstimatorFunc) EstimateWork(r *http.Request) WorkEstimate { return e(r) }