mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 18:00:08 +00:00
fairqueuing implementation with unit tests
This commit is contained in:
parent
0968636760
commit
24065cf5be
2
api/openapi-spec/swagger.json
generated
2
api/openapi-spec/swagger.json
generated
@ -20742,7 +20742,7 @@
|
|||||||
},
|
},
|
||||||
"info": {
|
"info": {
|
||||||
"title": "Kubernetes",
|
"title": "Kubernetes",
|
||||||
"version": "v1.18.0"
|
"version": "v1.17.0"
|
||||||
},
|
},
|
||||||
"paths": {
|
"paths": {
|
||||||
"/api/": {
|
"/api/": {
|
||||||
|
@ -42,6 +42,9 @@ filegroup(
|
|||||||
"//staging/src/k8s.io/apiserver/pkg/util/apihelpers:all-srcs",
|
"//staging/src/k8s.io/apiserver/pkg/util/apihelpers:all-srcs",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/dryrun:all-srcs",
|
"//staging/src/k8s.io/apiserver/pkg/util/dryrun:all-srcs",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:all-srcs",
|
"//staging/src/k8s.io/apiserver/pkg/util/feature:all-srcs",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:all-srcs",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:all-srcs",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:all-srcs",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/flushwriter:all-srcs",
|
"//staging/src/k8s.io/apiserver/pkg/util/flushwriter:all-srcs",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/openapi:all-srcs",
|
"//staging/src/k8s.io/apiserver/pkg/util/openapi:all-srcs",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/proxy:all-srcs",
|
"//staging/src/k8s.io/apiserver/pkg/util/proxy:all-srcs",
|
||||||
|
@ -26,8 +26,9 @@ require (
|
|||||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
||||||
github.com/hashicorp/golang-lru v0.5.1
|
github.com/hashicorp/golang-lru v0.5.1
|
||||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
|
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
|
||||||
github.com/pkg/errors v0.8.1 // indirect
|
github.com/pkg/errors v0.8.1
|
||||||
github.com/pquerna/cachecontrol v0.0.0-20171018203845-0dec1b30a021 // indirect
|
github.com/pquerna/cachecontrol v0.0.0-20171018203845-0dec1b30a021 // indirect
|
||||||
|
github.com/prometheus/client_golang v1.0.0
|
||||||
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90
|
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90
|
||||||
github.com/sirupsen/logrus v1.4.2 // indirect
|
github.com/sirupsen/logrus v1.4.2 // indirect
|
||||||
github.com/spf13/pflag v1.0.5
|
github.com/spf13/pflag v1.0.5
|
||||||
|
@ -0,0 +1,23 @@
|
|||||||
|
load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||||
|
|
||||||
|
go_library(
|
||||||
|
name = "go_default_library",
|
||||||
|
srcs = ["interface.go"],
|
||||||
|
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/counter",
|
||||||
|
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/counter",
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "package-srcs",
|
||||||
|
srcs = glob(["**"]),
|
||||||
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:private"],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "all-srcs",
|
||||||
|
srcs = [":package-srcs"],
|
||||||
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
)
|
@ -0,0 +1,33 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2019 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package counter
|
||||||
|
|
||||||
|
// GoRoutineCounter keeps track of the number of active goroutines
|
||||||
|
// working on/for something. This is a utility that makes such code more
|
||||||
|
// testable. The code uses this utility to report the number of active
|
||||||
|
// goroutines to the test code, so that the test code can advance a fake
|
||||||
|
// clock when and only when the code being tested has finished all
|
||||||
|
// the work that is ready to do at the present time.
|
||||||
|
type GoRoutineCounter interface {
|
||||||
|
// Add adds the given delta to the count of active goroutines.
|
||||||
|
// Call Add(1) before forking a goroutine, Add(-1) at the end of that goroutine.
|
||||||
|
// Call Add(-1) just before waiting on something from another goroutine (e.g.,
|
||||||
|
// just before a `select`).
|
||||||
|
// Call Add(1) just before doing something that unblocks a goroutine that is
|
||||||
|
// waiting on that something.
|
||||||
|
Add(delta int)
|
||||||
|
}
|
@ -0,0 +1,30 @@
|
|||||||
|
load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||||
|
|
||||||
|
go_library(
|
||||||
|
name = "go_default_library",
|
||||||
|
srcs = [
|
||||||
|
"interface.go",
|
||||||
|
"types.go",
|
||||||
|
],
|
||||||
|
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing",
|
||||||
|
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing",
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "package-srcs",
|
||||||
|
srcs = glob(["**"]),
|
||||||
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:private"],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "all-srcs",
|
||||||
|
srcs = [
|
||||||
|
":package-srcs",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset:all-srcs",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:all-srcs",
|
||||||
|
],
|
||||||
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
)
|
@ -0,0 +1,88 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2019 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package fairqueuing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// QueueSetFactory is used to create QueueSet objects.
|
||||||
|
type QueueSetFactory interface {
|
||||||
|
NewQueueSet(config QueueSetConfig) (QueueSet, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueueSet is the abstraction for the queuing and dispatching
|
||||||
|
// functionality of one non-exempt priority level. It covers the
|
||||||
|
// functionality described in the "Assignment to a Queue", "Queuing",
|
||||||
|
// and "Dispatching" sections of
|
||||||
|
// https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md
|
||||||
|
// . Some day we may have connections between priority levels, but
|
||||||
|
// today is not that day.
|
||||||
|
type QueueSet interface {
|
||||||
|
// SetConfiguration updates the configuration
|
||||||
|
SetConfiguration(QueueSetConfig) error
|
||||||
|
|
||||||
|
// Quiesce controls whether the QueueSet is operating normally or is quiescing.
|
||||||
|
// A quiescing QueueSet drains as normal but does not admit any
|
||||||
|
// new requests. Passing a non-nil handler means the system should
|
||||||
|
// be quiescing, a nil handler means the system should operate
|
||||||
|
// normally. A call to Wait while the system is quiescing
|
||||||
|
// will be rebuffed by returning tryAnother=true. If all the
|
||||||
|
// queues have no requests waiting nor executing while the system
|
||||||
|
// is quiescing then the handler will eventually be called with no
|
||||||
|
// locks held (even if the system becomes non-quiescing between the
|
||||||
|
// triggering state and the required call).
|
||||||
|
Quiesce(EmptyHandler)
|
||||||
|
|
||||||
|
// Wait uses the given hashValue as the source of entropy
|
||||||
|
// as it shuffle-shards a request into a queue and waits for
|
||||||
|
// a decision on what to do with that request. If tryAnother==true
|
||||||
|
// at return then the QueueSet has become undesirable and the client
|
||||||
|
// should try to find a different QueueSet to use; execute and
|
||||||
|
// afterExecution are irrelevant in this case. Otherwise, if execute
|
||||||
|
// then the client should start executing the request and, once the
|
||||||
|
// request finishes execution or is canceled, call afterExecution().
|
||||||
|
// Otherwise the client should not execute the
|
||||||
|
// request and afterExecution is irrelevant.
|
||||||
|
Wait(ctx context.Context, hashValue uint64) (tryAnother, execute bool, afterExecution func())
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueueSetConfig defines the configuration of a QueueSet.
|
||||||
|
type QueueSetConfig struct {
|
||||||
|
// Name is used to identify a queue set, allowing for descriptive information about its intended use
|
||||||
|
Name string
|
||||||
|
// ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time
|
||||||
|
ConcurrencyLimit int
|
||||||
|
// DesiredNumQueues is the number of queues that the API says should exist now
|
||||||
|
DesiredNumQueues int
|
||||||
|
// QueueLengthLimit is the maximum number of requests that may be waiting in a given queue at a time
|
||||||
|
QueueLengthLimit int
|
||||||
|
// HandSize is a parameter of shuffle sharding. Upon arrival of a request, a queue is chosen by randomly
|
||||||
|
// dealing a "hand" of this many queues and then picking one of minimum length.
|
||||||
|
HandSize int
|
||||||
|
// RequestWaitLimit is the maximum amount of time that a request may wait in a queue.
|
||||||
|
// If, by the end of that time, the request has not been dispatched then it is rejected.
|
||||||
|
RequestWaitLimit time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// EmptyHandler is used to notify the callee when all the queues
|
||||||
|
// of a QueueSet have been drained.
|
||||||
|
type EmptyHandler interface {
|
||||||
|
// HandleEmpty is called to deliver the notification
|
||||||
|
HandleEmpty()
|
||||||
|
}
|
@ -0,0 +1,45 @@
|
|||||||
|
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
||||||
|
|
||||||
|
go_library(
|
||||||
|
name = "go_default_library",
|
||||||
|
srcs = ["queueset.go"],
|
||||||
|
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset",
|
||||||
|
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset",
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
deps = [
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/util/shufflesharding:go_default_library",
|
||||||
|
"//vendor/github.com/pkg/errors:go_default_library",
|
||||||
|
"//vendor/k8s.io/klog:go_default_library",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
go_test(
|
||||||
|
name = "go_default_test",
|
||||||
|
srcs = ["queueset_test.go"],
|
||||||
|
embed = [":go_default_library"],
|
||||||
|
deps = [
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock:go_default_library",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "package-srcs",
|
||||||
|
srcs = glob(["**"]),
|
||||||
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:private"],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "all-srcs",
|
||||||
|
srcs = [":package-srcs"],
|
||||||
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
)
|
@ -0,0 +1,569 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2019 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package queueset
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"math"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/util/runtime"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
|
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
|
||||||
|
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
||||||
|
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||||
|
"k8s.io/apiserver/pkg/util/shufflesharding"
|
||||||
|
"k8s.io/klog"
|
||||||
|
)
|
||||||
|
|
||||||
|
// queueSetFactory implements the QueueSetFactory interface
|
||||||
|
// queueSetFactory makes QueueSet objects.
|
||||||
|
type queueSetFactory struct {
|
||||||
|
counter counter.GoRoutineCounter
|
||||||
|
clock clock.PassiveClock
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewQueueSetFactory creates a new QueueSetFactory object
|
||||||
|
func NewQueueSetFactory(c clock.PassiveClock, counter counter.GoRoutineCounter) fq.QueueSetFactory {
|
||||||
|
return &queueSetFactory{
|
||||||
|
counter: counter,
|
||||||
|
clock: c,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewQueueSet creates a new QueueSet object
|
||||||
|
// There is a new QueueSet created for each priority level.
|
||||||
|
func (qsf queueSetFactory) NewQueueSet(config fq.QueueSetConfig) (fq.QueueSet, error) {
|
||||||
|
return newQueueSet(config, qsf.clock, qsf.counter)
|
||||||
|
}
|
||||||
|
|
||||||
|
// queueSet is a fair queuing implementation designed with three major differences:
|
||||||
|
// 1) dispatches requests to be served rather than requests to be transmitted
|
||||||
|
// 2) serves multiple requests at once
|
||||||
|
// 3) a request's service time is not known until it finishes
|
||||||
|
// implementation of:
|
||||||
|
// https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md
|
||||||
|
type queueSet struct {
|
||||||
|
lock sync.Mutex
|
||||||
|
config fq.QueueSetConfig
|
||||||
|
counter counter.GoRoutineCounter
|
||||||
|
clock clock.PassiveClock
|
||||||
|
queues []*fq.Queue
|
||||||
|
virtualTime float64
|
||||||
|
estimatedServiceTime float64
|
||||||
|
lastRealTime time.Time
|
||||||
|
robinIndex int
|
||||||
|
// numRequestsEnqueued is the number of requests currently enqueued
|
||||||
|
// (eg: incremeneted on Enqueue, decremented on Dequue)
|
||||||
|
numRequestsEnqueued int
|
||||||
|
emptyHandler fq.EmptyHandler
|
||||||
|
dealer *shufflesharding.Dealer
|
||||||
|
}
|
||||||
|
|
||||||
|
// initQueues is a helper method for initializing an array of n queues
|
||||||
|
func initQueues(n, baseIndex int) []*fq.Queue {
|
||||||
|
fqqueues := make([]*fq.Queue, n)
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
fqqueues[i] = &fq.Queue{Index: baseIndex + i, Requests: make([]*fq.Request, 0)}
|
||||||
|
}
|
||||||
|
return fqqueues
|
||||||
|
}
|
||||||
|
|
||||||
|
// newQueueSet creates a new queueSet from passed in parameters
|
||||||
|
func newQueueSet(config fq.QueueSetConfig, c clock.PassiveClock, counter counter.GoRoutineCounter) (*queueSet, error) {
|
||||||
|
dealer, err := shufflesharding.NewDealer(config.DesiredNumQueues, config.HandSize)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "shuffle sharding dealer creation failed")
|
||||||
|
}
|
||||||
|
|
||||||
|
fq := &queueSet{
|
||||||
|
config: config,
|
||||||
|
counter: counter,
|
||||||
|
queues: initQueues(config.DesiredNumQueues, 0),
|
||||||
|
clock: c,
|
||||||
|
virtualTime: 0,
|
||||||
|
lastRealTime: c.Now(),
|
||||||
|
dealer: dealer,
|
||||||
|
}
|
||||||
|
return fq, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetConfiguration is used to set the configuration for a queueSet
|
||||||
|
// update handling for when fields are updated is handled here as well -
|
||||||
|
// eg: if DesiredNum is increased, SetConfiguration reconciles by
|
||||||
|
// adding more queues.
|
||||||
|
func (qs *queueSet) SetConfiguration(config fq.QueueSetConfig) error {
|
||||||
|
qs.lockAndSyncTime()
|
||||||
|
defer qs.lock.Unlock()
|
||||||
|
|
||||||
|
dealer, err := shufflesharding.NewDealer(config.DesiredNumQueues, config.HandSize)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "shuffle sharding dealer creation failed")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Adding queues is the only thing that requires immediate action
|
||||||
|
// Removing queues is handled by omitting indexes >DesiredNum from
|
||||||
|
// chooseQueueIndexLocked
|
||||||
|
numQueues := len(qs.queues)
|
||||||
|
if config.DesiredNumQueues > numQueues {
|
||||||
|
qs.queues = append(qs.queues,
|
||||||
|
initQueues(config.DesiredNumQueues-numQueues, len(qs.queues))...)
|
||||||
|
}
|
||||||
|
|
||||||
|
qs.config = config
|
||||||
|
qs.dealer = dealer
|
||||||
|
|
||||||
|
qs.dequeueWithChannelAsMuchAsPossible()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Quiesce controls whether the QueueSet is operating normally or is quiescing.
|
||||||
|
// A quiescing QueueSet drains as normal but does not admit any
|
||||||
|
// new requests. Passing a non-nil handler means the system should
|
||||||
|
// be quiescing, a nil handler means the system should operate
|
||||||
|
// normally. A call to Wait while the system is quiescing
|
||||||
|
// will be rebuffed by returning tryAnother=true. If all the
|
||||||
|
// queues have no requests waiting nor executing while the system
|
||||||
|
// is quiescing then the handler will eventually be called with no
|
||||||
|
// locks held (even if the system becomes non-quiescing between the
|
||||||
|
// triggering state and the required call).
|
||||||
|
func (qs *queueSet) Quiesce(eh fq.EmptyHandler) {
|
||||||
|
qs.lock.Lock()
|
||||||
|
defer qs.lock.Unlock()
|
||||||
|
if eh == nil {
|
||||||
|
qs.emptyHandler = eh
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Here we check whether there are any requests queued or executing and
|
||||||
|
// if not then fork an invocation of the EmptyHandler.
|
||||||
|
qs.maybeForkEmptyHandlerLocked()
|
||||||
|
|
||||||
|
qs.emptyHandler = eh
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait uses the given hashValue as the source of entropy
|
||||||
|
// as it shuffle-shards a request into a queue and waits for
|
||||||
|
// a decision on what to do with that request. If tryAnother==true
|
||||||
|
// at return then the QueueSet has become undesirable and the client
|
||||||
|
// should try to find a different QueueSet to use; execute and
|
||||||
|
// afterExecution are irrelevant in this case. Otherwise, if execute
|
||||||
|
// then the client should start executing the request and, once the
|
||||||
|
// request finishes execution or is canceled, call afterExecution().
|
||||||
|
// Otherwise the client should not execute the
|
||||||
|
// request and afterExecution is irrelevant.
|
||||||
|
func (qs *queueSet) Wait(ctx context.Context, hashValue uint64) (tryAnother, execute bool, afterExecution func()) {
|
||||||
|
var req *fq.Request
|
||||||
|
shouldReturn, tryAnother, execute, afterExecution := func() (
|
||||||
|
shouldReturn, tryAnother, execute bool, afterExecution func()) {
|
||||||
|
|
||||||
|
qs.lockAndSyncTime()
|
||||||
|
defer qs.lock.Unlock()
|
||||||
|
// A call to Wait while the system is quiescing will be rebuffed by
|
||||||
|
// returning `tryAnother=true`.
|
||||||
|
if qs.emptyHandler != nil {
|
||||||
|
return true, true, false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ========================================================================
|
||||||
|
// Step 1:
|
||||||
|
// 1) Start with shuffle sharding, to pick a queue.
|
||||||
|
// 2) Reject old requests that have been waiting too long
|
||||||
|
// 3) Reject current request if there is not enough concurrency shares and
|
||||||
|
// we are at max queue length
|
||||||
|
// 4) If not rejected, create a request and enqueue
|
||||||
|
req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue)
|
||||||
|
// req == nil means that the request was rejected - no remaining
|
||||||
|
// concurrency shares and at max queue length already
|
||||||
|
if req == nil {
|
||||||
|
metrics.AddReject(qs.config.Name, "queue-full")
|
||||||
|
return true, false, false, func() {}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ========================================================================
|
||||||
|
// Step 2:
|
||||||
|
// 1) The next step is to invoke the method that dequeues as much as possible.
|
||||||
|
|
||||||
|
// This method runs a loop, as long as there
|
||||||
|
// are non-empty queues and the number currently executing is less than the
|
||||||
|
// assured concurrency value. The body of the loop uses the fair queuing
|
||||||
|
// technique to pick a queue, dequeue the request at the head of that
|
||||||
|
// queue, increment the count of the number executing, and send true to
|
||||||
|
// the request's channel.
|
||||||
|
qs.dequeueWithChannelAsMuchAsPossible()
|
||||||
|
return false, false, false, func() {}
|
||||||
|
}()
|
||||||
|
if shouldReturn {
|
||||||
|
return tryAnother, execute, afterExecution
|
||||||
|
}
|
||||||
|
|
||||||
|
// ========================================================================
|
||||||
|
// Step 3:
|
||||||
|
// After that method finishes its loop and returns, the final step in Wait
|
||||||
|
// is to `select` (wait) on a message from the enqueud request's channel
|
||||||
|
// and return appropriately. While waiting this thread does no additional
|
||||||
|
// work so we decrement the go routine counter
|
||||||
|
qs.counter.Add(-1)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case execute := <-req.DequeueChannel:
|
||||||
|
if execute {
|
||||||
|
// execute the request
|
||||||
|
return false, true, func() {
|
||||||
|
qs.finishRequestAndDequeueWithChannelAsMuchAsPossible(req)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
klog.V(5).Infof("request timed out after being enqueued\n")
|
||||||
|
metrics.AddReject(qs.config.Name, "time-out")
|
||||||
|
return false, false, func() {}
|
||||||
|
case <-ctx.Done():
|
||||||
|
klog.V(5).Infof("request cancelled\n")
|
||||||
|
func() {
|
||||||
|
qs.lockAndSyncTime()
|
||||||
|
defer qs.lock.Unlock()
|
||||||
|
|
||||||
|
// TODO(aaron-prindle) add metrics to these two cases
|
||||||
|
if req.Enqueued {
|
||||||
|
// remove the request from the queue as it has timed out
|
||||||
|
for i := range req.Queue.Requests {
|
||||||
|
if req == req.Queue.Requests[i] {
|
||||||
|
// remove the request
|
||||||
|
req.Queue.Requests = append(req.Queue.Requests[:i],
|
||||||
|
req.Queue.Requests[i+1:]...)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// At this point, if the qs is quiescing,
|
||||||
|
// has zero requests executing, and has zero requests enqueued
|
||||||
|
// then a call to the EmptyHandler should be forked.
|
||||||
|
qs.maybeForkEmptyHandlerLocked()
|
||||||
|
} else {
|
||||||
|
// At this point we know that req was in its queue earlier and another
|
||||||
|
// goroutine has removed req from its queue and called qs.counter.Add(1)
|
||||||
|
// in anticipation of unblocking this goroutine through the other arm of this
|
||||||
|
// select. In this case we need to decrement the counter because this goroutine
|
||||||
|
// was actually unblocked through a different code path.
|
||||||
|
qs.counter.Add(-1)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return false, false, func() {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// syncTimeLocked is used to sync the time of the queueSet by looking at the elapsed
|
||||||
|
// time since the last sync and this value based on the 'virtualtime ratio'
|
||||||
|
// which scales inversely to the # of active flows
|
||||||
|
func (qs *queueSet) syncTimeLocked() {
|
||||||
|
realNow := qs.clock.Now()
|
||||||
|
timesincelast := realNow.Sub(qs.lastRealTime).Seconds()
|
||||||
|
qs.lastRealTime = realNow
|
||||||
|
var virtualTimeRatio float64
|
||||||
|
|
||||||
|
activeQueues := 0
|
||||||
|
reqs := 0
|
||||||
|
for _, queue := range qs.queues {
|
||||||
|
reqs += queue.RequestsExecuting
|
||||||
|
|
||||||
|
if len(queue.Requests) > 0 || queue.RequestsExecuting > 0 {
|
||||||
|
activeQueues++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if activeQueues != 0 {
|
||||||
|
// TODO(aaron-prindle) document the math.Min usage
|
||||||
|
virtualTimeRatio = math.Min(float64(reqs), float64(qs.config.ConcurrencyLimit)) / float64(activeQueues)
|
||||||
|
}
|
||||||
|
|
||||||
|
qs.virtualTime += timesincelast * virtualTimeRatio
|
||||||
|
}
|
||||||
|
|
||||||
|
func (qs *queueSet) lockAndSyncTime() {
|
||||||
|
qs.lock.Lock()
|
||||||
|
qs.syncTimeLocked()
|
||||||
|
}
|
||||||
|
|
||||||
|
// timeoutOldRequestsAndRejectOrEnqueueLocked encapsulates the logic required
|
||||||
|
// to validate and enqueue a request for the queueSet/QueueSet:
|
||||||
|
// 1) Start with shuffle sharding, to pick a queue.
|
||||||
|
// 2) Reject old requests that have been waiting too long
|
||||||
|
// 3) Reject current request if there is not enough concurrency shares and
|
||||||
|
// we are at max queue length
|
||||||
|
// 4) If not rejected, create a request and enqueue
|
||||||
|
// returns the enqueud request on a successful enqueue
|
||||||
|
// returns nil in the case that there is no available concurrency or
|
||||||
|
// the queuelengthlimit has been reached
|
||||||
|
func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue uint64) *fq.Request {
|
||||||
|
// Start with the shuffle sharding, to pick a queue.
|
||||||
|
queueIdx := qs.chooseQueueIndexLocked(hashValue)
|
||||||
|
queue := qs.queues[queueIdx]
|
||||||
|
// The next step is the logic to reject requests that have been waiting too long
|
||||||
|
qs.removeTimedOutRequestsFromQueueLocked(queue)
|
||||||
|
// NOTE: currently timeout is only checked for each new request. This means that there can be
|
||||||
|
// requests that are in the queue longer than the timeout if there are no new requests
|
||||||
|
// We prefer the simplicity over the promptness, at least for now.
|
||||||
|
|
||||||
|
// Create a request and enqueue
|
||||||
|
req := &fq.Request{
|
||||||
|
DequeueChannel: make(chan bool, 1),
|
||||||
|
RealEnqueueTime: qs.clock.Now(),
|
||||||
|
Queue: queue,
|
||||||
|
}
|
||||||
|
if ok := qs.rejectOrEnqueueLocked(req); !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
metrics.ObserveQueueLength(qs.config.Name, len(queue.Requests))
|
||||||
|
return req
|
||||||
|
}
|
||||||
|
|
||||||
|
// removeTimedOutRequestsFromQueueLocked rejects old requests that have been enqueued
|
||||||
|
// past the requestWaitLimit
|
||||||
|
func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *fq.Queue) {
|
||||||
|
timeoutIdx := -1
|
||||||
|
now := qs.clock.Now()
|
||||||
|
reqs := queue.Requests
|
||||||
|
// reqs are sorted oldest -> newest
|
||||||
|
// can short circuit loop (break) if oldest requests are not timing out
|
||||||
|
// as newer requests also will not have timed out
|
||||||
|
|
||||||
|
// now - requestWaitLimit = waitLimit
|
||||||
|
waitLimit := now.Add(-qs.config.RequestWaitLimit)
|
||||||
|
for i, req := range reqs {
|
||||||
|
if waitLimit.After(req.RealEnqueueTime) {
|
||||||
|
qs.counter.Add(1)
|
||||||
|
req.DequeueChannel <- false
|
||||||
|
close(req.DequeueChannel)
|
||||||
|
// get index for timed out requests
|
||||||
|
timeoutIdx = i
|
||||||
|
} else {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// remove timed out requests from queue
|
||||||
|
if timeoutIdx != -1 {
|
||||||
|
// timeoutIdx + 1 to remove the last timeout req
|
||||||
|
removeIdx := timeoutIdx + 1
|
||||||
|
// remove all the timeout requests
|
||||||
|
queue.Requests = reqs[removeIdx:]
|
||||||
|
// decrement the # of requestsEnqueued
|
||||||
|
qs.numRequestsEnqueued -= removeIdx
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// getRequestsExecutingLocked gets the # of requests which are "executing":
|
||||||
|
// this is the# of requests/requests which have been dequeued but have not had
|
||||||
|
// finished (via the FinishRequest method invoked after service)
|
||||||
|
func (qs *queueSet) getRequestsExecutingLocked() int {
|
||||||
|
total := 0
|
||||||
|
for _, queue := range qs.queues {
|
||||||
|
total += queue.RequestsExecuting
|
||||||
|
}
|
||||||
|
return total
|
||||||
|
}
|
||||||
|
|
||||||
|
// chooseQueueIndexLocked uses shuffle sharding to select a queue index
|
||||||
|
// using the given hashValue and the shuffle sharding parameters of the queueSet.
|
||||||
|
func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64) int {
|
||||||
|
bestQueueIdx := -1
|
||||||
|
bestQueueLen := int(math.MaxInt32)
|
||||||
|
// DesiredNum is used here instead of numQueues to omit quiescing queues
|
||||||
|
qs.dealer.Deal(hashValue, func(queueIdx int) {
|
||||||
|
thisLen := len(qs.queues[queueIdx].Requests)
|
||||||
|
if thisLen < bestQueueLen {
|
||||||
|
bestQueueIdx, bestQueueLen = queueIdx, thisLen
|
||||||
|
}
|
||||||
|
})
|
||||||
|
return bestQueueIdx
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateQueueVirtualStartTime updates the virtual start time for a queue
|
||||||
|
// this is done when a new request is enqueued. For more info see:
|
||||||
|
// https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md#dispatching
|
||||||
|
func (qs *queueSet) updateQueueVirtualStartTime(request *fq.Request, queue *fq.Queue) {
|
||||||
|
// When a request arrives to an empty queue with no requests executing:
|
||||||
|
// len(queue.Requests) == 1 as enqueue has just happened prior (vs == 0)
|
||||||
|
if len(queue.Requests) == 1 && queue.RequestsExecuting == 0 {
|
||||||
|
// the queue’s virtual start time is set to the virtual time.
|
||||||
|
queue.VirtualStart = qs.virtualTime
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// enqueues a request into an queueSet
|
||||||
|
func (qs *queueSet) enqueue(request *fq.Request) {
|
||||||
|
queue := request.Queue
|
||||||
|
queue.Enqueue(request)
|
||||||
|
qs.updateQueueVirtualStartTime(request, queue)
|
||||||
|
qs.numRequestsEnqueued++
|
||||||
|
|
||||||
|
metrics.UpdateFlowControlRequestsInQueue(qs.config.Name, qs.numRequestsEnqueued)
|
||||||
|
}
|
||||||
|
|
||||||
|
// rejectOrEnqueueLocked rejects or enqueues the newly arrived request if
|
||||||
|
// resource criteria isn't met
|
||||||
|
func (qs *queueSet) rejectOrEnqueueLocked(request *fq.Request) bool {
|
||||||
|
queue := request.Queue
|
||||||
|
curQueueLength := len(queue.Requests)
|
||||||
|
// rejects the newly arrived request if resource criteria not met
|
||||||
|
if qs.getRequestsExecutingLocked() >= qs.config.ConcurrencyLimit &&
|
||||||
|
curQueueLength >= qs.config.QueueLengthLimit {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
qs.enqueue(request)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// selectQueue selects the minimum virtualFinish time from the set of queues
|
||||||
|
// the starting queue is selected via roundrobin
|
||||||
|
func (qs *queueSet) selectQueue() *fq.Queue {
|
||||||
|
minVirtualFinish := math.Inf(1)
|
||||||
|
var minQueue *fq.Queue
|
||||||
|
var minIndex int
|
||||||
|
for range qs.queues {
|
||||||
|
queue := qs.queues[qs.robinIndex]
|
||||||
|
if len(queue.Requests) != 0 {
|
||||||
|
currentVirtualFinish := queue.GetVirtualFinish(0, qs.estimatedServiceTime)
|
||||||
|
if currentVirtualFinish < minVirtualFinish {
|
||||||
|
minVirtualFinish = currentVirtualFinish
|
||||||
|
minQueue = queue
|
||||||
|
minIndex = qs.robinIndex
|
||||||
|
}
|
||||||
|
}
|
||||||
|
qs.robinIndex = (qs.robinIndex + 1) % len(qs.queues)
|
||||||
|
}
|
||||||
|
// we set the round robin indexing to start at the chose queue
|
||||||
|
// for the next round. This way the non-selected queues
|
||||||
|
// win in the case that the virtual finish times are the same
|
||||||
|
qs.robinIndex = minIndex
|
||||||
|
return minQueue
|
||||||
|
}
|
||||||
|
|
||||||
|
// dequeue dequeues a request from the queueSet
|
||||||
|
func (qs *queueSet) dequeue() (*fq.Request, bool) {
|
||||||
|
queue := qs.selectQueue()
|
||||||
|
if queue == nil {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
request, ok := queue.Dequeue()
|
||||||
|
if !ok {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
// When a request is dequeued for service -> qs.VirtualStart += G
|
||||||
|
queue.VirtualStart += qs.estimatedServiceTime
|
||||||
|
request.StartTime = qs.clock.Now()
|
||||||
|
// request dequeued, service has started
|
||||||
|
queue.RequestsExecuting++
|
||||||
|
metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, queue.RequestsExecuting)
|
||||||
|
qs.numRequestsEnqueued--
|
||||||
|
return request, ok
|
||||||
|
}
|
||||||
|
|
||||||
|
// dequeueWithChannelAsMuchAsPossible runs a loop, as long as there
|
||||||
|
// are non-empty queues and the number currently executing is less than the
|
||||||
|
// assured concurrency value. The body of the loop uses the fair queuing
|
||||||
|
// technique to pick a queue, dequeue the request at the head of that
|
||||||
|
// queue, increment the count of the number executing, and send true
|
||||||
|
// to the request's channel.
|
||||||
|
func (qs *queueSet) dequeueWithChannelAsMuchAsPossible() {
|
||||||
|
for qs.numRequestsEnqueued != 0 && qs.getRequestsExecutingLocked() < qs.config.ConcurrencyLimit {
|
||||||
|
_, ok := qs.dequeueWithChannel()
|
||||||
|
if !ok {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// dequeueWithChannel is a convenience method for dequeueing requests that
|
||||||
|
// require a message to be sent through the requests channel
|
||||||
|
// this is a required pattern for the QueueSet the queueSet supports
|
||||||
|
func (qs *queueSet) dequeueWithChannel() (*fq.Request, bool) {
|
||||||
|
req, ok := qs.dequeue()
|
||||||
|
if !ok {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
qs.counter.Add(1)
|
||||||
|
req.DequeueChannel <- true
|
||||||
|
close(req.DequeueChannel)
|
||||||
|
return req, ok
|
||||||
|
}
|
||||||
|
|
||||||
|
// removeQueueAndUpdateIndexes uses reslicing to remove an index from a slice
|
||||||
|
// and then updates the 'Index' field of the queues to be correct
|
||||||
|
func removeQueueAndUpdateIndexes(queues []*fq.Queue, index int) []*fq.Queue {
|
||||||
|
keptQueues := append(queues[:index], queues[index+1:]...)
|
||||||
|
for i := index; i < len(keptQueues); i++ {
|
||||||
|
keptQueues[i].Index--
|
||||||
|
}
|
||||||
|
return keptQueues
|
||||||
|
}
|
||||||
|
|
||||||
|
// finishRequestLocked is a callback that should be used when a previously dequeued request
|
||||||
|
// has completed it's service. This callback updates important state in the
|
||||||
|
// queueSet
|
||||||
|
func (qs *queueSet) finishRequestLocked(r *fq.Request) {
|
||||||
|
S := qs.clock.Since(r.StartTime).Seconds()
|
||||||
|
|
||||||
|
// When a request finishes being served, and the actual service time was S,
|
||||||
|
// the queue’s virtual start time is decremented by G - S.
|
||||||
|
r.Queue.VirtualStart -= qs.estimatedServiceTime - S
|
||||||
|
|
||||||
|
// request has finished, remove from requests executing
|
||||||
|
r.Queue.RequestsExecuting--
|
||||||
|
|
||||||
|
// Logic to remove quiesced queues
|
||||||
|
// >= as QueueIdx=25 is out of bounds for DesiredNum=25 [0...24]
|
||||||
|
if r.Queue.Index >= qs.config.DesiredNumQueues &&
|
||||||
|
len(r.Queue.Requests) == 0 &&
|
||||||
|
r.Queue.RequestsExecuting == 0 {
|
||||||
|
qs.queues = removeQueueAndUpdateIndexes(qs.queues, r.Queue.Index)
|
||||||
|
|
||||||
|
// decrement here to maintain the invariant that (qs.robinIndex+1) % numQueues
|
||||||
|
// is the index of the next queue after the one last dispatched from
|
||||||
|
if qs.robinIndex >= -r.Queue.Index {
|
||||||
|
qs.robinIndex--
|
||||||
|
}
|
||||||
|
|
||||||
|
// At this point, if the qs is quiescing,
|
||||||
|
// has zero requests executing, and has zero requests enqueued
|
||||||
|
// then a call to the EmptyHandler should be forked.
|
||||||
|
qs.maybeForkEmptyHandlerLocked()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (qs *queueSet) maybeForkEmptyHandlerLocked() {
|
||||||
|
if qs.emptyHandler != nil && qs.numRequestsEnqueued == 0 &&
|
||||||
|
qs.getRequestsExecutingLocked() == 0 {
|
||||||
|
qs.counter.Add(1)
|
||||||
|
go func(eh fq.EmptyHandler) {
|
||||||
|
defer runtime.HandleCrash()
|
||||||
|
defer qs.counter.Add(-1)
|
||||||
|
eh.HandleEmpty()
|
||||||
|
}(qs.emptyHandler)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// finishRequestAndDequeueWithChannelAsMuchAsPossible is a convenience method which calls finishRequest
|
||||||
|
// for a given request and then dequeues as many requests as possible
|
||||||
|
// and updates that request's channel signifying it is is dequeued
|
||||||
|
// this is a callback used for the filter that the queueSet supports
|
||||||
|
func (qs *queueSet) finishRequestAndDequeueWithChannelAsMuchAsPossible(req *fq.Request) {
|
||||||
|
qs.lockAndSyncTime()
|
||||||
|
defer qs.lock.Unlock()
|
||||||
|
|
||||||
|
qs.finishRequestLocked(req)
|
||||||
|
qs.dequeueWithChannelAsMuchAsPossible()
|
||||||
|
}
|
@ -0,0 +1,212 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2019 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package queueset
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"math"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
|
||||||
|
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
||||||
|
test "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing"
|
||||||
|
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock"
|
||||||
|
)
|
||||||
|
|
||||||
|
type uniformScenario []uniformClient
|
||||||
|
|
||||||
|
type uniformClient struct {
|
||||||
|
hash uint64
|
||||||
|
nThreads int
|
||||||
|
nCalls int
|
||||||
|
execDuration time.Duration
|
||||||
|
thinkDuration time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// exerciseQueueSetUniformScenario. Simple logic, only works if each
|
||||||
|
// client's offered load is at least as large as its fair share of
|
||||||
|
// capacity.
|
||||||
|
func exerciseQueueSetUniformScenario(t *testing.T, qs fq.QueueSet, sc uniformScenario,
|
||||||
|
totalDuration time.Duration, expectPass bool, expectedAllRequests bool,
|
||||||
|
clk *clock.FakeEventClock, counter counter.GoRoutineCounter) {
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
t.Logf("%s: Start", clk.Now().Format("2006-01-02 15:04:05.000000000"))
|
||||||
|
integrators := make([]test.Integrator, len(sc))
|
||||||
|
var failedCount uint64
|
||||||
|
for i, uc := range sc {
|
||||||
|
integrators[i] = test.NewIntegrator(clk)
|
||||||
|
for j := 0; j < uc.nThreads; j++ {
|
||||||
|
counter.Add(1)
|
||||||
|
go func(i, j int, uc uniformClient, igr test.Integrator) {
|
||||||
|
for k := 0; k < uc.nCalls; k++ {
|
||||||
|
ClockWait(clk, counter, uc.thinkDuration)
|
||||||
|
for {
|
||||||
|
tryAnother, execute, afterExecute := qs.Wait(context.Background(), uc.hash)
|
||||||
|
t.Logf("%s: %d, %d, %d got q=%v, e=%v", clk.Now().Format("2006-01-02 15:04:05.000000000"), i, j, k, tryAnother, execute)
|
||||||
|
if tryAnother {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !execute {
|
||||||
|
atomic.AddUint64(&failedCount, 1)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
igr.Add(1)
|
||||||
|
ClockWait(clk, counter, uc.execDuration)
|
||||||
|
afterExecute()
|
||||||
|
igr.Add(-1)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
counter.Add(-1)
|
||||||
|
}(i, j, uc, integrators[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
lim := now.Add(totalDuration)
|
||||||
|
clk.Run(&lim)
|
||||||
|
clk.SetTime(lim)
|
||||||
|
t.Logf("%s: End", clk.Now().Format("2006-01-02 15:04:05.000000000"))
|
||||||
|
results := make([]test.IntegratorResults, len(sc))
|
||||||
|
var sumOfAvg float64
|
||||||
|
for i := range sc {
|
||||||
|
results[i] = integrators[i].GetResults()
|
||||||
|
sumOfAvg += results[i].Average
|
||||||
|
}
|
||||||
|
idealAverage := sumOfAvg / float64(len(sc))
|
||||||
|
passes := make([]bool, len(sc))
|
||||||
|
allPass := true
|
||||||
|
for i := range sc {
|
||||||
|
relDiff := (results[i].Average - idealAverage) / idealAverage
|
||||||
|
passes[i] = math.Abs(relDiff) <= 0.1
|
||||||
|
allPass = allPass && passes[i]
|
||||||
|
}
|
||||||
|
for i := range sc {
|
||||||
|
if allPass != expectPass {
|
||||||
|
t.Errorf("Class %d got an Average of %v but the ideal was %v", i, results[i].Average, idealAverage)
|
||||||
|
} else {
|
||||||
|
t.Logf("Class %d got an Average of %v and the ideal was %v", i, results[i].Average, idealAverage)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
clk.Run(nil)
|
||||||
|
if expectedAllRequests && failedCount > 0 {
|
||||||
|
t.Errorf("Expected all requests to be successful but got %v failed requests", failedCount)
|
||||||
|
} else if !expectedAllRequests && failedCount == 0 {
|
||||||
|
t.Errorf("Expected failed requests but all requests succeeded")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestNoRestraint should fail because the dummy QueueSet exercises no control
|
||||||
|
func TestNoRestraint(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||||
|
nrf := test.NewNoRestraintFactory()
|
||||||
|
config := fq.QueueSetConfig{}
|
||||||
|
nr, err := nrf.NewQueueSet(config)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("QueueSet creation failed with %v", err)
|
||||||
|
}
|
||||||
|
exerciseQueueSetUniformScenario(t, nr, []uniformClient{
|
||||||
|
{1001001001, 5, 10, time.Second, time.Second},
|
||||||
|
{2002002002, 2, 10, time.Second, time.Second / 2},
|
||||||
|
}, time.Second*10, false, true, clk, counter)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUniformFlows(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||||
|
qsf := NewQueueSetFactory(clk, counter)
|
||||||
|
config := fq.QueueSetConfig{
|
||||||
|
Name: "TestUniformFlows",
|
||||||
|
ConcurrencyLimit: 100,
|
||||||
|
DesiredNumQueues: 128,
|
||||||
|
QueueLengthLimit: 128,
|
||||||
|
HandSize: 1,
|
||||||
|
RequestWaitLimit: 10 * time.Minute,
|
||||||
|
}
|
||||||
|
qs, err := qsf.NewQueueSet(config)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("QueueSet creation failed with %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
exerciseQueueSetUniformScenario(t, qs, []uniformClient{
|
||||||
|
{1001001001, 5, 10, time.Second, time.Second},
|
||||||
|
{2002002002, 5, 10, time.Second, time.Second},
|
||||||
|
}, time.Second*10, true, true, clk, counter)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDifferentFlows(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||||
|
qsf := NewQueueSetFactory(clk, counter)
|
||||||
|
config := fq.QueueSetConfig{
|
||||||
|
Name: "TestDifferentFlows",
|
||||||
|
ConcurrencyLimit: 1,
|
||||||
|
DesiredNumQueues: 128,
|
||||||
|
QueueLengthLimit: 128,
|
||||||
|
HandSize: 1,
|
||||||
|
RequestWaitLimit: 10 * time.Minute,
|
||||||
|
}
|
||||||
|
qs, err := qsf.NewQueueSet(config)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("QueueSet creation failed with %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
exerciseQueueSetUniformScenario(t, qs, []uniformClient{
|
||||||
|
{1001001001, 5, 10, time.Second, time.Second},
|
||||||
|
{2002002002, 2, 5, time.Second, time.Second / 2},
|
||||||
|
}, time.Second*10, true, true, clk, counter)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTimeout(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
clk, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||||
|
qsf := NewQueueSetFactory(clk, counter)
|
||||||
|
config := fq.QueueSetConfig{
|
||||||
|
Name: "TestTimeout",
|
||||||
|
ConcurrencyLimit: 1,
|
||||||
|
DesiredNumQueues: 128,
|
||||||
|
QueueLengthLimit: 128,
|
||||||
|
HandSize: 1,
|
||||||
|
RequestWaitLimit: 0,
|
||||||
|
}
|
||||||
|
qs, err := qsf.NewQueueSet(config)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("QueueSet creation failed with %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
exerciseQueueSetUniformScenario(t, qs, []uniformClient{
|
||||||
|
{1001001001, 5, 100, time.Second, time.Second},
|
||||||
|
}, time.Second*10, true, false, clk, counter)
|
||||||
|
}
|
||||||
|
|
||||||
|
func ClockWait(clk *clock.FakeEventClock, counter counter.GoRoutineCounter, duration time.Duration) {
|
||||||
|
dunch := make(chan struct{})
|
||||||
|
clk.EventAfterDuration(func(time.Time) {
|
||||||
|
counter.Add(1)
|
||||||
|
close(dunch)
|
||||||
|
}, duration)
|
||||||
|
counter.Add(-1)
|
||||||
|
select {
|
||||||
|
case <-dunch:
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,33 @@
|
|||||||
|
load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||||
|
|
||||||
|
go_library(
|
||||||
|
name = "go_default_library",
|
||||||
|
srcs = [
|
||||||
|
"integrator.go",
|
||||||
|
"no-restraint.go",
|
||||||
|
],
|
||||||
|
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing",
|
||||||
|
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing",
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
deps = [
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "package-srcs",
|
||||||
|
srcs = glob(["**"]),
|
||||||
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:private"],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "all-srcs",
|
||||||
|
srcs = [
|
||||||
|
":package-srcs",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock:all-srcs",
|
||||||
|
],
|
||||||
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
)
|
@ -0,0 +1,34 @@
|
|||||||
|
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
||||||
|
|
||||||
|
go_library(
|
||||||
|
name = "go_default_library",
|
||||||
|
srcs = ["event_clock.go"],
|
||||||
|
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock",
|
||||||
|
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock",
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
deps = [
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
go_test(
|
||||||
|
name = "go_default_test",
|
||||||
|
srcs = ["event_clock_test.go"],
|
||||||
|
embed = [":go_default_library"],
|
||||||
|
deps = ["//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library"],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "package-srcs",
|
||||||
|
srcs = glob(["**"]),
|
||||||
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:private"],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "all-srcs",
|
||||||
|
srcs = [":package-srcs"],
|
||||||
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
)
|
@ -0,0 +1,222 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2019 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package clock
|
||||||
|
|
||||||
|
import (
|
||||||
|
"container/heap"
|
||||||
|
"math/rand"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
|
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
|
||||||
|
)
|
||||||
|
|
||||||
|
// EventFunc does some work that needs to be done at or after the
|
||||||
|
// given time. After this function returns, associated work may continue
|
||||||
|
// on other goroutines only if they are counted by the GoRoutineCounter
|
||||||
|
// of the FakeEventClock handling this EventFunc.
|
||||||
|
type EventFunc func(time.Time)
|
||||||
|
|
||||||
|
// EventClock fires event on time
|
||||||
|
type EventClock interface {
|
||||||
|
clock.PassiveClock
|
||||||
|
EventAfterDuration(f EventFunc, d time.Duration)
|
||||||
|
EventAfterTime(f EventFunc, t time.Time)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RealEventClock fires event on real world time
|
||||||
|
type RealEventClock struct {
|
||||||
|
clock.RealClock
|
||||||
|
}
|
||||||
|
|
||||||
|
// EventAfterDuration schedules an EventFunc
|
||||||
|
func (RealEventClock) EventAfterDuration(f EventFunc, d time.Duration) {
|
||||||
|
ch := time.After(d)
|
||||||
|
go func() {
|
||||||
|
select {
|
||||||
|
case t := <-ch:
|
||||||
|
f(t)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// EventAfterTime schedules an EventFunc
|
||||||
|
func (r RealEventClock) EventAfterTime(f EventFunc, t time.Time) {
|
||||||
|
now := time.Now()
|
||||||
|
d := t.Sub(now)
|
||||||
|
if d <= 0 {
|
||||||
|
go f(now)
|
||||||
|
} else {
|
||||||
|
r.EventAfterDuration(f, d)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// waitGroupCounter is a wait group used for a GoRoutine Counter. This private
|
||||||
|
// type is used to disallow direct waitGroup access
|
||||||
|
type waitGroupCounter struct{ sync.WaitGroup }
|
||||||
|
|
||||||
|
// compile time assertion that waitGroupCounter meets requirements
|
||||||
|
// of GoRoutineCounter
|
||||||
|
var _ counter.GoRoutineCounter = (*waitGroupCounter)(nil)
|
||||||
|
|
||||||
|
// FakeEventClock is one whose time does not pass implicitly but
|
||||||
|
// rather is explicitly set by invocations of its SetTime method
|
||||||
|
type FakeEventClock struct {
|
||||||
|
clock.FakePassiveClock
|
||||||
|
|
||||||
|
// waiters is a heap of waiting work, sorted by time
|
||||||
|
waiters eventWaiterHeap
|
||||||
|
waitersLock sync.RWMutex
|
||||||
|
|
||||||
|
// clientWG may be nil and if not supplies constraints on time
|
||||||
|
// passing in Run. The Run method will not pick a new time until
|
||||||
|
// this is nil or its counter is zero.
|
||||||
|
clientWG *waitGroupCounter
|
||||||
|
|
||||||
|
// fuzz is the amount of noise to add to scheduling. An event
|
||||||
|
// requested to run at time T will run at some time chosen
|
||||||
|
// uniformly at random from the interval [T, T+fuzz]; the upper
|
||||||
|
// bound is exclusive iff fuzz is non-zero.
|
||||||
|
fuzz time.Duration
|
||||||
|
|
||||||
|
// rand is the random number generator to use in fuzzing
|
||||||
|
rand *rand.Rand
|
||||||
|
}
|
||||||
|
|
||||||
|
type eventWaiterHeap []eventWaiter
|
||||||
|
|
||||||
|
var _ heap.Interface = (*eventWaiterHeap)(nil)
|
||||||
|
|
||||||
|
type eventWaiter struct {
|
||||||
|
targetTime time.Time
|
||||||
|
f EventFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewFakeEventClock constructor. The given `r *rand.Rand` must
|
||||||
|
// henceforth not be used for any other purpose. If `r` is nil then a
|
||||||
|
// fresh one will be constructed, seeded with the current real time.
|
||||||
|
// The clientWG can be `nil` and if not is used to let Run know about
|
||||||
|
// additional work that has to complete before time can advance.
|
||||||
|
func NewFakeEventClock(t time.Time, fuzz time.Duration, r *rand.Rand) (*FakeEventClock, counter.GoRoutineCounter) {
|
||||||
|
grc := &waitGroupCounter{}
|
||||||
|
|
||||||
|
if r == nil {
|
||||||
|
r = rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
|
r.Uint64()
|
||||||
|
r.Uint64()
|
||||||
|
r.Uint64()
|
||||||
|
}
|
||||||
|
return &FakeEventClock{
|
||||||
|
FakePassiveClock: *clock.NewFakePassiveClock(t),
|
||||||
|
clientWG: grc,
|
||||||
|
fuzz: fuzz,
|
||||||
|
rand: r,
|
||||||
|
}, grc
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetNextTime returns the next time at which there is work scheduled,
|
||||||
|
// and a bool indicating whether there is any such time
|
||||||
|
func (fec *FakeEventClock) GetNextTime() (time.Time, bool) {
|
||||||
|
fec.waitersLock.RLock()
|
||||||
|
defer fec.waitersLock.RUnlock()
|
||||||
|
if len(fec.waiters) > 0 {
|
||||||
|
return fec.waiters[0].targetTime, true
|
||||||
|
}
|
||||||
|
return time.Time{}, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run runs all the events scheduled, and all the events they
|
||||||
|
// schedule, and so on, until there are none scheduled or the limit is not
|
||||||
|
// nil and the next time would exceed the limit. The clientWG given in
|
||||||
|
// the constructor gates each advance of time.
|
||||||
|
func (fec *FakeEventClock) Run(limit *time.Time) {
|
||||||
|
for {
|
||||||
|
fec.clientWG.Wait()
|
||||||
|
t, ok := fec.GetNextTime()
|
||||||
|
if !ok || limit != nil && t.After(*limit) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
fec.SetTime(t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetTime sets the time and runs to completion all events that should
|
||||||
|
// be started by the given time --- including any further events they
|
||||||
|
// schedule
|
||||||
|
func (fec *FakeEventClock) SetTime(t time.Time) {
|
||||||
|
fec.FakePassiveClock.SetTime(t)
|
||||||
|
for {
|
||||||
|
foundSome := false
|
||||||
|
func() {
|
||||||
|
fec.waitersLock.Lock()
|
||||||
|
defer fec.waitersLock.Unlock()
|
||||||
|
// This loop is because events run at a given time may schedule more
|
||||||
|
// events to run at that or an earlier time.
|
||||||
|
// Events should not advance the clock. But just in case they do...
|
||||||
|
now := fec.Now()
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for len(fec.waiters) > 0 && !now.Before(fec.waiters[0].targetTime) {
|
||||||
|
ew := heap.Pop(&fec.waiters).(eventWaiter)
|
||||||
|
wg.Add(1)
|
||||||
|
go func(f EventFunc) { f(now); wg.Done() }(ew.f)
|
||||||
|
foundSome = true
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}()
|
||||||
|
if !foundSome {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// EventAfterDuration schedules the given function to be invoked once
|
||||||
|
// the given duration has passed.
|
||||||
|
func (fec *FakeEventClock) EventAfterDuration(f EventFunc, d time.Duration) {
|
||||||
|
fec.waitersLock.Lock()
|
||||||
|
defer fec.waitersLock.Unlock()
|
||||||
|
now := fec.Now()
|
||||||
|
fd := time.Duration(float32(fec.fuzz) * fec.rand.Float32())
|
||||||
|
heap.Push(&fec.waiters, eventWaiter{targetTime: now.Add(d + fd), f: f})
|
||||||
|
}
|
||||||
|
|
||||||
|
// EventAfterTime schedules the given function to be invoked once
|
||||||
|
// the given time has arrived.
|
||||||
|
func (fec *FakeEventClock) EventAfterTime(f EventFunc, t time.Time) {
|
||||||
|
fec.waitersLock.Lock()
|
||||||
|
defer fec.waitersLock.Unlock()
|
||||||
|
fd := time.Duration(float32(fec.fuzz) * fec.rand.Float32())
|
||||||
|
heap.Push(&fec.waiters, eventWaiter{targetTime: t.Add(fd), f: f})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ewh eventWaiterHeap) Len() int { return len(ewh) }
|
||||||
|
|
||||||
|
func (ewh eventWaiterHeap) Less(i, j int) bool { return ewh[i].targetTime.Before(ewh[j].targetTime) }
|
||||||
|
|
||||||
|
func (ewh eventWaiterHeap) Swap(i, j int) { ewh[i], ewh[j] = ewh[j], ewh[i] }
|
||||||
|
|
||||||
|
func (ewh *eventWaiterHeap) Push(x interface{}) {
|
||||||
|
*ewh = append(*ewh, x.(eventWaiter))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ewh *eventWaiterHeap) Pop() interface{} {
|
||||||
|
old := *ewh
|
||||||
|
n := len(old)
|
||||||
|
x := old[n-1]
|
||||||
|
*ewh = old[:n-1]
|
||||||
|
return x
|
||||||
|
}
|
@ -0,0 +1,183 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2019 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package clock
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/rand"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
|
)
|
||||||
|
|
||||||
|
type TestableEventClock interface {
|
||||||
|
EventClock
|
||||||
|
SetTime(time.Time)
|
||||||
|
Run(*time.Time)
|
||||||
|
}
|
||||||
|
|
||||||
|
// settablePassiveClock allows setting current time of a passive clock
|
||||||
|
type settablePassiveClock interface {
|
||||||
|
clock.PassiveClock
|
||||||
|
SetTime(time.Time)
|
||||||
|
}
|
||||||
|
|
||||||
|
func exerciseTestableEventClock(t *testing.T, ec TestableEventClock, fuzz time.Duration) {
|
||||||
|
exercisePassiveClock(t, ec)
|
||||||
|
var numDone int32
|
||||||
|
now := ec.Now()
|
||||||
|
strictable := true
|
||||||
|
const batchSize = 100
|
||||||
|
times := make(chan time.Time, batchSize+1)
|
||||||
|
try := func(abs, strict bool, d time.Duration) {
|
||||||
|
f := func(u time.Time) {
|
||||||
|
realD := ec.Since(now)
|
||||||
|
atomic.AddInt32(&numDone, 1)
|
||||||
|
times <- u
|
||||||
|
if realD < d || strict && strictable && realD > d+fuzz {
|
||||||
|
t.Errorf("Asked for %v, got %v", d, realD)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if abs {
|
||||||
|
ec.EventAfterTime(f, now.Add(d))
|
||||||
|
} else {
|
||||||
|
ec.EventAfterDuration(f, d)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try(true, true, time.Minute)
|
||||||
|
for i := 0; i < batchSize; i++ {
|
||||||
|
d := time.Duration(rand.Intn(30)-3) * time.Second
|
||||||
|
try(i%2 == 0, d >= 0, d)
|
||||||
|
}
|
||||||
|
ec.Run(nil)
|
||||||
|
if numDone != batchSize+1 {
|
||||||
|
t.Errorf("Got only %v events", numDone)
|
||||||
|
}
|
||||||
|
lastTime := now.Add(-3 * time.Second)
|
||||||
|
for i := 0; i <= batchSize; i++ {
|
||||||
|
nextTime := <-times
|
||||||
|
if nextTime.Before(lastTime) {
|
||||||
|
t.Errorf("Got %s after %s", nextTime, lastTime)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
endTime := ec.Now()
|
||||||
|
dx := endTime.Sub(now)
|
||||||
|
if dx > time.Minute+fuzz {
|
||||||
|
t.Errorf("Run started at %#+v, ended at %#+v, dx=%d", now, endTime, dx)
|
||||||
|
}
|
||||||
|
now = endTime
|
||||||
|
var shouldRun int32
|
||||||
|
strictable = false
|
||||||
|
for i := 0; i < batchSize; i++ {
|
||||||
|
d := time.Duration(rand.Intn(30)-3) * time.Second
|
||||||
|
try(i%2 == 0, d >= 0, d)
|
||||||
|
if d <= 12*time.Second {
|
||||||
|
shouldRun++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ec.SetTime(now.Add(13*time.Second - 1))
|
||||||
|
if numDone != batchSize+1+shouldRun {
|
||||||
|
t.Errorf("Expected %v, but %v ran", shouldRun, numDone-batchSize-1)
|
||||||
|
}
|
||||||
|
lastTime = now.Add(-3 * time.Second)
|
||||||
|
for i := int32(0); i < shouldRun; i++ {
|
||||||
|
nextTime := <-times
|
||||||
|
if nextTime.Before(lastTime) {
|
||||||
|
t.Errorf("Got %s after %s", nextTime, lastTime)
|
||||||
|
}
|
||||||
|
lastTime = nextTime
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func exercisePassiveClock(t *testing.T, pc settablePassiveClock) {
|
||||||
|
t1 := time.Now()
|
||||||
|
t2 := t1.Add(time.Hour)
|
||||||
|
pc.SetTime(t1)
|
||||||
|
tx := pc.Now()
|
||||||
|
if tx != t1 {
|
||||||
|
t.Errorf("SetTime(%#+v); Now() => %#+v", t1, tx)
|
||||||
|
}
|
||||||
|
dx := pc.Since(t1)
|
||||||
|
if dx != 0 {
|
||||||
|
t.Errorf("Since() => %v", dx)
|
||||||
|
}
|
||||||
|
pc.SetTime(t2)
|
||||||
|
dx = pc.Since(t1)
|
||||||
|
if dx != time.Hour {
|
||||||
|
t.Errorf("Since() => %v", dx)
|
||||||
|
}
|
||||||
|
tx = pc.Now()
|
||||||
|
if tx != t2 {
|
||||||
|
t.Errorf("Now() => %#+v", tx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFakeEventClock(t *testing.T) {
|
||||||
|
startTime := time.Now()
|
||||||
|
fec, _ := NewFakeEventClock(startTime, 0, nil)
|
||||||
|
exerciseTestableEventClock(t, fec, 0)
|
||||||
|
fec, _ = NewFakeEventClock(startTime, time.Second, nil)
|
||||||
|
exerciseTestableEventClock(t, fec, time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
func exerciseEventClock(t *testing.T, ec EventClock, relax func(time.Duration)) {
|
||||||
|
var numDone int32
|
||||||
|
now := ec.Now()
|
||||||
|
const batchSize = 100
|
||||||
|
times := make(chan time.Time, batchSize+1)
|
||||||
|
try := func(abs bool, d time.Duration) {
|
||||||
|
f := func(u time.Time) {
|
||||||
|
realD := ec.Since(now)
|
||||||
|
atomic.AddInt32(&numDone, 1)
|
||||||
|
times <- u
|
||||||
|
if realD < d {
|
||||||
|
t.Errorf("Asked for %v, got %v", d, realD)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if abs {
|
||||||
|
ec.EventAfterTime(f, now.Add(d))
|
||||||
|
} else {
|
||||||
|
ec.EventAfterDuration(f, d)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try(true, time.Millisecond*3300)
|
||||||
|
for i := 0; i < batchSize; i++ {
|
||||||
|
d := time.Duration(rand.Intn(30)-3) * time.Millisecond * 100
|
||||||
|
try(i%2 == 0, d)
|
||||||
|
}
|
||||||
|
relax(time.Second * 4)
|
||||||
|
if atomic.LoadInt32(&numDone) != batchSize+1 {
|
||||||
|
t.Errorf("Got only %v events", numDone)
|
||||||
|
}
|
||||||
|
lastTime := now
|
||||||
|
for i := 0; i <= batchSize; i++ {
|
||||||
|
nextTime := <-times
|
||||||
|
if nextTime.Before(now) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
dt := nextTime.Sub(lastTime) / (50 * time.Millisecond)
|
||||||
|
if dt < 0 {
|
||||||
|
t.Errorf("Got %s after %s", nextTime, lastTime)
|
||||||
|
}
|
||||||
|
lastTime = nextTime
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRealEventClock(t *testing.T) {
|
||||||
|
exerciseEventClock(t, RealEventClock{}, func(d time.Duration) { time.Sleep(d) })
|
||||||
|
}
|
@ -0,0 +1,103 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2019 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package testing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Integrator computes the integral of some variable X over time as
|
||||||
|
// read from a particular clock. The integral starts when the
|
||||||
|
// Integrator is created, and ends at the latest operation on the
|
||||||
|
// Integrator.
|
||||||
|
type Integrator interface {
|
||||||
|
Set(float64) // set the value of X
|
||||||
|
Add(float64) // add the given quantity to X
|
||||||
|
GetResults() IntegratorResults
|
||||||
|
}
|
||||||
|
|
||||||
|
// IntegratorResults holds statistical abstracts of the integration
|
||||||
|
type IntegratorResults struct {
|
||||||
|
Duration float64 //seconds
|
||||||
|
Average float64
|
||||||
|
Deviation float64 //sqrt(avg((value-avg)^2))
|
||||||
|
}
|
||||||
|
|
||||||
|
type integrator struct {
|
||||||
|
sync.Mutex
|
||||||
|
clk clock.PassiveClock
|
||||||
|
lastTime time.Time
|
||||||
|
x float64
|
||||||
|
integrals [3]float64 // integral of x^0, x^1, and x^2
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewIntegrator makes one that uses the given clock
|
||||||
|
func NewIntegrator(clk clock.PassiveClock) Integrator {
|
||||||
|
return &integrator{
|
||||||
|
clk: clk,
|
||||||
|
lastTime: clk.Now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (igr *integrator) Set(x float64) {
|
||||||
|
igr.Lock()
|
||||||
|
igr.updateLocked()
|
||||||
|
igr.x = x
|
||||||
|
igr.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (igr *integrator) Add(deltaX float64) {
|
||||||
|
igr.Lock()
|
||||||
|
igr.updateLocked()
|
||||||
|
igr.x += deltaX
|
||||||
|
igr.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (igr *integrator) updateLocked() {
|
||||||
|
now := igr.clk.Now()
|
||||||
|
dt := now.Sub(igr.lastTime).Seconds()
|
||||||
|
igr.lastTime = now
|
||||||
|
igr.integrals[0] += dt
|
||||||
|
igr.integrals[1] += dt * igr.x
|
||||||
|
igr.integrals[2] += dt * igr.x * igr.x
|
||||||
|
}
|
||||||
|
|
||||||
|
func (igr *integrator) GetResults() (results IntegratorResults) {
|
||||||
|
igr.Lock()
|
||||||
|
defer func() { igr.Unlock() }()
|
||||||
|
igr.updateLocked()
|
||||||
|
results.Duration = igr.integrals[0]
|
||||||
|
if results.Duration <= 0 {
|
||||||
|
results.Average = math.NaN()
|
||||||
|
results.Deviation = math.NaN()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
results.Average = igr.integrals[1] / igr.integrals[0]
|
||||||
|
// Deviation is sqrt( Integral( (x - xbar)^2 dt) / Duration )
|
||||||
|
// = sqrt( Integral( x^2 + xbar^2 -2*x*xbar dt ) / Duration )
|
||||||
|
// = sqrt( ( Integral( x^2 dt ) + Duration * xbar^2 - 2*xbar*Integral(x dt) ) / Duration)
|
||||||
|
// = sqrt( Integral(x^2 dt)/Duration - xbar^2 )
|
||||||
|
variance := igr.integrals[2]/igr.integrals[0] - results.Average*results.Average
|
||||||
|
if variance > 0 {
|
||||||
|
results.Deviation = math.Sqrt(variance)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
@ -0,0 +1,49 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2019 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package testing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NewNoRestraintFactory makes a QueueSetFactory that produces
|
||||||
|
// QueueSets that exert no restraint --- every request is dispatched
|
||||||
|
// for execution as soon as it arrives.
|
||||||
|
func NewNoRestraintFactory() fq.QueueSetFactory {
|
||||||
|
return noRestraintFactory{}
|
||||||
|
}
|
||||||
|
|
||||||
|
type noRestraintFactory struct{}
|
||||||
|
|
||||||
|
func (noRestraintFactory) NewQueueSet(config fq.QueueSetConfig) (fq.QueueSet, error) {
|
||||||
|
return noRestraint{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type noRestraint struct{}
|
||||||
|
|
||||||
|
func (noRestraint) SetConfiguration(config fq.QueueSetConfig) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (noRestraint) Quiesce(fq.EmptyHandler) {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (noRestraint) Wait(ctx context.Context, hashValue uint64) (quiescent, execute bool, afterExecution func()) {
|
||||||
|
return false, true, func() {}
|
||||||
|
}
|
@ -0,0 +1,73 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2016 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package fairqueuing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Request is a temporary container for "requests" with additional tracking fields
|
||||||
|
// required for the functionality FQScheduler
|
||||||
|
type Request struct {
|
||||||
|
//TODO(aaron-prindle) seq is only used for testing, this was abstracted
|
||||||
|
// via an interface before, keeping this for now
|
||||||
|
QueueIdx int
|
||||||
|
|
||||||
|
Queue *Queue
|
||||||
|
StartTime time.Time
|
||||||
|
DequeueChannel chan bool
|
||||||
|
RealEnqueueTime time.Time
|
||||||
|
Enqueued bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Queue is an array of requests with additional metadata required for
|
||||||
|
// the FQScheduler
|
||||||
|
type Queue struct {
|
||||||
|
Requests []*Request
|
||||||
|
VirtualStart float64
|
||||||
|
RequestsExecuting int
|
||||||
|
Index int
|
||||||
|
}
|
||||||
|
|
||||||
|
// Enqueue enqueues a request into the queue
|
||||||
|
func (q *Queue) Enqueue(request *Request) {
|
||||||
|
request.Enqueued = true
|
||||||
|
q.Requests = append(q.Requests, request)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dequeue dequeues a request from the queue
|
||||||
|
func (q *Queue) Dequeue() (*Request, bool) {
|
||||||
|
if len(q.Requests) == 0 {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
request := q.Requests[0]
|
||||||
|
q.Requests = q.Requests[1:]
|
||||||
|
|
||||||
|
request.Enqueued = false
|
||||||
|
return request, true
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetVirtualFinish returns the expected virtual finish time of the request at
|
||||||
|
// index J in the queue with estimated finish time G
|
||||||
|
func (q *Queue) GetVirtualFinish(J int, G float64) float64 {
|
||||||
|
// The virtual finish time of request number J in the queue
|
||||||
|
// (counting from J=1 for the head) is J * G + (virtual start time).
|
||||||
|
|
||||||
|
// counting from J=1 for the head (eg: queue.Requests[0] -> J=1) - J+1
|
||||||
|
jg := float64(J+1) * float64(G)
|
||||||
|
return jg + q.VirtualStart
|
||||||
|
}
|
@ -0,0 +1,24 @@
|
|||||||
|
load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||||
|
|
||||||
|
go_library(
|
||||||
|
name = "go_default_library",
|
||||||
|
srcs = ["metrics.go"],
|
||||||
|
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/metrics",
|
||||||
|
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/metrics",
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
deps = ["//vendor/github.com/prometheus/client_golang/prometheus:go_default_library"],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "package-srcs",
|
||||||
|
srcs = glob(["**"]),
|
||||||
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:private"],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "all-srcs",
|
||||||
|
srcs = [":package-srcs"],
|
||||||
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
)
|
@ -0,0 +1,152 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2019 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package metrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
namespace = "apiserver"
|
||||||
|
subsystem = "flowcontrol"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
priorityLevel = "priorityLevel"
|
||||||
|
flowSchema = "flowSchema"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
queueLengthBuckets = []float64{0, 10, 25, 50, 100, 250, 500, 1000}
|
||||||
|
requestDurationSecondsBuckets = []float64{0, 0.005, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 30}
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
prometheus.MustRegister(apiserverRejectedRequests)
|
||||||
|
prometheus.MustRegister(apiserverCurrentInqueueRequests)
|
||||||
|
prometheus.MustRegister(apiserverRequestQueueLength)
|
||||||
|
prometheus.MustRegister(apiserverRequestConcurrencyLimit)
|
||||||
|
prometheus.MustRegister(apiserverCurrentExecutingRequests)
|
||||||
|
prometheus.MustRegister(apiserverRequestWaitingSeconds)
|
||||||
|
prometheus.MustRegister(apiserverRequestExecutionSeconds)
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
apiserverRejectedRequests = prometheus.NewCounterVec(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "rejectedRequests",
|
||||||
|
Help: "Number of rejected requests by api priority and fairness system",
|
||||||
|
},
|
||||||
|
[]string{priorityLevel, "reason"},
|
||||||
|
)
|
||||||
|
apiserverCurrentInqueueRequests = prometheus.NewGaugeVec(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "currentInqueueRequests",
|
||||||
|
Help: "Number of requests currently pending in the queue by the api priority and fairness system",
|
||||||
|
},
|
||||||
|
[]string{priorityLevel},
|
||||||
|
)
|
||||||
|
apiserverRequestQueueLength = prometheus.NewHistogramVec(
|
||||||
|
prometheus.HistogramOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "requestQueueLength",
|
||||||
|
Help: "Length of queue in the api priority and fairness system",
|
||||||
|
Buckets: queueLengthBuckets,
|
||||||
|
},
|
||||||
|
[]string{priorityLevel},
|
||||||
|
)
|
||||||
|
apiserverRequestConcurrencyLimit = prometheus.NewGaugeVec(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "requestConcurrencyLimit",
|
||||||
|
Help: "Shared concurrency limit in the api priority and fairness system",
|
||||||
|
},
|
||||||
|
[]string{priorityLevel},
|
||||||
|
)
|
||||||
|
apiserverCurrentExecutingRequests = prometheus.NewGaugeVec(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "currentExecutingRequests",
|
||||||
|
Help: "Number of requests currently executing in the api priority and fairness system",
|
||||||
|
},
|
||||||
|
[]string{priorityLevel},
|
||||||
|
)
|
||||||
|
apiserverRequestWaitingSeconds = prometheus.NewHistogramVec(
|
||||||
|
prometheus.HistogramOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "request_wait_durationSeconds",
|
||||||
|
Help: "Length of time a request spent waiting in its queue",
|
||||||
|
Buckets: requestDurationSecondsBuckets,
|
||||||
|
},
|
||||||
|
[]string{priorityLevel, flowSchema, "execute"},
|
||||||
|
)
|
||||||
|
apiserverRequestExecutionSeconds = prometheus.NewHistogramVec(
|
||||||
|
prometheus.HistogramOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "requestExecutionSeconds",
|
||||||
|
Help: "Time of request executing in the api priority and fairness system",
|
||||||
|
Buckets: requestDurationSecondsBuckets,
|
||||||
|
},
|
||||||
|
[]string{priorityLevel, flowSchema},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
// UpdateFlowControlRequestsInQueue updates the value for the # of requests in the specified queues in flow control
|
||||||
|
func UpdateFlowControlRequestsInQueue(priorityLevel string, inqueue int) {
|
||||||
|
apiserverCurrentInqueueRequests.WithLabelValues(priorityLevel).Set(float64(inqueue))
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateFlowControlRequestsExecuting updates the value for the # of requests executing in flow control
|
||||||
|
func UpdateFlowControlRequestsExecuting(priorityLevel string, executing int) {
|
||||||
|
apiserverCurrentExecutingRequests.WithLabelValues(priorityLevel).Set(float64(executing))
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateSharedConcurrencyLimit updates the value for the concurrency limit in flow control
|
||||||
|
func UpdateSharedConcurrencyLimit(priorityLevel string, limit int) {
|
||||||
|
apiserverRequestConcurrencyLimit.WithLabelValues(priorityLevel).Set(float64(limit))
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddReject increments the # of rejected requests for flow control
|
||||||
|
func AddReject(priorityLevel string, reason string) {
|
||||||
|
apiserverRejectedRequests.WithLabelValues(priorityLevel, reason).Add(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ObserveQueueLength observes the queue length for flow control
|
||||||
|
func ObserveQueueLength(priorityLevel string, length int) {
|
||||||
|
apiserverRequestQueueLength.WithLabelValues(priorityLevel).Observe(float64(length))
|
||||||
|
}
|
||||||
|
|
||||||
|
// ObserveWaitingDuration observes the queue length for flow control
|
||||||
|
func ObserveWaitingDuration(priorityLevel, flowSchema, execute string, waitTime time.Duration) {
|
||||||
|
apiserverRequestWaitingSeconds.WithLabelValues(priorityLevel, flowSchema, execute).Observe(waitTime.Seconds())
|
||||||
|
}
|
||||||
|
|
||||||
|
// ObserveExecutionDuration observes the execution duration for flow control
|
||||||
|
func ObserveExecutionDuration(priorityLevel, flowSchema string, executionTime time.Duration) {
|
||||||
|
apiserverRequestExecutionSeconds.WithLabelValues(priorityLevel, flowSchema).Observe(executionTime.Seconds())
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user