Merge pull request #85192 from MikeSpreitzer/fq-impl

Added fair queuing for server requests
This commit is contained in:
Kubernetes Prow Robot
2019-11-13 20:02:12 -08:00
committed by GitHub
25 changed files with 2362 additions and 1 deletions

View File

@@ -42,8 +42,12 @@ filegroup(
"//staging/src/k8s.io/apiserver/pkg/util/apihelpers:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/dryrun:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/feature:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/flushwriter:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/openapi:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/promise:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/proxy:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/shufflesharding:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/term:all-srcs",

View File

@@ -26,8 +26,9 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/hashicorp/golang-lru v0.5.1
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
github.com/pkg/errors v0.8.1 // indirect
github.com/pkg/errors v0.8.1
github.com/pquerna/cachecontrol v0.0.0-20171018203845-0dec1b30a021 // indirect
github.com/prometheus/client_golang v1.0.0
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90
github.com/sirupsen/logrus v1.4.2 // indirect
github.com/spf13/pflag v1.0.5

View File

@@ -0,0 +1,23 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["interface.go"],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/counter",
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/counter",
visibility = ["//visibility:public"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@@ -0,0 +1,33 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package counter
// GoRoutineCounter keeps track of the number of active goroutines
// working on/for something. This is a utility that makes such code more
// testable. The code uses this utility to report the number of active
// goroutines to the test code, so that the test code can advance a fake
// clock when and only when the code being tested has finished all
// the work that is ready to do at the present time.
type GoRoutineCounter interface {
// Add adds the given delta to the count of active goroutines.
// Call Add(1) before forking a goroutine, Add(-1) at the end of that goroutine.
// Call Add(-1) just before waiting on something from another goroutine (e.g.,
// just before a `select`).
// Call Add(1) just before doing something that unblocks a goroutine that is
// waiting on that something.
Add(delta int)
}

View File

@@ -0,0 +1,27 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["interface.go"],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing",
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing",
visibility = ["//visibility:public"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@@ -0,0 +1,90 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fairqueuing
import (
"context"
"time"
)
// QueueSetFactory is used to create QueueSet objects.
type QueueSetFactory interface {
NewQueueSet(config QueueSetConfig) (QueueSet, error)
}
// QueueSet is the abstraction for the queuing and dispatching
// functionality of one non-exempt priority level. It covers the
// functionality described in the "Assignment to a Queue", "Queuing",
// and "Dispatching" sections of
// https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md
// . Some day we may have connections between priority levels, but
// today is not that day.
type QueueSet interface {
// SetConfiguration updates the configuration
SetConfiguration(QueueSetConfig) error
// Quiesce controls whether the QueueSet is operating normally or is quiescing.
// A quiescing QueueSet drains as normal but does not admit any
// new requests. Passing a non-nil handler means the system should
// be quiescing, a nil handler means the system should operate
// normally. A call to Wait while the system is quiescing
// will be rebuffed by returning tryAnother=true. If all the
// queues have no requests waiting nor executing while the system
// is quiescing then the handler will eventually be called with no
// locks held (even if the system becomes non-quiescing between the
// triggering state and the required call).
Quiesce(EmptyHandler)
// Wait uses the given hashValue as the source of entropy as it
// shuffle-shards a request into a queue and waits for a decision
// on what to do with that request. The descr1 and descr2 values
// play no role in the logic but appear in log messages. If
// tryAnother==true at return then the QueueSet has become
// undesirable and the client should try to find a different
// QueueSet to use; execute and afterExecution are irrelevant in
// this case. Otherwise, if execute then the client should start
// executing the request and, once the request finishes execution
// or is canceled, call afterExecution(). Otherwise the client
// should not execute the request and afterExecution is
// irrelevant.
Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func())
}
// QueueSetConfig defines the configuration of a QueueSet.
type QueueSetConfig struct {
// Name is used to identify a queue set, allowing for descriptive information about its intended use
Name string
// ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time
ConcurrencyLimit int
// DesiredNumQueues is the number of queues that the API says should exist now
DesiredNumQueues int
// QueueLengthLimit is the maximum number of requests that may be waiting in a given queue at a time
QueueLengthLimit int
// HandSize is a parameter of shuffle sharding. Upon arrival of a request, a queue is chosen by randomly
// dealing a "hand" of this many queues and then picking one of minimum length.
HandSize int
// RequestWaitLimit is the maximum amount of time that a request may wait in a queue.
// If, by the end of that time, the request has not been dispatched then it is rejected.
RequestWaitLimit time.Duration
}
// EmptyHandler is used to notify the callee when all the queues
// of a QueueSet have been drained.
type EmptyHandler interface {
// HandleEmpty is called to deliver the notification
HandleEmpty()
}

View File

@@ -0,0 +1,52 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"doc.go",
"queueset.go",
"types.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset",
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/promise:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/shufflesharding:go_default_library",
"//vendor/github.com/pkg/errors:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["queueset_test.go"],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@@ -0,0 +1,120 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queueset
// This package implements a technique called "fair queuing for server
// requests". One QueueSet is a set of queues operating according to
// this technique.
// Fair queuing for server requests is inspired by the fair queuing
// technique from the world of networking. You can find a good paper
// on that at https://dl.acm.org/citation.cfm?doid=75247.75248 or
// http://people.csail.mit.edu/imcgraw/links/research/pubs/networks/WFQ.pdf
// and there is an implementation outline in the Wikipedia article at
// https://en.wikipedia.org/wiki/Fair_queuing .
// Fair queuing for server requests differs from traditional fair
// queuing in three ways: (1) we are dispatching requests to be
// executed within a process rather than transmitting packets on a
// network link, (2) multiple requests can be executing at once, and
// (3) the service time (execution duration) is not known until the
// execution completes.
// The first two differences can easily be handled by straightforward
// adaptation of the concept called "R(t)" in the original paper and
// "virtual time" in the implementation outline. In that
// implementation outline, the notation now() is used to mean reading
// the virtual clock. In the original papers terms, "R(t)" is the
// number of "rounds" that have been completed at real time t, where a
// round consists of virtually transmitting one bit from every
// non-empty queue in the router (regardless of which queue holds the
// packet that is really being transmitted at the moment); in this
// conception, a packet is considered to be "in" its queue until the
// packets transmission is finished. For our problem, we can define a
// round to be giving one nanosecond of CPU to every non-empty queue
// in the apiserver (where emptiness is judged based on both queued
// and executing requests from that queue), and define R(t) = (server
// start time) + (1 ns) * (number of rounds since server start). Let
// us write NEQ(t) for that number of non-empty queues in the
// apiserver at time t. Let us also write C for the concurrency
// limit. In the original paper, the partial derivative of R(t) with
// respect to t is
//
// 1 / NEQ(t) .
// To generalize from transmitting one packet at a time to executing C
// requests at a time, that derivative becomes
//
// C / NEQ(t) .
// However, sometimes there are fewer than C requests available to
// execute. For a given queue "q", let us also write "reqs(q, t)" for
// the number of requests of that queue that are executing at that
// time. The total number of requests executing is sum[over q]
// reqs(q, t) and if that is less than C then virtual time is not
// advancing as fast as it would if all C seats were occupied; in this
// case the numerator of the quotient in that derivative should be
// adjusted proportionally. Putting it all together for fair queing
// for server requests: at a particular time t, the partial derivative
// of R(t) with respect to t is
//
// min( C, sum[over q] reqs(q, t) ) / NEQ(t) .
//
// In terms of the implementation outline, this is the rate at which
// virtual time is advancing at time t (in virtual nanoseconds per
// real nanosecond). Where the networking implementation outline adds
// packet size to a virtual time, in our version this corresponds to
// adding a service time (i.e., duration) to virtual time.
// The third difference is handled by modifying the algorithm to
// dispatch based on an initial guess at the requests service time
// (duration) and then make the corresponding adjustments once the
// requests actual service time is known. This is similar, although
// not exactly isomorphic, to the original papers adjustment by
// `$delta` for the sake of promptness.
// For implementation simplicity (see below), let us use the same
// initial service time guess for every request; call that duration
// G. A good choice might be the service time limit (1
// minute). Different guesses will give slightly different dynamics,
// but any positive number can be used for G without ruining the
// long-term behavior.
// As in ordinary fair queuing, there is a bound on divergence from
// the ideal. In plain fair queuing the bound is one packet; in our
// version it is C requests.
// To support efficiently making the necessary adjustments once a
// requests actual service time is known, the virtual finish time of
// a request and the last virtual finish time of a queue are not
// represented directly but instead computed from queue length,
// request position in the queue, and an alternate state variable that
// holds the queues virtual start time. While the queue is empty and
// has no requests executing: the value of its virtual start time
// variable is ignored and its last virtual finish time is considered
// to be in the virtual past. When a request arrives to an empty queue
// with no requests executing, the queues virtual start time is set
// to the current virtual time. The virtual finish time of request
// number J in the queue (counting from J=1 for the head) is J * G +
// (queue's virtual start time). While the queue is non-empty: the
// last virtual finish time of the queue is the virtual finish time of
// the last request in the queue. While the queue is empty and has a
// request executing: the last virtual finish time is the queues
// virtual start time. When a request is dequeued for service the
// queues virtual start time is advanced by G. When a request
// finishes being served, and the actual service time was S, the
// queues virtual start time is decremented by G - S.

View File

@@ -0,0 +1,617 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queueset
import (
"context"
"math"
"sync"
"time"
"k8s.io/apimachinery/pkg/util/runtime"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
"k8s.io/apiserver/pkg/util/promise/lockingpromise"
"k8s.io/apiserver/pkg/util/shufflesharding"
"k8s.io/klog"
)
const nsTimeFmt = "2006-01-02 15:04:05.000000000"
// queueSetFactory implements the QueueSetFactory interface
// queueSetFactory makes QueueSet objects.
type queueSetFactory struct {
counter counter.GoRoutineCounter
clock clock.PassiveClock
}
// NewQueueSetFactory creates a new QueueSetFactory object
func NewQueueSetFactory(c clock.PassiveClock, counter counter.GoRoutineCounter) fq.QueueSetFactory {
return &queueSetFactory{
counter: counter,
clock: c,
}
}
// queueSet implements the Fair Queuing for Server Requests technique
// described in this package's doc, and a pointer to one implements
// the QueueSet interface. The clock, GoRoutineCounter, and estimated
// service time should not be changed; the fields listed after the
// lock must be accessed only while holding the lock.
type queueSet struct {
clock clock.PassiveClock
counter counter.GoRoutineCounter
estimatedServiceTime float64
lock sync.Mutex
config fq.QueueSetConfig
// queues may be longer than the desired number, while the excess
// queues are still draining.
queues []*queue
virtualTime float64
lastRealTime time.Time
// robinIndex is the index of the last queue dispatched
robinIndex int
// numRequestsEnqueued is the number of requests currently waiting
// in a queue (eg: incremeneted on Enqueue, decremented on Dequue)
numRequestsEnqueued int
emptyHandler fq.EmptyHandler
dealer *shufflesharding.Dealer
}
// NewQueueSet creates a new QueueSet object
// There is a new QueueSet created for each priority level.
func (qsf queueSetFactory) NewQueueSet(config fq.QueueSetConfig) (fq.QueueSet, error) {
dealer, err := shufflesharding.NewDealer(config.DesiredNumQueues, config.HandSize)
if err != nil {
return nil, errors.Wrap(err, "shuffle sharding dealer creation failed")
}
fq := &queueSet{
config: config,
counter: qsf.counter,
queues: createQueues(config.DesiredNumQueues, 0),
clock: qsf.clock,
virtualTime: 0,
estimatedServiceTime: 60,
lastRealTime: qsf.clock.Now(),
dealer: dealer,
}
return fq, nil
}
// createQueues is a helper method for initializing an array of n queues
func createQueues(n, baseIndex int) []*queue {
fqqueues := make([]*queue, n)
for i := 0; i < n; i++ {
fqqueues[i] = &queue{Index: baseIndex + i, Requests: make([]*request, 0)}
}
return fqqueues
}
// SetConfiguration is used to set the configuration for a queueSet
// update handling for when fields are updated is handled here as well -
// eg: if DesiredNum is increased, SetConfiguration reconciles by
// adding more queues.
func (qs *queueSet) SetConfiguration(config fq.QueueSetConfig) error {
qs.lockAndSyncTime()
defer qs.lock.Unlock()
dealer, err := shufflesharding.NewDealer(config.DesiredNumQueues, config.HandSize)
if err != nil {
return errors.Wrap(err, "shuffle sharding dealer creation failed")
}
// Adding queues is the only thing that requires immediate action
// Removing queues is handled by omitting indexes >DesiredNum from
// chooseQueueIndexLocked
numQueues := len(qs.queues)
if config.DesiredNumQueues > numQueues {
qs.queues = append(qs.queues,
createQueues(config.DesiredNumQueues-numQueues, len(qs.queues))...)
}
qs.config = config
qs.dealer = dealer
qs.dispatchAsMuchAsPossibleLocked()
return nil
}
// Quiesce controls whether the QueueSet is operating normally or is quiescing.
// A quiescing QueueSet drains as normal but does not admit any
// new requests. Passing a non-nil handler means the system should
// be quiescing, a nil handler means the system should operate
// normally. A call to Wait while the system is quiescing
// will be rebuffed by returning tryAnother=true. If all the
// queues have no requests waiting nor executing while the system
// is quiescing then the handler will eventually be called with no
// locks held (even if the system becomes non-quiescing between the
// triggering state and the required call).
func (qs *queueSet) Quiesce(eh fq.EmptyHandler) {
qs.lock.Lock()
defer qs.lock.Unlock()
qs.emptyHandler = eh
if eh == nil {
return
}
// Here we check whether there are any requests queued or executing and
// if not then fork an invocation of the EmptyHandler.
qs.maybeForkEmptyHandlerLocked()
}
// Values passed through a request's Decision
const (
DecisionExecute = "execute"
DecisionReject = "reject"
DecisionCancel = "cancel"
DecisionTryAnother = "tryAnother"
)
// Wait uses the given hashValue as the source of entropy as it
// shuffle-shards a request into a queue and waits for a decision on
// what to do with that request. The descr1 and descr2 values play no
// role in the logic but appear in log messages; we use two because
// the main client characterizes a request by two items that, if
// bundled together in a larger data structure, would lose interesting
// details when formatted. If tryAnother==true at return then the
// QueueSet has become undesirable and the client should try to find a
// different QueueSet to use; execute and afterExecution are
// irrelevant in this case. Otherwise, if execute then the client
// should start executing the request and, once the request finishes
// execution or is canceled, call afterExecution(). Otherwise the
// client should not execute the request and afterExecution is
// irrelevant.
func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func()) {
var req *request
decision := func() string {
qs.lockAndSyncTime()
defer qs.lock.Unlock()
// A call to Wait while the system is quiescing will be rebuffed by
// returning `tryAnother=true`.
if qs.emptyHandler != nil {
klog.V(5).Infof("QS(%s): rebuffing request %#+v %#+v with TryAnother", qs.config.Name, descr1, descr2)
return DecisionTryAnother
}
// ========================================================================
// Step 1:
// 1) Start with shuffle sharding, to pick a queue.
// 2) Reject old requests that have been waiting too long
// 3) Reject current request if there is not enough concurrency shares and
// we are at max queue length
// 4) If not rejected, create a request and enqueue
req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue, descr1, descr2)
// req == nil means that the request was rejected - no remaining
// concurrency shares and at max queue length already
if req == nil {
klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v due to queue full", qs.config.Name, descr1, descr2)
metrics.AddReject(qs.config.Name, "queue-full")
return DecisionReject
}
// ========================================================================
// Step 2:
// The next step is to invoke the method that dequeues as much
// as possible.
// This method runs a loop, as long as there are non-empty
// queues and the number currently executing is less than the
// assured concurrency value. The body of the loop uses the
// fair queuing technique to pick a queue and dispatch a
// request from that queue.
qs.dispatchAsMuchAsPossibleLocked()
// ========================================================================
// Step 3:
// Set up a relay from the context's Done channel to the world
// of well-counted goroutines. We Are Told that every
// request's context's Done channel gets closed by the time
// the request is done being processed.
doneCh := ctx.Done()
if doneCh != nil {
qs.preCreateOrUnblockGoroutine()
go func() {
defer runtime.HandleCrash()
qs.goroutineDoneOrBlocked()
select {
case <-doneCh:
klog.V(6).Infof("QS(%s): Context of request %#+v %#+v is Done", qs.config.Name, descr1, descr2)
req.Decision.Set(DecisionCancel)
}
qs.goroutineDoneOrBlocked()
}()
}
// ========================================================================
// Step 4:
// The final step in Wait is to wait on a decision from
// somewhere and then act on it.
decisionAny := req.Decision.GetLocked()
var decisionStr string
switch d := decisionAny.(type) {
case string:
decisionStr = d
default:
klog.Errorf("QS(%s): Impossible decision %#+v (of type %T) for request %#+v %#+v", qs.config.Name, decisionAny, decisionAny, descr1, descr2)
decisionStr = DecisionExecute
}
switch decisionStr {
case DecisionReject:
klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.config.Name, descr1, descr2)
metrics.AddReject(qs.config.Name, "time-out")
case DecisionCancel:
qs.syncTimeLocked()
// TODO(aaron-prindle) add metrics to these two cases
if req.IsWaiting {
klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.config.Name, descr1, descr2)
// remove the request from the queue as it has timed out
for i := range req.Queue.Requests {
if req == req.Queue.Requests[i] {
// remove the request
req.Queue.Requests = append(req.Queue.Requests[:i],
req.Queue.Requests[i+1:]...)
break
}
}
// At this point, if the qs is quiescing,
// has zero requests executing, and has zero requests enqueued
// then a call to the EmptyHandler should be forked.
qs.maybeForkEmptyHandlerLocked()
} else {
klog.V(5).Infof("QS(%s): request %#+v %#+v canceled shortly after dispatch", qs.config.Name, descr1, descr2)
}
}
return decisionStr
}()
switch decision {
case DecisionTryAnother:
return true, false, func() {}
case DecisionReject:
return false, false, func() {}
case DecisionCancel:
return false, false, func() {}
default:
if decision != DecisionExecute {
klog.Errorf("Impossible decision %q", decision)
}
return false, true, func() {
qs.finishRequestAndDispatchAsMuchAsPossible(req)
}
}
}
// lockAndSyncTime acquires the lock and updates the virtual time.
// Doing them together avoids the mistake of modify some queue state
// before calling syncTimeLocked.
func (qs *queueSet) lockAndSyncTime() {
qs.lock.Lock()
qs.syncTimeLocked()
}
// syncTimeLocked updates the virtual time based on the assumption
// that the current state of the queues has been in effect since
// `qs.lastRealTime`. Thus, it should be invoked after acquiring the
// lock and before modifying the state of any queue.
func (qs *queueSet) syncTimeLocked() {
realNow := qs.clock.Now()
timesincelast := realNow.Sub(qs.lastRealTime).Seconds()
qs.lastRealTime = realNow
qs.virtualTime += timesincelast * qs.getVirtualTimeRatio()
}
// getVirtualTimeRatio calculates the rate at which virtual time has
// been advancing, according to the logic in `doc.go`.
func (qs *queueSet) getVirtualTimeRatio() float64 {
activeQueues := 0
reqs := 0
for _, queue := range qs.queues {
reqs += queue.RequestsExecuting
if len(queue.Requests) > 0 || queue.RequestsExecuting > 0 {
activeQueues++
}
}
if activeQueues == 0 {
return 0
}
return math.Min(float64(reqs), float64(qs.config.ConcurrencyLimit)) / float64(activeQueues)
}
// timeoutOldRequestsAndRejectOrEnqueueLocked encapsulates the logic required
// to validate and enqueue a request for the queueSet/QueueSet:
// 1) Start with shuffle sharding, to pick a queue.
// 2) Reject old requests that have been waiting too long
// 3) Reject current request if there is not enough concurrency shares and
// we are at max queue length
// 4) If not rejected, create a request and enqueue
// returns the enqueud request on a successful enqueue
// returns nil in the case that there is no available concurrency or
// the queuelengthlimit has been reached
func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue uint64, descr1, descr2 interface{}) *request {
// Start with the shuffle sharding, to pick a queue.
queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2)
queue := qs.queues[queueIdx]
// The next step is the logic to reject requests that have been waiting too long
qs.removeTimedOutRequestsFromQueueLocked(queue)
// NOTE: currently timeout is only checked for each new request. This means that there can be
// requests that are in the queue longer than the timeout if there are no new requests
// We prefer the simplicity over the promptness, at least for now.
// Create a request and enqueue
req := &request{
Decision: lockingpromise.NewLockingPromise(&qs.lock, qs.counter),
ArrivalTime: qs.clock.Now(),
Queue: queue,
descr1: descr1,
descr2: descr2,
}
if ok := qs.rejectOrEnqueueLocked(req); !ok {
return nil
}
metrics.ObserveQueueLength(qs.config.Name, len(queue.Requests))
return req
}
// chooseQueueIndexLocked uses shuffle sharding to select a queue index
// using the given hashValue and the shuffle sharding parameters of the queueSet.
func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 interface{}) int {
bestQueueIdx := -1
bestQueueLen := int(math.MaxInt32)
// the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`.
qs.dealer.Deal(hashValue, func(queueIdx int) {
thisLen := len(qs.queues[queueIdx].Requests)
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of length %d", qs.config.Name, descr1, descr2, queueIdx, thisLen)
if thisLen < bestQueueLen {
bestQueueIdx, bestQueueLen = queueIdx, thisLen
}
})
klog.V(6).Infof("QS(%s): For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.config.Name, descr1, descr2, bestQueueIdx, bestQueueLen, qs.queues[bestQueueIdx].RequestsExecuting)
return bestQueueIdx
}
// removeTimedOutRequestsFromQueueLocked rejects old requests that have been enqueued
// past the requestWaitLimit
func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue) {
timeoutIdx := -1
now := qs.clock.Now()
reqs := queue.Requests
// reqs are sorted oldest -> newest
// can short circuit loop (break) if oldest requests are not timing out
// as newer requests also will not have timed out
// now - requestWaitLimit = waitLimit
waitLimit := now.Add(-qs.config.RequestWaitLimit)
for i, req := range reqs {
if waitLimit.After(req.ArrivalTime) {
req.Decision.SetLocked(DecisionReject)
// get index for timed out requests
timeoutIdx = i
} else {
break
}
}
// remove timed out requests from queue
if timeoutIdx != -1 {
// timeoutIdx + 1 to remove the last timeout req
removeIdx := timeoutIdx + 1
// remove all the timeout requests
queue.Requests = reqs[removeIdx:]
// decrement the # of requestsEnqueued
qs.numRequestsEnqueued -= removeIdx
}
}
// rejectOrEnqueueLocked rejects or enqueues the newly arrived request if
// resource criteria isn't met
func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
queue := request.Queue
curQueueLength := len(queue.Requests)
// rejects the newly arrived request if resource criteria not met
if qs.getRequestsExecutingLocked() >= qs.config.ConcurrencyLimit &&
curQueueLength >= qs.config.QueueLengthLimit {
return false
}
qs.enqueueLocked(request)
return true
}
// enqueues a request into an queueSet
func (qs *queueSet) enqueueLocked(request *request) {
queue := request.Queue
if len(queue.Requests) == 0 && queue.RequestsExecuting == 0 {
// the queues virtual start time is set to the virtual time.
queue.VirtualStart = qs.virtualTime
if klog.V(6) {
klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), queue.VirtualStart, queue.Index, request.descr1, request.descr2)
}
}
queue.Enqueue(request)
qs.numRequestsEnqueued++
metrics.UpdateFlowControlRequestsInQueue(qs.config.Name, qs.numRequestsEnqueued)
}
// getRequestsExecutingLocked gets the # of requests which are "executing":
// this is the # of requests which have been dispatched but have not
// finished (via the finishRequestLocked method invoked after service)
func (qs *queueSet) getRequestsExecutingLocked() int {
total := 0
for _, queue := range qs.queues {
total += queue.RequestsExecuting
}
return total
}
// dispatchAsMuchAsPossibleLocked runs a loop, as long as there
// are non-empty queues and the number currently executing is less than the
// assured concurrency value. The body of the loop uses the fair queuing
// technique to pick a queue, dequeue the request at the head of that
// queue, increment the count of the number executing, and send true
// to the request's channel.
func (qs *queueSet) dispatchAsMuchAsPossibleLocked() {
for qs.numRequestsEnqueued != 0 && qs.getRequestsExecutingLocked() < qs.config.ConcurrencyLimit {
_, ok := qs.dispatchLocked()
if !ok {
break
}
}
}
// dispatchLocked is a convenience method for dequeueing requests that
// require a message to be sent through the requests channel
// this is a required pattern for the QueueSet the queueSet supports
func (qs *queueSet) dispatchLocked() (*request, bool) {
queue := qs.selectQueueLocked()
if queue == nil {
return nil, false
}
request, ok := queue.Dequeue()
if !ok {
return nil, false
}
request.StartTime = qs.clock.Now()
// request dequeued, service has started
queue.RequestsExecuting++
qs.numRequestsEnqueued--
if klog.V(6) {
klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.config.Name, request.StartTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.Index, queue.VirtualStart, len(queue.Requests), queue.RequestsExecuting)
}
// When a request is dequeued for service -> qs.VirtualStart += G
queue.VirtualStart += qs.estimatedServiceTime
metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, queue.RequestsExecuting)
request.Decision.SetLocked(DecisionExecute)
return request, ok
}
/// selectQueueLocked selects the minimum virtualFinish time from the set of queues
// the starting queue is selected via roundrobin
func (qs *queueSet) selectQueueLocked() *queue {
minVirtualFinish := math.Inf(1)
var minQueue *queue
var minIndex int
for range qs.queues {
qs.robinIndex = (qs.robinIndex + 1) % len(qs.queues)
queue := qs.queues[qs.robinIndex]
if len(queue.Requests) != 0 {
currentVirtualFinish := queue.GetVirtualFinish(0, qs.estimatedServiceTime)
if currentVirtualFinish < minVirtualFinish {
minVirtualFinish = currentVirtualFinish
minQueue = queue
minIndex = qs.robinIndex
}
}
}
// we set the round robin indexing to start at the chose queue
// for the next round. This way the non-selected queues
// win in the case that the virtual finish times are the same
qs.robinIndex = minIndex
return minQueue
}
// finishRequestAndDispatchAsMuchAsPossible is a convenience method
// which calls finishRequest for a given request and then dispatches
// as many requests as possible. This is all of what needs to be done
// once a request finishes execution or is canceled.
func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) {
qs.lockAndSyncTime()
defer qs.lock.Unlock()
qs.finishRequestLocked(req)
qs.dispatchAsMuchAsPossibleLocked()
}
// finishRequestLocked is a callback that should be used when a
// previously dispatched request has completed it's service. This
// callback updates important state in the queueSet
func (qs *queueSet) finishRequestLocked(r *request) {
S := qs.clock.Since(r.StartTime).Seconds()
// When a request finishes being served, and the actual service time was S,
// the queues virtual start time is decremented by G - S.
r.Queue.VirtualStart -= qs.estimatedServiceTime - S
// request has finished, remove from requests executing
r.Queue.RequestsExecuting--
if klog.V(6) {
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.Queue.Index, r.Queue.VirtualStart, S, len(r.Queue.Requests), r.Queue.RequestsExecuting)
}
// Logic to remove quiesced queues
// >= as Index=25 is out of bounds for DesiredNum=25 [0...24]
if r.Queue.Index >= qs.config.DesiredNumQueues &&
len(r.Queue.Requests) == 0 &&
r.Queue.RequestsExecuting == 0 {
qs.queues = removeQueueAndUpdateIndexes(qs.queues, r.Queue.Index)
// decrement here to maintain the invariant that (qs.robinIndex+1) % numQueues
// is the index of the next queue after the one last dispatched from
if qs.robinIndex >= -r.Queue.Index {
qs.robinIndex--
}
// At this point, if the qs is quiescing,
// has zero requests executing, and has zero requests enqueued
// then a call to the EmptyHandler should be forked.
qs.maybeForkEmptyHandlerLocked()
}
}
// removeQueueAndUpdateIndexes uses reslicing to remove an index from a slice
// and then updates the 'Index' field of the queues to be correct
func removeQueueAndUpdateIndexes(queues []*queue, index int) []*queue {
keptQueues := append(queues[:index], queues[index+1:]...)
for i := index; i < len(keptQueues); i++ {
keptQueues[i].Index--
}
return keptQueues
}
func (qs *queueSet) maybeForkEmptyHandlerLocked() {
if qs.emptyHandler != nil && qs.numRequestsEnqueued == 0 &&
qs.getRequestsExecutingLocked() == 0 {
qs.preCreateOrUnblockGoroutine()
go func(eh fq.EmptyHandler) {
defer runtime.HandleCrash()
defer qs.goroutineDoneOrBlocked()
eh.HandleEmpty()
}(qs.emptyHandler)
}
}
// preCreateOrUnblockGoroutine needs to be called before creating a
// goroutine associated with this queueSet or unblocking a blocked
// one, to properly update the accounting used in testing.
func (qs *queueSet) preCreateOrUnblockGoroutine() {
qs.counter.Add(1)
}
// goroutineDoneOrBlocked needs to be called at the end of every
// goroutine associated with this queueSet or when such a goroutine is
// about to wait on some other goroutine to do something; this is to
// properly update the accounting used in testing.
func (qs *queueSet) goroutineDoneOrBlocked() {
qs.counter.Add(-1)
}

