mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 05:27:21 +00:00
Merge pull request #85259 from MikeSpreitzer/fq-followup
Brushed up fairqueuing package
This commit is contained in:
commit
83c1d70aca
@ -48,7 +48,6 @@ filegroup(
|
|||||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:all-srcs",
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:all-srcs",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/flushwriter:all-srcs",
|
"//staging/src/k8s.io/apiserver/pkg/util/flushwriter:all-srcs",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/openapi:all-srcs",
|
"//staging/src/k8s.io/apiserver/pkg/util/openapi:all-srcs",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/promise:all-srcs",
|
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/proxy:all-srcs",
|
"//staging/src/k8s.io/apiserver/pkg/util/proxy:all-srcs",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/shufflesharding:all-srcs",
|
"//staging/src/k8s.io/apiserver/pkg/util/shufflesharding:all-srcs",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/term:all-srcs",
|
"//staging/src/k8s.io/apiserver/pkg/util/term:all-srcs",
|
||||||
|
@ -19,6 +19,7 @@ filegroup(
|
|||||||
name = "all-srcs",
|
name = "all-srcs",
|
||||||
srcs = [
|
srcs = [
|
||||||
":package-srcs",
|
":package-srcs",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise:all-srcs",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset:all-srcs",
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset:all-srcs",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:all-srcs",
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:all-srcs",
|
||||||
],
|
],
|
||||||
|
@ -70,7 +70,9 @@ type QueueSetConfig struct {
|
|||||||
Name string
|
Name string
|
||||||
// ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time
|
// ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time
|
||||||
ConcurrencyLimit int
|
ConcurrencyLimit int
|
||||||
// DesiredNumQueues is the number of queues that the API says should exist now
|
// DesiredNumQueues is the number of queues that the API says
|
||||||
|
// should exist now. This may be zero, in which case
|
||||||
|
// QueueLengthLimit, HandSize, and RequestWaitLimit are ignored.
|
||||||
DesiredNumQueues int
|
DesiredNumQueues int
|
||||||
// QueueLengthLimit is the maximum number of requests that may be waiting in a given queue at a time
|
// QueueLengthLimit is the maximum number of requests that may be waiting in a given queue at a time
|
||||||
QueueLengthLimit int
|
QueueLengthLimit int
|
||||||
|
@ -3,8 +3,8 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
|||||||
go_library(
|
go_library(
|
||||||
name = "go_default_library",
|
name = "go_default_library",
|
||||||
srcs = ["interface.go"],
|
srcs = ["interface.go"],
|
||||||
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/promise",
|
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise",
|
||||||
importpath = "k8s.io/apiserver/pkg/util/promise",
|
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise",
|
||||||
visibility = ["//visibility:public"],
|
visibility = ["//visibility:public"],
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -19,7 +19,7 @@ filegroup(
|
|||||||
name = "all-srcs",
|
name = "all-srcs",
|
||||||
srcs = [
|
srcs = [
|
||||||
":package-srcs",
|
":package-srcs",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise:all-srcs",
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise:all-srcs",
|
||||||
],
|
],
|
||||||
tags = ["automanaged"],
|
tags = ["automanaged"],
|
||||||
visibility = ["//visibility:public"],
|
visibility = ["//visibility:public"],
|
@ -3,12 +3,12 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
|||||||
go_library(
|
go_library(
|
||||||
name = "go_default_library",
|
name = "go_default_library",
|
||||||
srcs = ["lockingpromise.go"],
|
srcs = ["lockingpromise.go"],
|
||||||
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/promise/lockingpromise",
|
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise",
|
||||||
importpath = "k8s.io/apiserver/pkg/util/promise/lockingpromise",
|
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise",
|
||||||
visibility = ["//visibility:public"],
|
visibility = ["//visibility:public"],
|
||||||
deps = [
|
deps = [
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/promise:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
@ -20,7 +20,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
|
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
|
||||||
"k8s.io/apiserver/pkg/util/promise"
|
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise"
|
||||||
)
|
)
|
||||||
|
|
||||||
// lockingPromise implements LockingMutable based on a condition
|
// lockingPromise implements LockingMutable based on a condition
|
@ -15,9 +15,9 @@ go_library(
|
|||||||
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/promise:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/shufflesharding:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/shufflesharding:go_default_library",
|
||||||
"//vendor/github.com/pkg/errors:go_default_library",
|
"//vendor/github.com/pkg/errors:go_default_library",
|
||||||
"//vendor/k8s.io/klog:go_default_library",
|
"//vendor/k8s.io/klog:go_default_library",
|
||||||
|
@ -14,33 +14,31 @@ See the License for the specific language governing permissions and
|
|||||||
limitations under the License.
|
limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package queueset
|
// Package queueset implements a technique called "fair queuing for
|
||||||
|
// server requests". One QueueSet is a set of queues operating
|
||||||
// This package implements a technique called "fair queuing for server
|
// according to this technique.
|
||||||
// requests". One QueueSet is a set of queues operating according to
|
//
|
||||||
// this technique.
|
|
||||||
|
|
||||||
// Fair queuing for server requests is inspired by the fair queuing
|
// Fair queuing for server requests is inspired by the fair queuing
|
||||||
// technique from the world of networking. You can find a good paper
|
// technique from the world of networking. You can find a good paper
|
||||||
// on that at https://dl.acm.org/citation.cfm?doid=75247.75248 or
|
// on that at https://dl.acm.org/citation.cfm?doid=75247.75248 or
|
||||||
// http://people.csail.mit.edu/imcgraw/links/research/pubs/networks/WFQ.pdf
|
// http://people.csail.mit.edu/imcgraw/links/research/pubs/networks/WFQ.pdf
|
||||||
// and there is an implementation outline in the Wikipedia article at
|
// and there is an implementation outline in the Wikipedia article at
|
||||||
// https://en.wikipedia.org/wiki/Fair_queuing .
|
// https://en.wikipedia.org/wiki/Fair_queuing .
|
||||||
|
//
|
||||||
// Fair queuing for server requests differs from traditional fair
|
// Fair queuing for server requests differs from traditional fair
|
||||||
// queuing in three ways: (1) we are dispatching requests to be
|
// queuing in three ways: (1) we are dispatching application layer
|
||||||
// executed within a process rather than transmitting packets on a
|
// requests to a server rather than transmitting packets on a network
|
||||||
// network link, (2) multiple requests can be executing at once, and
|
// link, (2) multiple requests can be executing at once, and (3) the
|
||||||
// (3) the service time (execution duration) is not known until the
|
// service time (execution duration) is not known until the execution
|
||||||
// execution completes.
|
// completes.
|
||||||
|
//
|
||||||
// The first two differences can easily be handled by straightforward
|
// The first two differences can easily be handled by straightforward
|
||||||
// adaptation of the concept called "R(t)" in the original paper and
|
// adaptation of the concept called "R(t)" in the original paper and
|
||||||
// "virtual time" in the implementation outline. In that
|
// "virtual time" in the implementation outline. In that
|
||||||
// implementation outline, the notation now() is used to mean reading
|
// implementation outline, the notation now() is used to mean reading
|
||||||
// the virtual clock. In the original paper’s terms, "R(t)" is the
|
// the virtual clock. In the original paper’s terms, "R(t)" is the
|
||||||
// number of "rounds" that have been completed at real time t, where a
|
// number of "rounds" that have been completed at real time t ---
|
||||||
// round consists of virtually transmitting one bit from every
|
// where a round consists of virtually transmitting one bit from every
|
||||||
// non-empty queue in the router (regardless of which queue holds the
|
// non-empty queue in the router (regardless of which queue holds the
|
||||||
// packet that is really being transmitted at the moment); in this
|
// packet that is really being transmitted at the moment); in this
|
||||||
// conception, a packet is considered to be "in" its queue until the
|
// conception, a packet is considered to be "in" its queue until the
|
||||||
@ -55,12 +53,12 @@ package queueset
|
|||||||
// respect to t is
|
// respect to t is
|
||||||
//
|
//
|
||||||
// 1 / NEQ(t) .
|
// 1 / NEQ(t) .
|
||||||
|
//
|
||||||
// To generalize from transmitting one packet at a time to executing C
|
// To generalize from transmitting one packet at a time to executing C
|
||||||
// requests at a time, that derivative becomes
|
// requests at a time, that derivative becomes
|
||||||
//
|
//
|
||||||
// C / NEQ(t) .
|
// C / NEQ(t) .
|
||||||
|
//
|
||||||
// However, sometimes there are fewer than C requests available to
|
// However, sometimes there are fewer than C requests available to
|
||||||
// execute. For a given queue "q", let us also write "reqs(q, t)" for
|
// execute. For a given queue "q", let us also write "reqs(q, t)" for
|
||||||
// the number of requests of that queue that are executing at that
|
// the number of requests of that queue that are executing at that
|
||||||
@ -79,25 +77,25 @@ package queueset
|
|||||||
// real nanosecond). Where the networking implementation outline adds
|
// real nanosecond). Where the networking implementation outline adds
|
||||||
// packet size to a virtual time, in our version this corresponds to
|
// packet size to a virtual time, in our version this corresponds to
|
||||||
// adding a service time (i.e., duration) to virtual time.
|
// adding a service time (i.e., duration) to virtual time.
|
||||||
|
//
|
||||||
// The third difference is handled by modifying the algorithm to
|
// The third difference is handled by modifying the algorithm to
|
||||||
// dispatch based on an initial guess at the request’s service time
|
// dispatch based on an initial guess at the request’s service time
|
||||||
// (duration) and then make the corresponding adjustments once the
|
// (duration) and then make the corresponding adjustments once the
|
||||||
// request’s actual service time is known. This is similar, although
|
// request’s actual service time is known. This is similar, although
|
||||||
// not exactly isomorphic, to the original paper’s adjustment by
|
// not exactly isomorphic, to the original paper’s adjustment by
|
||||||
// `$delta` for the sake of promptness.
|
// `$\delta$` for the sake of promptness.
|
||||||
|
//
|
||||||
// For implementation simplicity (see below), let us use the same
|
// For implementation simplicity (see below), let us use the same
|
||||||
// initial service time guess for every request; call that duration
|
// initial service time guess for every request; call that duration
|
||||||
// G. A good choice might be the service time limit (1
|
// G. A good choice might be the service time limit (1
|
||||||
// minute). Different guesses will give slightly different dynamics,
|
// minute). Different guesses will give slightly different dynamics,
|
||||||
// but any positive number can be used for G without ruining the
|
// but any positive number can be used for G without ruining the
|
||||||
// long-term behavior.
|
// long-term behavior.
|
||||||
|
//
|
||||||
// As in ordinary fair queuing, there is a bound on divergence from
|
// As in ordinary fair queuing, there is a bound on divergence from
|
||||||
// the ideal. In plain fair queuing the bound is one packet; in our
|
// the ideal. In plain fair queuing the bound is one packet; in our
|
||||||
// version it is C requests.
|
// version it is C requests.
|
||||||
|
//
|
||||||
// To support efficiently making the necessary adjustments once a
|
// To support efficiently making the necessary adjustments once a
|
||||||
// request’s actual service time is known, the virtual finish time of
|
// request’s actual service time is known, the virtual finish time of
|
||||||
// a request and the last virtual finish time of a queue are not
|
// a request and the last virtual finish time of a queue are not
|
||||||
@ -118,3 +116,5 @@ package queueset
|
|||||||
// queue’s virtual start time is advanced by G. When a request
|
// queue’s virtual start time is advanced by G. When a request
|
||||||
// finishes being served, and the actual service time was S, the
|
// finishes being served, and the actual service time was S, the
|
||||||
// queue’s virtual start time is decremented by G - S.
|
// queue’s virtual start time is decremented by G - S.
|
||||||
|
//
|
||||||
|
package queueset
|
||||||
|
@ -28,8 +28,8 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/util/clock"
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
|
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
|
||||||
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
||||||
|
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise"
|
||||||
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||||
"k8s.io/apiserver/pkg/util/promise/lockingpromise"
|
|
||||||
"k8s.io/apiserver/pkg/util/shufflesharding"
|
"k8s.io/apiserver/pkg/util/shufflesharding"
|
||||||
"k8s.io/klog"
|
"k8s.io/klog"
|
||||||
)
|
)
|
||||||
@ -56,48 +56,62 @@ func NewQueueSetFactory(c clock.PassiveClock, counter counter.GoRoutineCounter)
|
|||||||
// the QueueSet interface. The clock, GoRoutineCounter, and estimated
|
// the QueueSet interface. The clock, GoRoutineCounter, and estimated
|
||||||
// service time should not be changed; the fields listed after the
|
// service time should not be changed; the fields listed after the
|
||||||
// lock must be accessed only while holding the lock.
|
// lock must be accessed only while holding the lock.
|
||||||
|
// This is not yet designed to support limiting concurrency without
|
||||||
|
// queuing (this will need to be added soon).
|
||||||
type queueSet struct {
|
type queueSet struct {
|
||||||
clock clock.PassiveClock
|
clock clock.PassiveClock
|
||||||
counter counter.GoRoutineCounter
|
counter counter.GoRoutineCounter
|
||||||
estimatedServiceTime float64
|
estimatedServiceTime float64
|
||||||
|
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
|
|
||||||
|
// config holds the current configuration. Its DesiredNumQueues
|
||||||
|
// may be less than the current number of queues. If its
|
||||||
|
// DesiredNumQueues is zero then its other queuing parameters
|
||||||
|
// retain the settings they had when DesiredNumQueues was last
|
||||||
|
// non-zero (if ever).
|
||||||
config fq.QueueSetConfig
|
config fq.QueueSetConfig
|
||||||
|
|
||||||
// queues may be longer than the desired number, while the excess
|
// queues may be longer than the desired number, while the excess
|
||||||
// queues are still draining.
|
// queues are still draining.
|
||||||
queues []*queue
|
queues []*queue
|
||||||
virtualTime float64
|
|
||||||
|
// virtualTime is the number of virtual seconds since process startup
|
||||||
|
virtualTime float64
|
||||||
|
|
||||||
|
// lastRealTime is what `clock.Now()` yielded when `virtualTime` was last updated
|
||||||
lastRealTime time.Time
|
lastRealTime time.Time
|
||||||
|
|
||||||
// robinIndex is the index of the last queue dispatched
|
// robinIndex is the index of the last queue dispatched
|
||||||
robinIndex int
|
robinIndex int
|
||||||
|
|
||||||
// numRequestsEnqueued is the number of requests currently waiting
|
// totRequestsWaiting is the sum, over all the queues, of the
|
||||||
// in a queue (eg: incremeneted on Enqueue, decremented on Dequue)
|
// number of requests waiting in that queue
|
||||||
numRequestsEnqueued int
|
totRequestsWaiting int
|
||||||
|
|
||||||
|
// totRequestsExecuting is the total number of requests of this
|
||||||
|
// queueSet that are currently executing. That is the same as the
|
||||||
|
// sum, over all the queues, of the number of requests executing
|
||||||
|
// from that queue.
|
||||||
|
totRequestsExecuting int
|
||||||
|
|
||||||
emptyHandler fq.EmptyHandler
|
emptyHandler fq.EmptyHandler
|
||||||
dealer *shufflesharding.Dealer
|
dealer *shufflesharding.Dealer
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewQueueSet creates a new QueueSet object
|
// NewQueueSet creates a new QueueSet object.
|
||||||
// There is a new QueueSet created for each priority level.
|
// There is a new QueueSet created for each priority level.
|
||||||
func (qsf queueSetFactory) NewQueueSet(config fq.QueueSetConfig) (fq.QueueSet, error) {
|
func (qsf queueSetFactory) NewQueueSet(config fq.QueueSetConfig) (fq.QueueSet, error) {
|
||||||
dealer, err := shufflesharding.NewDealer(config.DesiredNumQueues, config.HandSize)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "shuffle sharding dealer creation failed")
|
|
||||||
}
|
|
||||||
|
|
||||||
fq := &queueSet{
|
fq := &queueSet{
|
||||||
config: config,
|
|
||||||
counter: qsf.counter,
|
|
||||||
queues: createQueues(config.DesiredNumQueues, 0),
|
|
||||||
clock: qsf.clock,
|
clock: qsf.clock,
|
||||||
virtualTime: 0,
|
counter: qsf.counter,
|
||||||
estimatedServiceTime: 60,
|
estimatedServiceTime: 60,
|
||||||
|
config: config,
|
||||||
lastRealTime: qsf.clock.Now(),
|
lastRealTime: qsf.clock.Now(),
|
||||||
dealer: dealer,
|
}
|
||||||
|
err := fq.SetConfiguration(config)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
return fq, nil
|
return fq, nil
|
||||||
}
|
}
|
||||||
@ -106,7 +120,7 @@ func (qsf queueSetFactory) NewQueueSet(config fq.QueueSetConfig) (fq.QueueSet, e
|
|||||||
func createQueues(n, baseIndex int) []*queue {
|
func createQueues(n, baseIndex int) []*queue {
|
||||||
fqqueues := make([]*queue, n)
|
fqqueues := make([]*queue, n)
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
fqqueues[i] = &queue{Index: baseIndex + i, Requests: make([]*request, 0)}
|
fqqueues[i] = &queue{index: baseIndex + i, requests: make([]*request, 0)}
|
||||||
}
|
}
|
||||||
return fqqueues
|
return fqqueues
|
||||||
}
|
}
|
||||||
@ -118,19 +132,26 @@ func createQueues(n, baseIndex int) []*queue {
|
|||||||
func (qs *queueSet) SetConfiguration(config fq.QueueSetConfig) error {
|
func (qs *queueSet) SetConfiguration(config fq.QueueSetConfig) error {
|
||||||
qs.lockAndSyncTime()
|
qs.lockAndSyncTime()
|
||||||
defer qs.lock.Unlock()
|
defer qs.lock.Unlock()
|
||||||
|
var dealer *shufflesharding.Dealer
|
||||||
|
|
||||||
dealer, err := shufflesharding.NewDealer(config.DesiredNumQueues, config.HandSize)
|
if config.DesiredNumQueues > 0 {
|
||||||
if err != nil {
|
var err error
|
||||||
return errors.Wrap(err, "shuffle sharding dealer creation failed")
|
dealer, err = shufflesharding.NewDealer(config.DesiredNumQueues, config.HandSize)
|
||||||
}
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "shuffle sharding dealer creation failed")
|
||||||
// Adding queues is the only thing that requires immediate action
|
}
|
||||||
// Removing queues is handled by omitting indexes >DesiredNum from
|
// Adding queues is the only thing that requires immediate action
|
||||||
// chooseQueueIndexLocked
|
// Removing queues is handled by omitting indexes >DesiredNum from
|
||||||
numQueues := len(qs.queues)
|
// chooseQueueIndexLocked
|
||||||
if config.DesiredNumQueues > numQueues {
|
numQueues := len(qs.queues)
|
||||||
qs.queues = append(qs.queues,
|
if config.DesiredNumQueues > numQueues {
|
||||||
createQueues(config.DesiredNumQueues-numQueues, len(qs.queues))...)
|
qs.queues = append(qs.queues,
|
||||||
|
createQueues(config.DesiredNumQueues-numQueues, len(qs.queues))...)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
config.QueueLengthLimit = qs.config.QueueLengthLimit
|
||||||
|
config.HandSize = qs.config.HandSize
|
||||||
|
config.RequestWaitLimit = qs.config.RequestWaitLimit
|
||||||
}
|
}
|
||||||
|
|
||||||
qs.config = config
|
qs.config = config
|
||||||
@ -162,12 +183,15 @@ func (qs *queueSet) Quiesce(eh fq.EmptyHandler) {
|
|||||||
qs.maybeForkEmptyHandlerLocked()
|
qs.maybeForkEmptyHandlerLocked()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Values passed through a request's Decision
|
// A decision about a request
|
||||||
|
type requestDecision int
|
||||||
|
|
||||||
|
// Values passed through a request's decision
|
||||||
const (
|
const (
|
||||||
DecisionExecute = "execute"
|
decisionExecute requestDecision = iota
|
||||||
DecisionReject = "reject"
|
decisionReject
|
||||||
DecisionCancel = "cancel"
|
decisionCancel
|
||||||
DecisionTryAnother = "tryAnother"
|
decisionTryAnother
|
||||||
)
|
)
|
||||||
|
|
||||||
// Wait uses the given hashValue as the source of entropy as it
|
// Wait uses the given hashValue as the source of entropy as it
|
||||||
@ -186,14 +210,26 @@ const (
|
|||||||
// irrelevant.
|
// irrelevant.
|
||||||
func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func()) {
|
func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func()) {
|
||||||
var req *request
|
var req *request
|
||||||
decision := func() string {
|
decision := func() requestDecision {
|
||||||
qs.lockAndSyncTime()
|
qs.lockAndSyncTime()
|
||||||
defer qs.lock.Unlock()
|
defer qs.lock.Unlock()
|
||||||
// A call to Wait while the system is quiescing will be rebuffed by
|
// A call to Wait while the system is quiescing will be rebuffed by
|
||||||
// returning `tryAnother=true`.
|
// returning `tryAnother=true`.
|
||||||
if qs.emptyHandler != nil {
|
if qs.emptyHandler != nil {
|
||||||
klog.V(5).Infof("QS(%s): rebuffing request %#+v %#+v with TryAnother", qs.config.Name, descr1, descr2)
|
klog.V(5).Infof("QS(%s): rebuffing request %#+v %#+v with TryAnother", qs.config.Name, descr1, descr2)
|
||||||
return DecisionTryAnother
|
return decisionTryAnother
|
||||||
|
}
|
||||||
|
|
||||||
|
// ========================================================================
|
||||||
|
// Step 0:
|
||||||
|
// Apply only concurrency limit, if zero queues desired
|
||||||
|
if qs.config.DesiredNumQueues < 1 {
|
||||||
|
if qs.totRequestsExecuting >= qs.config.ConcurrencyLimit {
|
||||||
|
klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v because %d are executing and the limit is %d", qs.config.Name, descr1, descr2, qs.totRequestsExecuting, qs.config.ConcurrencyLimit)
|
||||||
|
return decisionReject
|
||||||
|
}
|
||||||
|
req = qs.dispatchSansQueue(descr1, descr2)
|
||||||
|
return decisionExecute
|
||||||
}
|
}
|
||||||
|
|
||||||
// ========================================================================
|
// ========================================================================
|
||||||
@ -209,7 +245,7 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i
|
|||||||
if req == nil {
|
if req == nil {
|
||||||
klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v due to queue full", qs.config.Name, descr1, descr2)
|
klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v due to queue full", qs.config.Name, descr1, descr2)
|
||||||
metrics.AddReject(qs.config.Name, "queue-full")
|
metrics.AddReject(qs.config.Name, "queue-full")
|
||||||
return DecisionReject
|
return decisionReject
|
||||||
}
|
}
|
||||||
|
|
||||||
// ========================================================================
|
// ========================================================================
|
||||||
@ -239,7 +275,7 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i
|
|||||||
select {
|
select {
|
||||||
case <-doneCh:
|
case <-doneCh:
|
||||||
klog.V(6).Infof("QS(%s): Context of request %#+v %#+v is Done", qs.config.Name, descr1, descr2)
|
klog.V(6).Infof("QS(%s): Context of request %#+v %#+v is Done", qs.config.Name, descr1, descr2)
|
||||||
req.Decision.Set(DecisionCancel)
|
req.decision.Set(decisionCancel)
|
||||||
}
|
}
|
||||||
qs.goroutineDoneOrBlocked()
|
qs.goroutineDoneOrBlocked()
|
||||||
}()
|
}()
|
||||||
@ -249,30 +285,30 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i
|
|||||||
// Step 4:
|
// Step 4:
|
||||||
// The final step in Wait is to wait on a decision from
|
// The final step in Wait is to wait on a decision from
|
||||||
// somewhere and then act on it.
|
// somewhere and then act on it.
|
||||||
decisionAny := req.Decision.GetLocked()
|
decisionAny := req.decision.GetLocked()
|
||||||
var decisionStr string
|
var decision requestDecision
|
||||||
switch d := decisionAny.(type) {
|
switch dec := decisionAny.(type) {
|
||||||
case string:
|
case requestDecision:
|
||||||
decisionStr = d
|
decision = dec
|
||||||
default:
|
default:
|
||||||
klog.Errorf("QS(%s): Impossible decision %#+v (of type %T) for request %#+v %#+v", qs.config.Name, decisionAny, decisionAny, descr1, descr2)
|
klog.Errorf("QS(%s): Impossible decision %#+v (of type %T) for request %#+v %#+v", qs.config.Name, decisionAny, decisionAny, descr1, descr2)
|
||||||
decisionStr = DecisionExecute
|
decision = decisionExecute
|
||||||
}
|
}
|
||||||
switch decisionStr {
|
switch decision {
|
||||||
case DecisionReject:
|
case decisionReject:
|
||||||
klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.config.Name, descr1, descr2)
|
klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.config.Name, descr1, descr2)
|
||||||
metrics.AddReject(qs.config.Name, "time-out")
|
metrics.AddReject(qs.config.Name, "time-out")
|
||||||
case DecisionCancel:
|
case decisionCancel:
|
||||||
qs.syncTimeLocked()
|
qs.syncTimeLocked()
|
||||||
// TODO(aaron-prindle) add metrics to these two cases
|
// TODO(aaron-prindle) add metrics to these two cases
|
||||||
if req.IsWaiting {
|
if req.isWaiting {
|
||||||
klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.config.Name, descr1, descr2)
|
klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.config.Name, descr1, descr2)
|
||||||
// remove the request from the queue as it has timed out
|
// remove the request from the queue as it has timed out
|
||||||
for i := range req.Queue.Requests {
|
for i := range req.queue.requests {
|
||||||
if req == req.Queue.Requests[i] {
|
if req == req.queue.requests[i] {
|
||||||
// remove the request
|
// remove the request
|
||||||
req.Queue.Requests = append(req.Queue.Requests[:i],
|
req.queue.requests = append(req.queue.requests[:i],
|
||||||
req.Queue.Requests[i+1:]...)
|
req.queue.requests[i+1:]...)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -284,17 +320,15 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i
|
|||||||
klog.V(5).Infof("QS(%s): request %#+v %#+v canceled shortly after dispatch", qs.config.Name, descr1, descr2)
|
klog.V(5).Infof("QS(%s): request %#+v %#+v canceled shortly after dispatch", qs.config.Name, descr1, descr2)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return decisionStr
|
return decision
|
||||||
}()
|
}()
|
||||||
switch decision {
|
switch decision {
|
||||||
case DecisionTryAnother:
|
case decisionTryAnother:
|
||||||
return true, false, func() {}
|
return true, false, func() {}
|
||||||
case DecisionReject:
|
case decisionReject, decisionCancel:
|
||||||
return false, false, func() {}
|
|
||||||
case DecisionCancel:
|
|
||||||
return false, false, func() {}
|
return false, false, func() {}
|
||||||
default:
|
default:
|
||||||
if decision != DecisionExecute {
|
if decision != decisionExecute {
|
||||||
klog.Errorf("Impossible decision %q", decision)
|
klog.Errorf("Impossible decision %q", decision)
|
||||||
}
|
}
|
||||||
return false, true, func() {
|
return false, true, func() {
|
||||||
@ -317,9 +351,9 @@ func (qs *queueSet) lockAndSyncTime() {
|
|||||||
// lock and before modifying the state of any queue.
|
// lock and before modifying the state of any queue.
|
||||||
func (qs *queueSet) syncTimeLocked() {
|
func (qs *queueSet) syncTimeLocked() {
|
||||||
realNow := qs.clock.Now()
|
realNow := qs.clock.Now()
|
||||||
timesincelast := realNow.Sub(qs.lastRealTime).Seconds()
|
timeSinceLast := realNow.Sub(qs.lastRealTime).Seconds()
|
||||||
qs.lastRealTime = realNow
|
qs.lastRealTime = realNow
|
||||||
qs.virtualTime += timesincelast * qs.getVirtualTimeRatio()
|
qs.virtualTime += timeSinceLast * qs.getVirtualTimeRatio()
|
||||||
}
|
}
|
||||||
|
|
||||||
// getVirtualTimeRatio calculates the rate at which virtual time has
|
// getVirtualTimeRatio calculates the rate at which virtual time has
|
||||||
@ -328,8 +362,8 @@ func (qs *queueSet) getVirtualTimeRatio() float64 {
|
|||||||
activeQueues := 0
|
activeQueues := 0
|
||||||
reqs := 0
|
reqs := 0
|
||||||
for _, queue := range qs.queues {
|
for _, queue := range qs.queues {
|
||||||
reqs += queue.RequestsExecuting
|
reqs += queue.requestsExecuting
|
||||||
if len(queue.Requests) > 0 || queue.RequestsExecuting > 0 {
|
if len(queue.requests) > 0 || queue.requestsExecuting > 0 {
|
||||||
activeQueues++
|
activeQueues++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -361,16 +395,16 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue uint64,
|
|||||||
|
|
||||||
// Create a request and enqueue
|
// Create a request and enqueue
|
||||||
req := &request{
|
req := &request{
|
||||||
Decision: lockingpromise.NewLockingPromise(&qs.lock, qs.counter),
|
decision: lockingpromise.NewLockingPromise(&qs.lock, qs.counter),
|
||||||
ArrivalTime: qs.clock.Now(),
|
arrivalTime: qs.clock.Now(),
|
||||||
Queue: queue,
|
queue: queue,
|
||||||
descr1: descr1,
|
descr1: descr1,
|
||||||
descr2: descr2,
|
descr2: descr2,
|
||||||
}
|
}
|
||||||
if ok := qs.rejectOrEnqueueLocked(req); !ok {
|
if ok := qs.rejectOrEnqueueLocked(req); !ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
metrics.ObserveQueueLength(qs.config.Name, len(queue.Requests))
|
metrics.ObserveQueueLength(qs.config.Name, len(queue.requests))
|
||||||
return req
|
return req
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -381,13 +415,13 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte
|
|||||||
bestQueueLen := int(math.MaxInt32)
|
bestQueueLen := int(math.MaxInt32)
|
||||||
// the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`.
|
// the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`.
|
||||||
qs.dealer.Deal(hashValue, func(queueIdx int) {
|
qs.dealer.Deal(hashValue, func(queueIdx int) {
|
||||||
thisLen := len(qs.queues[queueIdx].Requests)
|
thisLen := len(qs.queues[queueIdx].requests)
|
||||||
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of length %d", qs.config.Name, descr1, descr2, queueIdx, thisLen)
|
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of length %d", qs.config.Name, descr1, descr2, queueIdx, thisLen)
|
||||||
if thisLen < bestQueueLen {
|
if thisLen < bestQueueLen {
|
||||||
bestQueueIdx, bestQueueLen = queueIdx, thisLen
|
bestQueueIdx, bestQueueLen = queueIdx, thisLen
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
klog.V(6).Infof("QS(%s): For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.config.Name, descr1, descr2, bestQueueIdx, bestQueueLen, qs.queues[bestQueueIdx].RequestsExecuting)
|
klog.V(6).Infof("QS(%s): For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.config.Name, descr1, descr2, bestQueueIdx, bestQueueLen, qs.queues[bestQueueIdx].requestsExecuting)
|
||||||
return bestQueueIdx
|
return bestQueueIdx
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -396,7 +430,7 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte
|
|||||||
func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue) {
|
func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue) {
|
||||||
timeoutIdx := -1
|
timeoutIdx := -1
|
||||||
now := qs.clock.Now()
|
now := qs.clock.Now()
|
||||||
reqs := queue.Requests
|
reqs := queue.requests
|
||||||
// reqs are sorted oldest -> newest
|
// reqs are sorted oldest -> newest
|
||||||
// can short circuit loop (break) if oldest requests are not timing out
|
// can short circuit loop (break) if oldest requests are not timing out
|
||||||
// as newer requests also will not have timed out
|
// as newer requests also will not have timed out
|
||||||
@ -404,8 +438,8 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue) {
|
|||||||
// now - requestWaitLimit = waitLimit
|
// now - requestWaitLimit = waitLimit
|
||||||
waitLimit := now.Add(-qs.config.RequestWaitLimit)
|
waitLimit := now.Add(-qs.config.RequestWaitLimit)
|
||||||
for i, req := range reqs {
|
for i, req := range reqs {
|
||||||
if waitLimit.After(req.ArrivalTime) {
|
if waitLimit.After(req.arrivalTime) {
|
||||||
req.Decision.SetLocked(DecisionReject)
|
req.decision.SetLocked(decisionReject)
|
||||||
// get index for timed out requests
|
// get index for timed out requests
|
||||||
timeoutIdx = i
|
timeoutIdx = i
|
||||||
} else {
|
} else {
|
||||||
@ -417,19 +451,19 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue) {
|
|||||||
// timeoutIdx + 1 to remove the last timeout req
|
// timeoutIdx + 1 to remove the last timeout req
|
||||||
removeIdx := timeoutIdx + 1
|
removeIdx := timeoutIdx + 1
|
||||||
// remove all the timeout requests
|
// remove all the timeout requests
|
||||||
queue.Requests = reqs[removeIdx:]
|
queue.requests = reqs[removeIdx:]
|
||||||
// decrement the # of requestsEnqueued
|
// decrement the # of requestsEnqueued
|
||||||
qs.numRequestsEnqueued -= removeIdx
|
qs.totRequestsWaiting -= removeIdx
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// rejectOrEnqueueLocked rejects or enqueues the newly arrived request if
|
// rejectOrEnqueueLocked rejects or enqueues the newly arrived request if
|
||||||
// resource criteria isn't met
|
// resource criteria isn't met
|
||||||
func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
|
func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
|
||||||
queue := request.Queue
|
queue := request.queue
|
||||||
curQueueLength := len(queue.Requests)
|
curQueueLength := len(queue.requests)
|
||||||
// rejects the newly arrived request if resource criteria not met
|
// rejects the newly arrived request if resource criteria not met
|
||||||
if qs.getRequestsExecutingLocked() >= qs.config.ConcurrencyLimit &&
|
if qs.totRequestsExecuting >= qs.config.ConcurrencyLimit &&
|
||||||
curQueueLength >= qs.config.QueueLengthLimit {
|
curQueueLength >= qs.config.QueueLengthLimit {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -440,28 +474,17 @@ func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
|
|||||||
|
|
||||||
// enqueues a request into an queueSet
|
// enqueues a request into an queueSet
|
||||||
func (qs *queueSet) enqueueLocked(request *request) {
|
func (qs *queueSet) enqueueLocked(request *request) {
|
||||||
queue := request.Queue
|
queue := request.queue
|
||||||
if len(queue.Requests) == 0 && queue.RequestsExecuting == 0 {
|
if len(queue.requests) == 0 && queue.requestsExecuting == 0 {
|
||||||
// the queue’s virtual start time is set to the virtual time.
|
// the queue’s virtual start time is set to the virtual time.
|
||||||
queue.VirtualStart = qs.virtualTime
|
queue.virtualStart = qs.virtualTime
|
||||||
if klog.V(6) {
|
if klog.V(6) {
|
||||||
klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), queue.VirtualStart, queue.Index, request.descr1, request.descr2)
|
klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
queue.Enqueue(request)
|
queue.Enqueue(request)
|
||||||
qs.numRequestsEnqueued++
|
qs.totRequestsWaiting++
|
||||||
metrics.UpdateFlowControlRequestsInQueue(qs.config.Name, qs.numRequestsEnqueued)
|
metrics.UpdateFlowControlRequestsInQueue(qs.config.Name, qs.totRequestsWaiting)
|
||||||
}
|
|
||||||
|
|
||||||
// getRequestsExecutingLocked gets the # of requests which are "executing":
|
|
||||||
// this is the # of requests which have been dispatched but have not
|
|
||||||
// finished (via the finishRequestLocked method invoked after service)
|
|
||||||
func (qs *queueSet) getRequestsExecutingLocked() int {
|
|
||||||
total := 0
|
|
||||||
for _, queue := range qs.queues {
|
|
||||||
total += queue.RequestsExecuting
|
|
||||||
}
|
|
||||||
return total
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// dispatchAsMuchAsPossibleLocked runs a loop, as long as there
|
// dispatchAsMuchAsPossibleLocked runs a loop, as long as there
|
||||||
@ -471,50 +494,70 @@ func (qs *queueSet) getRequestsExecutingLocked() int {
|
|||||||
// queue, increment the count of the number executing, and send true
|
// queue, increment the count of the number executing, and send true
|
||||||
// to the request's channel.
|
// to the request's channel.
|
||||||
func (qs *queueSet) dispatchAsMuchAsPossibleLocked() {
|
func (qs *queueSet) dispatchAsMuchAsPossibleLocked() {
|
||||||
for qs.numRequestsEnqueued != 0 && qs.getRequestsExecutingLocked() < qs.config.ConcurrencyLimit {
|
for qs.totRequestsWaiting != 0 && qs.totRequestsExecuting < qs.config.ConcurrencyLimit {
|
||||||
_, ok := qs.dispatchLocked()
|
ok := qs.dispatchLocked()
|
||||||
if !ok {
|
if !ok {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// dispatchLocked is a convenience method for dequeueing requests that
|
func (qs *queueSet) dispatchSansQueue(descr1, descr2 interface{}) *request {
|
||||||
// require a message to be sent through the requests channel
|
now := qs.clock.Now()
|
||||||
// this is a required pattern for the QueueSet the queueSet supports
|
req := &request{
|
||||||
func (qs *queueSet) dispatchLocked() (*request, bool) {
|
startTime: now,
|
||||||
queue := qs.selectQueueLocked()
|
arrivalTime: now,
|
||||||
if queue == nil {
|
descr1: descr1,
|
||||||
return nil, false
|
descr2: descr2,
|
||||||
}
|
}
|
||||||
request, ok := queue.Dequeue()
|
qs.totRequestsExecuting++
|
||||||
if !ok {
|
if klog.V(5) {
|
||||||
return nil, false
|
klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %#+v %#+v, qs will have %d executing", qs.config.Name, now.Format(nsTimeFmt), qs.virtualTime, descr1, descr2, qs.totRequestsExecuting)
|
||||||
}
|
}
|
||||||
request.StartTime = qs.clock.Now()
|
metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, qs.totRequestsExecuting)
|
||||||
// request dequeued, service has started
|
return req
|
||||||
queue.RequestsExecuting++
|
|
||||||
qs.numRequestsEnqueued--
|
|
||||||
if klog.V(6) {
|
|
||||||
klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.config.Name, request.StartTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.Index, queue.VirtualStart, len(queue.Requests), queue.RequestsExecuting)
|
|
||||||
}
|
|
||||||
// When a request is dequeued for service -> qs.VirtualStart += G
|
|
||||||
queue.VirtualStart += qs.estimatedServiceTime
|
|
||||||
metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, queue.RequestsExecuting)
|
|
||||||
request.Decision.SetLocked(DecisionExecute)
|
|
||||||
return request, ok
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// selectQueueLocked selects the minimum virtualFinish time from the set of queues
|
// dispatchLocked uses the Fair Queuing for Server Requests method to
|
||||||
// the starting queue is selected via roundrobin
|
// select a queue and dispatch the oldest request in that queue. The
|
||||||
|
// return value indicates whether a request was dispatched; this will
|
||||||
|
// be false when there are no requests waiting in any queue.
|
||||||
|
func (qs *queueSet) dispatchLocked() bool {
|
||||||
|
queue := qs.selectQueueLocked()
|
||||||
|
if queue == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
request, ok := queue.Dequeue()
|
||||||
|
if !ok { // This should never happen. But if it does...
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
request.startTime = qs.clock.Now()
|
||||||
|
// request dequeued, service has started
|
||||||
|
qs.totRequestsWaiting--
|
||||||
|
qs.totRequestsExecuting++
|
||||||
|
queue.requestsExecuting++
|
||||||
|
if klog.V(6) {
|
||||||
|
klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.config.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.index, queue.virtualStart, len(queue.requests), queue.requestsExecuting)
|
||||||
|
}
|
||||||
|
// When a request is dequeued for service -> qs.virtualStart += G
|
||||||
|
queue.virtualStart += qs.estimatedServiceTime
|
||||||
|
metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, qs.totRequestsExecuting)
|
||||||
|
request.decision.SetLocked(decisionExecute)
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
|
// selectQueueLocked examines the queues in round robin order and
|
||||||
|
// returns the first one of those for which the virtual finish time of
|
||||||
|
// the oldest waiting request is minimal.
|
||||||
func (qs *queueSet) selectQueueLocked() *queue {
|
func (qs *queueSet) selectQueueLocked() *queue {
|
||||||
minVirtualFinish := math.Inf(1)
|
minVirtualFinish := math.Inf(1)
|
||||||
var minQueue *queue
|
var minQueue *queue
|
||||||
var minIndex int
|
var minIndex int
|
||||||
|
nq := len(qs.queues)
|
||||||
for range qs.queues {
|
for range qs.queues {
|
||||||
qs.robinIndex = (qs.robinIndex + 1) % len(qs.queues)
|
qs.robinIndex = (qs.robinIndex + 1) % nq
|
||||||
queue := qs.queues[qs.robinIndex]
|
queue := qs.queues[qs.robinIndex]
|
||||||
if len(queue.Requests) != 0 {
|
if len(queue.requests) != 0 {
|
||||||
currentVirtualFinish := queue.GetVirtualFinish(0, qs.estimatedServiceTime)
|
currentVirtualFinish := queue.GetVirtualFinish(0, qs.estimatedServiceTime)
|
||||||
if currentVirtualFinish < minVirtualFinish {
|
if currentVirtualFinish < minVirtualFinish {
|
||||||
minVirtualFinish = currentVirtualFinish
|
minVirtualFinish = currentVirtualFinish
|
||||||
@ -546,29 +589,39 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) {
|
|||||||
// previously dispatched request has completed it's service. This
|
// previously dispatched request has completed it's service. This
|
||||||
// callback updates important state in the queueSet
|
// callback updates important state in the queueSet
|
||||||
func (qs *queueSet) finishRequestLocked(r *request) {
|
func (qs *queueSet) finishRequestLocked(r *request) {
|
||||||
S := qs.clock.Since(r.StartTime).Seconds()
|
qs.totRequestsExecuting--
|
||||||
|
metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, qs.totRequestsExecuting)
|
||||||
|
|
||||||
|
if r.queue == nil {
|
||||||
|
if klog.V(6) {
|
||||||
|
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, qs.totRequestsExecuting)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
S := qs.clock.Since(r.startTime).Seconds()
|
||||||
|
|
||||||
// When a request finishes being served, and the actual service time was S,
|
// When a request finishes being served, and the actual service time was S,
|
||||||
// the queue’s virtual start time is decremented by G - S.
|
// the queue’s virtual start time is decremented by G - S.
|
||||||
r.Queue.VirtualStart -= qs.estimatedServiceTime - S
|
r.queue.virtualStart -= qs.estimatedServiceTime - S
|
||||||
|
|
||||||
// request has finished, remove from requests executing
|
// request has finished, remove from requests executing
|
||||||
r.Queue.RequestsExecuting--
|
r.queue.requestsExecuting--
|
||||||
|
|
||||||
if klog.V(6) {
|
if klog.V(6) {
|
||||||
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.Queue.Index, r.Queue.VirtualStart, S, len(r.Queue.Requests), r.Queue.RequestsExecuting)
|
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index, r.queue.virtualStart, S, len(r.queue.requests), r.queue.requestsExecuting)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Logic to remove quiesced queues
|
// If there are more queues than desired and this one has no
|
||||||
// >= as Index=25 is out of bounds for DesiredNum=25 [0...24]
|
// requests then remove it
|
||||||
if r.Queue.Index >= qs.config.DesiredNumQueues &&
|
if len(qs.queues) > qs.config.DesiredNumQueues &&
|
||||||
len(r.Queue.Requests) == 0 &&
|
len(r.queue.requests) == 0 &&
|
||||||
r.Queue.RequestsExecuting == 0 {
|
r.queue.requestsExecuting == 0 {
|
||||||
qs.queues = removeQueueAndUpdateIndexes(qs.queues, r.Queue.Index)
|
qs.queues = removeQueueAndUpdateIndexes(qs.queues, r.queue.index)
|
||||||
|
|
||||||
// decrement here to maintain the invariant that (qs.robinIndex+1) % numQueues
|
// decrement here to maintain the invariant that (qs.robinIndex+1) % numQueues
|
||||||
// is the index of the next queue after the one last dispatched from
|
// is the index of the next queue after the one last dispatched from
|
||||||
if qs.robinIndex >= r.Queue.Index {
|
if qs.robinIndex >= r.queue.index {
|
||||||
qs.robinIndex--
|
qs.robinIndex--
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -580,18 +633,18 @@ func (qs *queueSet) finishRequestLocked(r *request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// removeQueueAndUpdateIndexes uses reslicing to remove an index from a slice
|
// removeQueueAndUpdateIndexes uses reslicing to remove an index from a slice
|
||||||
// and then updates the 'Index' field of the queues to be correct
|
// and then updates the 'index' field of the queues to be correct
|
||||||
func removeQueueAndUpdateIndexes(queues []*queue, index int) []*queue {
|
func removeQueueAndUpdateIndexes(queues []*queue, index int) []*queue {
|
||||||
keptQueues := append(queues[:index], queues[index+1:]...)
|
keptQueues := append(queues[:index], queues[index+1:]...)
|
||||||
for i := index; i < len(keptQueues); i++ {
|
for i := index; i < len(keptQueues); i++ {
|
||||||
keptQueues[i].Index--
|
keptQueues[i].index--
|
||||||
}
|
}
|
||||||
return keptQueues
|
return keptQueues
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qs *queueSet) maybeForkEmptyHandlerLocked() {
|
func (qs *queueSet) maybeForkEmptyHandlerLocked() {
|
||||||
if qs.emptyHandler != nil && qs.numRequestsEnqueued == 0 &&
|
if qs.emptyHandler != nil && qs.totRequestsWaiting == 0 &&
|
||||||
qs.getRequestsExecutingLocked() == 0 {
|
qs.totRequestsExecuting == 0 {
|
||||||
qs.preCreateOrUnblockGoroutine()
|
qs.preCreateOrUnblockGoroutine()
|
||||||
go func(eh fq.EmptyHandler) {
|
go func(eh fq.EmptyHandler) {
|
||||||
defer runtime.HandleCrash()
|
defer runtime.HandleCrash()
|
||||||
|
@ -197,10 +197,34 @@ func TestDifferentFlows(t *testing.T) {
|
|||||||
|
|
||||||
exerciseQueueSetUniformScenario(t, "DifferentFlows", qs, []uniformClient{
|
exerciseQueueSetUniformScenario(t, "DifferentFlows", qs, []uniformClient{
|
||||||
{1001001001, 6, 10, time.Second, time.Second},
|
{1001001001, 6, 10, time.Second, time.Second},
|
||||||
{2002002002, 4, 15, time.Second, time.Second / 2},
|
{2002002002, 5, 15, time.Second, time.Second / 2},
|
||||||
}, time.Second*20, true, true, clk, counter)
|
}, time.Second*20, true, true, clk, counter)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDifferentFlowsWithoutQueuing(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||||
|
qsf := NewQueueSetFactory(clk, counter)
|
||||||
|
config := fq.QueueSetConfig{
|
||||||
|
Name: "TestDifferentFlowsWithoutQueuing",
|
||||||
|
ConcurrencyLimit: 4,
|
||||||
|
DesiredNumQueues: 0,
|
||||||
|
QueueLengthLimit: 6,
|
||||||
|
HandSize: 3,
|
||||||
|
RequestWaitLimit: 10 * time.Minute,
|
||||||
|
}
|
||||||
|
qs, err := qsf.NewQueueSet(config)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("QueueSet creation failed with %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
exerciseQueueSetUniformScenario(t, "DifferentFlowsWithoutQueuing", qs, []uniformClient{
|
||||||
|
{1001001001, 6, 10, time.Second, 57 * time.Millisecond},
|
||||||
|
{2002002002, 4, 15, time.Second, 750 * time.Millisecond},
|
||||||
|
}, time.Second*13, false, false, clk, counter)
|
||||||
|
}
|
||||||
|
|
||||||
func TestTimeout(t *testing.T) {
|
func TestTimeout(t *testing.T) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
Copyright 2016 The Kubernetes Authors.
|
Copyright 2019 The Kubernetes Authors.
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
you may not use this file except in compliance with the License.
|
you may not use this file except in compliance with the License.
|
||||||
@ -19,25 +19,25 @@ package queueset
|
|||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/apiserver/pkg/util/promise"
|
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise"
|
||||||
)
|
)
|
||||||
|
|
||||||
// request is a temporary container for "requests" with additional tracking fields
|
// request is a temporary container for "requests" with additional
|
||||||
// required for the functionality FQScheduler
|
// tracking fields required for the functionality FQScheduler
|
||||||
type request struct {
|
type request struct {
|
||||||
Queue *queue
|
queue *queue
|
||||||
|
|
||||||
// StartTime is the clock time when the request began executing
|
// startTime is the real time when the request began executing
|
||||||
StartTime time.Time
|
startTime time.Time
|
||||||
|
|
||||||
// Decision gets set to the decision about what to do with this request
|
// decision gets set to the decision about what to do with this request
|
||||||
Decision promise.LockingMutable
|
decision promise.LockingMutable
|
||||||
|
|
||||||
// ArrivalTime is when the request entered this system
|
// arrivalTime is the real time when the request entered this system
|
||||||
ArrivalTime time.Time
|
arrivalTime time.Time
|
||||||
|
|
||||||
// IsWaiting indicates whether the request is presently waiting in a queue
|
// isWaiting indicates whether the request is presently waiting in a queue
|
||||||
IsWaiting bool
|
isWaiting bool
|
||||||
|
|
||||||
// descr1 and descr2 are not used in any logic but they appear in
|
// descr1 and descr2 are not used in any logic but they appear in
|
||||||
// log messages
|
// log messages
|
||||||
@ -47,31 +47,32 @@ type request struct {
|
|||||||
// queue is an array of requests with additional metadata required for
|
// queue is an array of requests with additional metadata required for
|
||||||
// the FQScheduler
|
// the FQScheduler
|
||||||
type queue struct {
|
type queue struct {
|
||||||
Requests []*request
|
requests []*request
|
||||||
|
|
||||||
// VirtualStart is the virtual time when the oldest request in the
|
// virtualStart is the virtual time (virtual seconds since process
|
||||||
// queue (if there is any) started virtually executing
|
// startup) when the oldest request in the queue (if there is any)
|
||||||
VirtualStart float64
|
// started virtually executing
|
||||||
|
virtualStart float64
|
||||||
|
|
||||||
RequestsExecuting int
|
requestsExecuting int
|
||||||
Index int
|
index int
|
||||||
}
|
}
|
||||||
|
|
||||||
// Enqueue enqueues a request into the queue
|
// Enqueue enqueues a request into the queue
|
||||||
func (q *queue) Enqueue(request *request) {
|
func (q *queue) Enqueue(request *request) {
|
||||||
request.IsWaiting = true
|
request.isWaiting = true
|
||||||
q.Requests = append(q.Requests, request)
|
q.requests = append(q.requests, request)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Dequeue dequeues a request from the queue
|
// Dequeue dequeues a request from the queue
|
||||||
func (q *queue) Dequeue() (*request, bool) {
|
func (q *queue) Dequeue() (*request, bool) {
|
||||||
if len(q.Requests) == 0 {
|
if len(q.requests) == 0 {
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
request := q.Requests[0]
|
request := q.requests[0]
|
||||||
q.Requests = q.Requests[1:]
|
q.requests = q.requests[1:]
|
||||||
|
|
||||||
request.IsWaiting = false
|
request.isWaiting = false
|
||||||
return request, true
|
return request, true
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -81,7 +82,7 @@ func (q *queue) GetVirtualFinish(J int, G float64) float64 {
|
|||||||
// The virtual finish time of request number J in the queue
|
// The virtual finish time of request number J in the queue
|
||||||
// (counting from J=1 for the head) is J * G + (virtual start time).
|
// (counting from J=1 for the head) is J * G + (virtual start time).
|
||||||
|
|
||||||
// counting from J=1 for the head (eg: queue.Requests[0] -> J=1) - J+1
|
// counting from J=1 for the head (eg: queue.requests[0] -> J=1) - J+1
|
||||||
jg := float64(J+1) * float64(G)
|
jg := float64(J+1) * float64(G)
|
||||||
return jg + q.VirtualStart
|
return jg + q.virtualStart
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user