mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
apf: add "width" for request
all requests have a width of 1 to maintain current behavior.
This commit is contained in:
parent
7b2776b89f
commit
b50507d98b
@ -24,9 +24,10 @@ import (
|
|||||||
|
|
||||||
// QueueSetDump is an instant dump of queue-set.
|
// QueueSetDump is an instant dump of queue-set.
|
||||||
type QueueSetDump struct {
|
type QueueSetDump struct {
|
||||||
Queues []QueueDump
|
Queues []QueueDump
|
||||||
Waiting int
|
Waiting int
|
||||||
Executing int
|
Executing int
|
||||||
|
SeatsInUse int
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueueDump is an instant dump of one queue in a queue-set.
|
// QueueDump is an instant dump of one queue in a queue-set.
|
||||||
@ -34,6 +35,7 @@ type QueueDump struct {
|
|||||||
Requests []RequestDump
|
Requests []RequestDump
|
||||||
VirtualStart float64
|
VirtualStart float64
|
||||||
ExecutingRequests int
|
ExecutingRequests int
|
||||||
|
SeatsInUse int
|
||||||
}
|
}
|
||||||
|
|
||||||
// RequestDump is an instant dump of one requests pending in the queue.
|
// RequestDump is an instant dump of one requests pending in the queue.
|
||||||
|
@ -108,6 +108,10 @@ type queueSet struct {
|
|||||||
// sum, over all the queues, of the number of requests executing
|
// sum, over all the queues, of the number of requests executing
|
||||||
// from that queue.
|
// from that queue.
|
||||||
totRequestsExecuting int
|
totRequestsExecuting int
|
||||||
|
|
||||||
|
// totSeatsInUse is the number of total "seats" in use by all the
|
||||||
|
// request(s) that are currently executing in this queueset.
|
||||||
|
totSeatsInUse int
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewQueueSetFactory creates a new QueueSetFactory object
|
// NewQueueSetFactory creates a new QueueSetFactory object
|
||||||
@ -233,6 +237,9 @@ const (
|
|||||||
// because the metrics --- and only the metrics --- track that
|
// because the metrics --- and only the metrics --- track that
|
||||||
// quantity per FlowSchema.
|
// quantity per FlowSchema.
|
||||||
func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) {
|
func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) {
|
||||||
|
// all request(s) have a width of 1, in keeping with the current behavior
|
||||||
|
width := 1.0
|
||||||
|
|
||||||
qs.lockAndSyncTime()
|
qs.lockAndSyncTime()
|
||||||
defer qs.lock.Unlock()
|
defer qs.lock.Unlock()
|
||||||
var req *request
|
var req *request
|
||||||
@ -241,12 +248,13 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDist
|
|||||||
// Step 0:
|
// Step 0:
|
||||||
// Apply only concurrency limit, if zero queues desired
|
// Apply only concurrency limit, if zero queues desired
|
||||||
if qs.qCfg.DesiredNumQueues < 1 {
|
if qs.qCfg.DesiredNumQueues < 1 {
|
||||||
if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit {
|
if qs.totSeatsInUse >= qs.dCfg.ConcurrencyLimit {
|
||||||
klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d are executing and the limit is %d", qs.qCfg.Name, fsName, descr1, descr2, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit)
|
klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d seats are in use (%d are executing) and the limit is %d",
|
||||||
|
qs.qCfg.Name, fsName, descr1, descr2, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit)
|
||||||
metrics.AddReject(ctx, qs.qCfg.Name, fsName, "concurrency-limit")
|
metrics.AddReject(ctx, qs.qCfg.Name, fsName, "concurrency-limit")
|
||||||
return nil, qs.isIdleLocked()
|
return nil, qs.isIdleLocked()
|
||||||
}
|
}
|
||||||
req = qs.dispatchSansQueueLocked(ctx, flowDistinguisher, fsName, descr1, descr2)
|
req = qs.dispatchSansQueueLocked(ctx, width, flowDistinguisher, fsName, descr1, descr2)
|
||||||
return req, false
|
return req, false
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -257,7 +265,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDist
|
|||||||
// 3) Reject current request if there is not enough concurrency shares and
|
// 3) Reject current request if there is not enough concurrency shares and
|
||||||
// we are at max queue length
|
// we are at max queue length
|
||||||
// 4) If not rejected, create a request and enqueue
|
// 4) If not rejected, create a request and enqueue
|
||||||
req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, hashValue, flowDistinguisher, fsName, descr1, descr2, queueNoteFn)
|
req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, width, hashValue, flowDistinguisher, fsName, descr1, descr2, queueNoteFn)
|
||||||
// req == nil means that the request was rejected - no remaining
|
// req == nil means that the request was rejected - no remaining
|
||||||
// concurrency shares and at max queue length already
|
// concurrency shares and at max queue length already
|
||||||
if req == nil {
|
if req == nil {
|
||||||
@ -310,6 +318,11 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDist
|
|||||||
return req, false
|
return req, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Seats returns the number of seats this request requires.
|
||||||
|
func (req *request) Seats() int {
|
||||||
|
return int(math.Ceil(req.width))
|
||||||
|
}
|
||||||
|
|
||||||
func (req *request) NoteQueued(inQueue bool) {
|
func (req *request) NoteQueued(inQueue bool) {
|
||||||
if req.queueNoteFn != nil {
|
if req.queueNoteFn != nil {
|
||||||
req.queueNoteFn(inQueue)
|
req.queueNoteFn(inQueue)
|
||||||
@ -427,7 +440,7 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 {
|
|||||||
// returns the enqueud request on a successful enqueue
|
// returns the enqueud request on a successful enqueue
|
||||||
// returns nil in the case that there is no available concurrency or
|
// returns nil in the case that there is no available concurrency or
|
||||||
// the queuelengthlimit has been reached
|
// the queuelengthlimit has been reached
|
||||||
func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request {
|
func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, width float64, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request {
|
||||||
// Start with the shuffle sharding, to pick a queue.
|
// Start with the shuffle sharding, to pick a queue.
|
||||||
queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2)
|
queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2)
|
||||||
queue := qs.queues[queueIdx]
|
queue := qs.queues[queueIdx]
|
||||||
@ -449,6 +462,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
|
|||||||
descr1: descr1,
|
descr1: descr1,
|
||||||
descr2: descr2,
|
descr2: descr2,
|
||||||
queueNoteFn: queueNoteFn,
|
queueNoteFn: queueNoteFn,
|
||||||
|
width: width,
|
||||||
}
|
}
|
||||||
if ok := qs.rejectOrEnqueueLocked(req); !ok {
|
if ok := qs.rejectOrEnqueueLocked(req); !ok {
|
||||||
return nil
|
return nil
|
||||||
@ -522,7 +536,7 @@ func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
|
|||||||
queue := request.queue
|
queue := request.queue
|
||||||
curQueueLength := queue.requests.Length()
|
curQueueLength := queue.requests.Length()
|
||||||
// rejects the newly arrived request if resource criteria not met
|
// rejects the newly arrived request if resource criteria not met
|
||||||
if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit &&
|
if qs.totSeatsInUse >= qs.dCfg.ConcurrencyLimit &&
|
||||||
curQueueLength >= qs.qCfg.QueueLengthLimit {
|
curQueueLength >= qs.qCfg.QueueLengthLimit {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -556,7 +570,7 @@ func (qs *queueSet) enqueueLocked(request *request) {
|
|||||||
// queue, increment the count of the number executing, and send true
|
// queue, increment the count of the number executing, and send true
|
||||||
// to the request's channel.
|
// to the request's channel.
|
||||||
func (qs *queueSet) dispatchAsMuchAsPossibleLocked() {
|
func (qs *queueSet) dispatchAsMuchAsPossibleLocked() {
|
||||||
for qs.totRequestsWaiting != 0 && qs.totRequestsExecuting < qs.dCfg.ConcurrencyLimit {
|
for qs.totRequestsWaiting != 0 && qs.totSeatsInUse < qs.dCfg.ConcurrencyLimit {
|
||||||
ok := qs.dispatchLocked()
|
ok := qs.dispatchLocked()
|
||||||
if !ok {
|
if !ok {
|
||||||
break
|
break
|
||||||
@ -564,7 +578,7 @@ func (qs *queueSet) dispatchAsMuchAsPossibleLocked() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request {
|
func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, width float64, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request {
|
||||||
now := qs.clock.Now()
|
now := qs.clock.Now()
|
||||||
req := &request{
|
req := &request{
|
||||||
qs: qs,
|
qs: qs,
|
||||||
@ -576,9 +590,11 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, flowDistinguish
|
|||||||
arrivalTime: now,
|
arrivalTime: now,
|
||||||
descr1: descr1,
|
descr1: descr1,
|
||||||
descr2: descr2,
|
descr2: descr2,
|
||||||
|
width: width,
|
||||||
}
|
}
|
||||||
req.decision.SetLocked(decisionExecute)
|
req.decision.SetLocked(decisionExecute)
|
||||||
qs.totRequestsExecuting++
|
qs.totRequestsExecuting++
|
||||||
|
qs.totSeatsInUse += req.Seats()
|
||||||
metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1)
|
metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1)
|
||||||
qs.obsPair.RequestsExecuting.Add(1)
|
qs.obsPair.RequestsExecuting.Add(1)
|
||||||
if klog.V(5).Enabled() {
|
if klog.V(5).Enabled() {
|
||||||
@ -608,7 +624,9 @@ func (qs *queueSet) dispatchLocked() bool {
|
|||||||
// problem because other overhead is also included.
|
// problem because other overhead is also included.
|
||||||
qs.totRequestsWaiting--
|
qs.totRequestsWaiting--
|
||||||
qs.totRequestsExecuting++
|
qs.totRequestsExecuting++
|
||||||
|
qs.totSeatsInUse += request.Seats()
|
||||||
queue.requestsExecuting++
|
queue.requestsExecuting++
|
||||||
|
queue.seatsInUse += request.Seats()
|
||||||
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1)
|
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1)
|
||||||
request.NoteQueued(false)
|
request.NoteQueued(false)
|
||||||
metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1)
|
metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1)
|
||||||
@ -620,7 +638,7 @@ func (qs *queueSet) dispatchLocked() bool {
|
|||||||
queue.index, queue.virtualStart, queue.requests.Length(), queue.requestsExecuting)
|
queue.index, queue.virtualStart, queue.requests.Length(), queue.requestsExecuting)
|
||||||
}
|
}
|
||||||
// When a request is dequeued for service -> qs.virtualStart += G
|
// When a request is dequeued for service -> qs.virtualStart += G
|
||||||
queue.virtualStart += qs.estimatedServiceTime
|
queue.virtualStart += qs.estimatedServiceTime * float64(request.Seats())
|
||||||
request.decision.SetLocked(decisionExecute)
|
request.decision.SetLocked(decisionExecute)
|
||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
@ -682,7 +700,7 @@ func (qs *queueSet) selectQueueLocked() *queue {
|
|||||||
// queue here. if the last virtual start time (excluded estimated cost)
|
// queue here. if the last virtual start time (excluded estimated cost)
|
||||||
// falls behind the global virtual time, we update the latest virtual
|
// falls behind the global virtual time, we update the latest virtual
|
||||||
// start by: <latest global virtual time> + <previously estimated cost>
|
// start by: <latest global virtual time> + <previously estimated cost>
|
||||||
previouslyEstimatedServiceTime := float64(minQueue.requestsExecuting) * qs.estimatedServiceTime
|
previouslyEstimatedServiceTime := float64(minQueue.seatsInUse) * qs.estimatedServiceTime
|
||||||
if qs.virtualTime > minQueue.virtualStart-previouslyEstimatedServiceTime {
|
if qs.virtualTime > minQueue.virtualStart-previouslyEstimatedServiceTime {
|
||||||
// per-queue virtual time should not fall behind the global
|
// per-queue virtual time should not fall behind the global
|
||||||
minQueue.virtualStart = qs.virtualTime + previouslyEstimatedServiceTime
|
minQueue.virtualStart = qs.virtualTime + previouslyEstimatedServiceTime
|
||||||
@ -710,6 +728,7 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool
|
|||||||
func (qs *queueSet) finishRequestLocked(r *request) {
|
func (qs *queueSet) finishRequestLocked(r *request) {
|
||||||
now := qs.clock.Now()
|
now := qs.clock.Now()
|
||||||
qs.totRequestsExecuting--
|
qs.totRequestsExecuting--
|
||||||
|
qs.totSeatsInUse -= r.Seats()
|
||||||
metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1)
|
metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1)
|
||||||
qs.obsPair.RequestsExecuting.Add(-1)
|
qs.obsPair.RequestsExecuting.Add(-1)
|
||||||
|
|
||||||
@ -724,10 +743,11 @@ func (qs *queueSet) finishRequestLocked(r *request) {
|
|||||||
|
|
||||||
// When a request finishes being served, and the actual service time was S,
|
// When a request finishes being served, and the actual service time was S,
|
||||||
// the queue’s virtual start time is decremented by G - S.
|
// the queue’s virtual start time is decremented by G - S.
|
||||||
r.queue.virtualStart -= qs.estimatedServiceTime - S
|
r.queue.virtualStart -= (qs.estimatedServiceTime * float64(r.Seats())) - S
|
||||||
|
|
||||||
// request has finished, remove from requests executing
|
// request has finished, remove from requests executing
|
||||||
r.queue.requestsExecuting--
|
r.queue.requestsExecuting--
|
||||||
|
r.queue.seatsInUse -= r.Seats()
|
||||||
|
|
||||||
if klog.V(6).Enabled() {
|
if klog.V(6).Enabled() {
|
||||||
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing",
|
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing",
|
||||||
@ -784,9 +804,10 @@ func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump {
|
|||||||
qs.lock.Lock()
|
qs.lock.Lock()
|
||||||
defer qs.lock.Unlock()
|
defer qs.lock.Unlock()
|
||||||
d := debug.QueueSetDump{
|
d := debug.QueueSetDump{
|
||||||
Queues: make([]debug.QueueDump, len(qs.queues)),
|
Queues: make([]debug.QueueDump, len(qs.queues)),
|
||||||
Waiting: qs.totRequestsWaiting,
|
Waiting: qs.totRequestsWaiting,
|
||||||
Executing: qs.totRequestsExecuting,
|
Executing: qs.totRequestsExecuting,
|
||||||
|
SeatsInUse: qs.totSeatsInUse,
|
||||||
}
|
}
|
||||||
for i, q := range qs.queues {
|
for i, q := range qs.queues {
|
||||||
d.Queues[i] = q.dump(includeRequestDetails)
|
d.Queues[i] = q.dump(includeRequestDetails)
|
||||||
|
@ -43,6 +43,9 @@ type request struct {
|
|||||||
// startTime is the real time when the request began executing
|
// startTime is the real time when the request began executing
|
||||||
startTime time.Time
|
startTime time.Time
|
||||||
|
|
||||||
|
// width of the request
|
||||||
|
width float64
|
||||||
|
|
||||||
// decision gets set to a `requestDecision` indicating what to do
|
// decision gets set to a `requestDecision` indicating what to do
|
||||||
// with this request. It gets set exactly once, when the request
|
// with this request. It gets set exactly once, when the request
|
||||||
// is removed from its queue. The value will be decisionReject,
|
// is removed from its queue. The value will be decisionReject,
|
||||||
@ -80,6 +83,10 @@ type queue struct {
|
|||||||
|
|
||||||
requestsExecuting int
|
requestsExecuting int
|
||||||
index int
|
index int
|
||||||
|
|
||||||
|
// seatsInUse is the total number of "seats" currently occupied
|
||||||
|
// by all the requests that are currently executing in this queue.
|
||||||
|
seatsInUse int
|
||||||
}
|
}
|
||||||
|
|
||||||
// Enqueue enqueues a request into the queue and
|
// Enqueue enqueues a request into the queue and
|
||||||
@ -129,5 +136,6 @@ func (q *queue) dump(includeDetails bool) debug.QueueDump {
|
|||||||
VirtualStart: q.virtualStart,
|
VirtualStart: q.virtualStart,
|
||||||
Requests: digest,
|
Requests: digest,
|
||||||
ExecutingRequests: q.requestsExecuting,
|
ExecutingRequests: q.requestsExecuting,
|
||||||
|
SeatsInUse: q.seatsInUse,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user