View File

@@ -0,0 +1,225 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queueset
import (
"context"
"math"
"sync/atomic"
"testing"
"time"
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
test "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock"
"k8s.io/klog"
)
type uniformScenario []uniformClient
type uniformClient struct {
hash uint64
nThreads int
nCalls int
// duration for a simulated synchronous call
execDuration time.Duration
// duration for simulated "other work"
thinkDuration time.Duration
}
// exerciseQueueSetUniformScenario runs a scenario based on the given set of uniform clients.
// Each uniform client specifies a number of threads, each of which alternates between thinking
// and making a synchronous request through the QueueSet.
// This function measures how much concurrency each client got, on average, over
// the initial evalDuration and tests to see whether they all got about the same amount.
// Each client needs to be demanding enough to use this amount, otherwise the fair result
// is not equal amounts and the simple test in this function would not accurately test fairness.
// expectPass indicates whether the QueueSet is expected to be fair.
// expectedAllRequests indicates whether all requests are expected to get dispatched.
func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet, sc uniformScenario,
evalDuration time.Duration, expectPass bool, expectedAllRequests bool,
clk *clock.FakeEventClock, counter counter.GoRoutineCounter) {
now := time.Now()
t.Logf("%s: Start %s, clk=%p, grc=%p", clk.Now().Format(nsTimeFmt), name, clk, counter)
integrators := make([]test.Integrator, len(sc))
var failedCount uint64
for i, uc := range sc {
integrators[i] = test.NewIntegrator(clk)
for j := 0; j < uc.nThreads; j++ {
counter.Add(1)
go func(i, j int, uc uniformClient, igr test.Integrator) {
for k := 0; k < uc.nCalls; k++ {
ClockWait(clk, counter, uc.thinkDuration)
for {
tryAnother, execute, afterExecute := qs.Wait(context.Background(), uc.hash, name, []int{i, j, k})
t.Logf("%s: %d, %d, %d got a=%v, e=%v", clk.Now().Format(nsTimeFmt), i, j, k, tryAnother, execute)
if tryAnother {
continue
}
if !execute {
atomic.AddUint64(&failedCount, 1)
break
}
igr.Add(1)
ClockWait(clk, counter, uc.execDuration)
afterExecute()
igr.Add(-1)
break
}
}
counter.Add(-1)
}(i, j, uc, integrators[i])
}
}
lim := now.Add(evalDuration)
clk.Run(&lim)
clk.SetTime(lim)
t.Logf("%s: End", clk.Now().Format(nsTimeFmt))
results := make([]test.IntegratorResults, len(sc))
var sumOfAvg float64
for i := range sc {
results[i] = integrators[i].GetResults()
sumOfAvg += results[i].Average
}
idealAverage := sumOfAvg / float64(len(sc))
passes := make([]bool, len(sc))
allPass := true
for i := range sc {
relDiff := (results[i].Average - idealAverage) / idealAverage
passes[i] = math.Abs(relDiff) <= 0.1
allPass = allPass && passes[i]
}
for i := range sc {
if allPass != expectPass {
t.Errorf("Class %d got an Average of %v but the ideal was %v", i, results[i].Average, idealAverage)
} else {
t.Logf("Class %d got an Average of %v and the ideal was %v", i, results[i].Average, idealAverage)
}
}
clk.Run(nil)
if expectedAllRequests && failedCount > 0 {
t.Errorf("Expected all requests to be successful but got %v failed requests", failedCount)
} else if !expectedAllRequests && failedCount == 0 {
t.Errorf("Expected failed requests but all requests succeeded")
}
}
func ClockWait(clk *clock.FakeEventClock, counter counter.GoRoutineCounter, duration time.Duration) {
dunch := make(chan struct{})
clk.EventAfterDuration(func(time.Time) {
counter.Add(1)
close(dunch)
}, duration)
counter.Add(-1)
select {
case <-dunch:
}
}
func init() {
klog.InitFlags(nil)
}
// TestNoRestraint should fail because the dummy QueueSet exercises no control
func TestNoRestraint(t *testing.T) {
now := time.Now()
clk, counter := clock.NewFakeEventClock(now, 0, nil)
nrf := test.NewNoRestraintFactory()
config := fq.QueueSetConfig{}
nr, err := nrf.NewQueueSet(config)
if err != nil {
t.Fatalf("QueueSet creation failed with %v", err)
}
exerciseQueueSetUniformScenario(t, "NoRestraint", nr, []uniformClient{
{1001001001, 5, 10, time.Second, time.Second},
{2002002002, 2, 10, time.Second, time.Second / 2},
}, time.Second*10, false, true, clk, counter)
}
func TestUniformFlows(t *testing.T) {
now := time.Now()
clk, counter := clock.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
config := fq.QueueSetConfig{
Name: "TestUniformFlows",
ConcurrencyLimit: 4,
DesiredNumQueues: 8,
QueueLengthLimit: 6,
HandSize: 3,
RequestWaitLimit: 10 * time.Minute,
}
qs, err := qsf.NewQueueSet(config)
if err != nil {
t.Fatalf("QueueSet creation failed with %v", err)
}
exerciseQueueSetUniformScenario(t, "UniformFlows", qs, []uniformClient{
{1001001001, 5, 10, time.Second, time.Second},
{2002002002, 5, 10, time.Second, time.Second},
}, time.Second*20, true, true, clk, counter)
}
func TestDifferentFlows(t *testing.T) {
now := time.Now()
clk, counter := clock.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
config := fq.QueueSetConfig{
Name: "TestDifferentFlows",
ConcurrencyLimit: 4,
DesiredNumQueues: 8,
QueueLengthLimit: 6,
HandSize: 3,
RequestWaitLimit: 10 * time.Minute,
}
qs, err := qsf.NewQueueSet(config)
if err != nil {
t.Fatalf("QueueSet creation failed with %v", err)
}
exerciseQueueSetUniformScenario(t, "DifferentFlows", qs, []uniformClient{
{1001001001, 6, 10, time.Second, time.Second},
{2002002002, 4, 15, time.Second, time.Second / 2},
}, time.Second*20, true, true, clk, counter)
}
func TestTimeout(t *testing.T) {
now := time.Now()
clk, counter := clock.NewFakeEventClock(now, 0, nil)
qsf := NewQueueSetFactory(clk, counter)
config := fq.QueueSetConfig{
Name: "TestTimeout",
ConcurrencyLimit: 1,
DesiredNumQueues: 128,
QueueLengthLimit: 128,
HandSize: 1,
RequestWaitLimit: 0,
}
qs, err := qsf.NewQueueSet(config)
if err != nil {
t.Fatalf("QueueSet creation failed with %v", err)
}
exerciseQueueSetUniformScenario(t, "Timeout", qs, []uniformClient{
{1001001001, 5, 100, time.Second, time.Second},
}, time.Second*10, true, false, clk, counter)
}

