diff --git a/staging/src/k8s.io/apiserver/BUILD b/staging/src/k8s.io/apiserver/BUILD index e04d7b37398..a6b511f8dd9 100644 --- a/staging/src/k8s.io/apiserver/BUILD +++ b/staging/src/k8s.io/apiserver/BUILD @@ -42,8 +42,12 @@ filegroup( "//staging/src/k8s.io/apiserver/pkg/util/apihelpers:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/dryrun:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/feature:all-srcs", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:all-srcs", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:all-srcs", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/flushwriter:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/openapi:all-srcs", + "//staging/src/k8s.io/apiserver/pkg/util/promise:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/proxy:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/shufflesharding:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/term:all-srcs", diff --git a/staging/src/k8s.io/apiserver/go.mod b/staging/src/k8s.io/apiserver/go.mod index 6c8e3d04e57..c6a38dd4074 100644 --- a/staging/src/k8s.io/apiserver/go.mod +++ b/staging/src/k8s.io/apiserver/go.mod @@ -26,8 +26,9 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/hashicorp/golang-lru v0.5.1 github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 - github.com/pkg/errors v0.8.1 // indirect + github.com/pkg/errors v0.8.1 github.com/pquerna/cachecontrol v0.0.0-20171018203845-0dec1b30a021 // indirect + github.com/prometheus/client_golang v1.0.0 github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 github.com/sirupsen/logrus v1.4.2 // indirect github.com/spf13/pflag v1.0.5 diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter/BUILD b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter/BUILD new file mode 100644 index 00000000000..9722df05adc --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter/BUILD @@ -0,0 +1,23 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["interface.go"], + importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/counter", + importpath = "k8s.io/apiserver/pkg/util/flowcontrol/counter", + visibility = ["//visibility:public"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter/interface.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter/interface.go new file mode 100644 index 00000000000..0418e1217a8 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter/interface.go @@ -0,0 +1,33 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package counter + +// GoRoutineCounter keeps track of the number of active goroutines +// working on/for something. This is a utility that makes such code more +// testable. The code uses this utility to report the number of active +// goroutines to the test code, so that the test code can advance a fake +// clock when and only when the code being tested has finished all +// the work that is ready to do at the present time. +type GoRoutineCounter interface { + // Add adds the given delta to the count of active goroutines. + // Call Add(1) before forking a goroutine, Add(-1) at the end of that goroutine. + // Call Add(-1) just before waiting on something from another goroutine (e.g., + // just before a `select`). + // Call Add(1) just before doing something that unblocks a goroutine that is + // waiting on that something. + Add(delta int) +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/BUILD b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/BUILD new file mode 100644 index 00000000000..6707c197d4b --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/BUILD @@ -0,0 +1,27 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["interface.go"], + importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing", + importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing", + visibility = ["//visibility:public"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [ + ":package-srcs", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset:all-srcs", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:all-srcs", + ], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go new file mode 100644 index 00000000000..137907ecc45 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go @@ -0,0 +1,90 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fairqueuing + +import ( + "context" + "time" +) + +// QueueSetFactory is used to create QueueSet objects. +type QueueSetFactory interface { + NewQueueSet(config QueueSetConfig) (QueueSet, error) +} + +// QueueSet is the abstraction for the queuing and dispatching +// functionality of one non-exempt priority level. It covers the +// functionality described in the "Assignment to a Queue", "Queuing", +// and "Dispatching" sections of +// https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md +// . Some day we may have connections between priority levels, but +// today is not that day. +type QueueSet interface { + // SetConfiguration updates the configuration + SetConfiguration(QueueSetConfig) error + + // Quiesce controls whether the QueueSet is operating normally or is quiescing. + // A quiescing QueueSet drains as normal but does not admit any + // new requests. Passing a non-nil handler means the system should + // be quiescing, a nil handler means the system should operate + // normally. A call to Wait while the system is quiescing + // will be rebuffed by returning tryAnother=true. If all the + // queues have no requests waiting nor executing while the system + // is quiescing then the handler will eventually be called with no + // locks held (even if the system becomes non-quiescing between the + // triggering state and the required call). + Quiesce(EmptyHandler) + + // Wait uses the given hashValue as the source of entropy as it + // shuffle-shards a request into a queue and waits for a decision + // on what to do with that request. The descr1 and descr2 values + // play no role in the logic but appear in log messages. If + // tryAnother==true at return then the QueueSet has become + // undesirable and the client should try to find a different + // QueueSet to use; execute and afterExecution are irrelevant in + // this case. Otherwise, if execute then the client should start + // executing the request and, once the request finishes execution + // or is canceled, call afterExecution(). Otherwise the client + // should not execute the request and afterExecution is + // irrelevant. + Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func()) +} + +// QueueSetConfig defines the configuration of a QueueSet. +type QueueSetConfig struct { + // Name is used to identify a queue set, allowing for descriptive information about its intended use + Name string + // ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time + ConcurrencyLimit int + // DesiredNumQueues is the number of queues that the API says should exist now + DesiredNumQueues int + // QueueLengthLimit is the maximum number of requests that may be waiting in a given queue at a time + QueueLengthLimit int + // HandSize is a parameter of shuffle sharding. Upon arrival of a request, a queue is chosen by randomly + // dealing a "hand" of this many queues and then picking one of minimum length. + HandSize int + // RequestWaitLimit is the maximum amount of time that a request may wait in a queue. + // If, by the end of that time, the request has not been dispatched then it is rejected. + RequestWaitLimit time.Duration +} + +// EmptyHandler is used to notify the callee when all the queues +// of a QueueSet have been drained. +type EmptyHandler interface { + // HandleEmpty is called to deliver the notification + HandleEmpty() +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/BUILD b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/BUILD new file mode 100644 index 00000000000..e29bab63f34 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/BUILD @@ -0,0 +1,52 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "doc.go", + "queueset.go", + "types.go", + ], + importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset", + importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset", + visibility = ["//visibility:public"], + deps = [ + "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/promise:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/shufflesharding:go_default_library", + "//vendor/github.com/pkg/errors:go_default_library", + "//vendor/k8s.io/klog:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["queueset_test.go"], + embed = [":go_default_library"], + deps = [ + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock:go_default_library", + "//vendor/k8s.io/klog:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/doc.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/doc.go new file mode 100644 index 00000000000..d9431378ad3 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/doc.go @@ -0,0 +1,120 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queueset + +// This package implements a technique called "fair queuing for server +// requests". One QueueSet is a set of queues operating according to +// this technique. + +// Fair queuing for server requests is inspired by the fair queuing +// technique from the world of networking. You can find a good paper +// on that at https://dl.acm.org/citation.cfm?doid=75247.75248 or +// http://people.csail.mit.edu/imcgraw/links/research/pubs/networks/WFQ.pdf +// and there is an implementation outline in the Wikipedia article at +// https://en.wikipedia.org/wiki/Fair_queuing . + +// Fair queuing for server requests differs from traditional fair +// queuing in three ways: (1) we are dispatching requests to be +// executed within a process rather than transmitting packets on a +// network link, (2) multiple requests can be executing at once, and +// (3) the service time (execution duration) is not known until the +// execution completes. + +// The first two differences can easily be handled by straightforward +// adaptation of the concept called "R(t)" in the original paper and +// "virtual time" in the implementation outline. In that +// implementation outline, the notation now() is used to mean reading +// the virtual clock. In the original paper’s terms, "R(t)" is the +// number of "rounds" that have been completed at real time t, where a +// round consists of virtually transmitting one bit from every +// non-empty queue in the router (regardless of which queue holds the +// packet that is really being transmitted at the moment); in this +// conception, a packet is considered to be "in" its queue until the +// packet’s transmission is finished. For our problem, we can define a +// round to be giving one nanosecond of CPU to every non-empty queue +// in the apiserver (where emptiness is judged based on both queued +// and executing requests from that queue), and define R(t) = (server +// start time) + (1 ns) * (number of rounds since server start). Let +// us write NEQ(t) for that number of non-empty queues in the +// apiserver at time t. Let us also write C for the concurrency +// limit. In the original paper, the partial derivative of R(t) with +// respect to t is +// +// 1 / NEQ(t) . + +// To generalize from transmitting one packet at a time to executing C +// requests at a time, that derivative becomes +// +// C / NEQ(t) . + +// However, sometimes there are fewer than C requests available to +// execute. For a given queue "q", let us also write "reqs(q, t)" for +// the number of requests of that queue that are executing at that +// time. The total number of requests executing is sum[over q] +// reqs(q, t) and if that is less than C then virtual time is not +// advancing as fast as it would if all C seats were occupied; in this +// case the numerator of the quotient in that derivative should be +// adjusted proportionally. Putting it all together for fair queing +// for server requests: at a particular time t, the partial derivative +// of R(t) with respect to t is +// +// min( C, sum[over q] reqs(q, t) ) / NEQ(t) . +// +// In terms of the implementation outline, this is the rate at which +// virtual time is advancing at time t (in virtual nanoseconds per +// real nanosecond). Where the networking implementation outline adds +// packet size to a virtual time, in our version this corresponds to +// adding a service time (i.e., duration) to virtual time. + +// The third difference is handled by modifying the algorithm to +// dispatch based on an initial guess at the request’s service time +// (duration) and then make the corresponding adjustments once the +// request’s actual service time is known. This is similar, although +// not exactly isomorphic, to the original paper’s adjustment by +// `$delta` for the sake of promptness. + +// For implementation simplicity (see below), let us use the same +// initial service time guess for every request; call that duration +// G. A good choice might be the service time limit (1 +// minute). Different guesses will give slightly different dynamics, +// but any positive number can be used for G without ruining the +// long-term behavior. + +// As in ordinary fair queuing, there is a bound on divergence from +// the ideal. In plain fair queuing the bound is one packet; in our +// version it is C requests. + +// To support efficiently making the necessary adjustments once a +// request’s actual service time is known, the virtual finish time of +// a request and the last virtual finish time of a queue are not +// represented directly but instead computed from queue length, +// request position in the queue, and an alternate state variable that +// holds the queue’s virtual start time. While the queue is empty and +// has no requests executing: the value of its virtual start time +// variable is ignored and its last virtual finish time is considered +// to be in the virtual past. When a request arrives to an empty queue +// with no requests executing, the queue’s virtual start time is set +// to the current virtual time. The virtual finish time of request +// number J in the queue (counting from J=1 for the head) is J * G + +// (queue's virtual start time). While the queue is non-empty: the +// last virtual finish time of the queue is the virtual finish time of +// the last request in the queue. While the queue is empty and has a +// request executing: the last virtual finish time is the queue’s +// virtual start time. When a request is dequeued for service the +// queue’s virtual start time is advanced by G. When a request +// finishes being served, and the actual service time was S, the +// queue’s virtual start time is decremented by G - S. diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go new file mode 100644 index 00000000000..fe311a9e2c1 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -0,0 +1,617 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queueset + +import ( + "context" + "math" + "sync" + "time" + + "k8s.io/apimachinery/pkg/util/runtime" + + "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apiserver/pkg/util/flowcontrol/counter" + fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" + "k8s.io/apiserver/pkg/util/flowcontrol/metrics" + "k8s.io/apiserver/pkg/util/promise/lockingpromise" + "k8s.io/apiserver/pkg/util/shufflesharding" + "k8s.io/klog" +) + +const nsTimeFmt = "2006-01-02 15:04:05.000000000" + +// queueSetFactory implements the QueueSetFactory interface +// queueSetFactory makes QueueSet objects. +type queueSetFactory struct { + counter counter.GoRoutineCounter + clock clock.PassiveClock +} + +// NewQueueSetFactory creates a new QueueSetFactory object +func NewQueueSetFactory(c clock.PassiveClock, counter counter.GoRoutineCounter) fq.QueueSetFactory { + return &queueSetFactory{ + counter: counter, + clock: c, + } +} + +// queueSet implements the Fair Queuing for Server Requests technique +// described in this package's doc, and a pointer to one implements +// the QueueSet interface. The clock, GoRoutineCounter, and estimated +// service time should not be changed; the fields listed after the +// lock must be accessed only while holding the lock. +type queueSet struct { + clock clock.PassiveClock + counter counter.GoRoutineCounter + estimatedServiceTime float64 + + lock sync.Mutex + config fq.QueueSetConfig + + // queues may be longer than the desired number, while the excess + // queues are still draining. + queues []*queue + virtualTime float64 + lastRealTime time.Time + + // robinIndex is the index of the last queue dispatched + robinIndex int + + // numRequestsEnqueued is the number of requests currently waiting + // in a queue (eg: incremeneted on Enqueue, decremented on Dequue) + numRequestsEnqueued int + + emptyHandler fq.EmptyHandler + dealer *shufflesharding.Dealer +} + +// NewQueueSet creates a new QueueSet object +// There is a new QueueSet created for each priority level. +func (qsf queueSetFactory) NewQueueSet(config fq.QueueSetConfig) (fq.QueueSet, error) { + dealer, err := shufflesharding.NewDealer(config.DesiredNumQueues, config.HandSize) + if err != nil { + return nil, errors.Wrap(err, "shuffle sharding dealer creation failed") + } + + fq := &queueSet{ + config: config, + counter: qsf.counter, + queues: createQueues(config.DesiredNumQueues, 0), + clock: qsf.clock, + virtualTime: 0, + estimatedServiceTime: 60, + lastRealTime: qsf.clock.Now(), + dealer: dealer, + } + return fq, nil +} + +// createQueues is a helper method for initializing an array of n queues +func createQueues(n, baseIndex int) []*queue { + fqqueues := make([]*queue, n) + for i := 0; i < n; i++ { + fqqueues[i] = &queue{Index: baseIndex + i, Requests: make([]*request, 0)} + } + return fqqueues +} + +// SetConfiguration is used to set the configuration for a queueSet +// update handling for when fields are updated is handled here as well - +// eg: if DesiredNum is increased, SetConfiguration reconciles by +// adding more queues. +func (qs *queueSet) SetConfiguration(config fq.QueueSetConfig) error { + qs.lockAndSyncTime() + defer qs.lock.Unlock() + + dealer, err := shufflesharding.NewDealer(config.DesiredNumQueues, config.HandSize) + if err != nil { + return errors.Wrap(err, "shuffle sharding dealer creation failed") + } + + // Adding queues is the only thing that requires immediate action + // Removing queues is handled by omitting indexes >DesiredNum from + // chooseQueueIndexLocked + numQueues := len(qs.queues) + if config.DesiredNumQueues > numQueues { + qs.queues = append(qs.queues, + createQueues(config.DesiredNumQueues-numQueues, len(qs.queues))...) + } + + qs.config = config + qs.dealer = dealer + + qs.dispatchAsMuchAsPossibleLocked() + return nil +} + +// Quiesce controls whether the QueueSet is operating normally or is quiescing. +// A quiescing QueueSet drains as normal but does not admit any +// new requests. Passing a non-nil handler means the system should +// be quiescing, a nil handler means the system should operate +// normally. A call to Wait while the system is quiescing +// will be rebuffed by returning tryAnother=true. If all the +// queues have no requests waiting nor executing while the system +// is quiescing then the handler will eventually be called with no +// locks held (even if the system becomes non-quiescing between the +// triggering state and the required call). +func (qs *queueSet) Quiesce(eh fq.EmptyHandler) { + qs.lock.Lock() + defer qs.lock.Unlock() + qs.emptyHandler = eh + if eh == nil { + return + } + // Here we check whether there are any requests queued or executing and + // if not then fork an invocation of the EmptyHandler. + qs.maybeForkEmptyHandlerLocked() +} + +// Values passed through a request's Decision +const ( + DecisionExecute = "execute" + DecisionReject = "reject" + DecisionCancel = "cancel" + DecisionTryAnother = "tryAnother" +) + +// Wait uses the given hashValue as the source of entropy as it +// shuffle-shards a request into a queue and waits for a decision on +// what to do with that request. The descr1 and descr2 values play no +// role in the logic but appear in log messages; we use two because +// the main client characterizes a request by two items that, if +// bundled together in a larger data structure, would lose interesting +// details when formatted. If tryAnother==true at return then the +// QueueSet has become undesirable and the client should try to find a +// different QueueSet to use; execute and afterExecution are +// irrelevant in this case. Otherwise, if execute then the client +// should start executing the request and, once the request finishes +// execution or is canceled, call afterExecution(). Otherwise the +// client should not execute the request and afterExecution is +// irrelevant. +func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func()) { + var req *request + decision := func() string { + qs.lockAndSyncTime() + defer qs.lock.Unlock() + // A call to Wait while the system is quiescing will be rebuffed by + // returning `tryAnother=true`. + if qs.emptyHandler != nil { + klog.V(5).Infof("QS(%s): rebuffing request %#+v %#+v with TryAnother", qs.config.Name, descr1, descr2) + return DecisionTryAnother + } + + // ======================================================================== + // Step 1: + // 1) Start with shuffle sharding, to pick a queue. + // 2) Reject old requests that have been waiting too long + // 3) Reject current request if there is not enough concurrency shares and + // we are at max queue length + // 4) If not rejected, create a request and enqueue + req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue, descr1, descr2) + // req == nil means that the request was rejected - no remaining + // concurrency shares and at max queue length already + if req == nil { + klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v due to queue full", qs.config.Name, descr1, descr2) + metrics.AddReject(qs.config.Name, "queue-full") + return DecisionReject + } + + // ======================================================================== + // Step 2: + // The next step is to invoke the method that dequeues as much + // as possible. + // This method runs a loop, as long as there are non-empty + // queues and the number currently executing is less than the + // assured concurrency value. The body of the loop uses the + // fair queuing technique to pick a queue and dispatch a + // request from that queue. + qs.dispatchAsMuchAsPossibleLocked() + + // ======================================================================== + // Step 3: + + // Set up a relay from the context's Done channel to the world + // of well-counted goroutines. We Are Told that every + // request's context's Done channel gets closed by the time + // the request is done being processed. + doneCh := ctx.Done() + if doneCh != nil { + qs.preCreateOrUnblockGoroutine() + go func() { + defer runtime.HandleCrash() + qs.goroutineDoneOrBlocked() + select { + case <-doneCh: + klog.V(6).Infof("QS(%s): Context of request %#+v %#+v is Done", qs.config.Name, descr1, descr2) + req.Decision.Set(DecisionCancel) + } + qs.goroutineDoneOrBlocked() + }() + } + + // ======================================================================== + // Step 4: + // The final step in Wait is to wait on a decision from + // somewhere and then act on it. + decisionAny := req.Decision.GetLocked() + var decisionStr string + switch d := decisionAny.(type) { + case string: + decisionStr = d + default: + klog.Errorf("QS(%s): Impossible decision %#+v (of type %T) for request %#+v %#+v", qs.config.Name, decisionAny, decisionAny, descr1, descr2) + decisionStr = DecisionExecute + } + switch decisionStr { + case DecisionReject: + klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.config.Name, descr1, descr2) + metrics.AddReject(qs.config.Name, "time-out") + case DecisionCancel: + qs.syncTimeLocked() + // TODO(aaron-prindle) add metrics to these two cases + if req.IsWaiting { + klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.config.Name, descr1, descr2) + // remove the request from the queue as it has timed out + for i := range req.Queue.Requests { + if req == req.Queue.Requests[i] { + // remove the request + req.Queue.Requests = append(req.Queue.Requests[:i], + req.Queue.Requests[i+1:]...) + break + } + } + // At this point, if the qs is quiescing, + // has zero requests executing, and has zero requests enqueued + // then a call to the EmptyHandler should be forked. + qs.maybeForkEmptyHandlerLocked() + } else { + klog.V(5).Infof("QS(%s): request %#+v %#+v canceled shortly after dispatch", qs.config.Name, descr1, descr2) + } + } + return decisionStr + }() + switch decision { + case DecisionTryAnother: + return true, false, func() {} + case DecisionReject: + return false, false, func() {} + case DecisionCancel: + return false, false, func() {} + default: + if decision != DecisionExecute { + klog.Errorf("Impossible decision %q", decision) + } + return false, true, func() { + qs.finishRequestAndDispatchAsMuchAsPossible(req) + } + } +} + +// lockAndSyncTime acquires the lock and updates the virtual time. +// Doing them together avoids the mistake of modify some queue state +// before calling syncTimeLocked. +func (qs *queueSet) lockAndSyncTime() { + qs.lock.Lock() + qs.syncTimeLocked() +} + +// syncTimeLocked updates the virtual time based on the assumption +// that the current state of the queues has been in effect since +// `qs.lastRealTime`. Thus, it should be invoked after acquiring the +// lock and before modifying the state of any queue. +func (qs *queueSet) syncTimeLocked() { + realNow := qs.clock.Now() + timesincelast := realNow.Sub(qs.lastRealTime).Seconds() + qs.lastRealTime = realNow + qs.virtualTime += timesincelast * qs.getVirtualTimeRatio() +} + +// getVirtualTimeRatio calculates the rate at which virtual time has +// been advancing, according to the logic in `doc.go`. +func (qs *queueSet) getVirtualTimeRatio() float64 { + activeQueues := 0 + reqs := 0 + for _, queue := range qs.queues { + reqs += queue.RequestsExecuting + if len(queue.Requests) > 0 || queue.RequestsExecuting > 0 { + activeQueues++ + } + } + if activeQueues == 0 { + return 0 + } + return math.Min(float64(reqs), float64(qs.config.ConcurrencyLimit)) / float64(activeQueues) +} + +// timeoutOldRequestsAndRejectOrEnqueueLocked encapsulates the logic required +// to validate and enqueue a request for the queueSet/QueueSet: +// 1) Start with shuffle sharding, to pick a queue. +// 2) Reject old requests that have been waiting too long +// 3) Reject current request if there is not enough concurrency shares and +// we are at max queue length +// 4) If not rejected, create a request and enqueue +// returns the enqueud request on a successful enqueue +// returns nil in the case that there is no available concurrency or +// the queuelengthlimit has been reached +func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue uint64, descr1, descr2 interface{}) *request { + // Start with the shuffle sharding, to pick a queue. + queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2) + queue := qs.queues[queueIdx] + // The next step is the logic to reject requests that have been waiting too long + qs.removeTimedOutRequestsFromQueueLocked(queue) + // NOTE: currently timeout is only checked for each new request. This means that there can be + // requests that are in the queue longer than the timeout if there are no new requests + // We prefer the simplicity over the promptness, at least for now. + + // Create a request and enqueue + req := &request{ + Decision: lockingpromise.NewLockingPromise(&qs.lock, qs.counter), + ArrivalTime: qs.clock.Now(), + Queue: queue, + descr1: descr1, + descr2: descr2, + } + if ok := qs.rejectOrEnqueueLocked(req); !ok { + return nil + } + metrics.ObserveQueueLength(qs.config.Name, len(queue.Requests)) + return req +} + +// chooseQueueIndexLocked uses shuffle sharding to select a queue index +// using the given hashValue and the shuffle sharding parameters of the queueSet. +func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 interface{}) int { + bestQueueIdx := -1 + bestQueueLen := int(math.MaxInt32) + // the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`. + qs.dealer.Deal(hashValue, func(queueIdx int) { + thisLen := len(qs.queues[queueIdx].Requests) + klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of length %d", qs.config.Name, descr1, descr2, queueIdx, thisLen) + if thisLen < bestQueueLen { + bestQueueIdx, bestQueueLen = queueIdx, thisLen + } + }) + klog.V(6).Infof("QS(%s): For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.config.Name, descr1, descr2, bestQueueIdx, bestQueueLen, qs.queues[bestQueueIdx].RequestsExecuting) + return bestQueueIdx +} + +// removeTimedOutRequestsFromQueueLocked rejects old requests that have been enqueued +// past the requestWaitLimit +func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue) { + timeoutIdx := -1 + now := qs.clock.Now() + reqs := queue.Requests + // reqs are sorted oldest -> newest + // can short circuit loop (break) if oldest requests are not timing out + // as newer requests also will not have timed out + + // now - requestWaitLimit = waitLimit + waitLimit := now.Add(-qs.config.RequestWaitLimit) + for i, req := range reqs { + if waitLimit.After(req.ArrivalTime) { + req.Decision.SetLocked(DecisionReject) + // get index for timed out requests + timeoutIdx = i + } else { + break + } + } + // remove timed out requests from queue + if timeoutIdx != -1 { + // timeoutIdx + 1 to remove the last timeout req + removeIdx := timeoutIdx + 1 + // remove all the timeout requests + queue.Requests = reqs[removeIdx:] + // decrement the # of requestsEnqueued + qs.numRequestsEnqueued -= removeIdx + } +} + +// rejectOrEnqueueLocked rejects or enqueues the newly arrived request if +// resource criteria isn't met +func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool { + queue := request.Queue + curQueueLength := len(queue.Requests) + // rejects the newly arrived request if resource criteria not met + if qs.getRequestsExecutingLocked() >= qs.config.ConcurrencyLimit && + curQueueLength >= qs.config.QueueLengthLimit { + return false + } + + qs.enqueueLocked(request) + return true +} + +// enqueues a request into an queueSet +func (qs *queueSet) enqueueLocked(request *request) { + queue := request.Queue + if len(queue.Requests) == 0 && queue.RequestsExecuting == 0 { + // the queue’s virtual start time is set to the virtual time. + queue.VirtualStart = qs.virtualTime + if klog.V(6) { + klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), queue.VirtualStart, queue.Index, request.descr1, request.descr2) + } + } + queue.Enqueue(request) + qs.numRequestsEnqueued++ + metrics.UpdateFlowControlRequestsInQueue(qs.config.Name, qs.numRequestsEnqueued) +} + +// getRequestsExecutingLocked gets the # of requests which are "executing": +// this is the # of requests which have been dispatched but have not +// finished (via the finishRequestLocked method invoked after service) +func (qs *queueSet) getRequestsExecutingLocked() int { + total := 0 + for _, queue := range qs.queues { + total += queue.RequestsExecuting + } + return total +} + +// dispatchAsMuchAsPossibleLocked runs a loop, as long as there +// are non-empty queues and the number currently executing is less than the +// assured concurrency value. The body of the loop uses the fair queuing +// technique to pick a queue, dequeue the request at the head of that +// queue, increment the count of the number executing, and send true +// to the request's channel. +func (qs *queueSet) dispatchAsMuchAsPossibleLocked() { + for qs.numRequestsEnqueued != 0 && qs.getRequestsExecutingLocked() < qs.config.ConcurrencyLimit { + _, ok := qs.dispatchLocked() + if !ok { + break + } + } +} + +// dispatchLocked is a convenience method for dequeueing requests that +// require a message to be sent through the requests channel +// this is a required pattern for the QueueSet the queueSet supports +func (qs *queueSet) dispatchLocked() (*request, bool) { + queue := qs.selectQueueLocked() + if queue == nil { + return nil, false + } + request, ok := queue.Dequeue() + if !ok { + return nil, false + } + request.StartTime = qs.clock.Now() + // request dequeued, service has started + queue.RequestsExecuting++ + qs.numRequestsEnqueued-- + if klog.V(6) { + klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.config.Name, request.StartTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.Index, queue.VirtualStart, len(queue.Requests), queue.RequestsExecuting) + } + // When a request is dequeued for service -> qs.VirtualStart += G + queue.VirtualStart += qs.estimatedServiceTime + metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, queue.RequestsExecuting) + request.Decision.SetLocked(DecisionExecute) + return request, ok +} + +/// selectQueueLocked selects the minimum virtualFinish time from the set of queues +// the starting queue is selected via roundrobin +func (qs *queueSet) selectQueueLocked() *queue { + minVirtualFinish := math.Inf(1) + var minQueue *queue + var minIndex int + for range qs.queues { + qs.robinIndex = (qs.robinIndex + 1) % len(qs.queues) + queue := qs.queues[qs.robinIndex] + if len(queue.Requests) != 0 { + currentVirtualFinish := queue.GetVirtualFinish(0, qs.estimatedServiceTime) + if currentVirtualFinish < minVirtualFinish { + minVirtualFinish = currentVirtualFinish + minQueue = queue + minIndex = qs.robinIndex + } + } + } + // we set the round robin indexing to start at the chose queue + // for the next round. This way the non-selected queues + // win in the case that the virtual finish times are the same + qs.robinIndex = minIndex + return minQueue +} + +// finishRequestAndDispatchAsMuchAsPossible is a convenience method +// which calls finishRequest for a given request and then dispatches +// as many requests as possible. This is all of what needs to be done +// once a request finishes execution or is canceled. +func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) { + qs.lockAndSyncTime() + defer qs.lock.Unlock() + + qs.finishRequestLocked(req) + qs.dispatchAsMuchAsPossibleLocked() +} + +// finishRequestLocked is a callback that should be used when a +// previously dispatched request has completed it's service. This +// callback updates important state in the queueSet +func (qs *queueSet) finishRequestLocked(r *request) { + S := qs.clock.Since(r.StartTime).Seconds() + + // When a request finishes being served, and the actual service time was S, + // the queue’s virtual start time is decremented by G - S. + r.Queue.VirtualStart -= qs.estimatedServiceTime - S + + // request has finished, remove from requests executing + r.Queue.RequestsExecuting-- + + if klog.V(6) { + klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.Queue.Index, r.Queue.VirtualStart, S, len(r.Queue.Requests), r.Queue.RequestsExecuting) + } + + // Logic to remove quiesced queues + // >= as Index=25 is out of bounds for DesiredNum=25 [0...24] + if r.Queue.Index >= qs.config.DesiredNumQueues && + len(r.Queue.Requests) == 0 && + r.Queue.RequestsExecuting == 0 { + qs.queues = removeQueueAndUpdateIndexes(qs.queues, r.Queue.Index) + + // decrement here to maintain the invariant that (qs.robinIndex+1) % numQueues + // is the index of the next queue after the one last dispatched from + if qs.robinIndex >= -r.Queue.Index { + qs.robinIndex-- + } + + // At this point, if the qs is quiescing, + // has zero requests executing, and has zero requests enqueued + // then a call to the EmptyHandler should be forked. + qs.maybeForkEmptyHandlerLocked() + } +} + +// removeQueueAndUpdateIndexes uses reslicing to remove an index from a slice +// and then updates the 'Index' field of the queues to be correct +func removeQueueAndUpdateIndexes(queues []*queue, index int) []*queue { + keptQueues := append(queues[:index], queues[index+1:]...) + for i := index; i < len(keptQueues); i++ { + keptQueues[i].Index-- + } + return keptQueues +} + +func (qs *queueSet) maybeForkEmptyHandlerLocked() { + if qs.emptyHandler != nil && qs.numRequestsEnqueued == 0 && + qs.getRequestsExecutingLocked() == 0 { + qs.preCreateOrUnblockGoroutine() + go func(eh fq.EmptyHandler) { + defer runtime.HandleCrash() + defer qs.goroutineDoneOrBlocked() + eh.HandleEmpty() + }(qs.emptyHandler) + } +} + +// preCreateOrUnblockGoroutine needs to be called before creating a +// goroutine associated with this queueSet or unblocking a blocked +// one, to properly update the accounting used in testing. +func (qs *queueSet) preCreateOrUnblockGoroutine() { + qs.counter.Add(1) +} + +// goroutineDoneOrBlocked needs to be called at the end of every +// goroutine associated with this queueSet or when such a goroutine is +// about to wait on some other goroutine to do something; this is to +// properly update the accounting used in testing. +func (qs *queueSet) goroutineDoneOrBlocked() { + qs.counter.Add(-1) +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go new file mode 100644 index 00000000000..f8d0735ed6a --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -0,0 +1,225 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queueset + +import ( + "context" + "math" + "sync/atomic" + "testing" + "time" + + "k8s.io/apiserver/pkg/util/flowcontrol/counter" + fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" + test "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing" + "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock" + "k8s.io/klog" +) + +type uniformScenario []uniformClient + +type uniformClient struct { + hash uint64 + nThreads int + nCalls int + // duration for a simulated synchronous call + execDuration time.Duration + // duration for simulated "other work" + thinkDuration time.Duration +} + +// exerciseQueueSetUniformScenario runs a scenario based on the given set of uniform clients. +// Each uniform client specifies a number of threads, each of which alternates between thinking +// and making a synchronous request through the QueueSet. +// This function measures how much concurrency each client got, on average, over +// the initial evalDuration and tests to see whether they all got about the same amount. +// Each client needs to be demanding enough to use this amount, otherwise the fair result +// is not equal amounts and the simple test in this function would not accurately test fairness. +// expectPass indicates whether the QueueSet is expected to be fair. +// expectedAllRequests indicates whether all requests are expected to get dispatched. +func exerciseQueueSetUniformScenario(t *testing.T, name string, qs fq.QueueSet, sc uniformScenario, + evalDuration time.Duration, expectPass bool, expectedAllRequests bool, + clk *clock.FakeEventClock, counter counter.GoRoutineCounter) { + + now := time.Now() + t.Logf("%s: Start %s, clk=%p, grc=%p", clk.Now().Format(nsTimeFmt), name, clk, counter) + integrators := make([]test.Integrator, len(sc)) + var failedCount uint64 + for i, uc := range sc { + integrators[i] = test.NewIntegrator(clk) + for j := 0; j < uc.nThreads; j++ { + counter.Add(1) + go func(i, j int, uc uniformClient, igr test.Integrator) { + for k := 0; k < uc.nCalls; k++ { + ClockWait(clk, counter, uc.thinkDuration) + for { + tryAnother, execute, afterExecute := qs.Wait(context.Background(), uc.hash, name, []int{i, j, k}) + t.Logf("%s: %d, %d, %d got a=%v, e=%v", clk.Now().Format(nsTimeFmt), i, j, k, tryAnother, execute) + if tryAnother { + continue + } + if !execute { + atomic.AddUint64(&failedCount, 1) + break + } + igr.Add(1) + ClockWait(clk, counter, uc.execDuration) + afterExecute() + igr.Add(-1) + break + } + } + counter.Add(-1) + }(i, j, uc, integrators[i]) + } + } + lim := now.Add(evalDuration) + clk.Run(&lim) + clk.SetTime(lim) + t.Logf("%s: End", clk.Now().Format(nsTimeFmt)) + results := make([]test.IntegratorResults, len(sc)) + var sumOfAvg float64 + for i := range sc { + results[i] = integrators[i].GetResults() + sumOfAvg += results[i].Average + } + idealAverage := sumOfAvg / float64(len(sc)) + passes := make([]bool, len(sc)) + allPass := true + for i := range sc { + relDiff := (results[i].Average - idealAverage) / idealAverage + passes[i] = math.Abs(relDiff) <= 0.1 + allPass = allPass && passes[i] + } + for i := range sc { + if allPass != expectPass { + t.Errorf("Class %d got an Average of %v but the ideal was %v", i, results[i].Average, idealAverage) + } else { + t.Logf("Class %d got an Average of %v and the ideal was %v", i, results[i].Average, idealAverage) + } + } + + clk.Run(nil) + if expectedAllRequests && failedCount > 0 { + t.Errorf("Expected all requests to be successful but got %v failed requests", failedCount) + } else if !expectedAllRequests && failedCount == 0 { + t.Errorf("Expected failed requests but all requests succeeded") + } +} + +func ClockWait(clk *clock.FakeEventClock, counter counter.GoRoutineCounter, duration time.Duration) { + dunch := make(chan struct{}) + clk.EventAfterDuration(func(time.Time) { + counter.Add(1) + close(dunch) + }, duration) + counter.Add(-1) + select { + case <-dunch: + } +} + +func init() { + klog.InitFlags(nil) +} + +// TestNoRestraint should fail because the dummy QueueSet exercises no control +func TestNoRestraint(t *testing.T) { + now := time.Now() + clk, counter := clock.NewFakeEventClock(now, 0, nil) + nrf := test.NewNoRestraintFactory() + config := fq.QueueSetConfig{} + nr, err := nrf.NewQueueSet(config) + if err != nil { + t.Fatalf("QueueSet creation failed with %v", err) + } + exerciseQueueSetUniformScenario(t, "NoRestraint", nr, []uniformClient{ + {1001001001, 5, 10, time.Second, time.Second}, + {2002002002, 2, 10, time.Second, time.Second / 2}, + }, time.Second*10, false, true, clk, counter) +} + +func TestUniformFlows(t *testing.T) { + now := time.Now() + + clk, counter := clock.NewFakeEventClock(now, 0, nil) + qsf := NewQueueSetFactory(clk, counter) + config := fq.QueueSetConfig{ + Name: "TestUniformFlows", + ConcurrencyLimit: 4, + DesiredNumQueues: 8, + QueueLengthLimit: 6, + HandSize: 3, + RequestWaitLimit: 10 * time.Minute, + } + qs, err := qsf.NewQueueSet(config) + if err != nil { + t.Fatalf("QueueSet creation failed with %v", err) + } + + exerciseQueueSetUniformScenario(t, "UniformFlows", qs, []uniformClient{ + {1001001001, 5, 10, time.Second, time.Second}, + {2002002002, 5, 10, time.Second, time.Second}, + }, time.Second*20, true, true, clk, counter) +} + +func TestDifferentFlows(t *testing.T) { + now := time.Now() + + clk, counter := clock.NewFakeEventClock(now, 0, nil) + qsf := NewQueueSetFactory(clk, counter) + config := fq.QueueSetConfig{ + Name: "TestDifferentFlows", + ConcurrencyLimit: 4, + DesiredNumQueues: 8, + QueueLengthLimit: 6, + HandSize: 3, + RequestWaitLimit: 10 * time.Minute, + } + qs, err := qsf.NewQueueSet(config) + if err != nil { + t.Fatalf("QueueSet creation failed with %v", err) + } + + exerciseQueueSetUniformScenario(t, "DifferentFlows", qs, []uniformClient{ + {1001001001, 6, 10, time.Second, time.Second}, + {2002002002, 4, 15, time.Second, time.Second / 2}, + }, time.Second*20, true, true, clk, counter) +} + +func TestTimeout(t *testing.T) { + now := time.Now() + + clk, counter := clock.NewFakeEventClock(now, 0, nil) + qsf := NewQueueSetFactory(clk, counter) + config := fq.QueueSetConfig{ + Name: "TestTimeout", + ConcurrencyLimit: 1, + DesiredNumQueues: 128, + QueueLengthLimit: 128, + HandSize: 1, + RequestWaitLimit: 0, + } + qs, err := qsf.NewQueueSet(config) + if err != nil { + t.Fatalf("QueueSet creation failed with %v", err) + } + + exerciseQueueSetUniformScenario(t, "Timeout", qs, []uniformClient{ + {1001001001, 5, 100, time.Second, time.Second}, + }, time.Second*10, true, false, clk, counter) +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go new file mode 100644 index 00000000000..581954985cf --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/types.go @@ -0,0 +1,87 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queueset + +import ( + "time" + + "k8s.io/apiserver/pkg/util/promise" +) + +// request is a temporary container for "requests" with additional tracking fields +// required for the functionality FQScheduler +type request struct { + Queue *queue + + // StartTime is the clock time when the request began executing + StartTime time.Time + + // Decision gets set to the decision about what to do with this request + Decision promise.LockingMutable + + // ArrivalTime is when the request entered this system + ArrivalTime time.Time + + // IsWaiting indicates whether the request is presently waiting in a queue + IsWaiting bool + + // descr1 and descr2 are not used in any logic but they appear in + // log messages + descr1, descr2 interface{} +} + +// queue is an array of requests with additional metadata required for +// the FQScheduler +type queue struct { + Requests []*request + + // VirtualStart is the virtual time when the oldest request in the + // queue (if there is any) started virtually executing + VirtualStart float64 + + RequestsExecuting int + Index int +} + +// Enqueue enqueues a request into the queue +func (q *queue) Enqueue(request *request) { + request.IsWaiting = true + q.Requests = append(q.Requests, request) +} + +// Dequeue dequeues a request from the queue +func (q *queue) Dequeue() (*request, bool) { + if len(q.Requests) == 0 { + return nil, false + } + request := q.Requests[0] + q.Requests = q.Requests[1:] + + request.IsWaiting = false + return request, true +} + +// GetVirtualFinish returns the expected virtual finish time of the request at +// index J in the queue with estimated finish time G +func (q *queue) GetVirtualFinish(J int, G float64) float64 { + // The virtual finish time of request number J in the queue + // (counting from J=1 for the head) is J * G + (virtual start time). + + // counting from J=1 for the head (eg: queue.Requests[0] -> J=1) - J+1 + jg := float64(J+1) * float64(G) + return jg + q.VirtualStart +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/BUILD b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/BUILD new file mode 100644 index 00000000000..4fdeb893837 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/BUILD @@ -0,0 +1,33 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = [ + "integrator.go", + "no-restraint.go", + ], + importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing", + importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing", + visibility = ["//visibility:public"], + deps = [ + "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [ + ":package-srcs", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock:all-srcs", + ], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock/BUILD b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock/BUILD new file mode 100644 index 00000000000..438a9d8b558 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock/BUILD @@ -0,0 +1,35 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["event_clock.go"], + importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock", + importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock", + visibility = ["//visibility:public"], + deps = [ + "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library", + "//vendor/k8s.io/klog:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["event_clock_test.go"], + embed = [":go_default_library"], + deps = ["//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock/event_clock.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock/event_clock.go new file mode 100644 index 00000000000..683a622000b --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock/event_clock.go @@ -0,0 +1,251 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clock + +import ( + "container/heap" + "math/rand" + "runtime" + "strings" + "sync" + "time" + + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apiserver/pkg/util/flowcontrol/counter" + "k8s.io/klog" +) + +// EventFunc does some work that needs to be done at or after the +// given time. After this function returns, associated work may continue +// on other goroutines only if they are counted by the GoRoutineCounter +// of the FakeEventClock handling this EventFunc. +type EventFunc func(time.Time) + +// EventClock fires event on time +type EventClock interface { + clock.PassiveClock + EventAfterDuration(f EventFunc, d time.Duration) + EventAfterTime(f EventFunc, t time.Time) +} + +// RealEventClock fires event on real world time +type RealEventClock struct { + clock.RealClock +} + +// EventAfterDuration schedules an EventFunc +func (RealEventClock) EventAfterDuration(f EventFunc, d time.Duration) { + ch := time.After(d) + go func() { + select { + case t := <-ch: + f(t) + } + }() +} + +// EventAfterTime schedules an EventFunc +func (r RealEventClock) EventAfterTime(f EventFunc, t time.Time) { + now := time.Now() + d := t.Sub(now) + if d <= 0 { + go f(now) + } else { + r.EventAfterDuration(f, d) + } +} + +// waitGroupCounter is a wait group used for a GoRoutine Counter. This private +// type is used to disallow direct waitGroup access +type waitGroupCounter struct { + wg sync.WaitGroup +} + +// compile time assertion that waitGroupCounter meets requirements +// of GoRoutineCounter +var _ counter.GoRoutineCounter = (*waitGroupCounter)(nil) + +func (wgc *waitGroupCounter) Add(delta int) { + if klog.V(7) { + var pcs [5]uintptr + nCallers := runtime.Callers(2, pcs[:]) + frames := runtime.CallersFrames(pcs[:nCallers]) + frame1, more1 := frames.Next() + fileParts1 := strings.Split(frame1.File, "/") + tail2 := "(none)" + line2 := 0 + if more1 { + frame2, _ := frames.Next() + fileParts2 := strings.Split(frame2.File, "/") + tail2 = fileParts2[len(fileParts2)-1] + line2 = frame2.Line + } + klog.Infof("GRC(%p).Add(%d) from %s:%d from %s:%d", wgc, delta, fileParts1[len(fileParts1)-1], frame1.Line, tail2, line2) + } + wgc.wg.Add(delta) +} + +func (wgc *waitGroupCounter) Wait() { + wgc.wg.Wait() +} + +// FakeEventClock is one whose time does not pass implicitly but +// rather is explicitly set by invocations of its SetTime method +type FakeEventClock struct { + clock.FakePassiveClock + + // waiters is a heap of waiting work, sorted by time + waiters eventWaiterHeap + waitersLock sync.RWMutex + + // clientWG may be nil and if not supplies constraints on time + // passing in Run. The Run method will not pick a new time until + // this is nil or its counter is zero. + clientWG *waitGroupCounter + + // fuzz is the amount of noise to add to scheduling. An event + // requested to run at time T will run at some time chosen + // uniformly at random from the interval [T, T+fuzz]; the upper + // bound is exclusive iff fuzz is non-zero. + fuzz time.Duration + + // rand is the random number generator to use in fuzzing + rand *rand.Rand +} + +type eventWaiterHeap []eventWaiter + +var _ heap.Interface = (*eventWaiterHeap)(nil) + +type eventWaiter struct { + targetTime time.Time + f EventFunc +} + +// NewFakeEventClock constructor. The given `r *rand.Rand` must +// henceforth not be used for any other purpose. If `r` is nil then a +// fresh one will be constructed, seeded with the current real time. +// The clientWG can be `nil` and if not is used to let Run know about +// additional work that has to complete before time can advance. +func NewFakeEventClock(t time.Time, fuzz time.Duration, r *rand.Rand) (*FakeEventClock, counter.GoRoutineCounter) { + grc := &waitGroupCounter{} + + if r == nil { + r = rand.New(rand.NewSource(time.Now().UnixNano())) + r.Uint64() + r.Uint64() + r.Uint64() + } + return &FakeEventClock{ + FakePassiveClock: *clock.NewFakePassiveClock(t), + clientWG: grc, + fuzz: fuzz, + rand: r, + }, grc +} + +// GetNextTime returns the next time at which there is work scheduled, +// and a bool indicating whether there is any such time +func (fec *FakeEventClock) GetNextTime() (time.Time, bool) { + fec.waitersLock.RLock() + defer fec.waitersLock.RUnlock() + if len(fec.waiters) > 0 { + return fec.waiters[0].targetTime, true + } + return time.Time{}, false +} + +// Run runs all the events scheduled, and all the events they +// schedule, and so on, until there are none scheduled or the limit is not +// nil and the next time would exceed the limit. The clientWG given in +// the constructor gates each advance of time. +func (fec *FakeEventClock) Run(limit *time.Time) { + for { + fec.clientWG.Wait() + t, ok := fec.GetNextTime() + if !ok || limit != nil && t.After(*limit) { + break + } + fec.SetTime(t) + } +} + +// SetTime sets the time and runs to completion all events that should +// be started by the given time --- including any further events they +// schedule +func (fec *FakeEventClock) SetTime(t time.Time) { + fec.FakePassiveClock.SetTime(t) + for { + foundSome := false + func() { + fec.waitersLock.Lock() + defer fec.waitersLock.Unlock() + // This loop is because events run at a given time may schedule more + // events to run at that or an earlier time. + // Events should not advance the clock. But just in case they do... + now := fec.Now() + var wg sync.WaitGroup + for len(fec.waiters) > 0 && !now.Before(fec.waiters[0].targetTime) { + ew := heap.Pop(&fec.waiters).(eventWaiter) + wg.Add(1) + go func(f EventFunc) { f(now); wg.Done() }(ew.f) + foundSome = true + } + wg.Wait() + }() + if !foundSome { + break + } + } +} + +// EventAfterDuration schedules the given function to be invoked once +// the given duration has passed. +func (fec *FakeEventClock) EventAfterDuration(f EventFunc, d time.Duration) { + fec.waitersLock.Lock() + defer fec.waitersLock.Unlock() + now := fec.Now() + fd := time.Duration(float32(fec.fuzz) * fec.rand.Float32()) + heap.Push(&fec.waiters, eventWaiter{targetTime: now.Add(d + fd), f: f}) +} + +// EventAfterTime schedules the given function to be invoked once +// the given time has arrived. +func (fec *FakeEventClock) EventAfterTime(f EventFunc, t time.Time) { + fec.waitersLock.Lock() + defer fec.waitersLock.Unlock() + fd := time.Duration(float32(fec.fuzz) * fec.rand.Float32()) + heap.Push(&fec.waiters, eventWaiter{targetTime: t.Add(fd), f: f}) +} + +func (ewh eventWaiterHeap) Len() int { return len(ewh) } + +func (ewh eventWaiterHeap) Less(i, j int) bool { return ewh[i].targetTime.Before(ewh[j].targetTime) } + +func (ewh eventWaiterHeap) Swap(i, j int) { ewh[i], ewh[j] = ewh[j], ewh[i] } + +func (ewh *eventWaiterHeap) Push(x interface{}) { + *ewh = append(*ewh, x.(eventWaiter)) +} + +func (ewh *eventWaiterHeap) Pop() interface{} { + old := *ewh + n := len(old) + x := old[n-1] + *ewh = old[:n-1] + return x +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock/event_clock_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock/event_clock_test.go new file mode 100644 index 00000000000..2cd58ef574c --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock/event_clock_test.go @@ -0,0 +1,183 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clock + +import ( + "math/rand" + "sync/atomic" + "testing" + "time" + + "k8s.io/apimachinery/pkg/util/clock" +) + +type TestableEventClock interface { + EventClock + SetTime(time.Time) + Run(*time.Time) +} + +// settablePassiveClock allows setting current time of a passive clock +type settablePassiveClock interface { + clock.PassiveClock + SetTime(time.Time) +} + +func exerciseTestableEventClock(t *testing.T, ec TestableEventClock, fuzz time.Duration) { + exercisePassiveClock(t, ec) + var numDone int32 + now := ec.Now() + strictable := true + const batchSize = 100 + times := make(chan time.Time, batchSize+1) + try := func(abs, strict bool, d time.Duration) { + f := func(u time.Time) { + realD := ec.Since(now) + atomic.AddInt32(&numDone, 1) + times <- u + if realD < d || strict && strictable && realD > d+fuzz { + t.Errorf("Asked for %v, got %v", d, realD) + } + } + if abs { + ec.EventAfterTime(f, now.Add(d)) + } else { + ec.EventAfterDuration(f, d) + } + } + try(true, true, time.Minute) + for i := 0; i < batchSize; i++ { + d := time.Duration(rand.Intn(30)-3) * time.Second + try(i%2 == 0, d >= 0, d) + } + ec.Run(nil) + if numDone != batchSize+1 { + t.Errorf("Got only %v events", numDone) + } + lastTime := now.Add(-3 * time.Second) + for i := 0; i <= batchSize; i++ { + nextTime := <-times + if nextTime.Before(lastTime) { + t.Errorf("Got %s after %s", nextTime, lastTime) + } + } + endTime := ec.Now() + dx := endTime.Sub(now) + if dx > time.Minute+fuzz { + t.Errorf("Run started at %#+v, ended at %#+v, dx=%d", now, endTime, dx) + } + now = endTime + var shouldRun int32 + strictable = false + for i := 0; i < batchSize; i++ { + d := time.Duration(rand.Intn(30)-3) * time.Second + try(i%2 == 0, d >= 0, d) + if d <= 12*time.Second { + shouldRun++ + } + } + ec.SetTime(now.Add(13*time.Second - 1)) + if numDone != batchSize+1+shouldRun { + t.Errorf("Expected %v, but %v ran", shouldRun, numDone-batchSize-1) + } + lastTime = now.Add(-3 * time.Second) + for i := int32(0); i < shouldRun; i++ { + nextTime := <-times + if nextTime.Before(lastTime) { + t.Errorf("Got %s after %s", nextTime, lastTime) + } + lastTime = nextTime + } +} + +func exercisePassiveClock(t *testing.T, pc settablePassiveClock) { + t1 := time.Now() + t2 := t1.Add(time.Hour) + pc.SetTime(t1) + tx := pc.Now() + if tx != t1 { + t.Errorf("SetTime(%#+v); Now() => %#+v", t1, tx) + } + dx := pc.Since(t1) + if dx != 0 { + t.Errorf("Since() => %v", dx) + } + pc.SetTime(t2) + dx = pc.Since(t1) + if dx != time.Hour { + t.Errorf("Since() => %v", dx) + } + tx = pc.Now() + if tx != t2 { + t.Errorf("Now() => %#+v", tx) + } +} + +func TestFakeEventClock(t *testing.T) { + startTime := time.Now() + fec, _ := NewFakeEventClock(startTime, 0, nil) + exerciseTestableEventClock(t, fec, 0) + fec, _ = NewFakeEventClock(startTime, time.Second, nil) + exerciseTestableEventClock(t, fec, time.Second) +} + +func exerciseEventClock(t *testing.T, ec EventClock, relax func(time.Duration)) { + var numDone int32 + now := ec.Now() + const batchSize = 100 + times := make(chan time.Time, batchSize+1) + try := func(abs bool, d time.Duration) { + f := func(u time.Time) { + realD := ec.Since(now) + atomic.AddInt32(&numDone, 1) + times <- u + if realD < d { + t.Errorf("Asked for %v, got %v", d, realD) + } + } + if abs { + ec.EventAfterTime(f, now.Add(d)) + } else { + ec.EventAfterDuration(f, d) + } + } + try(true, time.Millisecond*3300) + for i := 0; i < batchSize; i++ { + d := time.Duration(rand.Intn(30)-3) * time.Millisecond * 100 + try(i%2 == 0, d) + } + relax(time.Second * 4) + if atomic.LoadInt32(&numDone) != batchSize+1 { + t.Errorf("Got only %v events", numDone) + } + lastTime := now + for i := 0; i <= batchSize; i++ { + nextTime := <-times + if nextTime.Before(now) { + continue + } + dt := nextTime.Sub(lastTime) / (50 * time.Millisecond) + if dt < 0 { + t.Errorf("Got %s after %s", nextTime, lastTime) + } + lastTime = nextTime + } +} + +func TestRealEventClock(t *testing.T) { + exerciseEventClock(t, RealEventClock{}, func(d time.Duration) { time.Sleep(d) }) +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/integrator.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/integrator.go new file mode 100644 index 00000000000..22c75b7f253 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/integrator.go @@ -0,0 +1,103 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "math" + "sync" + "time" + + "k8s.io/apimachinery/pkg/util/clock" +) + +// Integrator computes the integral of some variable X over time as +// read from a particular clock. The integral starts when the +// Integrator is created, and ends at the latest operation on the +// Integrator. +type Integrator interface { + Set(float64) // set the value of X + Add(float64) // add the given quantity to X + GetResults() IntegratorResults +} + +// IntegratorResults holds statistical abstracts of the integration +type IntegratorResults struct { + Duration float64 //seconds + Average float64 + Deviation float64 //sqrt(avg((value-avg)^2)) +} + +type integrator struct { + sync.Mutex + clk clock.PassiveClock + lastTime time.Time + x float64 + integrals [3]float64 // integral of x^0, x^1, and x^2 +} + +// NewIntegrator makes one that uses the given clock +func NewIntegrator(clk clock.PassiveClock) Integrator { + return &integrator{ + clk: clk, + lastTime: clk.Now(), + } +} + +func (igr *integrator) Set(x float64) { + igr.Lock() + igr.updateLocked() + igr.x = x + igr.Unlock() +} + +func (igr *integrator) Add(deltaX float64) { + igr.Lock() + igr.updateLocked() + igr.x += deltaX + igr.Unlock() +} + +func (igr *integrator) updateLocked() { + now := igr.clk.Now() + dt := now.Sub(igr.lastTime).Seconds() + igr.lastTime = now + igr.integrals[0] += dt + igr.integrals[1] += dt * igr.x + igr.integrals[2] += dt * igr.x * igr.x +} + +func (igr *integrator) GetResults() (results IntegratorResults) { + igr.Lock() + defer func() { igr.Unlock() }() + igr.updateLocked() + results.Duration = igr.integrals[0] + if results.Duration <= 0 { + results.Average = math.NaN() + results.Deviation = math.NaN() + return + } + results.Average = igr.integrals[1] / igr.integrals[0] + // Deviation is sqrt( Integral( (x - xbar)^2 dt) / Duration ) + // = sqrt( Integral( x^2 + xbar^2 -2*x*xbar dt ) / Duration ) + // = sqrt( ( Integral( x^2 dt ) + Duration * xbar^2 - 2*xbar*Integral(x dt) ) / Duration) + // = sqrt( Integral(x^2 dt)/Duration - xbar^2 ) + variance := igr.integrals[2]/igr.integrals[0] - results.Average*results.Average + if variance > 0 { + results.Deviation = math.Sqrt(variance) + } + return +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go new file mode 100644 index 00000000000..5ac48be94d8 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/no-restraint.go @@ -0,0 +1,49 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "context" + + fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" +) + +// NewNoRestraintFactory makes a QueueSetFactory that produces +// QueueSets that exert no restraint --- every request is dispatched +// for execution as soon as it arrives. +func NewNoRestraintFactory() fq.QueueSetFactory { + return noRestraintFactory{} +} + +type noRestraintFactory struct{} + +func (noRestraintFactory) NewQueueSet(config fq.QueueSetConfig) (fq.QueueSet, error) { + return noRestraint{}, nil +} + +type noRestraint struct{} + +func (noRestraint) SetConfiguration(config fq.QueueSetConfig) error { + return nil +} + +func (noRestraint) Quiesce(fq.EmptyHandler) { +} + +func (noRestraint) Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (quiescent, execute bool, afterExecution func()) { + return false, true, func() {} +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/BUILD b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/BUILD new file mode 100644 index 00000000000..115224dc108 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/BUILD @@ -0,0 +1,24 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["metrics.go"], + importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/metrics", + importpath = "k8s.io/apiserver/pkg/util/flowcontrol/metrics", + visibility = ["//visibility:public"], + deps = ["//vendor/github.com/prometheus/client_golang/prometheus:go_default_library"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go new file mode 100644 index 00000000000..2e026b9c36b --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go @@ -0,0 +1,152 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +const ( + namespace = "apiserver" + subsystem = "flowcontrol" +) + +const ( + priorityLevel = "priorityLevel" + flowSchema = "flowSchema" +) + +var ( + queueLengthBuckets = []float64{0, 10, 25, 50, 100, 250, 500, 1000} + requestDurationSecondsBuckets = []float64{0, 0.005, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 30} +) + +func init() { + prometheus.MustRegister(apiserverRejectedRequests) + prometheus.MustRegister(apiserverCurrentInqueueRequests) + prometheus.MustRegister(apiserverRequestQueueLength) + prometheus.MustRegister(apiserverRequestConcurrencyLimit) + prometheus.MustRegister(apiserverCurrentExecutingRequests) + prometheus.MustRegister(apiserverRequestWaitingSeconds) + prometheus.MustRegister(apiserverRequestExecutionSeconds) +} + +var ( + apiserverRejectedRequests = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "rejectedRequests", + Help: "Number of rejected requests by api priority and fairness system", + }, + []string{priorityLevel, "reason"}, + ) + apiserverCurrentInqueueRequests = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "currentInqueueRequests", + Help: "Number of requests currently pending in the queue by the api priority and fairness system", + }, + []string{priorityLevel}, + ) + apiserverRequestQueueLength = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "requestQueueLength", + Help: "Length of queue in the api priority and fairness system", + Buckets: queueLengthBuckets, + }, + []string{priorityLevel}, + ) + apiserverRequestConcurrencyLimit = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "requestConcurrencyLimit", + Help: "Shared concurrency limit in the api priority and fairness system", + }, + []string{priorityLevel}, + ) + apiserverCurrentExecutingRequests = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "currentExecutingRequests", + Help: "Number of requests currently executing in the api priority and fairness system", + }, + []string{priorityLevel}, + ) + apiserverRequestWaitingSeconds = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "request_wait_durationSeconds", + Help: "Length of time a request spent waiting in its queue", + Buckets: requestDurationSecondsBuckets, + }, + []string{priorityLevel, flowSchema, "execute"}, + ) + apiserverRequestExecutionSeconds = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "requestExecutionSeconds", + Help: "Time of request executing in the api priority and fairness system", + Buckets: requestDurationSecondsBuckets, + }, + []string{priorityLevel, flowSchema}, + ) +) + +// UpdateFlowControlRequestsInQueue updates the value for the # of requests in the specified queues in flow control +func UpdateFlowControlRequestsInQueue(priorityLevel string, inqueue int) { + apiserverCurrentInqueueRequests.WithLabelValues(priorityLevel).Set(float64(inqueue)) +} + +// UpdateFlowControlRequestsExecuting updates the value for the # of requests executing in flow control +func UpdateFlowControlRequestsExecuting(priorityLevel string, executing int) { + apiserverCurrentExecutingRequests.WithLabelValues(priorityLevel).Set(float64(executing)) +} + +// UpdateSharedConcurrencyLimit updates the value for the concurrency limit in flow control +func UpdateSharedConcurrencyLimit(priorityLevel string, limit int) { + apiserverRequestConcurrencyLimit.WithLabelValues(priorityLevel).Set(float64(limit)) +} + +// AddReject increments the # of rejected requests for flow control +func AddReject(priorityLevel string, reason string) { + apiserverRejectedRequests.WithLabelValues(priorityLevel, reason).Add(1) +} + +// ObserveQueueLength observes the queue length for flow control +func ObserveQueueLength(priorityLevel string, length int) { + apiserverRequestQueueLength.WithLabelValues(priorityLevel).Observe(float64(length)) +} + +// ObserveWaitingDuration observes the queue length for flow control +func ObserveWaitingDuration(priorityLevel, flowSchema, execute string, waitTime time.Duration) { + apiserverRequestWaitingSeconds.WithLabelValues(priorityLevel, flowSchema, execute).Observe(waitTime.Seconds()) +} + +// ObserveExecutionDuration observes the execution duration for flow control +func ObserveExecutionDuration(priorityLevel, flowSchema string, executionTime time.Duration) { + apiserverRequestExecutionSeconds.WithLabelValues(priorityLevel, flowSchema).Observe(executionTime.Seconds()) +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/promise/BUILD b/staging/src/k8s.io/apiserver/pkg/util/promise/BUILD new file mode 100644 index 00000000000..dc7c60f537e --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/promise/BUILD @@ -0,0 +1,26 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["interface.go"], + importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/promise", + importpath = "k8s.io/apiserver/pkg/util/promise", + visibility = ["//visibility:public"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [ + ":package-srcs", + "//staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise:all-srcs", + ], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/staging/src/k8s.io/apiserver/pkg/util/promise/interface.go b/staging/src/k8s.io/apiserver/pkg/util/promise/interface.go new file mode 100644 index 00000000000..258469043b4 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/promise/interface.go @@ -0,0 +1,42 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package promise + +// Mutable is a variable that is initially not set and can be set one +// or more times (unlike a traditional "promise", which can be written +// only once). +type Mutable interface { + + // Set writes a value into this variable and unblocks every + // goroutine waiting for this variable to have a value + Set(interface{}) + + // Get reads the value of this variable. If this variable is + // not set yet then this call blocks until this variable gets a value. + Get() interface{} +} + +// LockingMutable is a Mutable whose implementation is protected by a lock +type LockingMutable interface { + Mutable + + // SetLocked is like Set but the caller must already hold the lock + SetLocked(interface{}) + + // GetLocked is like Get but the caller must already hold the lock + GetLocked() interface{} +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise/BUILD b/staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise/BUILD new file mode 100644 index 00000000000..b05144f2507 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise/BUILD @@ -0,0 +1,34 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["lockingpromise.go"], + importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/promise/lockingpromise", + importpath = "k8s.io/apiserver/pkg/util/promise/lockingpromise", + visibility = ["//visibility:public"], + deps = [ + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/promise:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["lockingpromise_test.go"], + embed = [":go_default_library"], + deps = ["//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock:go_default_library"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise/lockingpromise.go b/staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise/lockingpromise.go new file mode 100644 index 00000000000..a449df2c788 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise/lockingpromise.go @@ -0,0 +1,79 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lockingpromise + +import ( + "sync" + + "k8s.io/apiserver/pkg/util/flowcontrol/counter" + "k8s.io/apiserver/pkg/util/promise" +) + +// lockingPromise implements LockingMutable based on a condition +// variable. This implementation tracks active goroutines: the given +// counter is decremented for a goroutine waiting for this varible to +// be set and incremented when such a goroutine is unblocked. +type lockingPromise struct { + lock sync.Locker + cond sync.Cond + activeCounter counter.GoRoutineCounter // counter of active goroutines + waitingCount int // number of goroutines idle due to this mutable being unset + isSet bool + value interface{} +} + +var _ promise.LockingMutable = &lockingPromise{} + +// NewLockingPromise makes a new promise.LockingMutable +func NewLockingPromise(lock sync.Locker, activeCounter counter.GoRoutineCounter) promise.LockingMutable { + return &lockingPromise{ + lock: lock, + cond: *sync.NewCond(lock), + activeCounter: activeCounter, + } +} + +func (lp *lockingPromise) Set(value interface{}) { + lp.lock.Lock() + defer lp.lock.Unlock() + lp.SetLocked(value) +} + +func (lp *lockingPromise) Get() interface{} { + lp.lock.Lock() + defer lp.lock.Unlock() + return lp.GetLocked() +} + +func (lp *lockingPromise) SetLocked(value interface{}) { + lp.isSet = true + lp.value = value + if lp.waitingCount > 0 { + lp.activeCounter.Add(lp.waitingCount) + lp.waitingCount = 0 + lp.cond.Broadcast() + } +} + +func (lp *lockingPromise) GetLocked() interface{} { + if !lp.isSet { + lp.waitingCount++ + lp.activeCounter.Add(-1) + lp.cond.Wait() + } + return lp.value +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise/lockingpromise_test.go b/staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise/lockingpromise_test.go new file mode 100644 index 00000000000..766f8ffed9c --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise/lockingpromise_test.go @@ -0,0 +1,70 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lockingpromise + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock" +) + +func TestLockingPromise(t *testing.T) { + now := time.Now() + clock, counter := clock.NewFakeEventClock(now, 0, nil) + var lock sync.Mutex + lp := NewLockingPromise(&lock, counter) + var gots int32 + var got atomic.Value + counter.Add(1) + go func() { + got.Store(lp.Get()) + atomic.AddInt32(&gots, 1) + counter.Add(-1) + }() + clock.Run(nil) + time.Sleep(time.Second) + if atomic.LoadInt32(&gots) != 0 { + t.Error("Get returned before Set") + } + var aval = &now + lp.Set(aval) + clock.Run(nil) + time.Sleep(time.Second) + if atomic.LoadInt32(&gots) != 1 { + t.Error("Get did not return after Set") + } + if got.Load() != aval { + t.Error("Get did not return what was Set") + } + counter.Add(1) + go func() { + got.Store(lp.Get()) + atomic.AddInt32(&gots, 1) + counter.Add(-1) + }() + clock.Run(nil) + time.Sleep(time.Second) + if atomic.LoadInt32(&gots) != 2 { + t.Error("Second Get did not return immediately") + } + if got.Load() != aval { + t.Error("Second Get did not return what was Set") + } +} diff --git a/staging/src/k8s.io/component-base/metrics/BUILD b/staging/src/k8s.io/component-base/metrics/BUILD index 0baa5f3ecf2..0d6a7701ae8 100644 --- a/staging/src/k8s.io/component-base/metrics/BUILD +++ b/staging/src/k8s.io/component-base/metrics/BUILD @@ -100,6 +100,7 @@ package_group( "//pkg/scheduler/framework/v1alpha1", "//pkg/volume/util/operationexecutor", "//staging/src/k8s.io/apiserver/pkg/admission/metrics", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics", "//staging/src/k8s.io/component-base/metrics/...", "//test/e2e", "//test/e2e/storage",