Merge pull request #102185 from tkashem/apf-seats-in-use

apf: introduce the concept of width for a request
This commit is contained in:
Kubernetes Prow Robot 2021-05-21 08:41:23 -07:00 committed by GitHub
commit e5bee02452
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 48 additions and 17 deletions

View File

@ -24,9 +24,10 @@ import (
// QueueSetDump is an instant dump of queue-set.
type QueueSetDump struct {
Queues []QueueDump
Waiting int
Executing int
Queues []QueueDump
Waiting int
Executing int
SeatsInUse int
}
// QueueDump is an instant dump of one queue in a queue-set.
@ -34,6 +35,7 @@ type QueueDump struct {
Requests []RequestDump
VirtualStart float64
ExecutingRequests int
SeatsInUse int
}
// RequestDump is an instant dump of one requests pending in the queue.

View File

@ -108,6 +108,10 @@ type queueSet struct {
// sum, over all the queues, of the number of requests executing
// from that queue.
totRequestsExecuting int
// totSeatsInUse is the number of total "seats" in use by all the
// request(s) that are currently executing in this queueset.
totSeatsInUse int
}
// NewQueueSetFactory creates a new QueueSetFactory object
@ -233,6 +237,9 @@ const (
// because the metrics --- and only the metrics --- track that
// quantity per FlowSchema.
func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) {
// all request(s) have a width of 1, in keeping with the current behavior
width := 1.0
qs.lockAndSyncTime()
defer qs.lock.Unlock()
var req *request
@ -241,12 +248,13 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDist
// Step 0:
// Apply only concurrency limit, if zero queues desired
if qs.qCfg.DesiredNumQueues < 1 {
if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit {
klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d are executing and the limit is %d", qs.qCfg.Name, fsName, descr1, descr2, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit)
if qs.totSeatsInUse >= qs.dCfg.ConcurrencyLimit {
klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d seats are in use (%d are executing) and the limit is %d",
qs.qCfg.Name, fsName, descr1, descr2, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit)
metrics.AddReject(ctx, qs.qCfg.Name, fsName, "concurrency-limit")
return nil, qs.isIdleLocked()
}
req = qs.dispatchSansQueueLocked(ctx, flowDistinguisher, fsName, descr1, descr2)
req = qs.dispatchSansQueueLocked(ctx, width, flowDistinguisher, fsName, descr1, descr2)
return req, false
}
@ -257,7 +265,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDist
// 3) Reject current request if there is not enough concurrency shares and
// we are at max queue length
// 4) If not rejected, create a request and enqueue
req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, hashValue, flowDistinguisher, fsName, descr1, descr2, queueNoteFn)
req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, width, hashValue, flowDistinguisher, fsName, descr1, descr2, queueNoteFn)
// req == nil means that the request was rejected - no remaining
// concurrency shares and at max queue length already
if req == nil {
@ -310,6 +318,11 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDist
return req, false
}
// Seats returns the number of seats this request requires.
func (req *request) Seats() int {
return int(math.Ceil(req.width))
}
func (req *request) NoteQueued(inQueue bool) {
if req.queueNoteFn != nil {
req.queueNoteFn(inQueue)
@ -427,7 +440,7 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 {
// returns the enqueud request on a successful enqueue
// returns nil in the case that there is no available concurrency or
// the queuelengthlimit has been reached
func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request {
func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, width float64, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request {
// Start with the shuffle sharding, to pick a queue.
queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2)
queue := qs.queues[queueIdx]
@ -449,6 +462,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
descr1: descr1,
descr2: descr2,
queueNoteFn: queueNoteFn,
width: width,
}
if ok := qs.rejectOrEnqueueLocked(req); !ok {
return nil
@ -522,7 +536,7 @@ func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
queue := request.queue
curQueueLength := queue.requests.Length()
// rejects the newly arrived request if resource criteria not met
if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit &&
if qs.totSeatsInUse >= qs.dCfg.ConcurrencyLimit &&
curQueueLength >= qs.qCfg.QueueLengthLimit {
return false
}
@ -556,7 +570,7 @@ func (qs *queueSet) enqueueLocked(request *request) {
// queue, increment the count of the number executing, and send true
// to the request's channel.
func (qs *queueSet) dispatchAsMuchAsPossibleLocked() {
for qs.totRequestsWaiting != 0 && qs.totRequestsExecuting < qs.dCfg.ConcurrencyLimit {
for qs.totRequestsWaiting != 0 && qs.totSeatsInUse < qs.dCfg.ConcurrencyLimit {
ok := qs.dispatchLocked()
if !ok {
break
@ -564,7 +578,7 @@ func (qs *queueSet) dispatchAsMuchAsPossibleLocked() {
}
}
func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request {
func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, width float64, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request {
now := qs.clock.Now()
req := &request{
qs: qs,
@ -576,9 +590,11 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, flowDistinguish
arrivalTime: now,
descr1: descr1,
descr2: descr2,
width: width,
}
req.decision.SetLocked(decisionExecute)
qs.totRequestsExecuting++
qs.totSeatsInUse += req.Seats()
metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1)
qs.obsPair.RequestsExecuting.Add(1)
if klog.V(5).Enabled() {
@ -608,7 +624,9 @@ func (qs *queueSet) dispatchLocked() bool {
// problem because other overhead is also included.
qs.totRequestsWaiting--
qs.totRequestsExecuting++
qs.totSeatsInUse += request.Seats()
queue.requestsExecuting++
queue.seatsInUse += request.Seats()
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1)
request.NoteQueued(false)
metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1)
@ -620,7 +638,7 @@ func (qs *queueSet) dispatchLocked() bool {
queue.index, queue.virtualStart, queue.requests.Length(), queue.requestsExecuting)
}
// When a request is dequeued for service -> qs.virtualStart += G
queue.virtualStart += qs.estimatedServiceTime
queue.virtualStart += qs.estimatedServiceTime * float64(request.Seats())
request.decision.SetLocked(decisionExecute)
return ok
}
@ -682,7 +700,7 @@ func (qs *queueSet) selectQueueLocked() *queue {
// queue here. if the last virtual start time (excluded estimated cost)
// falls behind the global virtual time, we update the latest virtual
// start by: <latest global virtual time> + <previously estimated cost>
previouslyEstimatedServiceTime := float64(minQueue.requestsExecuting) * qs.estimatedServiceTime
previouslyEstimatedServiceTime := float64(minQueue.seatsInUse) * qs.estimatedServiceTime
if qs.virtualTime > minQueue.virtualStart-previouslyEstimatedServiceTime {
// per-queue virtual time should not fall behind the global
minQueue.virtualStart = qs.virtualTime + previouslyEstimatedServiceTime
@ -710,6 +728,7 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool
func (qs *queueSet) finishRequestLocked(r *request) {
now := qs.clock.Now()
qs.totRequestsExecuting--
qs.totSeatsInUse -= r.Seats()
metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1)
qs.obsPair.RequestsExecuting.Add(-1)
@ -724,10 +743,11 @@ func (qs *queueSet) finishRequestLocked(r *request) {
// When a request finishes being served, and the actual service time was S,
// the queues virtual start time is decremented by G - S.
r.queue.virtualStart -= qs.estimatedServiceTime - S
r.queue.virtualStart -= (qs.estimatedServiceTime * float64(r.Seats())) - S
// request has finished, remove from requests executing
r.queue.requestsExecuting--
r.queue.seatsInUse -= r.Seats()
if klog.V(6).Enabled() {
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing",
@ -784,9 +804,10 @@ func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump {
qs.lock.Lock()
defer qs.lock.Unlock()
d := debug.QueueSetDump{
Queues: make([]debug.QueueDump, len(qs.queues)),
Waiting: qs.totRequestsWaiting,
Executing: qs.totRequestsExecuting,
Queues: make([]debug.QueueDump, len(qs.queues)),
Waiting: qs.totRequestsWaiting,
Executing: qs.totRequestsExecuting,
SeatsInUse: qs.totSeatsInUse,
}
for i, q := range qs.queues {
d.Queues[i] = q.dump(includeRequestDetails)

View File

@ -43,6 +43,9 @@ type request struct {
// startTime is the real time when the request began executing
startTime time.Time
// width of the request
width float64
// decision gets set to a `requestDecision` indicating what to do
// with this request. It gets set exactly once, when the request
// is removed from its queue. The value will be decisionReject,
@ -80,6 +83,10 @@ type queue struct {
requestsExecuting int
index int
// seatsInUse is the total number of "seats" currently occupied
// by all the requests that are currently executing in this queue.
seatsInUse int
}
// Enqueue enqueues a request into the queue and
@ -129,5 +136,6 @@ func (q *queue) dump(includeDetails bool) debug.QueueDump {
VirtualStart: q.virtualStart,
Requests: digest,
ExecutingRequests: q.requestsExecuting,
SeatsInUse: q.seatsInUse,
}
}