Merge pull request #99935 from MikeSpreitzer/ratelimit-realtest

Enable, rate limit, and test APF controller fights
This commit is contained in:
Kubernetes Prow Robot 2021-03-09 09:19:36 -08:00 committed by GitHub
commit 8a599510d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 386 additions and 53 deletions

View File

@ -23,6 +23,7 @@ import (
"encoding/json"
"fmt"
"math"
"math/rand"
"sort"
"sync"
"time"
@ -34,8 +35,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
apitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
"k8s.io/apiserver/pkg/authentication/user"
@ -53,6 +56,8 @@ import (
flowcontrollister "k8s.io/client-go/listers/flowcontrol/v1beta1"
)
const timeFmt = "2006-01-02T15:04:05.999"
// This file contains a simple local (to the apiserver) controller
// that digests API Priority and Fairness config objects (FlowSchema
// and PriorityLevelConfiguration) into the data structure that the
@ -85,9 +90,19 @@ type RequestDigest struct {
// this type and cfgMeal follow the convention that the suffix
// "Locked" means that the caller must hold the configController lock.
type configController struct {
name string // varies in tests of fighting controllers
clock clock.PassiveClock
queueSetFactory fq.QueueSetFactory
obsPairGenerator metrics.TimedObserverPairGenerator
// How this controller appears in an ObjectMeta ManagedFieldsEntry.Manager
asFieldManager string
// Given a boolean indicating whether a FlowSchema's referenced
// PriorityLevelConfig exists, return a boolean indicating whether
// the reference is dangling
foundToDangling func(bool) bool
// configQueue holds `(interface{})(0)` when the configuration
// objects need to be reprocessed.
configQueue workqueue.RateLimitingInterface
@ -122,6 +137,18 @@ type configController struct {
// name to the state for that level. Every name referenced from a
// member of `flowSchemas` has an entry here.
priorityLevelStates map[string]*priorityLevelState
// the most recent update attempts, ordered by increasing age.
// Consumer trims to keep only the last minute's worth of entries.
// The controller uses this to limit itself to at most six updates
// to a given FlowSchema in any minute.
// This may only be accessed from the one and only worker goroutine.
mostRecentUpdates []updateAttempt
}
type updateAttempt struct {
timeUpdated time.Time
updatedItems sets.String // FlowSchema names
}
// priorityLevelState holds the state specific to a priority level.
@ -154,14 +181,18 @@ type priorityLevelState struct {
// NewTestableController is extra flexible to facilitate testing
func newTestableController(config TestableConfig) *configController {
cfgCtlr := &configController{
name: config.Name,
clock: config.Clock,
queueSetFactory: config.QueueSetFactory,
obsPairGenerator: config.ObsPairGenerator,
asFieldManager: config.AsFieldManager,
foundToDangling: config.FoundToDangling,
serverConcurrencyLimit: config.ServerConcurrencyLimit,
requestWaitLimit: config.RequestWaitLimit,
flowcontrolClient: config.FlowcontrolClient,
priorityLevelStates: make(map[string]*priorityLevelState),
}
klog.V(2).Infof("NewTestableController with serverConcurrencyLimit=%d, requestWaitLimit=%s", cfgCtlr.serverConcurrencyLimit, cfgCtlr.requestWaitLimit)
klog.V(2).Infof("NewTestableController %q with serverConcurrencyLimit=%d, requestWaitLimit=%s, name=%s, asFieldManager=%q", cfgCtlr.name, cfgCtlr.serverConcurrencyLimit, cfgCtlr.requestWaitLimit, cfgCtlr.name, cfgCtlr.asFieldManager)
// Start with longish delay because conflicts will be between
// different processes, so take some time to go away.
cfgCtlr.configQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 8*time.Hour), "priority_and_fairness_config_queue")
@ -177,40 +208,60 @@ func newTestableController(config TestableConfig) *configController {
pli.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pl := obj.(*flowcontrol.PriorityLevelConfiguration)
klog.V(7).Infof("Triggered API priority and fairness config reloading due to creation of PLC %s", pl.Name)
klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to creation of PLC %s", cfgCtlr.name, pl.Name)
cfgCtlr.configQueue.Add(0)
},
UpdateFunc: func(oldObj, newObj interface{}) {
newPL := newObj.(*flowcontrol.PriorityLevelConfiguration)
oldPL := oldObj.(*flowcontrol.PriorityLevelConfiguration)
if !apiequality.Semantic.DeepEqual(oldPL.Spec, newPL.Spec) {
klog.V(7).Infof("Triggered API priority and fairness config reloading due to spec update of PLC %s", newPL.Name)
klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to spec update of PLC %s", cfgCtlr.name, newPL.Name)
cfgCtlr.configQueue.Add(0)
} else {
klog.V(7).Infof("No trigger API priority and fairness config reloading in %s due to spec non-change of PLC %s", cfgCtlr.name, newPL.Name)
}
},
DeleteFunc: func(obj interface{}) {
name, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
klog.V(7).Infof("Triggered API priority and fairness config reloading due to deletion of PLC %s", name)
klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to deletion of PLC %s", cfgCtlr.name, name)
cfgCtlr.configQueue.Add(0)
}})
fsi.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
fs := obj.(*flowcontrol.FlowSchema)
klog.V(7).Infof("Triggered API priority and fairness config reloading due to creation of FS %s", fs.Name)
klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to creation of FS %s", cfgCtlr.name, fs.Name)
cfgCtlr.configQueue.Add(0)
},
UpdateFunc: func(oldObj, newObj interface{}) {
newFS := newObj.(*flowcontrol.FlowSchema)
oldFS := oldObj.(*flowcontrol.FlowSchema)
if !apiequality.Semantic.DeepEqual(oldFS.Spec, newFS.Spec) {
klog.V(7).Infof("Triggered API priority and fairness config reloading due to spec update of FS %s", newFS.Name)
// Changes to either Spec or Status are relevant. The
// concern is that we might, in some future release, want
// different behavior than is implemented now. One of the
// hardest questions is how does an operator roll out the
// new release in a cluster with multiple kube-apiservers
// --- in a way that works no matter what servers crash
// and restart when. If this handler reacts only to
// changes in Spec then we have a scenario in which the
// rollout leaves the old Status in place. The scenario
// ends with this subsequence: deploy the last new server
// before deleting the last old server, and in between
// those two operations the last old server crashes and
// recovers. The chosen solution is making this controller
// insist on maintaining the particular state that it
// establishes.
if !(apiequality.Semantic.DeepEqual(oldFS.Spec, newFS.Spec) &&
apiequality.Semantic.DeepEqual(oldFS.Status, newFS.Status)) {
klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to spec and/or status update of FS %s", cfgCtlr.name, newFS.Name)
cfgCtlr.configQueue.Add(0)
} else {
klog.V(7).Infof("No trigger of API priority and fairness config reloading in %s due to spec and status non-change of FS %s", cfgCtlr.name, newFS.Name)
}
},
DeleteFunc: func(obj interface{}) {
name, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
klog.V(7).Infof("Triggered API priority and fairness config reloading due to deletion of FS %s", name)
klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to deletion of FS %s", cfgCtlr.name, name)
cfgCtlr.configQueue.Add(0)
}})
@ -253,11 +304,16 @@ func (cfgCtlr *configController) Run(stopCh <-chan struct{}) error {
return nil
}
// runWorker is the logic of the one and only worker goroutine. We
// limit the number to one in order to obviate explicit
// synchronization around access to `cfgCtlr.mostRecentUpdates`.
func (cfgCtlr *configController) runWorker() {
for cfgCtlr.processNextWorkItem() {
}
}
// processNextWorkItem works on one entry from the work queue.
// Only invoke this in the one and only worker goroutine.
func (cfgCtlr *configController) processNextWorkItem() bool {
obj, shutdown := cfgCtlr.configQueue.Get()
if shutdown {
@ -266,9 +322,14 @@ func (cfgCtlr *configController) processNextWorkItem() bool {
func(obj interface{}) {
defer cfgCtlr.configQueue.Done(obj)
if !cfgCtlr.syncOne() {
specificDelay, err := cfgCtlr.syncOne(map[string]string{})
switch {
case err != nil:
klog.Error(err)
cfgCtlr.configQueue.AddRateLimited(obj)
} else {
case specificDelay > 0:
cfgCtlr.configQueue.AddAfter(obj, specificDelay)
default:
cfgCtlr.configQueue.Forget(obj)
}
}(obj)
@ -276,27 +337,22 @@ func (cfgCtlr *configController) processNextWorkItem() bool {
return true
}
// syncOne attempts to sync all the API Priority and Fairness config
// objects. It either succeeds and returns `true` or logs an error
// and returns `false`.
func (cfgCtlr *configController) syncOne() bool {
// syncOne does one full synchronization. It reads all the API
// objects that configure API Priority and Fairness and updates the
// local configController accordingly.
// Only invoke this in the one and only worker goroutine
func (cfgCtlr *configController) syncOne(flowSchemaRVs map[string]string) (specificDelay time.Duration, err error) {
klog.V(5).Infof("%s syncOne at %s", cfgCtlr.name, cfgCtlr.clock.Now().Format(timeFmt))
all := labels.Everything()
newPLs, err := cfgCtlr.plLister.List(all)
if err != nil {
klog.Errorf("Unable to list PriorityLevelConfiguration objects: %s", err.Error())
return false
return 0, fmt.Errorf("unable to list PriorityLevelConfiguration objects: %w", err)
}
newFSs, err := cfgCtlr.fsLister.List(all)
if err != nil {
klog.Errorf("Unable to list FlowSchema objects: %s", err.Error())
return false
return 0, fmt.Errorf("unable to list FlowSchema objects: %w", err)
}
err = cfgCtlr.digestConfigObjects(newPLs, newFSs)
if err == nil {
return true
}
klog.Error(err)
return false
return cfgCtlr.digestConfigObjects(newPLs, newFSs, flowSchemaRVs)
}
// cfgMeal is the data involved in the process of digesting the API
@ -336,35 +392,79 @@ type fsStatusUpdate struct {
// digestConfigObjects is given all the API objects that configure
// cfgCtlr and writes its consequent new configState.
func (cfgCtlr *configController) digestConfigObjects(newPLs []*flowcontrol.PriorityLevelConfiguration, newFSs []*flowcontrol.FlowSchema) error {
// Only invoke this in the one and only worker goroutine
func (cfgCtlr *configController) digestConfigObjects(newPLs []*flowcontrol.PriorityLevelConfiguration, newFSs []*flowcontrol.FlowSchema, flowSchemaRVs map[string]string) (time.Duration, error) {
fsStatusUpdates := cfgCtlr.lockAndDigestConfigObjects(newPLs, newFSs)
var errs []error
currResult := updateAttempt{
timeUpdated: cfgCtlr.clock.Now(),
updatedItems: sets.String{},
}
var suggestedDelay time.Duration
for _, fsu := range fsStatusUpdates {
// if we should skip this name, indicate we will need a delay, but continue with other entries
if cfgCtlr.shouldDelayUpdate(fsu.flowSchema.Name) {
if suggestedDelay == 0 {
suggestedDelay = time.Duration(30+rand.Intn(45)) * time.Second
}
continue
}
// if we are going to issue an update, be sure we track every name we update so we know if we update it too often.
currResult.updatedItems.Insert(fsu.flowSchema.Name)
enc, err := json.Marshal(fsu.condition)
if err != nil {
// should never happen because these conditions are created here and well formed
panic(fmt.Sprintf("Failed to json.Marshall(%#+v): %s", fsu.condition, err.Error()))
}
klog.V(4).Infof("Writing Condition %s to FlowSchema %s because its previous value was %s", string(enc), fsu.flowSchema.Name, fcfmt.Fmt(fsu.oldValue))
klog.V(4).Infof("%s writing Condition %s to FlowSchema %s, which had ResourceVersion=%s, because its previous value was %s", cfgCtlr.name, string(enc), fsu.flowSchema.Name, fsu.flowSchema.ResourceVersion, fcfmt.Fmt(fsu.oldValue))
fsIfc := cfgCtlr.flowcontrolClient.FlowSchemas()
patchBytes := []byte(fmt.Sprintf(`{"status": {"conditions": [ %s ] } }`, string(enc)))
patchOptions := metav1.PatchOptions{FieldManager: ConfigConsumerAsFieldManager}
_, err = fsIfc.Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, patchBytes, patchOptions, "status")
patchOptions := metav1.PatchOptions{FieldManager: cfgCtlr.asFieldManager}
patchedFlowSchema, err := fsIfc.Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, patchBytes, patchOptions, "status")
if err == nil {
continue
}
if apierrors.IsNotFound(err) {
key, _ := cache.MetaNamespaceKeyFunc(patchedFlowSchema)
flowSchemaRVs[key] = patchedFlowSchema.ResourceVersion
} else if apierrors.IsNotFound(err) {
// This object has been deleted. A notification is coming
// and nothing more needs to be done here.
klog.V(5).Infof("Attempted update of concurrently deleted FlowSchema %s; nothing more needs to be done", fsu.flowSchema.Name)
klog.V(5).Infof("%s at %s: attempted update of concurrently deleted FlowSchema %s; nothing more needs to be done", cfgCtlr.name, cfgCtlr.clock.Now().Format(timeFmt), fsu.flowSchema.Name)
} else {
errs = append(errs, errors.Wrap(err, fmt.Sprintf("failed to set a status.condition for FlowSchema %s", fsu.flowSchema.Name)))
}
}
if len(errs) == 0 {
return nil
cfgCtlr.addUpdateResult(currResult)
return suggestedDelay, utilerrors.NewAggregate(errs)
}
// shouldDelayUpdate checks to see if a flowschema has been updated too often and returns true if a delay is needed.
// Only invoke this in the one and only worker goroutine
func (cfgCtlr *configController) shouldDelayUpdate(flowSchemaName string) bool {
numUpdatesInPastMinute := 0
oneMinuteAgo := cfgCtlr.clock.Now().Add(-1 * time.Minute)
for idx, update := range cfgCtlr.mostRecentUpdates {
if oneMinuteAgo.After(update.timeUpdated) {
// this and the remaining items are no longer relevant
cfgCtlr.mostRecentUpdates = cfgCtlr.mostRecentUpdates[:idx]
return false
}
if update.updatedItems.Has(flowSchemaName) {
numUpdatesInPastMinute++
if numUpdatesInPastMinute > 5 {
return true
}
}
}
return utilerrors.NewAggregate(errs)
return false
}
// addUpdateResult adds the result. It isn't a ring buffer because
// this is small and rate limited.
// Only invoke this in the one and only worker goroutine
func (cfgCtlr *configController) addUpdateResult(result updateAttempt) {
cfgCtlr.mostRecentUpdates = append([]updateAttempt{result}, cfgCtlr.mostRecentUpdates...)
}
func (cfgCtlr *configController) lockAndDigestConfigObjects(newPLs []*flowcontrol.PriorityLevelConfiguration, newFSs []*flowcontrol.FlowSchema) []fsStatusUpdate {
@ -448,7 +548,7 @@ func (meal *cfgMeal) digestFlowSchemasLocked(newFSs []*flowcontrol.FlowSchema) {
//
// TODO: consider not even trying if server is not handling
// requests yet.
meal.presyncFlowSchemaStatus(fs, !goodPriorityRef, fs.Spec.PriorityLevelConfiguration.Name)
meal.presyncFlowSchemaStatus(fs, meal.cfgCtlr.foundToDangling(goodPriorityRef), fs.Spec.PriorityLevelConfiguration.Name)
if !goodPriorityRef {
klog.V(6).Infof("Ignoring FlowSchema %s because of bad priority level reference %q", fs.Name, fs.Spec.PriorityLevelConfiguration.Name)
@ -612,12 +712,13 @@ func (meal *cfgMeal) presyncFlowSchemaStatus(fs *flowcontrol.FlowSchema, isDangl
if danglingCondition.Status == desiredStatus && danglingCondition.Reason == desiredReason && danglingCondition.Message == desiredMessage {
return
}
now := meal.cfgCtlr.clock.Now()
meal.fsStatusUpdates = append(meal.fsStatusUpdates, fsStatusUpdate{
flowSchema: fs,
condition: flowcontrol.FlowSchemaCondition{
Type: flowcontrol.FlowSchemaConditionDangling,
Status: desiredStatus,
LastTransitionTime: metav1.Now(),
LastTransitionTime: metav1.NewTime(now),
Reason: desiredReason,
Message: desiredMessage,
},
@ -710,7 +811,9 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig
return selectedFlowSchema, plState.pl, false, req, startWaitingTime
}
// Call this after getting a clue that the given priority level is undesired and idle
// maybeReap will remove the last internal traces of the named
// priority level if it has no more use. Call this after getting a
// clue that the given priority level is undesired and idle.
func (cfgCtlr *configController) maybeReap(plName string) {
cfgCtlr.lock.Lock()
defer cfgCtlr.lock.Unlock()
@ -730,8 +833,11 @@ func (cfgCtlr *configController) maybeReap(plName string) {
cfgCtlr.configQueue.Add(0)
}
// Call this if both (1) plState.queues is non-nil and reported being
// idle, and (2) cfgCtlr's lock has not been released since then.
// maybeReapLocked requires the cfgCtlr's lock to already be held and
// will remove the last internal traces of the named priority level if
// it has no more use. Call this if both (1) plState.queues is
// non-nil and reported being idle, and (2) cfgCtlr's lock has not
// been released since then.
func (cfgCtlr *configController) maybeReapLocked(plName string, plState *priorityLevelState) {
if !(plState.quiescing && plState.numPending == 0) {
return

View File

@ -78,18 +78,43 @@ func New(
requestWaitLimit time.Duration,
) Interface {
grc := counter.NoOp{}
clk := clock.RealClock{}
return NewTestable(TestableConfig{
Name: "Controller",
Clock: clk,
AsFieldManager: ConfigConsumerAsFieldManager,
FoundToDangling: func(found bool) bool { return !found },
InformerFactory: informerFactory,
FlowcontrolClient: flowcontrolClient,
ServerConcurrencyLimit: serverConcurrencyLimit,
RequestWaitLimit: requestWaitLimit,
ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator,
QueueSetFactory: fqs.NewQueueSetFactory(&clock.RealClock{}, grc),
QueueSetFactory: fqs.NewQueueSetFactory(clk, grc),
})
}
// TestableConfig carries the parameters to an implementation that is testable
type TestableConfig struct {
// Name of the controller
Name string
// Clock to use in timing deliberate delays
Clock clock.PassiveClock
// AsFieldManager is the string to use in the metadata for
// server-side apply. Normally this is
// `ConfigConsumerAsFieldManager`. This is exposed as a parameter
// so that a test of competing controllers can supply different
// values.
AsFieldManager string
// FoundToDangling maps the boolean indicating whether a
// FlowSchema's referenced PLC exists to the boolean indicating
// that FlowSchema's status should indicate a dangling reference.
// This is a parameter so that we can write tests of what happens
// when servers disagree on that bit of Status.
FoundToDangling func(bool) bool
// InformerFactory to use in building the controller
InformerFactory kubeinformers.SharedInformerFactory

View File

@ -28,6 +28,7 @@ import (
flowcontrol "k8s.io/api/flowcontrol/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
@ -251,10 +252,14 @@ func TestConfigConsumer(t *testing.T) {
queues: map[string]*ctlrTestQueueSet{},
}
ctlr := newTestableController(TestableConfig{
Name: "Controller",
Clock: clock.RealClock{},
AsFieldManager: ConfigConsumerAsFieldManager,
FoundToDangling: func(found bool) bool { return !found },
InformerFactory: informerFactory,
FlowcontrolClient: flowcontrolClient,
ServerConcurrencyLimit: 100,
RequestWaitLimit: time.Minute,
ServerConcurrencyLimit: 100, // server concurrency limit
RequestWaitLimit: time.Minute, // request wait limit
ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator,
QueueSetFactory: cts,
})
@ -378,12 +383,16 @@ func TestAPFControllerWithGracefulShutdown(t *testing.T) {
queues: map[string]*ctlrTestQueueSet{},
}
controller := newTestableController(TestableConfig{
informerFactory,
flowcontrolClient,
100,
time.Minute,
metrics.PriorityLevelConcurrencyObserverPairGenerator,
cts,
Name: "Controller",
Clock: clock.RealClock{},
AsFieldManager: ConfigConsumerAsFieldManager,
FoundToDangling: func(found bool) bool { return !found },
InformerFactory: informerFactory,
FlowcontrolClient: flowcontrolClient,
ServerConcurrencyLimit: 100,
RequestWaitLimit: time.Minute,
ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator,
QueueSetFactory: cts,
})
stopCh, controllerCompletedCh := make(chan struct{}), make(chan struct{})

View File

@ -49,7 +49,7 @@ const (
timeout = time.Second * 10
)
func setup(t testing.TB) (*httptest.Server, *rest.Config, framework.CloseFunc) {
func setup(t testing.TB, maxReadonlyRequestsInFlight, MaxMutatingRequestsInFlight int) (*httptest.Server, *rest.Config, framework.CloseFunc) {
opts := framework.MasterConfigOptions{EtcdOptions: framework.DefaultEtcdOptions()}
opts.EtcdOptions.DefaultStorageMediaType = "application/vnd.kubernetes.protobuf"
masterConfig := framework.NewIntegrationTestMasterConfigWithOptions(&opts)
@ -58,8 +58,8 @@ func setup(t testing.TB) (*httptest.Server, *rest.Config, framework.CloseFunc) {
Group: "flowcontrol.apiserver.k8s.io",
Version: "v1alpha1",
})
masterConfig.GenericConfig.MaxRequestsInFlight = 1
masterConfig.GenericConfig.MaxMutatingRequestsInFlight = 1
masterConfig.GenericConfig.MaxRequestsInFlight = maxReadonlyRequestsInFlight
masterConfig.GenericConfig.MaxMutatingRequestsInFlight = MaxMutatingRequestsInFlight
masterConfig.GenericConfig.OpenAPIConfig = framework.DefaultOpenAPIConfig()
masterConfig.ExtraConfig.APIResourceConfigSource = resourceConfig
_, s, closeFn := framework.RunAMaster(masterConfig)
@ -70,7 +70,7 @@ func setup(t testing.TB) (*httptest.Server, *rest.Config, framework.CloseFunc) {
func TestPriorityLevelIsolation(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.APIPriorityAndFairness, true)()
// NOTE: disabling the feature should fail the test
_, loopbackConfig, closeFn := setup(t)
_, loopbackConfig, closeFn := setup(t, 1, 1)
defer closeFn()
loopbackClient := clientset.NewForConfigOrDie(loopbackConfig)

View File

@ -0,0 +1,192 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flowcontrol
import (
"context"
"fmt"
"math"
"math/rand"
"sync"
"testing"
"time"
flowcontrol "k8s.io/api/flowcontrol/v1beta1"
"k8s.io/apimachinery/pkg/util/clock"
genericfeatures "k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilfc "k8s.io/apiserver/pkg/util/flowcontrol"
fqtesting "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
featuregatetesting "k8s.io/component-base/featuregate/testing"
)
/* fightTest configures a test of how API Priority and Fairness config
controllers fight when they disagree on how to set FlowSchemaStatus.
In particular, they set the condition that indicates integrity of
the reference to the PriorityLevelConfiguration. The scenario tested is
two teams of controllers, where the controllers in one team set the
condition normally and the controllers in the other team set the condition
to the opposite value.
This is a behavioral test: it instantiates these controllers and runs them
almost normally. The test aims to run the controllers for a little under
2 minutes. The test takes clock readings to get upper and lower bounds on
how long each controller ran, and calculates consequent bounds on the number
of writes that should happen to each FlowSchemaStatus. The test creates
an informer to observe the writes. The calculated lower bound on the
number of writes is very lax, assuming only that one write can be done
every 10 seconds.
*/
type fightTest struct {
t *testing.T
ctx context.Context
loopbackConfig *rest.Config
teamSize int
stopCh chan struct{}
now time.Time
clk *clock.FakeClock
ctlrs map[bool][]utilfc.Interface
countsMutex sync.Mutex
// writeCounts maps FlowSchema.Name to number of writes
writeCounts map[string]int
}
func newFightTest(t *testing.T, loopbackConfig *rest.Config, teamSize int) *fightTest {
now := time.Now()
ft := &fightTest{
t: t,
ctx: context.Background(),
loopbackConfig: loopbackConfig,
teamSize: teamSize,
stopCh: make(chan struct{}),
now: now,
clk: clock.NewFakeClock(now),
ctlrs: map[bool][]utilfc.Interface{
false: make([]utilfc.Interface, teamSize),
true: make([]utilfc.Interface, teamSize)},
writeCounts: map[string]int{},
}
return ft
}
func (ft *fightTest) createMainInformer() {
myConfig := rest.CopyConfig(ft.loopbackConfig)
myConfig = rest.AddUserAgent(myConfig, "audience")
myClientset := clientset.NewForConfigOrDie(myConfig)
informerFactory := informers.NewSharedInformerFactory(myClientset, 0)
inf := informerFactory.Flowcontrol().V1beta1().FlowSchemas().Informer()
inf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
fs := obj.(*flowcontrol.FlowSchema)
ft.countWrite(fs)
},
UpdateFunc: func(oldObj, newObj interface{}) {
fs := newObj.(*flowcontrol.FlowSchema)
ft.countWrite(fs)
},
})
go inf.Run(ft.stopCh)
if !cache.WaitForCacheSync(ft.stopCh, inf.HasSynced) {
ft.t.Errorf("Failed to sync main informer cache")
}
}
func (ft *fightTest) countWrite(fs *flowcontrol.FlowSchema) {
ft.countsMutex.Lock()
defer ft.countsMutex.Unlock()
ft.writeCounts[fs.Name]++
}
func (ft *fightTest) createController(invert bool, i int) {
fieldMgr := fmt.Sprintf("testController%d%v", i, invert)
myConfig := rest.CopyConfig(ft.loopbackConfig)
myConfig = rest.AddUserAgent(myConfig, fieldMgr)
myClientset := clientset.NewForConfigOrDie(myConfig)
fcIfc := myClientset.FlowcontrolV1beta1()
informerFactory := informers.NewSharedInformerFactory(myClientset, 0)
foundToDangling := func(found bool) bool { return !found }
if invert {
foundToDangling = func(found bool) bool { return found }
}
ctlr := utilfc.NewTestable(utilfc.TestableConfig{
Name: fieldMgr,
FoundToDangling: foundToDangling,
Clock: clock.RealClock{},
AsFieldManager: fieldMgr,
InformerFactory: informerFactory,
FlowcontrolClient: fcIfc,
ServerConcurrencyLimit: 200, // server concurrency limit
RequestWaitLimit: time.Minute / 4, // request wait limit
ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator,
QueueSetFactory: fqtesting.NewNoRestraintFactory(),
})
ft.ctlrs[invert][i] = ctlr
informerFactory.Start(ft.stopCh)
go ctlr.Run(ft.stopCh)
}
func (ft *fightTest) evaluate(tBeforeCreate, tAfterCreate time.Time) {
tBeforeLock := time.Now()
ft.countsMutex.Lock()
defer ft.countsMutex.Unlock()
tAfterLock := time.Now()
minFightSecs := tBeforeLock.Sub(tAfterCreate).Seconds()
maxFightSecs := tAfterLock.Sub(tBeforeCreate).Seconds()
minTotalWrites := int(minFightSecs / 10)
maxWritesPerWriter := 6 * int(math.Ceil(maxFightSecs/60))
maxTotalWrites := (1 + ft.teamSize*2) * maxWritesPerWriter
for flowSchemaName, writeCount := range ft.writeCounts {
if writeCount < minTotalWrites {
ft.t.Errorf("There were a total of %d writes to FlowSchema %s but there should have been at least %d from %s to %s", writeCount, flowSchemaName, minTotalWrites, tAfterCreate, tBeforeLock)
} else if writeCount > maxTotalWrites {
ft.t.Errorf("There were a total of %d writes to FlowSchema %s but there should have been no more than %d from %s to %s", writeCount, flowSchemaName, maxTotalWrites, tBeforeCreate, tAfterLock)
} else {
ft.t.Logf("There were a total of %d writes to FlowSchema %s over %v, %v seconds", writeCount, flowSchemaName, minFightSecs, maxFightSecs)
}
}
}
func TestConfigConsumerFight(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.APIPriorityAndFairness, true)()
_, loopbackConfig, closeFn := setup(t, 100, 100)
defer closeFn()
const teamSize = 3
ft := newFightTest(t, loopbackConfig, teamSize)
tBeforeCreate := time.Now()
ft.createMainInformer()
ft.foreach(ft.createController)
tAfterCreate := time.Now()
time.Sleep(110 * time.Second)
ft.evaluate(tBeforeCreate, tAfterCreate)
close(ft.stopCh)
}
func (ft *fightTest) foreach(visit func(invert bool, i int)) {
for i := 0; i < ft.teamSize; i++ {
// The order of the following enumeration is not deterministic,
// and that is good.
invert := rand.Intn(2) == 0
visit(invert, i)
visit(!invert, i)
}
}

1
vendor/modules.txt vendored
View File

@ -1873,6 +1873,7 @@ k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing
k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise
k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise
k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset
k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing
k8s.io/apiserver/pkg/util/flowcontrol/format
k8s.io/apiserver/pkg/util/flowcontrol/metrics
k8s.io/apiserver/pkg/util/flushwriter