Fix nits noticed in recent code review

This commit is contained in:
Mike Spreitzer 2021-10-18 23:51:48 -05:00
parent b977200a5d
commit 1844a05277
4 changed files with 62 additions and 58 deletions

View File

@ -33,9 +33,8 @@ type walkFunc func(*request) (ok bool)
// Internal interface to abstract out the implementation details
// of the underlying list used to maintain the requests.
//
// Note that the FIFO list is not safe for concurrent use by multiple
// goroutines without additional locking or coordination. It rests with
// the user to ensure that the FIFO list is used with proper locking.
// Note that a fifo, including the removeFromFIFOFuncs returned from Enqueue,
// is not safe for concurrent use by multiple goroutines.
type fifo interface {
// Enqueue enqueues the specified request into the list and
// returns a removeFromFIFOFunc function that can be used to remove the
@ -64,7 +63,7 @@ type fifo interface {
}
// the FIFO list implementation is not safe for concurrent use by multiple
// goroutines without additional locking or coordination.
// goroutines.
type requestFIFO struct {
*list.List

View File

@ -51,6 +51,7 @@ type queueSetFactory struct {
// promiseFactory returns a WriteOnce
// - whose Set method is invoked with the queueSet locked, and
// - whose Get method is invoked with the queueSet not locked.
// The parameters are the same as for `promise.NewWriteOnce`.
type promiseFactory func(initial interface{}, doneCh <-chan struct{}, doneVal interface{}) promise.WriteOnce
// promiseFactoryFactory returns the promiseFactory to use for the given queueSet
@ -94,8 +95,8 @@ type queueSet struct {
// the current dispatching configuration.
dCfg fq.DispatchingConfig
// If `config.DesiredNumQueues` is non-zero then dealer is not nil
// and is good for `config`.
// If `qCfg.DesiredNumQueues` is non-zero then dealer is not nil
// and is good for `qCfg`.
dealer *shufflesharding.Dealer
// queues may be longer than the desired number, while the excess
@ -206,9 +207,9 @@ func (qs *queueSet) BeginConfigChange(qCfg fq.QueuingConfig) (fq.QueueSetComplet
dealer: dealer}, nil
}
// SetConfiguration is used to set the configuration for a queueSet.
// setConfiguration is used to set the configuration for a queueSet.
// Update handling for when fields are updated is handled here as well -
// eg: if DesiredNum is increased, SetConfiguration reconciles by
// eg: if DesiredNum is increased, setConfiguration reconciles by
// adding more queues.
func (qs *queueSet) setConfiguration(ctx context.Context, qCfg fq.QueuingConfig, dealer *shufflesharding.Dealer, dCfg fq.DispatchingConfig) {
qs.lockAndSyncTime(ctx)
@ -247,8 +248,13 @@ type requestDecision int
// Values passed through a request's decision
const (
// Serve this one
decisionExecute requestDecision = iota
// Reject this one due to APF queuing considerations
decisionReject
// This one's context timed out / was canceled
decisionCancel
)
@ -257,7 +263,7 @@ const (
// executing at each point where there is a change in that quantity,
// because the metrics --- and only the metrics --- track that
// quantity per FlowSchema.
// The queueSet's promiseFactory is invoked once if the returns Request is non-nil,
// The queueSet's promiseFactory is invoked once if the returned Request is non-nil,
// not invoked if the Request is nil.
func (qs *queueSet) StartRequest(ctx context.Context, workEstimate *fqrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) {
qs.lockAndSyncTime(ctx)
@ -380,7 +386,7 @@ func (req *request) wait() (bool, bool) {
// TODO(aaron-prindle) add metrics for this case
klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2)
// remove the request from the queue as it has timed out
req.removeFromQueueFn()
req.removeFromQueueLocked()
qs.totRequestsWaiting--
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
req.NoteQueued(false)
@ -399,7 +405,7 @@ func (qs *queueSet) isIdleLocked() bool {
}
// lockAndSyncTime acquires the lock and updates the virtual time.
// Doing them together avoids the mistake of modify some queue state
// Doing them together avoids the mistake of modifying some queue state
// before calling syncTimeLocked.
func (qs *queueSet) lockAndSyncTime(ctx context.Context) {
qs.lock.Lock()
@ -508,7 +514,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
// requests that are in the queue longer than the timeout if there are no new requests
// We prefer the simplicity over the promptness, at least for now.
defer qs.boundNextDispatch(queue)
defer qs.boundNextDispatchLocked(queue)
// Create a request and enqueue
req := &request{
@ -633,7 +639,7 @@ func (qs *queueSet) enqueueLocked(request *request) {
klog.Infof("QS(%s) at t=%s R=%v: initialized queue %d start R due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.nextDispatchR, queue.index, request.descr1, request.descr2)
}
}
queue.Enqueue(request)
request.removeFromQueueLocked = queue.requests.Enqueue(request)
qs.totRequestsWaiting++
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1)
request.NoteQueued(true)
@ -687,12 +693,11 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
// return value indicates whether a request was dispatched; this will
// be false when there are no requests waiting in any queue.
func (qs *queueSet) dispatchLocked() bool {
queue := qs.findDispatchQueueLocked()
queue, request := qs.findDispatchQueueLocked()
if queue == nil {
return false
}
request, ok := queue.Dequeue()
if !ok { // This should never happen. But if it does...
if request == nil { // This should never happen. But if it does...
return false
}
request.startTime = qs.clock.Now()
@ -719,9 +724,9 @@ func (qs *queueSet) dispatchLocked() bool {
}
// When a request is dequeued for service -> qs.virtualStart += G * width
queue.nextDispatchR += request.totalWork()
qs.boundNextDispatch(queue)
qs.boundNextDispatchLocked(queue)
request.decision.Set(decisionExecute)
return ok
return true
}
// canAccommodateSeatsLocked returns true if this queueSet has enough
@ -749,8 +754,9 @@ func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool {
// findDispatchQueueLocked examines the queues in round robin order and
// returns the first one of those for which the virtual finish time of
// the oldest waiting request is minimal.
func (qs *queueSet) findDispatchQueueLocked() *queue {
// the oldest waiting request is minimal, and also returns that request.
// Returns nils if the head of the selected queue can not be dispatched now.
func (qs *queueSet) findDispatchQueueLocked() (*queue, *request) {
minVirtualFinish := MaxSeatSeconds
sMin := MaxSeatSeconds
dsMin := MaxSeatSeconds
@ -783,7 +789,7 @@ func (qs *queueSet) findDispatchQueueLocked() *queue {
if oldestReqFromMinQueue == nil {
// This cannot happen
klog.ErrorS(errors.New("selected queue is empty"), "Impossible", "queueSet", qs.qCfg.Name)
return nil
return nil, nil
}
if !qs.canAccommodateSeatsLocked(oldestReqFromMinQueue.MaxSeats()) {
// since we have not picked the queue with the minimum virtual finish
@ -792,8 +798,9 @@ func (qs *queueSet) findDispatchQueueLocked() *queue {
klog.Infof("QS(%s): request %v %v seats %d cannot be dispatched from queue %d, waiting for currently executing requests to complete, %d requests are occupying %d seats and the limit is %d",
qs.qCfg.Name, oldestReqFromMinQueue.descr1, oldestReqFromMinQueue.descr2, oldestReqFromMinQueue.MaxSeats(), minQueue.index, qs.totRequestsExecuting, qs.totSeatsInUse, qs.dCfg.ConcurrencyLimit)
}
return nil
return nil, nil
}
oldestReqFromMinQueue.removeFromQueueLocked()
// If the requested final seats exceed capacity of that queue,
// we reduce them to current capacity and adjust additional latency
@ -814,7 +821,7 @@ func (qs *queueSet) findDispatchQueueLocked() *queue {
klog.ErrorS(errors.New("dispatch before arrival"), "Inconceivable!", "QS", qs.qCfg.Name, "queue", minQueue.index, "dispatchR", minQueue.nextDispatchR, "request", oldestReqFromMinQueue)
}
metrics.SetDispatchMetrics(qs.qCfg.Name, qs.currentR.ToFloat(), minQueue.nextDispatchR.ToFloat(), sMin.ToFloat(), sMax.ToFloat(), dsMin.ToFloat(), dsMax.ToFloat())
return minQueue
return minQueue, oldestReqFromMinQueue
}
func ssMin(a, b SeatSeconds) SeatSeconds {
@ -921,18 +928,18 @@ func (qs *queueSet) finishRequestLocked(r *request) {
// When a request finishes being served, and the actual service time was S,
// the queues start R is decremented by (G - S)*width.
r.queue.nextDispatchR -= SeatsTimesDuration(float64(r.InitialSeats()), qs.estimatedServiceDuration-actualServiceDuration)
qs.boundNextDispatch(r.queue)
qs.boundNextDispatchLocked(r.queue)
}
}
// boundNextDispatch applies the anti-windup hack.
// boundNextDispatchLocked applies the anti-windup hack.
// We need a hack because all non-empty queues are allocated the same
// number of seats. A queue that can not use all those seats and does
// not go empty accumulates a progresively earlier `virtualStart` compared
// to queues that are using more than they are allocated.
// The following hack addresses the first side of that inequity,
// by insisting that dispatch in the virtual world not precede arrival.
func (qs *queueSet) boundNextDispatch(queue *queue) {
func (qs *queueSet) boundNextDispatchLocked(queue *queue) {
oldestReqFromMinQueue, _ := queue.requests.Peek()
if oldestReqFromMinQueue == nil {
return
@ -991,7 +998,7 @@ func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump {
SeatsInUse: qs.totSeatsInUse,
}
for i, q := range qs.queues {
d.Queues[i] = q.dump(includeRequestDetails)
d.Queues[i] = q.dumpLocked(includeRequestDetails)
}
return d
}

View File

@ -1308,7 +1308,7 @@ func TestFindDispatchQueueLocked(t *testing.T) {
minQueueExpected = test.queues[queueIdx]
}
minQueueGot := qs.findDispatchQueueLocked()
minQueueGot, reqGot := qs.findDispatchQueueLocked()
if minQueueExpected != minQueueGot {
t.Errorf("Expected queue: %#v, but got: %#v", minQueueExpected, minQueueGot)
}
@ -1317,6 +1317,10 @@ func TestFindDispatchQueueLocked(t *testing.T) {
if robinIndexExpected != qs.robinIndex {
t.Errorf("Expected robin index: %d for attempt: %d, but got: %d", robinIndexExpected, attempt, qs.robinIndex)
}
if (reqGot == nil) != (minQueueGot == nil) {
t.Errorf("reqGot=%p but minQueueGot=%p", reqGot, minQueueGot)
}
}
})
}
@ -1451,7 +1455,7 @@ func TestRequestWork(t *testing.T) {
func newFIFO(requests ...*request) fifo {
l := newRequestFIFO()
for i := range requests {
l.Enqueue(requests[i])
requests[i].removeFromQueueLocked = l.Enqueue(requests[i])
}
return l
}

View File

@ -30,7 +30,7 @@ import (
)
// request is a temporary container for "requests" with additional
// tracking fields required for the functionality FQScheduler
// tracking fields required for QueueSet functionality.
type request struct {
ctx context.Context
@ -43,9 +43,6 @@ type request struct {
// a queue.
queue *queue
// startTime is the real time when the request began executing
startTime time.Time
// estimated amount of work of the request
workEstimate completedWorkEstimate
@ -61,22 +58,29 @@ type request struct {
// arrivalTime is the real time when the request entered this system
arrivalTime time.Time
// arrivalR is R(arrivalTime). R is, confusingly, also called "virtual time".
// This field is meaningful only while the request is waiting in the virtual world.
arrivalR SeatSeconds
// descr1 and descr2 are not used in any logic but they appear in
// log messages
descr1, descr2 interface{}
// Indicates whether client has called Request::Wait()
waitStarted bool
queueNoteFn fq.QueueNoteFn
// The preceding fields are filled in at creation and not modified since;
// the following fields may be modified later and must only be accessed while
// holding the queueSet's lock.
// Removes this request from its queue. If the request is not put into a
// a queue it will be nil.
removeFromQueueFn removeFromFIFOFunc
removeFromQueueLocked removeFromFIFOFunc
// arrivalR is R(arrivalTime). R is, confusingly, also called "virtual time".
// This field is meaningful only while the request is waiting in the virtual world.
arrivalR SeatSeconds
// startTime is the real time when the request began executing
startTime time.Time
// Indicates whether client has called Request::Wait()
waitStarted bool
}
type completedWorkEstimate struct {
@ -85,8 +89,8 @@ type completedWorkEstimate struct {
finalWork SeatSeconds // only final work
}
// queue is an array of requests with additional metadata required for
// the FQScheduler
// queue is a sequence of requests that have arrived but not yet finished
// execution in both the real and virtual worlds.
type queue struct {
// The requests not yet executing in the real world are stored in a FIFO list.
requests fifo
@ -95,9 +99,11 @@ type queue struct {
// which the next request will be dispatched in the virtual world.
nextDispatchR SeatSeconds
// requestsExecuting is the count in the real world
// requestsExecuting is the count in the real world.
requestsExecuting int
index int
// index is the position of this queue among those in its queueSet.
index int
// seatsInUse is the total number of "seats" currently occupied
// by all the requests that are currently executing in this queue.
@ -140,19 +146,7 @@ func (qs *queueSet) computeFinalWork(we *fcrequest.WorkEstimate) SeatSeconds {
return SeatsTimesDuration(float64(we.FinalSeats), we.AdditionalLatency)
}
// Enqueue enqueues a request into the queue and
// sets the removeFromQueueFn of the request appropriately.
func (q *queue) Enqueue(request *request) {
request.removeFromQueueFn = q.requests.Enqueue(request)
}
// Dequeue dequeues a request from the queue
func (q *queue) Dequeue() (*request, bool) {
request, ok := q.requests.Dequeue()
return request, ok
}
func (q *queue) dump(includeDetails bool) debug.QueueDump {
func (q *queue) dumpLocked(includeDetails bool) debug.QueueDump {
digest := make([]debug.RequestDump, q.requests.Length())
i := 0
q.requests.Walk(func(r *request) bool {