View File

@@ -0,0 +1,87 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queueset
import (
"time"
"k8s.io/apiserver/pkg/util/promise"
)
// request is a temporary container for "requests" with additional tracking fields
// required for the functionality FQScheduler
type request struct {
Queue *queue
// StartTime is the clock time when the request began executing
StartTime time.Time
// Decision gets set to the decision about what to do with this request
Decision promise.LockingMutable
// ArrivalTime is when the request entered this system
ArrivalTime time.Time
// IsWaiting indicates whether the request is presently waiting in a queue
IsWaiting bool
// descr1 and descr2 are not used in any logic but they appear in
// log messages
descr1, descr2 interface{}
}
// queue is an array of requests with additional metadata required for
// the FQScheduler
type queue struct {
Requests []*request
// VirtualStart is the virtual time when the oldest request in the
// queue (if there is any) started virtually executing
VirtualStart float64
RequestsExecuting int
Index int
}
// Enqueue enqueues a request into the queue
func (q *queue) Enqueue(request *request) {
request.IsWaiting = true
q.Requests = append(q.Requests, request)
}
// Dequeue dequeues a request from the queue
func (q *queue) Dequeue() (*request, bool) {
if len(q.Requests) == 0 {
return nil, false
}
request := q.Requests[0]
q.Requests = q.Requests[1:]
request.IsWaiting = false
return request, true
}
// GetVirtualFinish returns the expected virtual finish time of the request at
// index J in the queue with estimated finish time G
func (q *queue) GetVirtualFinish(J int, G float64) float64 {
// The virtual finish time of request number J in the queue
// (counting from J=1 for the head) is J * G + (virtual start time).
// counting from J=1 for the head (eg: queue.Requests[0] -> J=1) - J+1
jg := float64(J+1) * float64(G)
return jg + q.VirtualStart
}

