diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug/dump.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug/dump.go index 5e44676491b..9d50c3e21a2 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug/dump.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/debug/dump.go @@ -24,9 +24,10 @@ import ( // QueueSetDump is an instant dump of queue-set. type QueueSetDump struct { - Queues []QueueDump - Waiting int - Executing int + Queues []QueueDump + Waiting int + Executing int + SeatsInUse int } // QueueDump is an instant dump of one queue in a queue-set. @@ -34,6 +35,7 @@ type QueueDump struct { Requests []RequestDump VirtualStart float64 ExecutingRequests int + SeatsInUse int } // RequestDump is an instant dump of one requests pending in the queue. diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index d3d1a6b6245..34f2bc370af 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -108,6 +108,10 @@ type queueSet struct { // sum, over all the queues, of the number of requests executing // from that queue. totRequestsExecuting int + + // totSeatsInUse is the number of total "seats" in use by all the + // request(s) that are currently executing in this queueset. + totSeatsInUse int } // NewQueueSetFactory creates a new QueueSetFactory object @@ -233,6 +237,9 @@ const ( // because the metrics --- and only the metrics --- track that // quantity per FlowSchema. func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) { + // all request(s) have a width of 1, in keeping with the current behavior + width := 1.0 + qs.lockAndSyncTime() defer qs.lock.Unlock() var req *request @@ -241,12 +248,13 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDist // Step 0: // Apply only concurrency limit, if zero queues desired if qs.qCfg.DesiredNumQueues < 1 { - if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit { - klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d are executing and the limit is %d", qs.qCfg.Name, fsName, descr1, descr2, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit) + if qs.totSeatsInUse >= qs.dCfg.ConcurrencyLimit { + klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d seats are in use (%d are executing) and the limit is %d", + qs.qCfg.Name, fsName, descr1, descr2, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit) metrics.AddReject(ctx, qs.qCfg.Name, fsName, "concurrency-limit") return nil, qs.isIdleLocked() } - req = qs.dispatchSansQueueLocked(ctx, flowDistinguisher, fsName, descr1, descr2) + req = qs.dispatchSansQueueLocked(ctx, width, flowDistinguisher, fsName, descr1, descr2) return req, false } @@ -257,7 +265,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDist // 3) Reject current request if there is not enough concurrency shares and // we are at max queue length // 4) If not rejected, create a request and enqueue - req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, hashValue, flowDistinguisher, fsName, descr1, descr2, queueNoteFn) + req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, width, hashValue, flowDistinguisher, fsName, descr1, descr2, queueNoteFn) // req == nil means that the request was rejected - no remaining // concurrency shares and at max queue length already if req == nil { @@ -310,6 +318,11 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDist return req, false } +// Seats returns the number of seats this request requires. +func (req *request) Seats() int { + return int(math.Ceil(req.width)) +} + func (req *request) NoteQueued(inQueue bool) { if req.queueNoteFn != nil { req.queueNoteFn(inQueue) @@ -427,7 +440,7 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 { // returns the enqueud request on a successful enqueue // returns nil in the case that there is no available concurrency or // the queuelengthlimit has been reached -func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request { +func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, width float64, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request { // Start with the shuffle sharding, to pick a queue. queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2) queue := qs.queues[queueIdx] @@ -449,6 +462,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte descr1: descr1, descr2: descr2, queueNoteFn: queueNoteFn, + width: width, } if ok := qs.rejectOrEnqueueLocked(req); !ok { return nil @@ -522,7 +536,7 @@ func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool { queue := request.queue curQueueLength := queue.requests.Length() // rejects the newly arrived request if resource criteria not met - if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit && + if qs.totSeatsInUse >= qs.dCfg.ConcurrencyLimit && curQueueLength >= qs.qCfg.QueueLengthLimit { return false } @@ -556,7 +570,7 @@ func (qs *queueSet) enqueueLocked(request *request) { // queue, increment the count of the number executing, and send true // to the request's channel. func (qs *queueSet) dispatchAsMuchAsPossibleLocked() { - for qs.totRequestsWaiting != 0 && qs.totRequestsExecuting < qs.dCfg.ConcurrencyLimit { + for qs.totRequestsWaiting != 0 && qs.totSeatsInUse < qs.dCfg.ConcurrencyLimit { ok := qs.dispatchLocked() if !ok { break @@ -564,7 +578,7 @@ func (qs *queueSet) dispatchAsMuchAsPossibleLocked() { } } -func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request { +func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, width float64, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request { now := qs.clock.Now() req := &request{ qs: qs, @@ -576,9 +590,11 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, flowDistinguish arrivalTime: now, descr1: descr1, descr2: descr2, + width: width, } req.decision.SetLocked(decisionExecute) qs.totRequestsExecuting++ + qs.totSeatsInUse += req.Seats() metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1) qs.obsPair.RequestsExecuting.Add(1) if klog.V(5).Enabled() { @@ -608,7 +624,9 @@ func (qs *queueSet) dispatchLocked() bool { // problem because other overhead is also included. qs.totRequestsWaiting-- qs.totRequestsExecuting++ + qs.totSeatsInUse += request.Seats() queue.requestsExecuting++ + queue.seatsInUse += request.Seats() metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1) request.NoteQueued(false) metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1) @@ -620,7 +638,7 @@ func (qs *queueSet) dispatchLocked() bool { queue.index, queue.virtualStart, queue.requests.Length(), queue.requestsExecuting) } // When a request is dequeued for service -> qs.virtualStart += G - queue.virtualStart += qs.estimatedServiceTime + queue.virtualStart += qs.estimatedServiceTime * float64(request.Seats()) request.decision.SetLocked(decisionExecute) return ok } @@ -682,7 +700,7 @@ func (qs *queueSet) selectQueueLocked() *queue { // queue here. if the last virtual start time (excluded estimated cost) // falls behind the global virtual time, we update the latest virtual // start by: + - previouslyEstimatedServiceTime := float64(minQueue.requestsExecuting) * qs.estimatedServiceTime + previouslyEstimatedServiceTime := float64(minQueue.seatsInUse) * qs.estimatedServiceTime if qs.virtualTime > minQueue.virtualStart-previouslyEstimatedServiceTime { // per-queue virtual time should not fall behind the global minQueue.virtualStart = qs.virtualTime + previouslyEstimatedServiceTime @@ -710,6 +728,7 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool func (qs *queueSet) finishRequestLocked(r *request) { now := qs.clock.Now() qs.totRequestsExecuting-- + qs.totSeatsInUse -= r.Seats() metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1) qs.obsPair.RequestsExecuting.Add(-1) @@ -724,10 +743,11 @@ func (qs *queueSet) finishRequestLocked(r *request) { // When a request finishes being served, and the actual service time was S, // the queue’s virtual start time is decremented by G - S. - r.queue.virtualStart -= qs.estimatedServiceTime - S + r.queue.virtualStart -= (qs.estimatedServiceTime * float64(r.Seats())) - S // request has finished, remove from requests executing r.queue.requestsExecuting-- + r.queue.seatsInUse -= r.Seats() if klog.V(6).Enabled() { klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", @@ -784,9 +804,10 @@ func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump { qs.lock.Lock() defer qs.lock.Unlock() d := debug.QueueSetDump{ - Queues: make([]debug.QueueDump, len(qs.queues)), - Waiting: qs.totRequestsWaiting, - Executing: qs.totRequestsExecuting, + Queues: make([]debug.QueueDump, len(qs.queues)), + Waiting: qs.totRequestsWaiting, + Executing: qs.totRequestsExecuting, + SeatsInUse: qs.totSeatsInUse, } for i, q := range qs.queues { d.Queues[i] = q.dump(includeRequestDetails) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go index 0345f9ed90a..7309d3bf04e 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -43,6 +43,9 @@ type request struct { // startTime is the real time when the request began executing startTime time.Time + // width of the request + width float64 + // decision gets set to a `requestDecision` indicating what to do // with this request. It gets set exactly once, when the request // is removed from its queue. The value will be decisionReject, @@ -80,6 +83,10 @@ type queue struct { requestsExecuting int index int + + // seatsInUse is the total number of "seats" currently occupied + // by all the requests that are currently executing in this queue. + seatsInUse int } // Enqueue enqueues a request into the queue and @@ -129,5 +136,6 @@ func (q *queue) dump(includeDetails bool) debug.QueueDump { VirtualStart: q.virtualStart, Requests: digest, ExecutingRequests: q.requestsExecuting, + SeatsInUse: q.seatsInUse, } }