From 1c31b2bdc65377f502c2306dbdf32a802eb1afb7 Mon Sep 17 00:00:00 2001 From: Mike Spreitzer Date: Wed, 13 Nov 2019 01:52:05 -0500 Subject: [PATCH] Brushing up queueset (1) Replaced random-looking assortment of counter increments and decrements with something hopefully more principalled-looking. Most importantly, introduced the MutablePromise abstraction to neatly wrap up the complicated business of unioning multiple sources of unblocking. (2) Improved debug logging. (3) Somewhat more interesting test cases, and a bug fix wrt round robin index. --- api/openapi-spec/swagger.json | 2 +- staging/src/k8s.io/apiserver/BUILD | 1 + .../pkg/util/flowcontrol/fairqueuing/BUILD | 5 +- .../util/flowcontrol/fairqueuing/interface.go | 24 +- .../flowcontrol/fairqueuing/queueset/BUILD | 9 +- .../flowcontrol/fairqueuing/queueset/doc.go | 120 ++++ .../fairqueuing/queueset/queueset.go | 572 ++++++++++-------- .../fairqueuing/queueset/queueset_test.go | 55 +- .../fairqueuing/{ => queueset}/types.go | 54 +- .../fairqueuing/testing/clock/BUILD | 1 + .../fairqueuing/testing/clock/event_clock.go | 31 +- .../fairqueuing/testing/no-restraint.go | 2 +- .../apiserver/pkg/util/promise/interface.go | 42 ++ .../promise/lockingpromise/lockingpromise.go | 78 +++ .../lockingpromise/lockingpromise_test.go | 70 +++ .../src/k8s.io/component-base/metrics/BUILD | 1 + 16 files changed, 741 insertions(+), 326 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/doc.go rename staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/{ => queueset}/types.go (56%) create mode 100644 staging/src/k8s.io/apiserver/pkg/util/promise/interface.go create mode 100644 staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise/lockingpromise.go create mode 100644 staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise/lockingpromise_test.go diff --git a/api/openapi-spec/swagger.json b/api/openapi-spec/swagger.json index b0f6caf72f6..e3753083a43 100644 --- a/api/openapi-spec/swagger.json +++ b/api/openapi-spec/swagger.json @@ -20742,7 +20742,7 @@ }, "info": { "title": "Kubernetes", - "version": "v1.17.0" + "version": "v1.18.0" }, "paths": { "/api/": { diff --git a/staging/src/k8s.io/apiserver/BUILD b/staging/src/k8s.io/apiserver/BUILD index 6e41d74175f..a6b511f8dd9 100644 --- a/staging/src/k8s.io/apiserver/BUILD +++ b/staging/src/k8s.io/apiserver/BUILD @@ -47,6 +47,7 @@ filegroup( "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/flushwriter:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/openapi:all-srcs", + "//staging/src/k8s.io/apiserver/pkg/util/promise:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/proxy:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/shufflesharding:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/term:all-srcs", diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/BUILD b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/BUILD index ad83f0ec182..6707c197d4b 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/BUILD @@ -2,10 +2,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", - srcs = [ - "interface.go", - "types.go", - ], + srcs = ["interface.go"], importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing", importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing", visibility = ["//visibility:public"], diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go index 2de7f92f498..137907ecc45 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go @@ -49,17 +49,19 @@ type QueueSet interface { // triggering state and the required call). Quiesce(EmptyHandler) - // Wait uses the given hashValue as the source of entropy - // as it shuffle-shards a request into a queue and waits for - // a decision on what to do with that request. If tryAnother==true - // at return then the QueueSet has become undesirable and the client - // should try to find a different QueueSet to use; execute and - // afterExecution are irrelevant in this case. Otherwise, if execute - // then the client should start executing the request and, once the - // request finishes execution or is canceled, call afterExecution(). - // Otherwise the client should not execute the - // request and afterExecution is irrelevant. - Wait(ctx context.Context, hashValue uint64) (tryAnother, execute bool, afterExecution func()) + // Wait uses the given hashValue as the source of entropy as it + // shuffle-shards a request into a queue and waits for a decision + // on what to do with that request. The descr1 and descr2 values + // play no role in the logic but appear in log messages. If + // tryAnother==true at return then the QueueSet has become + // undesirable and the client should try to find a different + // QueueSet to use; execute and afterExecution are irrelevant in + // this case. Otherwise, if execute then the client should start + // executing the request and, once the request finishes execution + // or is canceled, call afterExecution(). Otherwise the client + // should not execute the request and afterExecution is + // irrelevant. + Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func()) } // QueueSetConfig defines the configuration of a QueueSet. diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/BUILD b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/BUILD index 0b6eed63251..e29bab63f34 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/BUILD @@ -2,7 +2,11 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", - srcs = ["queueset.go"], + srcs = [ + "doc.go", + "queueset.go", + "types.go", + ], importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset", importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset", visibility = ["//visibility:public"], @@ -12,6 +16,8 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/promise:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/shufflesharding:go_default_library", "//vendor/github.com/pkg/errors:go_default_library", "//vendor/k8s.io/klog:go_default_library", @@ -27,6 +33,7 @@ go_test( "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock:go_default_library", + "//vendor/k8s.io/klog:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/doc.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/doc.go new file mode 100644 index 00000000000..d9431378ad3 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/doc.go @@ -0,0 +1,120 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queueset + +// This package implements a technique called "fair queuing for server +// requests". One QueueSet is a set of queues operating according to +// this technique. + +// Fair queuing for server requests is inspired by the fair queuing +// technique from the world of networking. You can find a good paper +// on that at https://dl.acm.org/citation.cfm?doid=75247.75248 or +// http://people.csail.mit.edu/imcgraw/links/research/pubs/networks/WFQ.pdf +// and there is an implementation outline in the Wikipedia article at +// https://en.wikipedia.org/wiki/Fair_queuing . + +// Fair queuing for server requests differs from traditional fair +// queuing in three ways: (1) we are dispatching requests to be +// executed within a process rather than transmitting packets on a +// network link, (2) multiple requests can be executing at once, and +// (3) the service time (execution duration) is not known until the +// execution completes. + +// The first two differences can easily be handled by straightforward +// adaptation of the concept called "R(t)" in the original paper and +// "virtual time" in the implementation outline. In that +// implementation outline, the notation now() is used to mean reading +// the virtual clock. In the original paper’s terms, "R(t)" is the +// number of "rounds" that have been completed at real time t, where a +// round consists of virtually transmitting one bit from every +// non-empty queue in the router (regardless of which queue holds the +// packet that is really being transmitted at the moment); in this +// conception, a packet is considered to be "in" its queue until the +// packet’s transmission is finished. For our problem, we can define a +// round to be giving one nanosecond of CPU to every non-empty queue +// in the apiserver (where emptiness is judged based on both queued +// and executing requests from that queue), and define R(t) = (server +// start time) + (1 ns) * (number of rounds since server start). Let +// us write NEQ(t) for that number of non-empty queues in the +// apiserver at time t. Let us also write C for the concurrency +// limit. In the original paper, the partial derivative of R(t) with +// respect to t is +// +// 1 / NEQ(t) . + +// To generalize from transmitting one packet at a time to executing C +// requests at a time, that derivative becomes +// +// C / NEQ(t) . + +// However, sometimes there are fewer than C requests available to +// execute. For a given queue "q", let us also write "reqs(q, t)" for +// the number of requests of that queue that are executing at that +// time. The total number of requests executing is sum[over q] +// reqs(q, t) and if that is less than C then virtual time is not +// advancing as fast as it would if all C seats were occupied; in this +// case the numerator of the quotient in that derivative should be +// adjusted proportionally. Putting it all together for fair queing +// for server requests: at a particular time t, the partial derivative +// of R(t) with respect to t is +// +// min( C, sum[over q] reqs(q, t) ) / NEQ(t) . +// +// In terms of the implementation outline, this is the rate at which +// virtual time is advancing at time t (in virtual nanoseconds per +// real nanosecond). Where the networking implementation outline adds +// packet size to a virtual time, in our version this corresponds to +// adding a service time (i.e., duration) to virtual time. + +// The third difference is handled by modifying the algorithm to +// dispatch based on an initial guess at the request’s service time +// (duration) and then make the corresponding adjustments once the +// request’s actual service time is known. This is similar, although +// not exactly isomorphic, to the original paper’s adjustment by +// `$delta` for the sake of promptness. + +// For implementation simplicity (see below), let us use the same +// initial service time guess for every request; call that duration +// G. A good choice might be the service time limit (1 +// minute). Different guesses will give slightly different dynamics, +// but any positive number can be used for G without ruining the +// long-term behavior. + +// As in ordinary fair queuing, there is a bound on divergence from +// the ideal. In plain fair queuing the bound is one packet; in our +// version it is C requests. + +// To support efficiently making the necessary adjustments once a +// request’s actual service time is known, the virtual finish time of +// a request and the last virtual finish time of a queue are not +// represented directly but instead computed from queue length, +// request position in the queue, and an alternate state variable that +// holds the queue’s virtual start time. While the queue is empty and +// has no requests executing: the value of its virtual start time +// variable is ignored and its last virtual finish time is considered +// to be in the virtual past. When a request arrives to an empty queue +// with no requests executing, the queue’s virtual start time is set +// to the current virtual time. The virtual finish time of request +// number J in the queue (counting from J=1 for the head) is J * G + +// (queue's virtual start time). While the queue is non-empty: the +// last virtual finish time of the queue is the virtual finish time of +// the last request in the queue. While the queue is empty and has a +// request executing: the last virtual finish time is the queue’s +// virtual start time. When a request is dequeued for service the +// queue’s virtual start time is advanced by G. When a request +// finishes being served, and the actual service time was S, the +// queue’s virtual start time is decremented by G - S. diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index 8be7dceb188..e71870a7481 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -29,6 +29,7 @@ import ( "k8s.io/apiserver/pkg/util/flowcontrol/counter" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" "k8s.io/apiserver/pkg/util/flowcontrol/metrics" + "k8s.io/apiserver/pkg/util/promise/lockingpromise" "k8s.io/apiserver/pkg/util/shufflesharding" "k8s.io/klog" ) @@ -48,63 +49,66 @@ func NewQueueSetFactory(c clock.PassiveClock, counter counter.GoRoutineCounter) } } +// queueSet implements the Fair Queuing for Server Requests technique +// described in this package's doc, and a pointer to one implements +// the QueueSet interface. The clock, GoRoutineCounter, and estimated +// service time should not be changed; the fields listed after the +// lock must be accessed only while holding the lock. +type queueSet struct { + clock clock.PassiveClock + counter counter.GoRoutineCounter + estimatedServiceTime float64 + + lock sync.Mutex + config fq.QueueSetConfig + + // queues may be longer than the desired number, while the excess + // queues are still draining. + queues []*queue + virtualTime float64 + lastRealTime time.Time + + // robinIndex is the index of the last queue dispatched + robinIndex int + + // numRequestsEnqueued is the number of requests currently waiting + // in a queue (eg: incremeneted on Enqueue, decremented on Dequue) + numRequestsEnqueued int + + emptyHandler fq.EmptyHandler + dealer *shufflesharding.Dealer +} + // NewQueueSet creates a new QueueSet object // There is a new QueueSet created for each priority level. func (qsf queueSetFactory) NewQueueSet(config fq.QueueSetConfig) (fq.QueueSet, error) { - return newQueueSet(config, qsf.clock, qsf.counter) -} - -// queueSet is a fair queuing implementation designed with three major differences: -// 1) dispatches requests to be served rather than requests to be transmitted -// 2) serves multiple requests at once -// 3) a request's service time is not known until it finishes -// implementation of: -// https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md -type queueSet struct { - lock sync.Mutex - config fq.QueueSetConfig - counter counter.GoRoutineCounter - clock clock.PassiveClock - queues []*fq.Queue - virtualTime float64 - estimatedServiceTime float64 - lastRealTime time.Time - robinIndex int - // numRequestsEnqueued is the number of requests currently enqueued - // (eg: incremeneted on Enqueue, decremented on Dequue) - numRequestsEnqueued int - emptyHandler fq.EmptyHandler - dealer *shufflesharding.Dealer -} - -// initQueues is a helper method for initializing an array of n queues -func initQueues(n, baseIndex int) []*fq.Queue { - fqqueues := make([]*fq.Queue, n) - for i := 0; i < n; i++ { - fqqueues[i] = &fq.Queue{Index: baseIndex + i, Requests: make([]*fq.Request, 0)} - } - return fqqueues -} - -// newQueueSet creates a new queueSet from passed in parameters -func newQueueSet(config fq.QueueSetConfig, c clock.PassiveClock, counter counter.GoRoutineCounter) (*queueSet, error) { dealer, err := shufflesharding.NewDealer(config.DesiredNumQueues, config.HandSize) if err != nil { return nil, errors.Wrap(err, "shuffle sharding dealer creation failed") } fq := &queueSet{ - config: config, - counter: counter, - queues: initQueues(config.DesiredNumQueues, 0), - clock: c, - virtualTime: 0, - lastRealTime: c.Now(), - dealer: dealer, + config: config, + counter: qsf.counter, + queues: createQueues(config.DesiredNumQueues, 0), + clock: qsf.clock, + virtualTime: 0, + estimatedServiceTime: 60, + lastRealTime: qsf.clock.Now(), + dealer: dealer, } return fq, nil } +// createQueues is a helper method for initializing an array of n queues +func createQueues(n, baseIndex int) []*queue { + fqqueues := make([]*queue, n) + for i := 0; i < n; i++ { + fqqueues[i] = &queue{Index: baseIndex + i, Requests: make([]*request, 0)} + } + return fqqueues +} + // SetConfiguration is used to set the configuration for a queueSet // update handling for when fields are updated is handled here as well - // eg: if DesiredNum is increased, SetConfiguration reconciles by @@ -124,13 +128,13 @@ func (qs *queueSet) SetConfiguration(config fq.QueueSetConfig) error { numQueues := len(qs.queues) if config.DesiredNumQueues > numQueues { qs.queues = append(qs.queues, - initQueues(config.DesiredNumQueues-numQueues, len(qs.queues))...) + createQueues(config.DesiredNumQueues-numQueues, len(qs.queues))...) } qs.config = config qs.dealer = dealer - qs.dequeueWithChannelLockedAsMuchAsPossibleLocked() + qs.dispatchAsMuchAsPossibleLocked() return nil } @@ -147,38 +151,47 @@ func (qs *queueSet) SetConfiguration(config fq.QueueSetConfig) error { func (qs *queueSet) Quiesce(eh fq.EmptyHandler) { qs.lock.Lock() defer qs.lock.Unlock() + qs.emptyHandler = eh if eh == nil { - qs.emptyHandler = eh return } // Here we check whether there are any requests queued or executing and // if not then fork an invocation of the EmptyHandler. qs.maybeForkEmptyHandlerLocked() - - qs.emptyHandler = eh } -// Wait uses the given hashValue as the source of entropy -// as it shuffle-shards a request into a queue and waits for -// a decision on what to do with that request. If tryAnother==true -// at return then the QueueSet has become undesirable and the client -// should try to find a different QueueSet to use; execute and -// afterExecution are irrelevant in this case. Otherwise, if execute -// then the client should start executing the request and, once the -// request finishes execution or is canceled, call afterExecution(). -// Otherwise the client should not execute the -// request and afterExecution is irrelevant. -func (qs *queueSet) Wait(ctx context.Context, hashValue uint64) (tryAnother, execute bool, afterExecution func()) { - var req *fq.Request - shouldReturn, tryAnother, execute, afterExecution := func() ( - shouldReturn, tryAnother, execute bool, afterExecution func()) { +// Values passed through a request's Decision +const ( + DecisionExecute = "execute" + DecisionReject = "reject" + DecisionCancel = "cancel" + DecisionTryAnother = "tryAnother" +) +// Wait uses the given hashValue as the source of entropy as it +// shuffle-shards a request into a queue and waits for a decision on +// what to do with that request. The descr1 and descr2 values play no +// role in the logic but appear in log messages; we use two because +// the main client characterizes a request by two items that, if +// bundled together in a larger data structure, would lose interesting +// details when formatted. If tryAnother==true at return then the +// QueueSet has become undesirable and the client should try to find a +// different QueueSet to use; execute and afterExecution are +// irrelevant in this case. Otherwise, if execute then the client +// should start executing the request and, once the request finishes +// execution or is canceled, call afterExecution(). Otherwise the +// client should not execute the request and afterExecution is +// irrelevant. +func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func()) { + var req *request + decision := func() string { qs.lockAndSyncTime() defer qs.lock.Unlock() // A call to Wait while the system is quiescing will be rebuffed by // returning `tryAnother=true`. if qs.emptyHandler != nil { - return true, true, false, nil + klog.V(5).Infof("QS(%s): rebuffing request %#+v %#+v with TryAnother", qs.config.Name, descr1, descr2) + return DecisionTryAnother } // ======================================================================== @@ -188,58 +201,70 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64) (tryAnother, exe // 3) Reject current request if there is not enough concurrency shares and // we are at max queue length // 4) If not rejected, create a request and enqueue - req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue) + req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue, descr1, descr2) // req == nil means that the request was rejected - no remaining // concurrency shares and at max queue length already if req == nil { + klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v due to queue full", qs.config.Name, descr1, descr2) metrics.AddReject(qs.config.Name, "queue-full") - return true, false, false, func() {} + return DecisionReject } // ======================================================================== // Step 2: - // 1) The next step is to invoke the method that dequeues as much as possible. + // The next step is to invoke the method that dequeues as much + // as possible. + // This method runs a loop, as long as there are non-empty + // queues and the number currently executing is less than the + // assured concurrency value. The body of the loop uses the + // fair queuing technique to pick a queue and dispatch a + // request from that queue. + qs.dispatchAsMuchAsPossibleLocked() - // This method runs a loop, as long as there - // are non-empty queues and the number currently executing is less than the - // assured concurrency value. The body of the loop uses the fair queuing - // technique to pick a queue, dequeue the request at the head of that - // queue, increment the count of the number executing, and send true to - // the request's channel. - qs.dequeueWithChannelLockedAsMuchAsPossibleLocked() - return false, false, false, func() {} - }() - if shouldReturn { - return tryAnother, execute, afterExecution - } + // ======================================================================== + // Step 3: - // ======================================================================== - // Step 3: - // After that method finishes its loop and returns, the final step in Wait - // is to `select` (wait) on a message from the enqueud request's channel - // and return appropriately. While waiting this thread does no additional - // work so we decrement the go routine counter - qs.counter.Add(-1) - - select { - case execute := <-req.DequeueChannel: - if execute { - // execute the request - return false, true, func() { - qs.finishRequestAndDequeueWithChannelAsMuchAsPossible(req) - } + // Set up a relay from the context's Done channel to the world + // of well-counted goroutines. We Are Told that every + // request's context's Done channel gets closed by the time + // the request is done being processed. + doneCh := ctx.Done() + if doneCh != nil { + qs.preCreateOrUnblockGoroutine() + go func() { + defer runtime.HandleCrash() + qs.goroutineDoneOrBlocked() + select { + case <-doneCh: + klog.V(6).Infof("QS(%s): Context of request %#+v %#+v is Done", qs.config.Name, descr1, descr2) + req.Decision.Set(DecisionCancel) + } + qs.goroutineDoneOrBlocked() + }() } - klog.V(5).Infof("request timed out after being enqueued\n") - metrics.AddReject(qs.config.Name, "time-out") - return false, false, func() {} - case <-ctx.Done(): - klog.V(5).Infof("request cancelled\n") - func() { - qs.lockAndSyncTime() - defer qs.lock.Unlock() + // ======================================================================== + // Step 4: + // The final step in Wait is to wait on a decision from + // somewhere and then act on it. + decisionAny := req.Decision.GetLocked() + var decisionStr string + switch d := decisionAny.(type) { + case string: + decisionStr = d + default: + klog.Errorf("QS(%s): Impossible decision %#+v (of type %T) for request %#+v %#+v", qs.config.Name, decisionAny, decisionAny, descr1, descr2) + decisionStr = DecisionExecute + } + switch decisionStr { + case DecisionReject: + klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.config.Name, descr1, descr2) + metrics.AddReject(qs.config.Name, "time-out") + case DecisionCancel: + qs.syncTimeLocked() // TODO(aaron-prindle) add metrics to these two cases - if req.Enqueued { + if req.IsWaiting { + klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.config.Name, descr1, descr2) // remove the request from the queue as it has timed out for i := range req.Queue.Requests { if req == req.Queue.Requests[i] { @@ -254,47 +279,62 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64) (tryAnother, exe // then a call to the EmptyHandler should be forked. qs.maybeForkEmptyHandlerLocked() } else { - // At this point we know that req was in its queue earlier and another - // goroutine has removed req from its queue and called qs.counter.Add(1) - // in anticipation of unblocking this goroutine through the other arm of this - // select. In this case we need to decrement the counter because this goroutine - // was actually unblocked through a different code path. - qs.counter.Add(-1) + klog.V(5).Infof("QS(%s): request %#+v %#+v canceled shortly after dispatch", qs.config.Name, descr1, descr2) } - }() + } + return decisionStr + }() + switch decision { + case DecisionTryAnother: + return true, false, func() {} + case DecisionReject: return false, false, func() {} + case DecisionCancel: + return false, false, func() {} + default: + if decision != DecisionExecute { + klog.Errorf("Impossible decision %q", decision) + } + return false, true, func() { + qs.finishRequestAndDispatchAsMuchAsPossible(req) + } } } -// syncTimeLocked is used to sync the time of the queueSet by looking at the elapsed -// time since the last sync and this value based on the 'virtualtime ratio' -// which scales inversely to the # of active flows +// lockAndSyncTime acquires the lock and updates the virtual time. +// Doing them together avoids the mistake of modify some queue state +// before calling syncTimeLocked. +func (qs *queueSet) lockAndSyncTime() { + qs.lock.Lock() + qs.syncTimeLocked() +} + +// syncTimeLocked updates the virtual time based on the assumption +// that the current state of the queues has been in effect since +// `qs.lastRealTime`. Thus, it should be invoked after acquiring the +// lock and before modifying the state of any queue. func (qs *queueSet) syncTimeLocked() { realNow := qs.clock.Now() timesincelast := realNow.Sub(qs.lastRealTime).Seconds() qs.lastRealTime = realNow - var virtualTimeRatio float64 + qs.virtualTime += timesincelast * qs.getVirtualTimeRatio() +} +// getVirtualTimeRatio calculates the rate at which virtual time has +// been advancing, according to the logic in `doc.go`. +func (qs *queueSet) getVirtualTimeRatio() float64 { activeQueues := 0 reqs := 0 for _, queue := range qs.queues { reqs += queue.RequestsExecuting - if len(queue.Requests) > 0 || queue.RequestsExecuting > 0 { activeQueues++ } } - if activeQueues != 0 { - // TODO(aaron-prindle) document the math.Min usage - virtualTimeRatio = math.Min(float64(reqs), float64(qs.config.ConcurrencyLimit)) / float64(activeQueues) + if activeQueues == 0 { + return 0 } - - qs.virtualTime += timesincelast * virtualTimeRatio -} - -func (qs *queueSet) lockAndSyncTime() { - qs.lock.Lock() - qs.syncTimeLocked() + return math.Min(float64(reqs), float64(qs.config.ConcurrencyLimit)) / float64(activeQueues) } // timeoutOldRequestsAndRejectOrEnqueueLocked encapsulates the logic required @@ -307,9 +347,9 @@ func (qs *queueSet) lockAndSyncTime() { // returns the enqueud request on a successful enqueue // returns nil in the case that there is no available concurrency or // the queuelengthlimit has been reached -func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue uint64) *fq.Request { +func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue uint64, descr1, descr2 interface{}) *request { // Start with the shuffle sharding, to pick a queue. - queueIdx := qs.chooseQueueIndexLocked(hashValue) + queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2) queue := qs.queues[queueIdx] // The next step is the logic to reject requests that have been waiting too long qs.removeTimedOutRequestsFromQueueLocked(queue) @@ -318,10 +358,12 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue uint64) // We prefer the simplicity over the promptness, at least for now. // Create a request and enqueue - req := &fq.Request{ - DequeueChannel: make(chan bool, 1), - RealEnqueueTime: qs.clock.Now(), - Queue: queue, + req := &request{ + Decision: lockingpromise.NewLockingPromise(&qs.lock, qs.counter), + ArrivalTime: qs.clock.Now(), + Queue: queue, + descr1: descr1, + descr2: descr2, } if ok := qs.rejectOrEnqueueLocked(req); !ok { return nil @@ -330,9 +372,26 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue uint64) return req } +// chooseQueueIndexLocked uses shuffle sharding to select a queue index +// using the given hashValue and the shuffle sharding parameters of the queueSet. +func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 interface{}) int { + bestQueueIdx := -1 + bestQueueLen := int(math.MaxInt32) + // the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`. + qs.dealer.Deal(hashValue, func(queueIdx int) { + thisLen := len(qs.queues[queueIdx].Requests) + klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of length %d", qs.config.Name, descr1, descr2, queueIdx, thisLen) + if thisLen < bestQueueLen { + bestQueueIdx, bestQueueLen = queueIdx, thisLen + } + }) + klog.V(6).Infof("QS(%s): For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.config.Name, descr1, descr2, bestQueueIdx, bestQueueLen, qs.queues[bestQueueIdx].RequestsExecuting) + return bestQueueIdx +} + // removeTimedOutRequestsFromQueueLocked rejects old requests that have been enqueued // past the requestWaitLimit -func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *fq.Queue) { +func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue) { timeoutIdx := -1 now := qs.clock.Now() reqs := queue.Requests @@ -343,10 +402,8 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *fq.Queue) { // now - requestWaitLimit = waitLimit waitLimit := now.Add(-qs.config.RequestWaitLimit) for i, req := range reqs { - if waitLimit.After(req.RealEnqueueTime) { - qs.counter.Add(1) - req.DequeueChannel <- false - close(req.DequeueChannel) + if waitLimit.After(req.ArrivalTime) { + req.Decision.SetLocked(DecisionReject) // get index for timed out requests timeoutIdx = i } else { @@ -364,57 +421,9 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *fq.Queue) { } } -// getRequestsExecutingLocked gets the # of requests which are "executing": -// this is the# of requests/requests which have been dequeued but have not had -// finished (via the FinishRequest method invoked after service) -func (qs *queueSet) getRequestsExecutingLocked() int { - total := 0 - for _, queue := range qs.queues { - total += queue.RequestsExecuting - } - return total -} - -// chooseQueueIndexLocked uses shuffle sharding to select a queue index -// using the given hashValue and the shuffle sharding parameters of the queueSet. -func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64) int { - bestQueueIdx := -1 - bestQueueLen := int(math.MaxInt32) - // DesiredNum is used here instead of numQueues to omit quiescing queues - qs.dealer.Deal(hashValue, func(queueIdx int) { - thisLen := len(qs.queues[queueIdx].Requests) - if thisLen < bestQueueLen { - bestQueueIdx, bestQueueLen = queueIdx, thisLen - } - }) - return bestQueueIdx -} - -// updateQueueVirtualStartTimeLocked updates the virtual start time for a queue -// this is done when a new request is enqueued. For more info see: -// https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md#dispatching -func (qs *queueSet) updateQueueVirtualStartTimeLocked(request *fq.Request, queue *fq.Queue) { - // When a request arrives to an empty queue with no requests executing: - // len(queue.Requests) == 1 as enqueue has just happened prior (vs == 0) - if len(queue.Requests) == 1 && queue.RequestsExecuting == 0 { - // the queue’s virtual start time is set to the virtual time. - queue.VirtualStart = qs.virtualTime - } -} - -// enqueues a request into an queueSet -func (qs *queueSet) enqueueLocked(request *fq.Request) { - queue := request.Queue - queue.Enqueue(request) - qs.updateQueueVirtualStartTimeLocked(request, queue) - qs.numRequestsEnqueued++ - - metrics.UpdateFlowControlRequestsInQueue(qs.config.Name, qs.numRequestsEnqueued) -} - // rejectOrEnqueueLocked rejects or enqueues the newly arrived request if // resource criteria isn't met -func (qs *queueSet) rejectOrEnqueueLocked(request *fq.Request) bool { +func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool { queue := request.Queue curQueueLength := len(queue.Requests) // rejects the newly arrived request if resource criteria not met @@ -427,13 +436,81 @@ func (qs *queueSet) rejectOrEnqueueLocked(request *fq.Request) bool { return true } -// selectQueueLocked selects the minimum virtualFinish time from the set of queues +// enqueues a request into an queueSet +func (qs *queueSet) enqueueLocked(request *request) { + queue := request.Queue + if len(queue.Requests) == 0 && queue.RequestsExecuting == 0 { + // the queue’s virtual start time is set to the virtual time. + queue.VirtualStart = qs.virtualTime + if klog.V(6) { + klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), queue.VirtualStart, queue.Index, request.descr1, request.descr2) + } + } + queue.Enqueue(request) + qs.numRequestsEnqueued++ + metrics.UpdateFlowControlRequestsInQueue(qs.config.Name, qs.numRequestsEnqueued) +} + +// getRequestsExecutingLocked gets the # of requests which are "executing": +// this is the # of requests which have been dispatched but have not +// finished (via the finishRequestLocked method invoked after service) +func (qs *queueSet) getRequestsExecutingLocked() int { + total := 0 + for _, queue := range qs.queues { + total += queue.RequestsExecuting + } + return total +} + +// dispatchAsMuchAsPossibleLocked runs a loop, as long as there +// are non-empty queues and the number currently executing is less than the +// assured concurrency value. The body of the loop uses the fair queuing +// technique to pick a queue, dequeue the request at the head of that +// queue, increment the count of the number executing, and send true +// to the request's channel. +func (qs *queueSet) dispatchAsMuchAsPossibleLocked() { + for qs.numRequestsEnqueued != 0 && qs.getRequestsExecutingLocked() < qs.config.ConcurrencyLimit { + _, ok := qs.dispatchLocked() + if !ok { + break + } + } +} + +// dispatchLocked is a convenience method for dequeueing requests that +// require a message to be sent through the requests channel +// this is a required pattern for the QueueSet the queueSet supports +func (qs *queueSet) dispatchLocked() (*request, bool) { + queue := qs.selectQueueLocked() + if queue == nil { + return nil, false + } + request, ok := queue.Dequeue() + if !ok { + return nil, false + } + request.StartTime = qs.clock.Now() + // request dequeued, service has started + queue.RequestsExecuting++ + qs.numRequestsEnqueued-- + if klog.V(6) { + klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.config.Name, request.StartTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.Index, queue.VirtualStart, len(queue.Requests), queue.RequestsExecuting) + } + // When a request is dequeued for service -> qs.VirtualStart += G + queue.VirtualStart += qs.estimatedServiceTime + metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, queue.RequestsExecuting) + request.Decision.SetLocked(DecisionExecute) + return request, ok +} + +/// selectQueueLocked selects the minimum virtualFinish time from the set of queues // the starting queue is selected via roundrobin -func (qs *queueSet) selectQueueLocked() *fq.Queue { +func (qs *queueSet) selectQueueLocked() *queue { minVirtualFinish := math.Inf(1) - var minQueue *fq.Queue + var minQueue *queue var minIndex int for range qs.queues { + qs.robinIndex = (qs.robinIndex + 1) % len(qs.queues) queue := qs.queues[qs.robinIndex] if len(queue.Requests) != 0 { currentVirtualFinish := queue.GetVirtualFinish(0, qs.estimatedServiceTime) @@ -443,7 +520,6 @@ func (qs *queueSet) selectQueueLocked() *fq.Queue { minIndex = qs.robinIndex } } - qs.robinIndex = (qs.robinIndex + 1) % len(qs.queues) } // we set the round robin indexing to start at the chose queue // for the next round. This way the non-selected queues @@ -452,69 +528,22 @@ func (qs *queueSet) selectQueueLocked() *fq.Queue { return minQueue } -// dequeue dequeues a request from the queueSet -func (qs *queueSet) dequeueLocked() (*fq.Request, bool) { - queue := qs.selectQueueLocked() - if queue == nil { - return nil, false - } - request, ok := queue.Dequeue() - if !ok { - return nil, false - } - // When a request is dequeued for service -> qs.VirtualStart += G - queue.VirtualStart += qs.estimatedServiceTime - request.StartTime = qs.clock.Now() - // request dequeued, service has started - queue.RequestsExecuting++ - metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, queue.RequestsExecuting) - qs.numRequestsEnqueued-- - return request, ok +// finishRequestAndDispatchAsMuchAsPossible is a convenience method +// which calls finishRequest for a given request and then dispatches +// as many requests as possible. This is all of what needs to be done +// once a request finishes execution or is canceled. +func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) { + qs.lockAndSyncTime() + defer qs.lock.Unlock() + + qs.finishRequestLocked(req) + qs.dispatchAsMuchAsPossibleLocked() } -// dequeueWithChannelLockedAsMuchAsPossibleLocked runs a loop, as long as there -// are non-empty queues and the number currently executing is less than the -// assured concurrency value. The body of the loop uses the fair queuing -// technique to pick a queue, dequeue the request at the head of that -// queue, increment the count of the number executing, and send true -// to the request's channel. -func (qs *queueSet) dequeueWithChannelLockedAsMuchAsPossibleLocked() { - for qs.numRequestsEnqueued != 0 && qs.getRequestsExecutingLocked() < qs.config.ConcurrencyLimit { - _, ok := qs.dequeueWithChannelLocked() - if !ok { - break - } - } -} - -// dequeueWithChannelLocked is a convenience method for dequeueing requests that -// require a message to be sent through the requests channel -// this is a required pattern for the QueueSet the queueSet supports -func (qs *queueSet) dequeueWithChannelLocked() (*fq.Request, bool) { - req, ok := qs.dequeueLocked() - if !ok { - return nil, false - } - qs.counter.Add(1) - req.DequeueChannel <- true - close(req.DequeueChannel) - return req, ok -} - -// removeQueueAndUpdateIndexes uses reslicing to remove an index from a slice -// and then updates the 'Index' field of the queues to be correct -func removeQueueAndUpdateIndexes(queues []*fq.Queue, index int) []*fq.Queue { - keptQueues := append(queues[:index], queues[index+1:]...) - for i := index; i < len(keptQueues); i++ { - keptQueues[i].Index-- - } - return keptQueues -} - -// finishRequestLocked is a callback that should be used when a previously dequeued request -// has completed it's service. This callback updates important state in the -// queueSet -func (qs *queueSet) finishRequestLocked(r *fq.Request) { +// finishRequestLocked is a callback that should be used when a +// previously dispatched request has completed it's service. This +// callback updates important state in the queueSet +func (qs *queueSet) finishRequestLocked(r *request) { S := qs.clock.Since(r.StartTime).Seconds() // When a request finishes being served, and the actual service time was S, @@ -524,8 +553,12 @@ func (qs *queueSet) finishRequestLocked(r *fq.Request) { // request has finished, remove from requests executing r.Queue.RequestsExecuting-- + if klog.V(6) { + klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.Queue.Index, r.Queue.VirtualStart, S, len(r.Queue.Requests), r.Queue.RequestsExecuting) + } + // Logic to remove quiesced queues - // >= as QueueIdx=25 is out of bounds for DesiredNum=25 [0...24] + // >= as Index=25 is out of bounds for DesiredNum=25 [0...24] if r.Queue.Index >= qs.config.DesiredNumQueues && len(r.Queue.Requests) == 0 && r.Queue.RequestsExecuting == 0 { @@ -544,26 +577,39 @@ func (qs *queueSet) finishRequestLocked(r *fq.Request) { } } +// removeQueueAndUpdateIndexes uses reslicing to remove an index from a slice +// and then updates the 'Index' field of the queues to be correct +func removeQueueAndUpdateIndexes(queues []*queue, index int) []*queue { + keptQueues := append(queues[:index], queues[index+1:]...) + for i := index; i < len(keptQueues); i++ { + keptQueues[i].Index-- + } + return keptQueues +} + func (qs *queueSet) maybeForkEmptyHandlerLocked() { if qs.emptyHandler != nil && qs.numRequestsEnqueued == 0 && qs.getRequestsExecutingLocked() == 0 { - qs.counter.Add(1) + qs.preCreateOrUnblockGoroutine() go func(eh fq.EmptyHandler) { defer runtime.HandleCrash() - defer qs.counter.Add(-1) + defer qs.goroutineDoneOrBlocked() eh.HandleEmpty() }(qs.emptyHandler) } } -// finishRequestAndDequeueWithChannelAsMuchAsPossible is a convenience method which calls finishRequest -// for a given request and then dequeues as many requests as possible -// and updates that request's channel signifying it is is dequeued -// this is a callback used for the filter that the queueSet supports -func (qs *queueSet) finishRequestAndDequeueWithChannelAsMuchAsPossible(req *fq.Request) { - qs.lockAndSyncTime() - defer qs.lock.Unlock() - - qs.finishRequestLocked(req) - qs.dequeueWithChannelLockedAsMuchAsPossibleLocked() +// preCreateOrUnblockGoroutine needs to be called before creating a +// goroutine associated with this queueSet or unblocking a blocked +// one, to properly update the accounting used in testing. +func (qs *queueSet) preCreateOrUnblockGoroutine() { + qs.counter.Add(1) +} + +// goroutineDoneOrBlocked needs to be called at the end of every +// goroutine associated with this queueSet or when such a goroutine is +// about to wait on some other goroutine to do something; this is to +// properly update the accounting used in testing. +func (qs *queueSet) goroutineDoneOrBlocked() { + qs.counter.Add(-1) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 2b9e30cb963..17cd5fa1b0a 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -27,6 +27,7 @@ import ( fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" test "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing" "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock" + "k8s.io/klog" ) type uniformScenario []uniformClient @@ -41,21 +42,23 @@ type uniformClient struct { thinkDuration time.Duration } +const nsTimeFmt = "2006-01-02 15:04:05.000000000" + // exerciseQueueSetUniformScenario runs a scenario based on the given set of uniform clients. // Each uniform client specifies a number of threads, each of which alternates between thinking // and making a synchronous request through the QueueSet. // This function measures how much concurrency each client got, on average, over -// the intial totalDuration and tests to see whether they all got about the same amount. +// the initial evalDuration and tests to see whether they all got about the same amount. // Each client needs to be demanding enough to use this amount, otherwise the fair result // is not equal amounts and the simple test in this function would not accurately test fairness. // expectPass indicates whether the QueueSet is expected to be fair. // expectedAllRequests indicates whether all requests are expected to get dispatched. -func exerciseQueueSetUniformScenario(t *testing.T, qs fq.QueueSet, sc uniformScenario, - totalDuration time.Duration, expectPass bool, expectedAllRequests bool, +func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet, sc uniformScenario, + evalDuration time.Duration, expectPass bool, expectedAllRequests bool, clk *clock.FakeEventClock, counter counter.GoRoutineCounter) { now := time.Now() - t.Logf("%s: Start", clk.Now().Format("2006-01-02 15:04:05.000000000")) + t.Logf("%s: Start %s, clk=%p, grc=%p", clk.Now().Format(nsTimeFmt), name, clk, counter) integrators := make([]test.Integrator, len(sc)) var failedCount uint64 for i, uc := range sc { @@ -66,8 +69,8 @@ func exerciseQueueSetUniformScenario(t *testing.T, qs fq.QueueSet, sc uniformSce for k := 0; k < uc.nCalls; k++ { ClockWait(clk, counter, uc.thinkDuration) for { - tryAnother, execute, afterExecute := qs.Wait(context.Background(), uc.hash) - t.Logf("%s: %d, %d, %d got q=%v, e=%v", clk.Now().Format("2006-01-02 15:04:05.000000000"), i, j, k, tryAnother, execute) + tryAnother, execute, afterExecute := qs.Wait(context.Background(), uc.hash, name, []int{i, j, k}) + t.Logf("%s: %d, %d, %d got a=%v, e=%v", clk.Now().Format(nsTimeFmt), i, j, k, tryAnother, execute) if tryAnother { continue } @@ -86,10 +89,10 @@ func exerciseQueueSetUniformScenario(t *testing.T, qs fq.QueueSet, sc uniformSce }(i, j, uc, integrators[i]) } } - lim := now.Add(totalDuration) + lim := now.Add(evalDuration) clk.Run(&lim) clk.SetTime(lim) - t.Logf("%s: End", clk.Now().Format("2006-01-02 15:04:05.000000000")) + t.Logf("%s: End", clk.Now().Format(nsTimeFmt)) results := make([]test.IntegratorResults, len(sc)) var sumOfAvg float64 for i := range sc { @@ -132,6 +135,10 @@ func ClockWait(clk *clock.FakeEventClock, counter counter.GoRoutineCounter, dura } } +func init() { + klog.InitFlags(nil) +} + // TestNoRestraint should fail because the dummy QueueSet exercises no control func TestNoRestraint(t *testing.T) { now := time.Now() @@ -142,7 +149,7 @@ func TestNoRestraint(t *testing.T) { if err != nil { t.Fatalf("QueueSet creation failed with %v", err) } - exerciseQueueSetUniformScenario(t, nr, []uniformClient{ + exerciseQueueSetUniformScenario(t, "NoRestraint", nr, []uniformClient{ {1001001001, 5, 10, time.Second, time.Second}, {2002002002, 2, 10, time.Second, time.Second / 2}, }, time.Second*10, false, true, clk, counter) @@ -155,10 +162,10 @@ func TestUniformFlows(t *testing.T) { qsf := NewQueueSetFactory(clk, counter) config := fq.QueueSetConfig{ Name: "TestUniformFlows", - ConcurrencyLimit: 100, - DesiredNumQueues: 128, - QueueLengthLimit: 128, - HandSize: 1, + ConcurrencyLimit: 4, + DesiredNumQueues: 8, + QueueLengthLimit: 6, + HandSize: 3, RequestWaitLimit: 10 * time.Minute, } qs, err := qsf.NewQueueSet(config) @@ -166,10 +173,10 @@ func TestUniformFlows(t *testing.T) { t.Fatalf("QueueSet creation failed with %v", err) } - exerciseQueueSetUniformScenario(t, qs, []uniformClient{ + exerciseQueueSetUniformScenario(t, "UniformFlows", qs, []uniformClient{ {1001001001, 5, 10, time.Second, time.Second}, {2002002002, 5, 10, time.Second, time.Second}, - }, time.Second*10, true, true, clk, counter) + }, time.Second*20, true, true, clk, counter) } func TestDifferentFlows(t *testing.T) { @@ -179,10 +186,10 @@ func TestDifferentFlows(t *testing.T) { qsf := NewQueueSetFactory(clk, counter) config := fq.QueueSetConfig{ Name: "TestDifferentFlows", - ConcurrencyLimit: 1, - DesiredNumQueues: 128, - QueueLengthLimit: 128, - HandSize: 1, + ConcurrencyLimit: 4, + DesiredNumQueues: 8, + QueueLengthLimit: 6, + HandSize: 3, RequestWaitLimit: 10 * time.Minute, } qs, err := qsf.NewQueueSet(config) @@ -190,10 +197,10 @@ func TestDifferentFlows(t *testing.T) { t.Fatalf("QueueSet creation failed with %v", err) } - exerciseQueueSetUniformScenario(t, qs, []uniformClient{ - {1001001001, 5, 10, time.Second, time.Second}, - {2002002002, 2, 5, time.Second, time.Second / 2}, - }, time.Second*10, true, true, clk, counter) + exerciseQueueSetUniformScenario(t, "DifferentFlows", qs, []uniformClient{ + {1001001001, 6, 10, time.Second, time.Second}, + {2002002002, 4, 15, time.Second, time.Second / 2}, + }, time.Second*20, true, true, clk, counter) } func TestTimeout(t *testing.T) { @@ -214,7 +221,7 @@ func TestTimeout(t *testing.T) { t.Fatalf("QueueSet creation failed with %v", err) } - exerciseQueueSetUniformScenario(t, qs, []uniformClient{ + exerciseQueueSetUniformScenario(t, "Timeout", qs, []uniformClient{ {1001001001, 5, 100, time.Second, time.Second}, }, time.Second*10, true, false, clk, counter) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go similarity index 56% rename from staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/types.go rename to staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go index 5ed89ad8969..581954985cf 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/types.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -14,56 +14,70 @@ See the License for the specific language governing permissions and limitations under the License. */ -package fairqueuing +package queueset import ( "time" + + "k8s.io/apiserver/pkg/util/promise" ) -// Request is a temporary container for "requests" with additional tracking fields +// request is a temporary container for "requests" with additional tracking fields // required for the functionality FQScheduler -type Request struct { - //TODO(aaron-prindle) seq is only used for testing, this was abstracted - // via an interface before, keeping this for now - QueueIdx int +type request struct { + Queue *queue - Queue *Queue - StartTime time.Time - DequeueChannel chan bool - RealEnqueueTime time.Time - Enqueued bool + // StartTime is the clock time when the request began executing + StartTime time.Time + + // Decision gets set to the decision about what to do with this request + Decision promise.LockingMutable + + // ArrivalTime is when the request entered this system + ArrivalTime time.Time + + // IsWaiting indicates whether the request is presently waiting in a queue + IsWaiting bool + + // descr1 and descr2 are not used in any logic but they appear in + // log messages + descr1, descr2 interface{} } -// Queue is an array of requests with additional metadata required for +// queue is an array of requests with additional metadata required for // the FQScheduler -type Queue struct { - Requests []*Request - VirtualStart float64 +type queue struct { + Requests []*request + + // VirtualStart is the virtual time when the oldest request in the + // queue (if there is any) started virtually executing + VirtualStart float64 + RequestsExecuting int Index int } // Enqueue enqueues a request into the queue -func (q *Queue) Enqueue(request *Request) { - request.Enqueued = true +func (q *queue) Enqueue(request *request) { + request.IsWaiting = true q.Requests = append(q.Requests, request) } // Dequeue dequeues a request from the queue -func (q *Queue) Dequeue() (*Request, bool) { +func (q *queue) Dequeue() (*request, bool) { if len(q.Requests) == 0 { return nil, false } request := q.Requests[0] q.Requests = q.Requests[1:] - request.Enqueued = false + request.IsWaiting = false return request, true } // GetVirtualFinish returns the expected virtual finish time of the request at // index J in the queue with estimated finish time G -func (q *Queue) GetVirtualFinish(J int, G float64) float64 { +func (q *queue) GetVirtualFinish(J int, G float64) float64 { // The virtual finish time of request number J in the queue // (counting from J=1 for the head) is J * G + (virtual start time). diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock/BUILD b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock/BUILD index a857a8ebf64..438a9d8b558 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock/BUILD @@ -9,6 +9,7 @@ go_library( deps = [ "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library", + "//vendor/k8s.io/klog:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock/event_clock.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock/event_clock.go index 089a2df55b8..683a622000b 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock/event_clock.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock/event_clock.go @@ -19,11 +19,14 @@ package clock import ( "container/heap" "math/rand" + "runtime" + "strings" "sync" "time" "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apiserver/pkg/util/flowcontrol/counter" + "k8s.io/klog" ) // EventFunc does some work that needs to be done at or after the @@ -68,12 +71,38 @@ func (r RealEventClock) EventAfterTime(f EventFunc, t time.Time) { // waitGroupCounter is a wait group used for a GoRoutine Counter. This private // type is used to disallow direct waitGroup access -type waitGroupCounter struct{ sync.WaitGroup } +type waitGroupCounter struct { + wg sync.WaitGroup +} // compile time assertion that waitGroupCounter meets requirements // of GoRoutineCounter var _ counter.GoRoutineCounter = (*waitGroupCounter)(nil) +func (wgc *waitGroupCounter) Add(delta int) { + if klog.V(7) { + var pcs [5]uintptr + nCallers := runtime.Callers(2, pcs[:]) + frames := runtime.CallersFrames(pcs[:nCallers]) + frame1, more1 := frames.Next() + fileParts1 := strings.Split(frame1.File, "/") + tail2 := "(none)" + line2 := 0 + if more1 { + frame2, _ := frames.Next() + fileParts2 := strings.Split(frame2.File, "/") + tail2 = fileParts2[len(fileParts2)-1] + line2 = frame2.Line + } + klog.Infof("GRC(%p).Add(%d) from %s:%d from %s:%d", wgc, delta, fileParts1[len(fileParts1)-1], frame1.Line, tail2, line2) + } + wgc.wg.Add(delta) +} + +func (wgc *waitGroupCounter) Wait() { + wgc.wg.Wait() +} + // FakeEventClock is one whose time does not pass implicitly but // rather is explicitly set by invocations of its SetTime method type FakeEventClock struct { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go index f8aab295b76..5ac48be94d8 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go @@ -44,6 +44,6 @@ func (noRestraint) SetConfiguration(config fq.QueueSetConfig) error { func (noRestraint) Quiesce(fq.EmptyHandler) { } -func (noRestraint) Wait(ctx context.Context, hashValue uint64) (quiescent, execute bool, afterExecution func()) { +func (noRestraint) Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (quiescent, execute bool, afterExecution func()) { return false, true, func() {} } diff --git a/staging/src/k8s.io/apiserver/pkg/util/promise/interface.go b/staging/src/k8s.io/apiserver/pkg/util/promise/interface.go new file mode 100644 index 00000000000..258469043b4 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/promise/interface.go @@ -0,0 +1,42 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package promise + +// Mutable is a variable that is initially not set and can be set one +// or more times (unlike a traditional "promise", which can be written +// only once). +type Mutable interface { + + // Set writes a value into this variable and unblocks every + // goroutine waiting for this variable to have a value + Set(interface{}) + + // Get reads the value of this variable. If this variable is + // not set yet then this call blocks until this variable gets a value. + Get() interface{} +} + +// LockingMutable is a Mutable whose implementation is protected by a lock +type LockingMutable interface { + Mutable + + // SetLocked is like Set but the caller must already hold the lock + SetLocked(interface{}) + + // GetLocked is like Get but the caller must already hold the lock + GetLocked() interface{} +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise/lockingpromise.go b/staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise/lockingpromise.go new file mode 100644 index 00000000000..c89e0101ec3 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise/lockingpromise.go @@ -0,0 +1,78 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lockingpromise + +import ( + "sync" + + "k8s.io/apiserver/pkg/util/flowcontrol/counter" + "k8s.io/apiserver/pkg/util/promise" +) + +// lockingPromise implements LockingMutable based on a condition +// variable. This implementation tracks active goroutines: the given +// counter is decremented for a goroutine waiting for this varible to +// be set and incremented when such a goroutine is unblocked. +type lockingPromise struct { + lock sync.Locker + cond sync.Cond + activeCounter counter.GoRoutineCounter // counter of active goroutines + waitingCount int // number of goroutines idle due to this mutable being unset + isSet bool + value interface{} +} + +var _ promise.LockingMutable = &lockingPromise{} + +func NewLockingPromise(lock sync.Locker, activeCounter counter.GoRoutineCounter) promise.LockingMutable { + return &lockingPromise{ + lock: lock, + cond: *sync.NewCond(lock), + activeCounter: activeCounter, + } +} + +func (lp *lockingPromise) Set(value interface{}) { + lp.lock.Lock() + defer lp.lock.Unlock() + lp.SetLocked(value) +} + +func (lp *lockingPromise) Get() interface{} { + lp.lock.Lock() + defer lp.lock.Unlock() + return lp.GetLocked() +} + +func (lp *lockingPromise) SetLocked(value interface{}) { + lp.isSet = true + lp.value = value + if lp.waitingCount > 0 { + lp.activeCounter.Add(lp.waitingCount) + lp.waitingCount = 0 + lp.cond.Broadcast() + } +} + +func (lp *lockingPromise) GetLocked() interface{} { + if !lp.isSet { + lp.waitingCount++ + lp.activeCounter.Add(-1) + lp.cond.Wait() + } + return lp.value +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise/lockingpromise_test.go b/staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise/lockingpromise_test.go new file mode 100644 index 00000000000..766f8ffed9c --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise/lockingpromise_test.go @@ -0,0 +1,70 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lockingpromise + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock" +) + +func TestLockingPromise(t *testing.T) { + now := time.Now() + clock, counter := clock.NewFakeEventClock(now, 0, nil) + var lock sync.Mutex + lp := NewLockingPromise(&lock, counter) + var gots int32 + var got atomic.Value + counter.Add(1) + go func() { + got.Store(lp.Get()) + atomic.AddInt32(&gots, 1) + counter.Add(-1) + }() + clock.Run(nil) + time.Sleep(time.Second) + if atomic.LoadInt32(&gots) != 0 { + t.Error("Get returned before Set") + } + var aval = &now + lp.Set(aval) + clock.Run(nil) + time.Sleep(time.Second) + if atomic.LoadInt32(&gots) != 1 { + t.Error("Get did not return after Set") + } + if got.Load() != aval { + t.Error("Get did not return what was Set") + } + counter.Add(1) + go func() { + got.Store(lp.Get()) + atomic.AddInt32(&gots, 1) + counter.Add(-1) + }() + clock.Run(nil) + time.Sleep(time.Second) + if atomic.LoadInt32(&gots) != 2 { + t.Error("Second Get did not return immediately") + } + if got.Load() != aval { + t.Error("Second Get did not return what was Set") + } +} diff --git a/staging/src/k8s.io/component-base/metrics/BUILD b/staging/src/k8s.io/component-base/metrics/BUILD index 0baa5f3ecf2..0d6a7701ae8 100644 --- a/staging/src/k8s.io/component-base/metrics/BUILD +++ b/staging/src/k8s.io/component-base/metrics/BUILD @@ -100,6 +100,7 @@ package_group( "//pkg/scheduler/framework/v1alpha1", "//pkg/volume/util/operationexecutor", "//staging/src/k8s.io/apiserver/pkg/admission/metrics", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics", "//staging/src/k8s.io/component-base/metrics/...", "//test/e2e", "//test/e2e/storage",