Brushing up queueset

(1) Replaced random-looking assortment of counter increments and
decrements with something hopefully more principalled-looking.  Most
importantly, introduced the MutablePromise abstraction to neatly wrap
up the complicated business of unioning multiple sources of
unblocking.

(2) Improved debug logging.

(3) Somewhat more interesting test cases, and a bug fix wrt round
robin index.
This commit is contained in:
Mike Spreitzer 2019-11-13 01:52:05 -05:00 committed by MikeSpreitzer
parent 6619df1798
commit 1c31b2bdc6
16 changed files with 741 additions and 326 deletions

View File

@ -20742,7 +20742,7 @@
},
"info": {
"title": "Kubernetes",
"version": "v1.17.0"
"version": "v1.18.0"
},
"paths": {
"/api/": {

View File

@ -47,6 +47,7 @@ filegroup(
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/flushwriter:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/openapi:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/promise:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/proxy:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/shufflesharding:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/term:all-srcs",

View File

@ -2,10 +2,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"interface.go",
"types.go",
],
srcs = ["interface.go"],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing",
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing",
visibility = ["//visibility:public"],

View File

@ -49,17 +49,19 @@ type QueueSet interface {
// triggering state and the required call).
Quiesce(EmptyHandler)
// Wait uses the given hashValue as the source of entropy
// as it shuffle-shards a request into a queue and waits for
// a decision on what to do with that request. If tryAnother==true
// at return then the QueueSet has become undesirable and the client
// should try to find a different QueueSet to use; execute and
// afterExecution are irrelevant in this case. Otherwise, if execute
// then the client should start executing the request and, once the
// request finishes execution or is canceled, call afterExecution().
// Otherwise the client should not execute the
// request and afterExecution is irrelevant.
Wait(ctx context.Context, hashValue uint64) (tryAnother, execute bool, afterExecution func())
// Wait uses the given hashValue as the source of entropy as it
// shuffle-shards a request into a queue and waits for a decision
// on what to do with that request. The descr1 and descr2 values
// play no role in the logic but appear in log messages. If
// tryAnother==true at return then the QueueSet has become
// undesirable and the client should try to find a different
// QueueSet to use; execute and afterExecution are irrelevant in
// this case. Otherwise, if execute then the client should start
// executing the request and, once the request finishes execution
// or is canceled, call afterExecution(). Otherwise the client
// should not execute the request and afterExecution is
// irrelevant.
Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func())
}
// QueueSetConfig defines the configuration of a QueueSet.

View File

@ -2,7 +2,11 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["queueset.go"],
srcs = [
"doc.go",
"queueset.go",
"types.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset",
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset",
visibility = ["//visibility:public"],
@ -12,6 +16,8 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/promise:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/shufflesharding:go_default_library",
"//vendor/github.com/pkg/errors:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
@ -27,6 +33,7 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

View File

@ -0,0 +1,120 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queueset
// This package implements a technique called "fair queuing for server
// requests". One QueueSet is a set of queues operating according to
// this technique.
// Fair queuing for server requests is inspired by the fair queuing
// technique from the world of networking. You can find a good paper
// on that at https://dl.acm.org/citation.cfm?doid=75247.75248 or
// http://people.csail.mit.edu/imcgraw/links/research/pubs/networks/WFQ.pdf
// and there is an implementation outline in the Wikipedia article at
// https://en.wikipedia.org/wiki/Fair_queuing .
// Fair queuing for server requests differs from traditional fair
// queuing in three ways: (1) we are dispatching requests to be
// executed within a process rather than transmitting packets on a
// network link, (2) multiple requests can be executing at once, and
// (3) the service time (execution duration) is not known until the
// execution completes.
// The first two differences can easily be handled by straightforward
// adaptation of the concept called "R(t)" in the original paper and
// "virtual time" in the implementation outline. In that
// implementation outline, the notation now() is used to mean reading
// the virtual clock. In the original papers terms, "R(t)" is the
// number of "rounds" that have been completed at real time t, where a
// round consists of virtually transmitting one bit from every
// non-empty queue in the router (regardless of which queue holds the
// packet that is really being transmitted at the moment); in this
// conception, a packet is considered to be "in" its queue until the
// packets transmission is finished. For our problem, we can define a
// round to be giving one nanosecond of CPU to every non-empty queue
// in the apiserver (where emptiness is judged based on both queued
// and executing requests from that queue), and define R(t) = (server
// start time) + (1 ns) * (number of rounds since server start). Let
// us write NEQ(t) for that number of non-empty queues in the
// apiserver at time t. Let us also write C for the concurrency
// limit. In the original paper, the partial derivative of R(t) with
// respect to t is
//
// 1 / NEQ(t) .
// To generalize from transmitting one packet at a time to executing C
// requests at a time, that derivative becomes
//
// C / NEQ(t) .
// However, sometimes there are fewer than C requests available to
// execute. For a given queue "q", let us also write "reqs(q, t)" for
// the number of requests of that queue that are executing at that
// time. The total number of requests executing is sum[over q]
// reqs(q, t) and if that is less than C then virtual time is not
// advancing as fast as it would if all C seats were occupied; in this
// case the numerator of the quotient in that derivative should be
// adjusted proportionally. Putting it all together for fair queing
// for server requests: at a particular time t, the partial derivative
// of R(t) with respect to t is
//
// min( C, sum[over q] reqs(q, t) ) / NEQ(t) .
//
// In terms of the implementation outline, this is the rate at which
// virtual time is advancing at time t (in virtual nanoseconds per
// real nanosecond). Where the networking implementation outline adds
// packet size to a virtual time, in our version this corresponds to
// adding a service time (i.e., duration) to virtual time.
// The third difference is handled by modifying the algorithm to
// dispatch based on an initial guess at the requests service time
// (duration) and then make the corresponding adjustments once the
// requests actual service time is known. This is similar, although
// not exactly isomorphic, to the original papers adjustment by
// `$delta` for the sake of promptness.
// For implementation simplicity (see below), let us use the same
// initial service time guess for every request; call that duration
// G. A good choice might be the service time limit (1
// minute). Different guesses will give slightly different dynamics,
// but any positive number can be used for G without ruining the
// long-term behavior.
// As in ordinary fair queuing, there is a bound on divergence from
// the ideal. In plain fair queuing the bound is one packet; in our
// version it is C requests.
// To support efficiently making the necessary adjustments once a
// requests actual service time is known, the virtual finish time of
// a request and the last virtual finish time of a queue are not
// represented directly but instead computed from queue length,
// request position in the queue, and an alternate state variable that
// holds the queues virtual start time. While the queue is empty and
// has no requests executing: the value of its virtual start time
// variable is ignored and its last virtual finish time is considered
// to be in the virtual past. When a request arrives to an empty queue
// with no requests executing, the queues virtual start time is set
// to the current virtual time. The virtual finish time of request
// number J in the queue (counting from J=1 for the head) is J * G +
// (queue's virtual start time). While the queue is non-empty: the
// last virtual finish time of the queue is the virtual finish time of
// the last request in the queue. While the queue is empty and has a
// request executing: the last virtual finish time is the queues
// virtual start time. When a request is dequeued for service the
// queues virtual start time is advanced by G. When a request
// finishes being served, and the actual service time was S, the
// queues virtual start time is decremented by G - S.

View File

@ -29,6 +29,7 @@ import (
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
"k8s.io/apiserver/pkg/util/promise/lockingpromise"
"k8s.io/apiserver/pkg/util/shufflesharding"
"k8s.io/klog"
)
@ -48,63 +49,66 @@ func NewQueueSetFactory(c clock.PassiveClock, counter counter.GoRoutineCounter)
}
}
// queueSet implements the Fair Queuing for Server Requests technique
// described in this package's doc, and a pointer to one implements
// the QueueSet interface. The clock, GoRoutineCounter, and estimated
// service time should not be changed; the fields listed after the
// lock must be accessed only while holding the lock.
type queueSet struct {
clock clock.PassiveClock
counter counter.GoRoutineCounter
estimatedServiceTime float64
lock sync.Mutex
config fq.QueueSetConfig
// queues may be longer than the desired number, while the excess
// queues are still draining.
queues []*queue
virtualTime float64
lastRealTime time.Time
// robinIndex is the index of the last queue dispatched
robinIndex int
// numRequestsEnqueued is the number of requests currently waiting
// in a queue (eg: incremeneted on Enqueue, decremented on Dequue)
numRequestsEnqueued int
emptyHandler fq.EmptyHandler
dealer *shufflesharding.Dealer
}
// NewQueueSet creates a new QueueSet object
// There is a new QueueSet created for each priority level.
func (qsf queueSetFactory) NewQueueSet(config fq.QueueSetConfig) (fq.QueueSet, error) {
return newQueueSet(config, qsf.clock, qsf.counter)
}
// queueSet is a fair queuing implementation designed with three major differences:
// 1) dispatches requests to be served rather than requests to be transmitted
// 2) serves multiple requests at once
// 3) a request's service time is not known until it finishes
// implementation of:
// https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md
type queueSet struct {
lock sync.Mutex
config fq.QueueSetConfig
counter counter.GoRoutineCounter
clock clock.PassiveClock
queues []*fq.Queue
virtualTime float64
estimatedServiceTime float64
lastRealTime time.Time
robinIndex int
// numRequestsEnqueued is the number of requests currently enqueued
// (eg: incremeneted on Enqueue, decremented on Dequue)
numRequestsEnqueued int
emptyHandler fq.EmptyHandler
dealer *shufflesharding.Dealer
}
// initQueues is a helper method for initializing an array of n queues
func initQueues(n, baseIndex int) []*fq.Queue {
fqqueues := make([]*fq.Queue, n)
for i := 0; i < n; i++ {
fqqueues[i] = &fq.Queue{Index: baseIndex + i, Requests: make([]*fq.Request, 0)}
}
return fqqueues
}
// newQueueSet creates a new queueSet from passed in parameters
func newQueueSet(config fq.QueueSetConfig, c clock.PassiveClock, counter counter.GoRoutineCounter) (*queueSet, error) {
dealer, err := shufflesharding.NewDealer(config.DesiredNumQueues, config.HandSize)
if err != nil {
return nil, errors.Wrap(err, "shuffle sharding dealer creation failed")
}
fq := &queueSet{
config: config,
counter: counter,
queues: initQueues(config.DesiredNumQueues, 0),
clock: c,
virtualTime: 0,
lastRealTime: c.Now(),
dealer: dealer,
config: config,
counter: qsf.counter,
queues: createQueues(config.DesiredNumQueues, 0),
clock: qsf.clock,
virtualTime: 0,
estimatedServiceTime: 60,
lastRealTime: qsf.clock.Now(),
dealer: dealer,
}
return fq, nil
}
// createQueues is a helper method for initializing an array of n queues
func createQueues(n, baseIndex int) []*queue {
fqqueues := make([]*queue, n)
for i := 0; i < n; i++ {
fqqueues[i] = &queue{Index: baseIndex + i, Requests: make([]*request, 0)}
}
return fqqueues
}
// SetConfiguration is used to set the configuration for a queueSet
// update handling for when fields are updated is handled here as well -
// eg: if DesiredNum is increased, SetConfiguration reconciles by
@ -124,13 +128,13 @@ func (qs *queueSet) SetConfiguration(config fq.QueueSetConfig) error {
numQueues := len(qs.queues)
if config.DesiredNumQueues > numQueues {
qs.queues = append(qs.queues,
initQueues(config.DesiredNumQueues-numQueues, len(qs.queues))...)
createQueues(config.DesiredNumQueues-numQueues, len(qs.queues))...)
}
qs.config = config
qs.dealer = dealer
qs.dequeueWithChannelLockedAsMuchAsPossibleLocked()
qs.dispatchAsMuchAsPossibleLocked()
return nil
}
@ -147,38 +151,47 @@ func (qs *queueSet) SetConfiguration(config fq.QueueSetConfig) error {
func (qs *queueSet) Quiesce(eh fq.EmptyHandler) {
qs.lock.Lock()
defer qs.lock.Unlock()
qs.emptyHandler = eh
if eh == nil {
qs.emptyHandler = eh
return
}
// Here we check whether there are any requests queued or executing and
// if not then fork an invocation of the EmptyHandler.
qs.maybeForkEmptyHandlerLocked()
qs.emptyHandler = eh
}
// Wait uses the given hashValue as the source of entropy
// as it shuffle-shards a request into a queue and waits for
// a decision on what to do with that request. If tryAnother==true
// at return then the QueueSet has become undesirable and the client
// should try to find a different QueueSet to use; execute and
// afterExecution are irrelevant in this case. Otherwise, if execute
// then the client should start executing the request and, once the
// request finishes execution or is canceled, call afterExecution().
// Otherwise the client should not execute the
// request and afterExecution is irrelevant.
func (qs *queueSet) Wait(ctx context.Context, hashValue uint64) (tryAnother, execute bool, afterExecution func()) {
var req *fq.Request
shouldReturn, tryAnother, execute, afterExecution := func() (
shouldReturn, tryAnother, execute bool, afterExecution func()) {
// Values passed through a request's Decision
const (
DecisionExecute = "execute"
DecisionReject = "reject"
DecisionCancel = "cancel"
DecisionTryAnother = "tryAnother"
)
// Wait uses the given hashValue as the source of entropy as it
// shuffle-shards a request into a queue and waits for a decision on
// what to do with that request. The descr1 and descr2 values play no
// role in the logic but appear in log messages; we use two because
// the main client characterizes a request by two items that, if
// bundled together in a larger data structure, would lose interesting
// details when formatted. If tryAnother==true at return then the
// QueueSet has become undesirable and the client should try to find a
// different QueueSet to use; execute and afterExecution are
// irrelevant in this case. Otherwise, if execute then the client
// should start executing the request and, once the request finishes
// execution or is canceled, call afterExecution(). Otherwise the
// client should not execute the request and afterExecution is
// irrelevant.
func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func()) {
var req *request
decision := func() string {
qs.lockAndSyncTime()
defer qs.lock.Unlock()
// A call to Wait while the system is quiescing will be rebuffed by
// returning `tryAnother=true`.
if qs.emptyHandler != nil {
return true, true, false, nil
klog.V(5).Infof("QS(%s): rebuffing request %#+v %#+v with TryAnother", qs.config.Name, descr1, descr2)
return DecisionTryAnother
}
// ========================================================================
@ -188,58 +201,70 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64) (tryAnother, exe
// 3) Reject current request if there is not enough concurrency shares and
// we are at max queue length
// 4) If not rejected, create a request and enqueue
req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue)
req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue, descr1, descr2)
// req == nil means that the request was rejected - no remaining
// concurrency shares and at max queue length already
if req == nil {
klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v due to queue full", qs.config.Name, descr1, descr2)
metrics.AddReject(qs.config.Name, "queue-full")
return true, false, false, func() {}
return DecisionReject
}
// ========================================================================
// Step 2:
// 1) The next step is to invoke the method that dequeues as much as possible.
// The next step is to invoke the method that dequeues as much
// as possible.
// This method runs a loop, as long as there are non-empty
// queues and the number currently executing is less than the
// assured concurrency value. The body of the loop uses the
// fair queuing technique to pick a queue and dispatch a
// request from that queue.
qs.dispatchAsMuchAsPossibleLocked()
// This method runs a loop, as long as there
// are non-empty queues and the number currently executing is less than the
// assured concurrency value. The body of the loop uses the fair queuing
// technique to pick a queue, dequeue the request at the head of that
// queue, increment the count of the number executing, and send true to
// the request's channel.
qs.dequeueWithChannelLockedAsMuchAsPossibleLocked()
return false, false, false, func() {}
}()
if shouldReturn {
return tryAnother, execute, afterExecution
}
// ========================================================================
// Step 3:
// ========================================================================
// Step 3:
// After that method finishes its loop and returns, the final step in Wait
// is to `select` (wait) on a message from the enqueud request's channel
// and return appropriately. While waiting this thread does no additional
// work so we decrement the go routine counter
qs.counter.Add(-1)
select {
case execute := <-req.DequeueChannel:
if execute {
// execute the request
return false, true, func() {
qs.finishRequestAndDequeueWithChannelAsMuchAsPossible(req)
}
// Set up a relay from the context's Done channel to the world
// of well-counted goroutines. We Are Told that every
// request's context's Done channel gets closed by the time
// the request is done being processed.
doneCh := ctx.Done()
if doneCh != nil {
qs.preCreateOrUnblockGoroutine()
go func() {
defer runtime.HandleCrash()
qs.goroutineDoneOrBlocked()
select {
case <-doneCh:
klog.V(6).Infof("QS(%s): Context of request %#+v %#+v is Done", qs.config.Name, descr1, descr2)
req.Decision.Set(DecisionCancel)
}
qs.goroutineDoneOrBlocked()
}()
}
klog.V(5).Infof("request timed out after being enqueued\n")
metrics.AddReject(qs.config.Name, "time-out")
return false, false, func() {}
case <-ctx.Done():
klog.V(5).Infof("request cancelled\n")
func() {
qs.lockAndSyncTime()
defer qs.lock.Unlock()
// ========================================================================
// Step 4:
// The final step in Wait is to wait on a decision from
// somewhere and then act on it.
decisionAny := req.Decision.GetLocked()
var decisionStr string
switch d := decisionAny.(type) {
case string:
decisionStr = d
default:
klog.Errorf("QS(%s): Impossible decision %#+v (of type %T) for request %#+v %#+v", qs.config.Name, decisionAny, decisionAny, descr1, descr2)
decisionStr = DecisionExecute
}
switch decisionStr {
case DecisionReject:
klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.config.Name, descr1, descr2)
metrics.AddReject(qs.config.Name, "time-out")
case DecisionCancel:
qs.syncTimeLocked()
// TODO(aaron-prindle) add metrics to these two cases
if req.Enqueued {
if req.IsWaiting {
klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.config.Name, descr1, descr2)
// remove the request from the queue as it has timed out
for i := range req.Queue.Requests {
if req == req.Queue.Requests[i] {
@ -254,47 +279,62 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64) (tryAnother, exe
// then a call to the EmptyHandler should be forked.
qs.maybeForkEmptyHandlerLocked()
} else {
// At this point we know that req was in its queue earlier and another
// goroutine has removed req from its queue and called qs.counter.Add(1)
// in anticipation of unblocking this goroutine through the other arm of this
// select. In this case we need to decrement the counter because this goroutine
// was actually unblocked through a different code path.
qs.counter.Add(-1)
klog.V(5).Infof("QS(%s): request %#+v %#+v canceled shortly after dispatch", qs.config.Name, descr1, descr2)
}
}()
}
return decisionStr
}()
switch decision {
case DecisionTryAnother:
return true, false, func() {}
case DecisionReject:
return false, false, func() {}
case DecisionCancel:
return false, false, func() {}
default:
if decision != DecisionExecute {
klog.Errorf("Impossible decision %q", decision)
}
return false, true, func() {
qs.finishRequestAndDispatchAsMuchAsPossible(req)
}
}
}
// syncTimeLocked is used to sync the time of the queueSet by looking at the elapsed
// time since the last sync and this value based on the 'virtualtime ratio'
// which scales inversely to the # of active flows
// lockAndSyncTime acquires the lock and updates the virtual time.
// Doing them together avoids the mistake of modify some queue state
// before calling syncTimeLocked.
func (qs *queueSet) lockAndSyncTime() {
qs.lock.Lock()
qs.syncTimeLocked()
}
// syncTimeLocked updates the virtual time based on the assumption
// that the current state of the queues has been in effect since
// `qs.lastRealTime`. Thus, it should be invoked after acquiring the
// lock and before modifying the state of any queue.
func (qs *queueSet) syncTimeLocked() {
realNow := qs.clock.Now()
timesincelast := realNow.Sub(qs.lastRealTime).Seconds()
qs.lastRealTime = realNow
var virtualTimeRatio float64
qs.virtualTime += timesincelast * qs.getVirtualTimeRatio()
}
// getVirtualTimeRatio calculates the rate at which virtual time has
// been advancing, according to the logic in `doc.go`.
func (qs *queueSet) getVirtualTimeRatio() float64 {
activeQueues := 0
reqs := 0
for _, queue := range qs.queues {
reqs += queue.RequestsExecuting
if len(queue.Requests) > 0 || queue.RequestsExecuting > 0 {
activeQueues++
}
}
if activeQueues != 0 {
// TODO(aaron-prindle) document the math.Min usage
virtualTimeRatio = math.Min(float64(reqs), float64(qs.config.ConcurrencyLimit)) / float64(activeQueues)
if activeQueues == 0 {
return 0
}
qs.virtualTime += timesincelast * virtualTimeRatio
}
func (qs *queueSet) lockAndSyncTime() {
qs.lock.Lock()
qs.syncTimeLocked()
return math.Min(float64(reqs), float64(qs.config.ConcurrencyLimit)) / float64(activeQueues)
}
// timeoutOldRequestsAndRejectOrEnqueueLocked encapsulates the logic required
@ -307,9 +347,9 @@ func (qs *queueSet) lockAndSyncTime() {
// returns the enqueud request on a successful enqueue
// returns nil in the case that there is no available concurrency or
// the queuelengthlimit has been reached
func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue uint64) *fq.Request {
func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue uint64, descr1, descr2 interface{}) *request {
// Start with the shuffle sharding, to pick a queue.
queueIdx := qs.chooseQueueIndexLocked(hashValue)
queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2)
queue := qs.queues[queueIdx]
// The next step is the logic to reject requests that have been waiting too long
qs.removeTimedOutRequestsFromQueueLocked(queue)
@ -318,10 +358,12 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue uint64)
// We prefer the simplicity over the promptness, at least for now.
// Create a request and enqueue
req := &fq.Request{
DequeueChannel: make(chan bool, 1),
RealEnqueueTime: qs.clock.Now(),
Queue: queue,
req := &request{
Decision: lockingpromise.NewLockingPromise(&qs.lock, qs.counter),
ArrivalTime: qs.clock.Now(),
Queue: queue,
descr1: descr1,
descr2: descr2,
}
if ok := qs.rejectOrEnqueueLocked(req); !ok {
return nil
@ -330,9 +372,26 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue uint64)
return req
}
// chooseQueueIndexLocked uses shuffle sharding to select a queue index
// using the given hashValue and the shuffle sharding parameters of the queueSet.
func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 interface{}) int {
bestQueueIdx := -1
bestQueueLen := int(math.MaxInt32)
// the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`.
qs.dealer.Deal(hashValue, func(queueIdx int) {
thisLen := len(qs.queues[queueIdx].Requests)
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of length %d", qs.config.Name, descr1, descr2, queueIdx, thisLen)
if thisLen < bestQueueLen {
bestQueueIdx, bestQueueLen = queueIdx, thisLen
}
})
klog.V(6).Infof("QS(%s): For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.config.Name, descr1, descr2, bestQueueIdx, bestQueueLen, qs.queues[bestQueueIdx].RequestsExecuting)
return bestQueueIdx
}
// removeTimedOutRequestsFromQueueLocked rejects old requests that have been enqueued
// past the requestWaitLimit
func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *fq.Queue) {
func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue) {
timeoutIdx := -1
now := qs.clock.Now()
reqs := queue.Requests
@ -343,10 +402,8 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *fq.Queue) {
// now - requestWaitLimit = waitLimit
waitLimit := now.Add(-qs.config.RequestWaitLimit)
for i, req := range reqs {
if waitLimit.After(req.RealEnqueueTime) {
qs.counter.Add(1)
req.DequeueChannel <- false
close(req.DequeueChannel)
if waitLimit.After(req.ArrivalTime) {
req.Decision.SetLocked(DecisionReject)
// get index for timed out requests
timeoutIdx = i
} else {
@ -364,57 +421,9 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *fq.Queue) {
}
}
// getRequestsExecutingLocked gets the # of requests which are "executing":
// this is the# of requests/requests which have been dequeued but have not had
// finished (via the FinishRequest method invoked after service)
func (qs *queueSet) getRequestsExecutingLocked() int {
total := 0
for _, queue := range qs.queues {
total += queue.RequestsExecuting
}
return total
}
// chooseQueueIndexLocked uses shuffle sharding to select a queue index
// using the given hashValue and the shuffle sharding parameters of the queueSet.
func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64) int {
bestQueueIdx := -1
bestQueueLen := int(math.MaxInt32)
// DesiredNum is used here instead of numQueues to omit quiescing queues
qs.dealer.Deal(hashValue, func(queueIdx int) {
thisLen := len(qs.queues[queueIdx].Requests)
if thisLen < bestQueueLen {
bestQueueIdx, bestQueueLen = queueIdx, thisLen
}
})
return bestQueueIdx
}
// updateQueueVirtualStartTimeLocked updates the virtual start time for a queue
// this is done when a new request is enqueued. For more info see:
// https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md#dispatching
func (qs *queueSet) updateQueueVirtualStartTimeLocked(request *fq.Request, queue *fq.Queue) {
// When a request arrives to an empty queue with no requests executing:
// len(queue.Requests) == 1 as enqueue has just happened prior (vs == 0)
if len(queue.Requests) == 1 && queue.RequestsExecuting == 0 {
// the queues virtual start time is set to the virtual time.
queue.VirtualStart = qs.virtualTime
}
}
// enqueues a request into an queueSet
func (qs *queueSet) enqueueLocked(request *fq.Request) {
queue := request.Queue
queue.Enqueue(request)
qs.updateQueueVirtualStartTimeLocked(request, queue)
qs.numRequestsEnqueued++
metrics.UpdateFlowControlRequestsInQueue(qs.config.Name, qs.numRequestsEnqueued)
}
// rejectOrEnqueueLocked rejects or enqueues the newly arrived request if
// resource criteria isn't met
func (qs *queueSet) rejectOrEnqueueLocked(request *fq.Request) bool {
func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
queue := request.Queue
curQueueLength := len(queue.Requests)
// rejects the newly arrived request if resource criteria not met
@ -427,13 +436,81 @@ func (qs *queueSet) rejectOrEnqueueLocked(request *fq.Request) bool {
return true
}
// selectQueueLocked selects the minimum virtualFinish time from the set of queues
// enqueues a request into an queueSet
func (qs *queueSet) enqueueLocked(request *request) {
queue := request.Queue
if len(queue.Requests) == 0 && queue.RequestsExecuting == 0 {
// the queues virtual start time is set to the virtual time.
queue.VirtualStart = qs.virtualTime
if klog.V(6) {
klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), queue.VirtualStart, queue.Index, request.descr1, request.descr2)
}
}
queue.Enqueue(request)
qs.numRequestsEnqueued++
metrics.UpdateFlowControlRequestsInQueue(qs.config.Name, qs.numRequestsEnqueued)
}
// getRequestsExecutingLocked gets the # of requests which are "executing":
// this is the # of requests which have been dispatched but have not
// finished (via the finishRequestLocked method invoked after service)
func (qs *queueSet) getRequestsExecutingLocked() int {
total := 0
for _, queue := range qs.queues {
total += queue.RequestsExecuting
}
return total
}
// dispatchAsMuchAsPossibleLocked runs a loop, as long as there
// are non-empty queues and the number currently executing is less than the
// assured concurrency value. The body of the loop uses the fair queuing
// technique to pick a queue, dequeue the request at the head of that
// queue, increment the count of the number executing, and send true
// to the request's channel.
func (qs *queueSet) dispatchAsMuchAsPossibleLocked() {
for qs.numRequestsEnqueued != 0 && qs.getRequestsExecutingLocked() < qs.config.ConcurrencyLimit {
_, ok := qs.dispatchLocked()
if !ok {
break
}
}
}
// dispatchLocked is a convenience method for dequeueing requests that
// require a message to be sent through the requests channel
// this is a required pattern for the QueueSet the queueSet supports
func (qs *queueSet) dispatchLocked() (*request, bool) {
queue := qs.selectQueueLocked()
if queue == nil {
return nil, false
}
request, ok := queue.Dequeue()
if !ok {
return nil, false
}
request.StartTime = qs.clock.Now()
// request dequeued, service has started
queue.RequestsExecuting++
qs.numRequestsEnqueued--
if klog.V(6) {
klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.config.Name, request.StartTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.Index, queue.VirtualStart, len(queue.Requests), queue.RequestsExecuting)
}
// When a request is dequeued for service -> qs.VirtualStart += G
queue.VirtualStart += qs.estimatedServiceTime
metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, queue.RequestsExecuting)
request.Decision.SetLocked(DecisionExecute)
return request, ok
}
/// selectQueueLocked selects the minimum virtualFinish time from the set of queues
// the starting queue is selected via roundrobin
func (qs *queueSet) selectQueueLocked() *fq.Queue {
func (qs *queueSet) selectQueueLocked() *queue {
minVirtualFinish := math.Inf(1)
var minQueue *fq.Queue
var minQueue *queue
var minIndex int
for range qs.queues {
qs.robinIndex = (qs.robinIndex + 1) % len(qs.queues)
queue := qs.queues[qs.robinIndex]
if len(queue.Requests) != 0 {
currentVirtualFinish := queue.GetVirtualFinish(0, qs.estimatedServiceTime)
@ -443,7 +520,6 @@ func (qs *queueSet) selectQueueLocked() *fq.Queue {
minIndex = qs.robinIndex
}
}
qs.robinIndex = (qs.robinIndex + 1) % len(qs.queues)
}
// we set the round robin indexing to start at the chose queue
// for the next round. This way the non-selected queues
@ -452,69 +528,22 @@ func (qs *queueSet) selectQueueLocked() *fq.Queue {
return minQueue
}
// dequeue dequeues a request from the queueSet
func (qs *queueSet) dequeueLocked() (*fq.Request, bool) {
queue := qs.selectQueueLocked()
if queue == nil {
return nil, false
}
request, ok := queue.Dequeue()
if !ok {
return nil, false
}
// When a request is dequeued for service -> qs.VirtualStart += G
queue.VirtualStart += qs.estimatedServiceTime
request.StartTime = qs.clock.Now()
// request dequeued, service has started
queue.RequestsExecuting++
metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, queue.RequestsExecuting)
qs.numRequestsEnqueued--
return request, ok
// finishRequestAndDispatchAsMuchAsPossible is a convenience method
// which calls finishRequest for a given request and then dispatches
// as many requests as possible. This is all of what needs to be done
// once a request finishes execution or is canceled.
func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) {
qs.lockAndSyncTime()
defer qs.lock.Unlock()
qs.finishRequestLocked(req)
qs.dispatchAsMuchAsPossibleLocked()
}
// dequeueWithChannelLockedAsMuchAsPossibleLocked runs a loop, as long as there
// are non-empty queues and the number currently executing is less than the
// assured concurrency value. The body of the loop uses the fair queuing
// technique to pick a queue, dequeue the request at the head of that
// queue, increment the count of the number executing, and send true
// to the request's channel.
func (qs *queueSet) dequeueWithChannelLockedAsMuchAsPossibleLocked() {
for qs.numRequestsEnqueued != 0 && qs.getRequestsExecutingLocked() < qs.config.ConcurrencyLimit {
_, ok := qs.dequeueWithChannelLocked()
if !ok {
break
}
}
}
// dequeueWithChannelLocked is a convenience method for dequeueing requests that
// require a message to be sent through the requests channel
// this is a required pattern for the QueueSet the queueSet supports
func (qs *queueSet) dequeueWithChannelLocked() (*fq.Request, bool) {
req, ok := qs.dequeueLocked()
if !ok {
return nil, false
}
qs.counter.Add(1)
req.DequeueChannel <- true
close(req.DequeueChannel)
return req, ok
}
// removeQueueAndUpdateIndexes uses reslicing to remove an index from a slice
// and then updates the 'Index' field of the queues to be correct
func removeQueueAndUpdateIndexes(queues []*fq.Queue, index int) []*fq.Queue {
keptQueues := append(queues[:index], queues[index+1:]...)
for i := index; i < len(keptQueues); i++ {
keptQueues[i].Index--
}
return keptQueues
}
// finishRequestLocked is a callback that should be used when a previously dequeued request
// has completed it's service. This callback updates important state in the
// queueSet
func (qs *queueSet) finishRequestLocked(r *fq.Request) {
// finishRequestLocked is a callback that should be used when a
// previously dispatched request has completed it's service. This
// callback updates important state in the queueSet
func (qs *queueSet) finishRequestLocked(r *request) {
S := qs.clock.Since(r.StartTime).Seconds()
// When a request finishes being served, and the actual service time was S,
@ -524,8 +553,12 @@ func (qs *queueSet) finishRequestLocked(r *fq.Request) {
// request has finished, remove from requests executing
r.Queue.RequestsExecuting--
if klog.V(6) {
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.Queue.Index, r.Queue.VirtualStart, S, len(r.Queue.Requests), r.Queue.RequestsExecuting)
}
// Logic to remove quiesced queues
// >= as QueueIdx=25 is out of bounds for DesiredNum=25 [0...24]
// >= as Index=25 is out of bounds for DesiredNum=25 [0...24]
if r.Queue.Index >= qs.config.DesiredNumQueues &&
len(r.Queue.Requests) == 0 &&
r.Queue.RequestsExecuting == 0 {
@ -544,26 +577,39 @@ func (qs *queueSet) finishRequestLocked(r *fq.Request) {
}
}
// removeQueueAndUpdateIndexes uses reslicing to remove an index from a slice
// and then updates the 'Index' field of the queues to be correct
func removeQueueAndUpdateIndexes(queues []*queue, index int) []*queue {
keptQueues := append(queues[:index], queues[index+1:]...)
for i := index; i < len(keptQueues); i++ {
keptQueues[i].Index--
}
return keptQueues
}
func (qs *queueSet) maybeForkEmptyHandlerLocked() {
if qs.emptyHandler != nil && qs.numRequestsEnqueued == 0 &&
qs.getRequestsExecutingLocked() == 0 {
qs.counter.Add(1)
qs.preCreateOrUnblockGoroutine()
go func(eh fq.EmptyHandler) {
defer runtime.HandleCrash()
defer qs.counter.Add(-1)
defer qs.goroutineDoneOrBlocked()
eh.HandleEmpty()
}(qs.emptyHandler)
}
}
// finishRequestAndDequeueWithChannelAsMuchAsPossible is a convenience method which calls finishRequest
// for a given request and then dequeues as many requests as possible
// and updates that request's channel signifying it is is dequeued
// this is a callback used for the filter that the queueSet supports
func (qs *queueSet) finishRequestAndDequeueWithChannelAsMuchAsPossible(req *fq.Request) {
qs.lockAndSyncTime()
defer qs.lock.Unlock()
qs.finishRequestLocked(req)
qs.dequeueWithChannelLockedAsMuchAsPossibleLocked()
// preCreateOrUnblockGoroutine needs to be called before creating a
// goroutine associated with this queueSet or unblocking a blocked
// one, to properly update the accounting used in testing.
func (qs *queueSet) preCreateOrUnblockGoroutine() {
qs.counter.Add(1)
}
// goroutineDoneOrBlocked needs to be called at the end of every
// goroutine associated with this queueSet or when such a goroutine is
// about to wait on some other goroutine to do something; this is to
// properly update the accounting used in testing.
func (qs *queueSet) goroutineDoneOrBlocked() {
qs.counter.Add(-1)
}

View File

@ -27,6 +27,7 @@ import (
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
test "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock"
"k8s.io/klog"
)
type uniformScenario []uniformClient
@ -41,21 +42,23 @@ type uniformClient struct {
thinkDuration time.Duration
}
const nsTimeFmt = "2006-01-02 15:04:05.000000000"
// exerciseQueueSetUniformScenario runs a scenario based on the given set of uniform clients.
// Each uniform client specifies a number of threads, each of which alternates between thinking
// and making a synchronous request through the QueueSet.
// This function measures how much concurrency each client got, on average, over
// the intial totalDuration and tests to see whether they all got about the same amount.
// the initial evalDuration and tests to see whether they all got about the same amount.
// Each client needs to be demanding enough to use this amount, otherwise the fair result
// is not equal amounts and the simple test in this function would not accurately test fairness.
// expectPass indicates whether the QueueSet is expected to be fair.
// expectedAllRequests indicates whether all requests are expected to get dispatched.
func exerciseQueueSetUniformScenario(t *testing.T, qs fq.QueueSet, sc uniformScenario,
totalDuration time.Duration, expectPass bool, expectedAllRequests bool,
func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet, sc uniformScenario,
evalDuration time.Duration, expectPass bool, expectedAllRequests bool,
clk *clock.FakeEventClock, counter counter.GoRoutineCounter) {
now := time.Now()
t.Logf("%s: Start", clk.Now().Format("2006-01-02 15:04:05.000000000"))
t.Logf("%s: Start %s, clk=%p, grc=%p", clk.Now().Format(nsTimeFmt), name, clk, counter)
integrators := make([]test.Integrator, len(sc))
var failedCount uint64
for i, uc := range sc {
@ -66,8 +69,8 @@ func exerciseQueueSetUniformScenario(t *testing.T, qs fq.QueueSet, sc uniformSce
for k := 0; k < uc.nCalls; k++ {
ClockWait(clk, counter, uc.thinkDuration)
for {
tryAnother, execute, afterExecute := qs.Wait(context.Background(), uc.hash)
t.Logf("%s: %d, %d, %d got q=%v, e=%v", clk.Now().Format("2006-01-02 15:04:05.000000000"), i, j, k, tryAnother, execute)
tryAnother, execute, afterExecute := qs.Wait(context.Background(), uc.hash, name, []int{i, j, k})
t.Logf("%s: %d, %d, %d got a=%v, e=%v", clk.Now().Format(nsTimeFmt), i, j, k, tryAnother, execute)
if tryAnother {
continue
}
@ -86,10 +89,10 @@ func exerciseQueueSetUniformScenario(t *testing.T, qs fq.QueueSet, sc uniformSce
}(i, j, uc, integrators[i])
}
}
lim := now.Add(totalDuration)
lim := now.Add(evalDuration)
clk.Run(&lim)
clk.SetTime(lim)
t.Logf("%s: End", clk.Now().Format("2006-01-02 15:04:05.000000000"))
t.Logf("%s: End", clk.Now().Format(nsTimeFmt))
results := make([]test.IntegratorResults, len(sc))
var sumOfAvg float64
for i := range sc {
@ -132,6 +135,10 @@ func ClockWait(clk *clock.FakeEventClock, counter counter.GoRoutineCounter, dura
}
}
func init() {
klog.InitFlags(nil)
}
// TestNoRestraint should fail because the dummy QueueSet exercises no control
func TestNoRestraint(t *testing.T) {
now := time.Now()
@ -142,7 +149,7 @@ func TestNoRestraint(t *testing.T) {
if err != nil {
t.Fatalf("QueueSet creation failed with %v", err)
}
exerciseQueueSetUniformScenario(t, nr, []uniformClient{
exerciseQueueSetUniformScenario(t, "NoRestraint", nr, []uniformClient{
{1001001001, 5, 10, time.Second, time.Second},
{2002002002, 2, 10, time.Second, time.Second / 2},
}, time.Second*10, false, true, clk, counter)
@ -155,10 +162,10 @@ func TestUniformFlows(t *testing.T) {
qsf := NewQueueSetFactory(clk, counter)
config := fq.QueueSetConfig{
Name: "TestUniformFlows",
ConcurrencyLimit: 100,
DesiredNumQueues: 128,
QueueLengthLimit: 128,
HandSize: 1,
ConcurrencyLimit: 4,
DesiredNumQueues: 8,
QueueLengthLimit: 6,
HandSize: 3,
RequestWaitLimit: 10 * time.Minute,
}
qs, err := qsf.NewQueueSet(config)
@ -166,10 +173,10 @@ func TestUniformFlows(t *testing.T) {
t.Fatalf("QueueSet creation failed with %v", err)
}
exerciseQueueSetUniformScenario(t, qs, []uniformClient{
exerciseQueueSetUniformScenario(t, "UniformFlows", qs, []uniformClient{
{1001001001, 5, 10, time.Second, time.Second},
{2002002002, 5, 10, time.Second, time.Second},
}, time.Second*10, true, true, clk, counter)
}, time.Second*20, true, true, clk, counter)
}
func TestDifferentFlows(t *testing.T) {
@ -179,10 +186,10 @@ func TestDifferentFlows(t *testing.T) {
qsf := NewQueueSetFactory(clk, counter)
config := fq.QueueSetConfig{
Name: "TestDifferentFlows",
ConcurrencyLimit: 1,
DesiredNumQueues: 128,
QueueLengthLimit: 128,
HandSize: 1,
ConcurrencyLimit: 4,
DesiredNumQueues: 8,
QueueLengthLimit: 6,
HandSize: 3,
RequestWaitLimit: 10 * time.Minute,
}
qs, err := qsf.NewQueueSet(config)
@ -190,10 +197,10 @@ func TestDifferentFlows(t *testing.T) {
t.Fatalf("QueueSet creation failed with %v", err)
}
exerciseQueueSetUniformScenario(t, qs, []uniformClient{
{1001001001, 5, 10, time.Second, time.Second},
{2002002002, 2, 5, time.Second, time.Second / 2},
}, time.Second*10, true, true, clk, counter)
exerciseQueueSetUniformScenario(t, "DifferentFlows", qs, []uniformClient{
{1001001001, 6, 10, time.Second, time.Second},
{2002002002, 4, 15, time.Second, time.Second / 2},
}, time.Second*20, true, true, clk, counter)
}
func TestTimeout(t *testing.T) {
@ -214,7 +221,7 @@ func TestTimeout(t *testing.T) {
t.Fatalf("QueueSet creation failed with %v", err)
}
exerciseQueueSetUniformScenario(t, qs, []uniformClient{
exerciseQueueSetUniformScenario(t, "Timeout", qs, []uniformClient{
{1001001001, 5, 100, time.Second, time.Second},
}, time.Second*10, true, false, clk, counter)
}

View File

@ -14,56 +14,70 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package fairqueuing
package queueset
import (
"time"
"k8s.io/apiserver/pkg/util/promise"
)
// Request is a temporary container for "requests" with additional tracking fields
// request is a temporary container for "requests" with additional tracking fields
// required for the functionality FQScheduler
type Request struct {
//TODO(aaron-prindle) seq is only used for testing, this was abstracted
// via an interface before, keeping this for now
QueueIdx int
type request struct {
Queue *queue
Queue *Queue
StartTime time.Time
DequeueChannel chan bool
RealEnqueueTime time.Time
Enqueued bool
// StartTime is the clock time when the request began executing
StartTime time.Time
// Decision gets set to the decision about what to do with this request
Decision promise.LockingMutable
// ArrivalTime is when the request entered this system
ArrivalTime time.Time
// IsWaiting indicates whether the request is presently waiting in a queue
IsWaiting bool
// descr1 and descr2 are not used in any logic but they appear in
// log messages
descr1, descr2 interface{}
}
// Queue is an array of requests with additional metadata required for
// queue is an array of requests with additional metadata required for
// the FQScheduler
type Queue struct {
Requests []*Request
VirtualStart float64
type queue struct {
Requests []*request
// VirtualStart is the virtual time when the oldest request in the
// queue (if there is any) started virtually executing
VirtualStart float64
RequestsExecuting int
Index int
}
// Enqueue enqueues a request into the queue
func (q *Queue) Enqueue(request *Request) {
request.Enqueued = true
func (q *queue) Enqueue(request *request) {
request.IsWaiting = true
q.Requests = append(q.Requests, request)
}
// Dequeue dequeues a request from the queue
func (q *Queue) Dequeue() (*Request, bool) {
func (q *queue) Dequeue() (*request, bool) {
if len(q.Requests) == 0 {
return nil, false
}
request := q.Requests[0]
q.Requests = q.Requests[1:]
request.Enqueued = false
request.IsWaiting = false
return request, true
}
// GetVirtualFinish returns the expected virtual finish time of the request at
// index J in the queue with estimated finish time G
func (q *Queue) GetVirtualFinish(J int, G float64) float64 {
func (q *queue) GetVirtualFinish(J int, G float64) float64 {
// The virtual finish time of request number J in the queue
// (counting from J=1 for the head) is J * G + (virtual start time).

View File

@ -9,6 +9,7 @@ go_library(
deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

View File

@ -19,11 +19,14 @@ package clock
import (
"container/heap"
"math/rand"
"runtime"
"strings"
"sync"
"time"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
"k8s.io/klog"
)
// EventFunc does some work that needs to be done at or after the
@ -68,12 +71,38 @@ func (r RealEventClock) EventAfterTime(f EventFunc, t time.Time) {
// waitGroupCounter is a wait group used for a GoRoutine Counter. This private
// type is used to disallow direct waitGroup access
type waitGroupCounter struct{ sync.WaitGroup }
type waitGroupCounter struct {
wg sync.WaitGroup
}
// compile time assertion that waitGroupCounter meets requirements
// of GoRoutineCounter
var _ counter.GoRoutineCounter = (*waitGroupCounter)(nil)
func (wgc *waitGroupCounter) Add(delta int) {
if klog.V(7) {
var pcs [5]uintptr
nCallers := runtime.Callers(2, pcs[:])
frames := runtime.CallersFrames(pcs[:nCallers])
frame1, more1 := frames.Next()
fileParts1 := strings.Split(frame1.File, "/")
tail2 := "(none)"
line2 := 0
if more1 {
frame2, _ := frames.Next()
fileParts2 := strings.Split(frame2.File, "/")
tail2 = fileParts2[len(fileParts2)-1]
line2 = frame2.Line
}
klog.Infof("GRC(%p).Add(%d) from %s:%d from %s:%d", wgc, delta, fileParts1[len(fileParts1)-1], frame1.Line, tail2, line2)
}
wgc.wg.Add(delta)
}
func (wgc *waitGroupCounter) Wait() {
wgc.wg.Wait()
}
// FakeEventClock is one whose time does not pass implicitly but
// rather is explicitly set by invocations of its SetTime method
type FakeEventClock struct {

View File

@ -44,6 +44,6 @@ func (noRestraint) SetConfiguration(config fq.QueueSetConfig) error {
func (noRestraint) Quiesce(fq.EmptyHandler) {
}
func (noRestraint) Wait(ctx context.Context, hashValue uint64) (quiescent, execute bool, afterExecution func()) {
func (noRestraint) Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (quiescent, execute bool, afterExecution func()) {
return false, true, func() {}
}

View File

@ -0,0 +1,42 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package promise
// Mutable is a variable that is initially not set and can be set one
// or more times (unlike a traditional "promise", which can be written
// only once).
type Mutable interface {
// Set writes a value into this variable and unblocks every
// goroutine waiting for this variable to have a value
Set(interface{})
// Get reads the value of this variable. If this variable is
// not set yet then this call blocks until this variable gets a value.
Get() interface{}
}
// LockingMutable is a Mutable whose implementation is protected by a lock
type LockingMutable interface {
Mutable
// SetLocked is like Set but the caller must already hold the lock
SetLocked(interface{})
// GetLocked is like Get but the caller must already hold the lock
GetLocked() interface{}
}

View File

@ -0,0 +1,78 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package lockingpromise
import (
"sync"
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
"k8s.io/apiserver/pkg/util/promise"
)
// lockingPromise implements LockingMutable based on a condition
// variable. This implementation tracks active goroutines: the given
// counter is decremented for a goroutine waiting for this varible to
// be set and incremented when such a goroutine is unblocked.
type lockingPromise struct {
lock sync.Locker
cond sync.Cond
activeCounter counter.GoRoutineCounter // counter of active goroutines
waitingCount int // number of goroutines idle due to this mutable being unset
isSet bool
value interface{}
}
var _ promise.LockingMutable = &lockingPromise{}
func NewLockingPromise(lock sync.Locker, activeCounter counter.GoRoutineCounter) promise.LockingMutable {
return &lockingPromise{
lock: lock,
cond: *sync.NewCond(lock),
activeCounter: activeCounter,
}
}
func (lp *lockingPromise) Set(value interface{}) {
lp.lock.Lock()
defer lp.lock.Unlock()
lp.SetLocked(value)
}
func (lp *lockingPromise) Get() interface{} {
lp.lock.Lock()
defer lp.lock.Unlock()
return lp.GetLocked()
}
func (lp *lockingPromise) SetLocked(value interface{}) {
lp.isSet = true
lp.value = value
if lp.waitingCount > 0 {
lp.activeCounter.Add(lp.waitingCount)
lp.waitingCount = 0
lp.cond.Broadcast()
}
}
func (lp *lockingPromise) GetLocked() interface{} {
if !lp.isSet {
lp.waitingCount++
lp.activeCounter.Add(-1)
lp.cond.Wait()
}
return lp.value
}

View File

@ -0,0 +1,70 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package lockingpromise
import (
"sync"
"sync/atomic"
"testing"
"time"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock"
)
func TestLockingPromise(t *testing.T) {
now := time.Now()
clock, counter := clock.NewFakeEventClock(now, 0, nil)
var lock sync.Mutex
lp := NewLockingPromise(&lock, counter)
var gots int32
var got atomic.Value
counter.Add(1)
go func() {
got.Store(lp.Get())
atomic.AddInt32(&gots, 1)
counter.Add(-1)
}()
clock.Run(nil)
time.Sleep(time.Second)
if atomic.LoadInt32(&gots) != 0 {
t.Error("Get returned before Set")
}
var aval = &now
lp.Set(aval)
clock.Run(nil)
time.Sleep(time.Second)
if atomic.LoadInt32(&gots) != 1 {
t.Error("Get did not return after Set")
}
if got.Load() != aval {
t.Error("Get did not return what was Set")
}
counter.Add(1)
go func() {
got.Store(lp.Get())
atomic.AddInt32(&gots, 1)
counter.Add(-1)
}()
clock.Run(nil)
time.Sleep(time.Second)
if atomic.LoadInt32(&gots) != 2 {
t.Error("Second Get did not return immediately")
}
if got.Load() != aval {
t.Error("Second Get did not return what was Set")
}
}

View File

@ -100,6 +100,7 @@ package_group(
"//pkg/scheduler/framework/v1alpha1",
"//pkg/volume/util/operationexecutor",
"//staging/src/k8s.io/apiserver/pkg/admission/metrics",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics",
"//staging/src/k8s.io/component-base/metrics/...",
"//test/e2e",
"//test/e2e/storage",