diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go index 53e10543198..9a5fc77b072 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/fifo_list.go @@ -33,9 +33,8 @@ type walkFunc func(*request) (ok bool) // Internal interface to abstract out the implementation details // of the underlying list used to maintain the requests. // -// Note that the FIFO list is not safe for concurrent use by multiple -// goroutines without additional locking or coordination. It rests with -// the user to ensure that the FIFO list is used with proper locking. +// Note that a fifo, including the removeFromFIFOFuncs returned from Enqueue, +// is not safe for concurrent use by multiple goroutines. type fifo interface { // Enqueue enqueues the specified request into the list and // returns a removeFromFIFOFunc function that can be used to remove the @@ -64,7 +63,7 @@ type fifo interface { } // the FIFO list implementation is not safe for concurrent use by multiple -// goroutines without additional locking or coordination. +// goroutines. type requestFIFO struct { *list.List diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index b2d60d4327d..d2023d8faea 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -51,6 +51,7 @@ type queueSetFactory struct { // promiseFactory returns a WriteOnce // - whose Set method is invoked with the queueSet locked, and // - whose Get method is invoked with the queueSet not locked. +// The parameters are the same as for `promise.NewWriteOnce`. type promiseFactory func(initial interface{}, doneCh <-chan struct{}, doneVal interface{}) promise.WriteOnce // promiseFactoryFactory returns the promiseFactory to use for the given queueSet @@ -94,8 +95,8 @@ type queueSet struct { // the current dispatching configuration. dCfg fq.DispatchingConfig - // If `config.DesiredNumQueues` is non-zero then dealer is not nil - // and is good for `config`. + // If `qCfg.DesiredNumQueues` is non-zero then dealer is not nil + // and is good for `qCfg`. dealer *shufflesharding.Dealer // queues may be longer than the desired number, while the excess @@ -206,9 +207,9 @@ func (qs *queueSet) BeginConfigChange(qCfg fq.QueuingConfig) (fq.QueueSetComplet dealer: dealer}, nil } -// SetConfiguration is used to set the configuration for a queueSet. +// setConfiguration is used to set the configuration for a queueSet. // Update handling for when fields are updated is handled here as well - -// eg: if DesiredNum is increased, SetConfiguration reconciles by +// eg: if DesiredNum is increased, setConfiguration reconciles by // adding more queues. func (qs *queueSet) setConfiguration(ctx context.Context, qCfg fq.QueuingConfig, dealer *shufflesharding.Dealer, dCfg fq.DispatchingConfig) { qs.lockAndSyncTime(ctx) @@ -247,8 +248,13 @@ type requestDecision int // Values passed through a request's decision const ( + // Serve this one decisionExecute requestDecision = iota + + // Reject this one due to APF queuing considerations decisionReject + + // This one's context timed out / was canceled decisionCancel ) @@ -257,7 +263,7 @@ const ( // executing at each point where there is a change in that quantity, // because the metrics --- and only the metrics --- track that // quantity per FlowSchema. -// The queueSet's promiseFactory is invoked once if the returns Request is non-nil, +// The queueSet's promiseFactory is invoked once if the returned Request is non-nil, // not invoked if the Request is nil. func (qs *queueSet) StartRequest(ctx context.Context, workEstimate *fqrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) { qs.lockAndSyncTime(ctx) @@ -380,7 +386,7 @@ func (req *request) wait() (bool, bool) { // TODO(aaron-prindle) add metrics for this case klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2) // remove the request from the queue as it has timed out - req.removeFromQueueFn() + req.removeFromQueueLocked() qs.totRequestsWaiting-- metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) req.NoteQueued(false) @@ -399,7 +405,7 @@ func (qs *queueSet) isIdleLocked() bool { } // lockAndSyncTime acquires the lock and updates the virtual time. -// Doing them together avoids the mistake of modify some queue state +// Doing them together avoids the mistake of modifying some queue state // before calling syncTimeLocked. func (qs *queueSet) lockAndSyncTime(ctx context.Context) { qs.lock.Lock() @@ -508,7 +514,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte // requests that are in the queue longer than the timeout if there are no new requests // We prefer the simplicity over the promptness, at least for now. - defer qs.boundNextDispatch(queue) + defer qs.boundNextDispatchLocked(queue) // Create a request and enqueue req := &request{ @@ -633,7 +639,7 @@ func (qs *queueSet) enqueueLocked(request *request) { klog.Infof("QS(%s) at t=%s R=%v: initialized queue %d start R due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.nextDispatchR, queue.index, request.descr1, request.descr2) } } - queue.Enqueue(request) + request.removeFromQueueLocked = queue.requests.Enqueue(request) qs.totRequestsWaiting++ metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1) request.NoteQueued(true) @@ -687,12 +693,11 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f // return value indicates whether a request was dispatched; this will // be false when there are no requests waiting in any queue. func (qs *queueSet) dispatchLocked() bool { - queue := qs.findDispatchQueueLocked() + queue, request := qs.findDispatchQueueLocked() if queue == nil { return false } - request, ok := queue.Dequeue() - if !ok { // This should never happen. But if it does... + if request == nil { // This should never happen. But if it does... return false } request.startTime = qs.clock.Now() @@ -719,9 +724,9 @@ func (qs *queueSet) dispatchLocked() bool { } // When a request is dequeued for service -> qs.virtualStart += G * width queue.nextDispatchR += request.totalWork() - qs.boundNextDispatch(queue) + qs.boundNextDispatchLocked(queue) request.decision.Set(decisionExecute) - return ok + return true } // canAccommodateSeatsLocked returns true if this queueSet has enough @@ -749,8 +754,9 @@ func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool { // findDispatchQueueLocked examines the queues in round robin order and // returns the first one of those for which the virtual finish time of -// the oldest waiting request is minimal. -func (qs *queueSet) findDispatchQueueLocked() *queue { +// the oldest waiting request is minimal, and also returns that request. +// Returns nils if the head of the selected queue can not be dispatched now. +func (qs *queueSet) findDispatchQueueLocked() (*queue, *request) { minVirtualFinish := MaxSeatSeconds sMin := MaxSeatSeconds dsMin := MaxSeatSeconds @@ -783,7 +789,7 @@ func (qs *queueSet) findDispatchQueueLocked() *queue { if oldestReqFromMinQueue == nil { // This cannot happen klog.ErrorS(errors.New("selected queue is empty"), "Impossible", "queueSet", qs.qCfg.Name) - return nil + return nil, nil } if !qs.canAccommodateSeatsLocked(oldestReqFromMinQueue.MaxSeats()) { // since we have not picked the queue with the minimum virtual finish @@ -792,8 +798,9 @@ func (qs *queueSet) findDispatchQueueLocked() *queue { klog.Infof("QS(%s): request %v %v seats %d cannot be dispatched from queue %d, waiting for currently executing requests to complete, %d requests are occupying %d seats and the limit is %d", qs.qCfg.Name, oldestReqFromMinQueue.descr1, oldestReqFromMinQueue.descr2, oldestReqFromMinQueue.MaxSeats(), minQueue.index, qs.totRequestsExecuting, qs.totSeatsInUse, qs.dCfg.ConcurrencyLimit) } - return nil + return nil, nil } + oldestReqFromMinQueue.removeFromQueueLocked() // If the requested final seats exceed capacity of that queue, // we reduce them to current capacity and adjust additional latency @@ -814,7 +821,7 @@ func (qs *queueSet) findDispatchQueueLocked() *queue { klog.ErrorS(errors.New("dispatch before arrival"), "Inconceivable!", "QS", qs.qCfg.Name, "queue", minQueue.index, "dispatchR", minQueue.nextDispatchR, "request", oldestReqFromMinQueue) } metrics.SetDispatchMetrics(qs.qCfg.Name, qs.currentR.ToFloat(), minQueue.nextDispatchR.ToFloat(), sMin.ToFloat(), sMax.ToFloat(), dsMin.ToFloat(), dsMax.ToFloat()) - return minQueue + return minQueue, oldestReqFromMinQueue } func ssMin(a, b SeatSeconds) SeatSeconds { @@ -921,18 +928,18 @@ func (qs *queueSet) finishRequestLocked(r *request) { // When a request finishes being served, and the actual service time was S, // the queue’s start R is decremented by (G - S)*width. r.queue.nextDispatchR -= SeatsTimesDuration(float64(r.InitialSeats()), qs.estimatedServiceDuration-actualServiceDuration) - qs.boundNextDispatch(r.queue) + qs.boundNextDispatchLocked(r.queue) } } -// boundNextDispatch applies the anti-windup hack. +// boundNextDispatchLocked applies the anti-windup hack. // We need a hack because all non-empty queues are allocated the same // number of seats. A queue that can not use all those seats and does // not go empty accumulates a progresively earlier `virtualStart` compared // to queues that are using more than they are allocated. // The following hack addresses the first side of that inequity, // by insisting that dispatch in the virtual world not precede arrival. -func (qs *queueSet) boundNextDispatch(queue *queue) { +func (qs *queueSet) boundNextDispatchLocked(queue *queue) { oldestReqFromMinQueue, _ := queue.requests.Peek() if oldestReqFromMinQueue == nil { return @@ -991,7 +998,7 @@ func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump { SeatsInUse: qs.totSeatsInUse, } for i, q := range qs.queues { - d.Queues[i] = q.dump(includeRequestDetails) + d.Queues[i] = q.dumpLocked(includeRequestDetails) } return d } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 4ec4e00b013..afa8155566c 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -1308,7 +1308,7 @@ func TestFindDispatchQueueLocked(t *testing.T) { minQueueExpected = test.queues[queueIdx] } - minQueueGot := qs.findDispatchQueueLocked() + minQueueGot, reqGot := qs.findDispatchQueueLocked() if minQueueExpected != minQueueGot { t.Errorf("Expected queue: %#v, but got: %#v", minQueueExpected, minQueueGot) } @@ -1317,6 +1317,10 @@ func TestFindDispatchQueueLocked(t *testing.T) { if robinIndexExpected != qs.robinIndex { t.Errorf("Expected robin index: %d for attempt: %d, but got: %d", robinIndexExpected, attempt, qs.robinIndex) } + + if (reqGot == nil) != (minQueueGot == nil) { + t.Errorf("reqGot=%p but minQueueGot=%p", reqGot, minQueueGot) + } } }) } @@ -1451,7 +1455,7 @@ func TestRequestWork(t *testing.T) { func newFIFO(requests ...*request) fifo { l := newRequestFIFO() for i := range requests { - l.Enqueue(requests[i]) + requests[i].removeFromQueueLocked = l.Enqueue(requests[i]) } return l } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go index d3fbea7eda1..d4f7d5d1d52 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -30,7 +30,7 @@ import ( ) // request is a temporary container for "requests" with additional -// tracking fields required for the functionality FQScheduler +// tracking fields required for QueueSet functionality. type request struct { ctx context.Context @@ -43,9 +43,6 @@ type request struct { // a queue. queue *queue - // startTime is the real time when the request began executing - startTime time.Time - // estimated amount of work of the request workEstimate completedWorkEstimate @@ -61,22 +58,29 @@ type request struct { // arrivalTime is the real time when the request entered this system arrivalTime time.Time - // arrivalR is R(arrivalTime). R is, confusingly, also called "virtual time". - // This field is meaningful only while the request is waiting in the virtual world. - arrivalR SeatSeconds - // descr1 and descr2 are not used in any logic but they appear in // log messages descr1, descr2 interface{} - // Indicates whether client has called Request::Wait() - waitStarted bool - queueNoteFn fq.QueueNoteFn + // The preceding fields are filled in at creation and not modified since; + // the following fields may be modified later and must only be accessed while + // holding the queueSet's lock. + // Removes this request from its queue. If the request is not put into a // a queue it will be nil. - removeFromQueueFn removeFromFIFOFunc + removeFromQueueLocked removeFromFIFOFunc + + // arrivalR is R(arrivalTime). R is, confusingly, also called "virtual time". + // This field is meaningful only while the request is waiting in the virtual world. + arrivalR SeatSeconds + + // startTime is the real time when the request began executing + startTime time.Time + + // Indicates whether client has called Request::Wait() + waitStarted bool } type completedWorkEstimate struct { @@ -85,8 +89,8 @@ type completedWorkEstimate struct { finalWork SeatSeconds // only final work } -// queue is an array of requests with additional metadata required for -// the FQScheduler +// queue is a sequence of requests that have arrived but not yet finished +// execution in both the real and virtual worlds. type queue struct { // The requests not yet executing in the real world are stored in a FIFO list. requests fifo @@ -95,9 +99,11 @@ type queue struct { // which the next request will be dispatched in the virtual world. nextDispatchR SeatSeconds - // requestsExecuting is the count in the real world + // requestsExecuting is the count in the real world. requestsExecuting int - index int + + // index is the position of this queue among those in its queueSet. + index int // seatsInUse is the total number of "seats" currently occupied // by all the requests that are currently executing in this queue. @@ -140,19 +146,7 @@ func (qs *queueSet) computeFinalWork(we *fcrequest.WorkEstimate) SeatSeconds { return SeatsTimesDuration(float64(we.FinalSeats), we.AdditionalLatency) } -// Enqueue enqueues a request into the queue and -// sets the removeFromQueueFn of the request appropriately. -func (q *queue) Enqueue(request *request) { - request.removeFromQueueFn = q.requests.Enqueue(request) -} - -// Dequeue dequeues a request from the queue -func (q *queue) Dequeue() (*request, bool) { - request, ok := q.requests.Dequeue() - return request, ok -} - -func (q *queue) dump(includeDetails bool) debug.QueueDump { +func (q *queue) dumpLocked(includeDetails bool) debug.QueueDump { digest := make([]debug.RequestDump, q.requests.Length()) i := 0 q.requests.Walk(func(r *request) bool {