View File

@@ -0,0 +1,33 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"integrator.go",
"no-restraint.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing",
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@@ -0,0 +1,35 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["event_clock.go"],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock",
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["event_clock_test.go"],
embed = [":go_default_library"],
deps = ["//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@@ -0,0 +1,251 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package clock
import (
"container/heap"
"math/rand"
"runtime"
"strings"
"sync"
"time"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
"k8s.io/klog"
)
// EventFunc does some work that needs to be done at or after the
// given time. After this function returns, associated work may continue
// on other goroutines only if they are counted by the GoRoutineCounter
// of the FakeEventClock handling this EventFunc.
type EventFunc func(time.Time)
// EventClock fires event on time
type EventClock interface {
clock.PassiveClock
EventAfterDuration(f EventFunc, d time.Duration)
EventAfterTime(f EventFunc, t time.Time)
}
// RealEventClock fires event on real world time
type RealEventClock struct {
clock.RealClock
}
// EventAfterDuration schedules an EventFunc
func (RealEventClock) EventAfterDuration(f EventFunc, d time.Duration) {
ch := time.After(d)
go func() {
select {
case t := <-ch:
f(t)
}
}()
}
// EventAfterTime schedules an EventFunc
func (r RealEventClock) EventAfterTime(f EventFunc, t time.Time) {
now := time.Now()
d := t.Sub(now)
if d <= 0 {
go f(now)
} else {
r.EventAfterDuration(f, d)
}
}
// waitGroupCounter is a wait group used for a GoRoutine Counter. This private
// type is used to disallow direct waitGroup access
type waitGroupCounter struct {
wg sync.WaitGroup
}
// compile time assertion that waitGroupCounter meets requirements
// of GoRoutineCounter
var _ counter.GoRoutineCounter = (*waitGroupCounter)(nil)
func (wgc *waitGroupCounter) Add(delta int) {
if klog.V(7) {
var pcs [5]uintptr
nCallers := runtime.Callers(2, pcs[:])
frames := runtime.CallersFrames(pcs[:nCallers])
frame1, more1 := frames.Next()
fileParts1 := strings.Split(frame1.File, "/")
tail2 := "(none)"
line2 := 0
if more1 {
frame2, _ := frames.Next()
fileParts2 := strings.Split(frame2.File, "/")
tail2 = fileParts2[len(fileParts2)-1]
line2 = frame2.Line
}
klog.Infof("GRC(%p).Add(%d) from %s:%d from %s:%d", wgc, delta, fileParts1[len(fileParts1)-1], frame1.Line, tail2, line2)
}
wgc.wg.Add(delta)
}
func (wgc *waitGroupCounter) Wait() {
wgc.wg.Wait()
}
// FakeEventClock is one whose time does not pass implicitly but
// rather is explicitly set by invocations of its SetTime method
type FakeEventClock struct {
clock.FakePassiveClock
// waiters is a heap of waiting work, sorted by time
waiters eventWaiterHeap
waitersLock sync.RWMutex
// clientWG may be nil and if not supplies constraints on time
// passing in Run. The Run method will not pick a new time until
// this is nil or its counter is zero.
clientWG *waitGroupCounter
// fuzz is the amount of noise to add to scheduling. An event
// requested to run at time T will run at some time chosen
// uniformly at random from the interval [T, T+fuzz]; the upper
// bound is exclusive iff fuzz is non-zero.
fuzz time.Duration
// rand is the random number generator to use in fuzzing
rand *rand.Rand
}
type eventWaiterHeap []eventWaiter
var _ heap.Interface = (*eventWaiterHeap)(nil)
type eventWaiter struct {
targetTime time.Time
f EventFunc
}
// NewFakeEventClock constructor. The given `r *rand.Rand` must
// henceforth not be used for any other purpose. If `r` is nil then a
// fresh one will be constructed, seeded with the current real time.
// The clientWG can be `nil` and if not is used to let Run know about
// additional work that has to complete before time can advance.
func NewFakeEventClock(t time.Time, fuzz time.Duration, r *rand.Rand) (*FakeEventClock, counter.GoRoutineCounter) {
grc := &waitGroupCounter{}
if r == nil {
r = rand.New(rand.NewSource(time.Now().UnixNano()))
r.Uint64()
r.Uint64()
r.Uint64()
}
return &FakeEventClock{
FakePassiveClock: *clock.NewFakePassiveClock(t),
clientWG: grc,
fuzz: fuzz,
rand: r,
}, grc
}
// GetNextTime returns the next time at which there is work scheduled,
// and a bool indicating whether there is any such time
func (fec *FakeEventClock) GetNextTime() (time.Time, bool) {
fec.waitersLock.RLock()
defer fec.waitersLock.RUnlock()
if len(fec.waiters) > 0 {
return fec.waiters[0].targetTime, true
}
return time.Time{}, false
}
// Run runs all the events scheduled, and all the events they
// schedule, and so on, until there are none scheduled or the limit is not
// nil and the next time would exceed the limit. The clientWG given in
// the constructor gates each advance of time.
func (fec *FakeEventClock) Run(limit *time.Time) {
for {
fec.clientWG.Wait()
t, ok := fec.GetNextTime()
if !ok || limit != nil && t.After(*limit) {
break
}
fec.SetTime(t)
}
}
// SetTime sets the time and runs to completion all events that should
// be started by the given time --- including any further events they
// schedule
func (fec *FakeEventClock) SetTime(t time.Time) {
fec.FakePassiveClock.SetTime(t)
for {
foundSome := false
func() {
fec.waitersLock.Lock()
defer fec.waitersLock.Unlock()
// This loop is because events run at a given time may schedule more
// events to run at that or an earlier time.
// Events should not advance the clock. But just in case they do...
now := fec.Now()
var wg sync.WaitGroup
for len(fec.waiters) > 0 && !now.Before(fec.waiters[0].targetTime) {
ew := heap.Pop(&fec.waiters).(eventWaiter)
wg.Add(1)
go func(f EventFunc) { f(now); wg.Done() }(ew.f)
foundSome = true
}
wg.Wait()
}()
if !foundSome {
break
}
}
}
// EventAfterDuration schedules the given function to be invoked once
// the given duration has passed.
func (fec *FakeEventClock) EventAfterDuration(f EventFunc, d time.Duration) {
fec.waitersLock.Lock()
defer fec.waitersLock.Unlock()
now := fec.Now()
fd := time.Duration(float32(fec.fuzz) * fec.rand.Float32())
heap.Push(&fec.waiters, eventWaiter{targetTime: now.Add(d + fd), f: f})
}
// EventAfterTime schedules the given function to be invoked once
// the given time has arrived.
func (fec *FakeEventClock) EventAfterTime(f EventFunc, t time.Time) {
fec.waitersLock.Lock()
defer fec.waitersLock.Unlock()
fd := time.Duration(float32(fec.fuzz) * fec.rand.Float32())
heap.Push(&fec.waiters, eventWaiter{targetTime: t.Add(fd), f: f})
}
func (ewh eventWaiterHeap) Len() int { return len(ewh) }
func (ewh eventWaiterHeap) Less(i, j int) bool { return ewh[i].targetTime.Before(ewh[j].targetTime) }
func (ewh eventWaiterHeap) Swap(i, j int) { ewh[i], ewh[j] = ewh[j], ewh[i] }
func (ewh *eventWaiterHeap) Push(x interface{}) {
*ewh = append(*ewh, x.(eventWaiter))
}
func (ewh *eventWaiterHeap) Pop() interface{} {
old := *ewh
n := len(old)
x := old[n-1]
*ewh = old[:n-1]
return x
}

View File

@@ -0,0 +1,183 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package clock
import (
"math/rand"
"sync/atomic"
"testing"
"time"
"k8s.io/apimachinery/pkg/util/clock"
)
type TestableEventClock interface {
EventClock
SetTime(time.Time)
Run(*time.Time)
}
// settablePassiveClock allows setting current time of a passive clock
type settablePassiveClock interface {
clock.PassiveClock
SetTime(time.Time)
}
func exerciseTestableEventClock(t *testing.T, ec TestableEventClock, fuzz time.Duration) {
exercisePassiveClock(t, ec)
var numDone int32
now := ec.Now()
strictable := true
const batchSize = 100
times := make(chan time.Time, batchSize+1)
try := func(abs, strict bool, d time.Duration) {
f := func(u time.Time) {
realD := ec.Since(now)
atomic.AddInt32(&numDone, 1)
times <- u
if realD < d || strict && strictable && realD > d+fuzz {
t.Errorf("Asked for %v, got %v", d, realD)
}
}
if abs {
ec.EventAfterTime(f, now.Add(d))
} else {
ec.EventAfterDuration(f, d)
}
}
try(true, true, time.Minute)
for i := 0; i < batchSize; i++ {
d := time.Duration(rand.Intn(30)-3) * time.Second
try(i%2 == 0, d >= 0, d)
}
ec.Run(nil)
if numDone != batchSize+1 {
t.Errorf("Got only %v events", numDone)
}
lastTime := now.Add(-3 * time.Second)
for i := 0; i <= batchSize; i++ {
nextTime := <-times
if nextTime.Before(lastTime) {
t.Errorf("Got %s after %s", nextTime, lastTime)
}
}
endTime := ec.Now()
dx := endTime.Sub(now)
if dx > time.Minute+fuzz {
t.Errorf("Run started at %#+v, ended at %#+v, dx=%d", now, endTime, dx)
}
now = endTime
var shouldRun int32
strictable = false
for i := 0; i < batchSize; i++ {
d := time.Duration(rand.Intn(30)-3) * time.Second
try(i%2 == 0, d >= 0, d)
if d <= 12*time.Second {
shouldRun++
}
}
ec.SetTime(now.Add(13*time.Second - 1))
if numDone != batchSize+1+shouldRun {
t.Errorf("Expected %v, but %v ran", shouldRun, numDone-batchSize-1)
}
lastTime = now.Add(-3 * time.Second)
for i := int32(0); i < shouldRun; i++ {
nextTime := <-times
if nextTime.Before(lastTime) {
t.Errorf("Got %s after %s", nextTime, lastTime)
}
lastTime = nextTime
}
}
func exercisePassiveClock(t *testing.T, pc settablePassiveClock) {
t1 := time.Now()
t2 := t1.Add(time.Hour)
pc.SetTime(t1)
tx := pc.Now()
if tx != t1 {
t.Errorf("SetTime(%#+v); Now() => %#+v", t1, tx)
}
dx := pc.Since(t1)
if dx != 0 {
t.Errorf("Since() => %v", dx)
}
pc.SetTime(t2)
dx = pc.Since(t1)
if dx != time.Hour {
t.Errorf("Since() => %v", dx)
}
tx = pc.Now()
if tx != t2 {
t.Errorf("Now() => %#+v", tx)
}
}
func TestFakeEventClock(t *testing.T) {
startTime := time.Now()
fec, _ := NewFakeEventClock(startTime, 0, nil)
exerciseTestableEventClock(t, fec, 0)
fec, _ = NewFakeEventClock(startTime, time.Second, nil)
exerciseTestableEventClock(t, fec, time.Second)
}
func exerciseEventClock(t *testing.T, ec EventClock, relax func(time.Duration)) {
var numDone int32
now := ec.Now()
const batchSize = 100
times := make(chan time.Time, batchSize+1)
try := func(abs bool, d time.Duration) {
f := func(u time.Time) {
realD := ec.Since(now)
atomic.AddInt32(&numDone, 1)
times <- u
if realD < d {
t.Errorf("Asked for %v, got %v", d, realD)
}
}
if abs {
ec.EventAfterTime(f, now.Add(d))
} else {
ec.EventAfterDuration(f, d)
}
}
try(true, time.Millisecond*3300)
for i := 0; i < batchSize; i++ {
d := time.Duration(rand.Intn(30)-3) * time.Millisecond * 100
try(i%2 == 0, d)
}
relax(time.Second * 4)
if atomic.LoadInt32(&numDone) != batchSize+1 {
t.Errorf("Got only %v events", numDone)
}
lastTime := now
for i := 0; i <= batchSize; i++ {
nextTime := <-times
if nextTime.Before(now) {
continue
}
dt := nextTime.Sub(lastTime) / (50 * time.Millisecond)
if dt < 0 {
t.Errorf("Got %s after %s", nextTime, lastTime)
}
lastTime = nextTime
}
}
func TestRealEventClock(t *testing.T) {
exerciseEventClock(t, RealEventClock{}, func(d time.Duration) { time.Sleep(d) })
}

View File

@@ -0,0 +1,103 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
import (
"math"
"sync"
"time"
"k8s.io/apimachinery/pkg/util/clock"
)
// Integrator computes the integral of some variable X over time as
// read from a particular clock. The integral starts when the
// Integrator is created, and ends at the latest operation on the
// Integrator.
type Integrator interface {
Set(float64) // set the value of X
Add(float64) // add the given quantity to X
GetResults() IntegratorResults
}
// IntegratorResults holds statistical abstracts of the integration
type IntegratorResults struct {
Duration float64 //seconds
Average float64
Deviation float64 //sqrt(avg((value-avg)^2))
}
type integrator struct {
sync.Mutex
clk clock.PassiveClock
lastTime time.Time
x float64
integrals [3]float64 // integral of x^0, x^1, and x^2
}
// NewIntegrator makes one that uses the given clock
func NewIntegrator(clk clock.PassiveClock) Integrator {
return &integrator{
clk: clk,
lastTime: clk.Now(),
}
}
func (igr *integrator) Set(x float64) {
igr.Lock()
igr.updateLocked()
igr.x = x
igr.Unlock()
}
func (igr *integrator) Add(deltaX float64) {
igr.Lock()
igr.updateLocked()
igr.x += deltaX
igr.Unlock()
}
func (igr *integrator) updateLocked() {
now := igr.clk.Now()
dt := now.Sub(igr.lastTime).Seconds()
igr.lastTime = now
igr.integrals[0] += dt
igr.integrals[1] += dt * igr.x
igr.integrals[2] += dt * igr.x * igr.x
}
func (igr *integrator) GetResults() (results IntegratorResults) {
igr.Lock()
defer func() { igr.Unlock() }()
igr.updateLocked()
results.Duration = igr.integrals[0]
if results.Duration <= 0 {
results.Average = math.NaN()
results.Deviation = math.NaN()
return
}
results.Average = igr.integrals[1] / igr.integrals[0]
// Deviation is sqrt( Integral( (x - xbar)^2 dt) / Duration )
// = sqrt( Integral( x^2 + xbar^2 -2*x*xbar dt ) / Duration )
// = sqrt( ( Integral( x^2 dt ) + Duration * xbar^2 - 2*xbar*Integral(x dt) ) / Duration)
// = sqrt( Integral(x^2 dt)/Duration - xbar^2 )
variance := igr.integrals[2]/igr.integrals[0] - results.Average*results.Average
if variance > 0 {
results.Deviation = math.Sqrt(variance)
}
return
}

View File

@@ -0,0 +1,49 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
import (
"context"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
)
// NewNoRestraintFactory makes a QueueSetFactory that produces
// QueueSets that exert no restraint --- every request is dispatched
// for execution as soon as it arrives.
func NewNoRestraintFactory() fq.QueueSetFactory {
return noRestraintFactory{}
}
type noRestraintFactory struct{}
func (noRestraintFactory) NewQueueSet(config fq.QueueSetConfig) (fq.QueueSet, error) {
return noRestraint{}, nil
}
type noRestraint struct{}
func (noRestraint) SetConfiguration(config fq.QueueSetConfig) error {
return nil
}
func (noRestraint) Quiesce(fq.EmptyHandler) {
}
func (noRestraint) Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (quiescent, execute bool, afterExecution func()) {
return false, true, func() {}
}

View File

@@ -0,0 +1,24 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["metrics.go"],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/metrics",
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/metrics",
visibility = ["//visibility:public"],
deps = ["//vendor/github.com/prometheus/client_golang/prometheus:go_default_library"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@@ -0,0 +1,152 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"time"
"github.com/prometheus/client_golang/prometheus"
)
const (
namespace = "apiserver"
subsystem = "flowcontrol"
)
const (
priorityLevel = "priorityLevel"
flowSchema = "flowSchema"
)
var (
queueLengthBuckets = []float64{0, 10, 25, 50, 100, 250, 500, 1000}
requestDurationSecondsBuckets = []float64{0, 0.005, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 30}
)
func init() {
prometheus.MustRegister(apiserverRejectedRequests)
prometheus.MustRegister(apiserverCurrentInqueueRequests)
prometheus.MustRegister(apiserverRequestQueueLength)
prometheus.MustRegister(apiserverRequestConcurrencyLimit)
prometheus.MustRegister(apiserverCurrentExecutingRequests)
prometheus.MustRegister(apiserverRequestWaitingSeconds)
prometheus.MustRegister(apiserverRequestExecutionSeconds)
}
var (
apiserverRejectedRequests = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "rejectedRequests",
Help: "Number of rejected requests by api priority and fairness system",
},
[]string{priorityLevel, "reason"},
)
apiserverCurrentInqueueRequests = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "currentInqueueRequests",
Help: "Number of requests currently pending in the queue by the api priority and fairness system",
},
[]string{priorityLevel},
)
apiserverRequestQueueLength = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "requestQueueLength",
Help: "Length of queue in the api priority and fairness system",
Buckets: queueLengthBuckets,
},
[]string{priorityLevel},
)
apiserverRequestConcurrencyLimit = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "requestConcurrencyLimit",
Help: "Shared concurrency limit in the api priority and fairness system",
},
[]string{priorityLevel},
)
apiserverCurrentExecutingRequests = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "currentExecutingRequests",
Help: "Number of requests currently executing in the api priority and fairness system",
},
[]string{priorityLevel},
)
apiserverRequestWaitingSeconds = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "request_wait_durationSeconds",
Help: "Length of time a request spent waiting in its queue",
Buckets: requestDurationSecondsBuckets,
},
[]string{priorityLevel, flowSchema, "execute"},
)
apiserverRequestExecutionSeconds = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "requestExecutionSeconds",
Help: "Time of request executing in the api priority and fairness system",
Buckets: requestDurationSecondsBuckets,
},
[]string{priorityLevel, flowSchema},
)
)
// UpdateFlowControlRequestsInQueue updates the value for the # of requests in the specified queues in flow control
func UpdateFlowControlRequestsInQueue(priorityLevel string, inqueue int) {
apiserverCurrentInqueueRequests.WithLabelValues(priorityLevel).Set(float64(inqueue))
}
// UpdateFlowControlRequestsExecuting updates the value for the # of requests executing in flow control
func UpdateFlowControlRequestsExecuting(priorityLevel string, executing int) {
apiserverCurrentExecutingRequests.WithLabelValues(priorityLevel).Set(float64(executing))
}
// UpdateSharedConcurrencyLimit updates the value for the concurrency limit in flow control
func UpdateSharedConcurrencyLimit(priorityLevel string, limit int) {
apiserverRequestConcurrencyLimit.WithLabelValues(priorityLevel).Set(float64(limit))
}
// AddReject increments the # of rejected requests for flow control
func AddReject(priorityLevel string, reason string) {
apiserverRejectedRequests.WithLabelValues(priorityLevel, reason).Add(1)
}
// ObserveQueueLength observes the queue length for flow control
func ObserveQueueLength(priorityLevel string, length int) {
apiserverRequestQueueLength.WithLabelValues(priorityLevel).Observe(float64(length))
}
// ObserveWaitingDuration observes the queue length for flow control
func ObserveWaitingDuration(priorityLevel, flowSchema, execute string, waitTime time.Duration) {
apiserverRequestWaitingSeconds.WithLabelValues(priorityLevel, flowSchema, execute).Observe(waitTime.Seconds())
}
// ObserveExecutionDuration observes the execution duration for flow control
func ObserveExecutionDuration(priorityLevel, flowSchema string, executionTime time.Duration) {
apiserverRequestExecutionSeconds.WithLabelValues(priorityLevel, flowSchema).Observe(executionTime.Seconds())
}

View File

@@ -0,0 +1,26 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["interface.go"],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/promise",
importpath = "k8s.io/apiserver/pkg/util/promise",
visibility = ["//visibility:public"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@@ -0,0 +1,42 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package promise
// Mutable is a variable that is initially not set and can be set one
// or more times (unlike a traditional "promise", which can be written
// only once).
type Mutable interface {
// Set writes a value into this variable and unblocks every
// goroutine waiting for this variable to have a value
Set(interface{})
// Get reads the value of this variable. If this variable is
// not set yet then this call blocks until this variable gets a value.
Get() interface{}
}
// LockingMutable is a Mutable whose implementation is protected by a lock
type LockingMutable interface {
Mutable
// SetLocked is like Set but the caller must already hold the lock
SetLocked(interface{})
// GetLocked is like Get but the caller must already hold the lock
GetLocked() interface{}
}

View File

@@ -0,0 +1,34 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["lockingpromise.go"],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/promise/lockingpromise",
importpath = "k8s.io/apiserver/pkg/util/promise/lockingpromise",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/promise:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["lockingpromise_test.go"],
embed = [":go_default_library"],
deps = ["//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock:go_default_library"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@@ -0,0 +1,79 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package lockingpromise
import (
"sync"
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
"k8s.io/apiserver/pkg/util/promise"
)
// lockingPromise implements LockingMutable based on a condition
// variable. This implementation tracks active goroutines: the given
// counter is decremented for a goroutine waiting for this varible to
// be set and incremented when such a goroutine is unblocked.
type lockingPromise struct {
lock sync.Locker
cond sync.Cond
activeCounter counter.GoRoutineCounter // counter of active goroutines
waitingCount int // number of goroutines idle due to this mutable being unset
isSet bool
value interface{}
}
var _ promise.LockingMutable = &lockingPromise{}
// NewLockingPromise makes a new promise.LockingMutable
func NewLockingPromise(lock sync.Locker, activeCounter counter.GoRoutineCounter) promise.LockingMutable {
return &lockingPromise{
lock: lock,
cond: *sync.NewCond(lock),
activeCounter: activeCounter,
}
}
func (lp *lockingPromise) Set(value interface{}) {
lp.lock.Lock()
defer lp.lock.Unlock()
lp.SetLocked(value)
}
func (lp *lockingPromise) Get() interface{} {
lp.lock.Lock()
defer lp.lock.Unlock()
return lp.GetLocked()
}
func (lp *lockingPromise) SetLocked(value interface{}) {
lp.isSet = true
lp.value = value
if lp.waitingCount > 0 {
lp.activeCounter.Add(lp.waitingCount)
lp.waitingCount = 0
lp.cond.Broadcast()
}
}
func (lp *lockingPromise) GetLocked() interface{} {
if !lp.isSet {
lp.waitingCount++
lp.activeCounter.Add(-1)
lp.cond.Wait()
}
return lp.value
}

View File

@@ -0,0 +1,70 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package lockingpromise
import (
"sync"
"sync/atomic"
"testing"
"time"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock"
)
func TestLockingPromise(t *testing.T) {
now := time.Now()
clock, counter := clock.NewFakeEventClock(now, 0, nil)
var lock sync.Mutex
lp := NewLockingPromise(&lock, counter)
var gots int32
var got atomic.Value
counter.Add(1)
go func() {
got.Store(lp.Get())
atomic.AddInt32(&gots, 1)
counter.Add(-1)
}()
clock.Run(nil)
time.Sleep(time.Second)
if atomic.LoadInt32(&gots) != 0 {
t.Error("Get returned before Set")
}
var aval = &now
lp.Set(aval)
clock.Run(nil)
time.Sleep(time.Second)
if atomic.LoadInt32(&gots) != 1 {
t.Error("Get did not return after Set")
}
if got.Load() != aval {
t.Error("Get did not return what was Set")
}
counter.Add(1)
go func() {
got.Store(lp.Get())
atomic.AddInt32(&gots, 1)
counter.Add(-1)
}()
clock.Run(nil)
time.Sleep(time.Second)
if atomic.LoadInt32(&gots) != 2 {
t.Error("Second Get did not return immediately")
}
if got.Load() != aval {
t.Error("Second Get did not return what was Set")
}
}

View File

@@ -100,6 +100,7 @@ package_group(
"//pkg/scheduler/framework/v1alpha1",
"//pkg/volume/util/operationexecutor",
"//staging/src/k8s.io/apiserver/pkg/admission/metrics",
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics",
"//staging/src/k8s.io/component-base/metrics/...",
"//test/e2e",
"//test/e2e/storage",