mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-12 12:48:51 +00:00
Enable, rate limit, and test APF controller fights
Using real time.
This commit is contained in:
committed by
Mike Spreitzer
parent
a91fdfbeb3
commit
80ff06fe84
@@ -49,7 +49,7 @@ const (
|
||||
timeout = time.Second * 10
|
||||
)
|
||||
|
||||
func setup(t testing.TB) (*httptest.Server, *rest.Config, framework.CloseFunc) {
|
||||
func setup(t testing.TB, maxReadonlyRequestsInFlight, MaxMutatingRequestsInFlight int) (*httptest.Server, *rest.Config, framework.CloseFunc) {
|
||||
opts := framework.MasterConfigOptions{EtcdOptions: framework.DefaultEtcdOptions()}
|
||||
opts.EtcdOptions.DefaultStorageMediaType = "application/vnd.kubernetes.protobuf"
|
||||
masterConfig := framework.NewIntegrationTestMasterConfigWithOptions(&opts)
|
||||
@@ -58,8 +58,8 @@ func setup(t testing.TB) (*httptest.Server, *rest.Config, framework.CloseFunc) {
|
||||
Group: "flowcontrol.apiserver.k8s.io",
|
||||
Version: "v1alpha1",
|
||||
})
|
||||
masterConfig.GenericConfig.MaxRequestsInFlight = 1
|
||||
masterConfig.GenericConfig.MaxMutatingRequestsInFlight = 1
|
||||
masterConfig.GenericConfig.MaxRequestsInFlight = maxReadonlyRequestsInFlight
|
||||
masterConfig.GenericConfig.MaxMutatingRequestsInFlight = MaxMutatingRequestsInFlight
|
||||
masterConfig.GenericConfig.OpenAPIConfig = framework.DefaultOpenAPIConfig()
|
||||
masterConfig.ExtraConfig.APIResourceConfigSource = resourceConfig
|
||||
_, s, closeFn := framework.RunAMaster(masterConfig)
|
||||
@@ -70,7 +70,7 @@ func setup(t testing.TB) (*httptest.Server, *rest.Config, framework.CloseFunc) {
|
||||
func TestPriorityLevelIsolation(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.APIPriorityAndFairness, true)()
|
||||
// NOTE: disabling the feature should fail the test
|
||||
_, loopbackConfig, closeFn := setup(t)
|
||||
_, loopbackConfig, closeFn := setup(t, 1, 1)
|
||||
defer closeFn()
|
||||
|
||||
loopbackClient := clientset.NewForConfigOrDie(loopbackConfig)
|
||||
|
192
test/integration/apiserver/flowcontrol/fight_test.go
Normal file
192
test/integration/apiserver/flowcontrol/fight_test.go
Normal file
@@ -0,0 +1,192 @@
|
||||
/*
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package flowcontrol
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
flowcontrol "k8s.io/api/flowcontrol/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
genericfeatures "k8s.io/apiserver/pkg/features"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
utilfc "k8s.io/apiserver/pkg/util/flowcontrol"
|
||||
fqtesting "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing"
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||
"k8s.io/client-go/informers"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
)
|
||||
|
||||
/* fightTest configures a test of how API Priority and Fairness config
|
||||
controllers fight when they disagree on how to set FlowSchemaStatus.
|
||||
In particular, they set the condition that indicates integrity of
|
||||
the reference to the PriorityLevelConfiguration. The scenario tested is
|
||||
two teams of controllers, where the controllers in one team set the
|
||||
condition normally and the controllers in the other team set the condition
|
||||
to the opposite value.
|
||||
|
||||
This is a behavioral test: it instantiates these controllers and runs them
|
||||
almost normally. The test aims to run the controllers for a little under
|
||||
2 minutes. The test takes clock readings to get upper and lower bounds on
|
||||
how long each controller ran, and calculates consequent bounds on the number
|
||||
of writes that should happen to each FlowSchemaStatus. The test creates
|
||||
an informer to observe the writes. The calculated lower bound on the
|
||||
number of writes is very lax, assuming only that one write can be done
|
||||
every 10 seconds.
|
||||
*/
|
||||
type fightTest struct {
|
||||
t *testing.T
|
||||
ctx context.Context
|
||||
loopbackConfig *rest.Config
|
||||
teamSize int
|
||||
stopCh chan struct{}
|
||||
now time.Time
|
||||
clk *clock.FakeClock
|
||||
ctlrs map[bool][]utilfc.Interface
|
||||
|
||||
countsMutex sync.Mutex
|
||||
|
||||
// writeCounts maps FlowSchema.Name to number of writes
|
||||
writeCounts map[string]int
|
||||
}
|
||||
|
||||
func newFightTest(t *testing.T, loopbackConfig *rest.Config, teamSize int) *fightTest {
|
||||
now := time.Now()
|
||||
ft := &fightTest{
|
||||
t: t,
|
||||
ctx: context.Background(),
|
||||
loopbackConfig: loopbackConfig,
|
||||
teamSize: teamSize,
|
||||
stopCh: make(chan struct{}),
|
||||
now: now,
|
||||
clk: clock.NewFakeClock(now),
|
||||
ctlrs: map[bool][]utilfc.Interface{
|
||||
false: make([]utilfc.Interface, teamSize),
|
||||
true: make([]utilfc.Interface, teamSize)},
|
||||
writeCounts: map[string]int{},
|
||||
}
|
||||
return ft
|
||||
}
|
||||
|
||||
func (ft *fightTest) createMainInformer() {
|
||||
myConfig := rest.CopyConfig(ft.loopbackConfig)
|
||||
myConfig = rest.AddUserAgent(myConfig, "audience")
|
||||
myClientset := clientset.NewForConfigOrDie(myConfig)
|
||||
informerFactory := informers.NewSharedInformerFactory(myClientset, 0)
|
||||
inf := informerFactory.Flowcontrol().V1beta1().FlowSchemas().Informer()
|
||||
inf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
fs := obj.(*flowcontrol.FlowSchema)
|
||||
ft.countWrite(fs)
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
fs := newObj.(*flowcontrol.FlowSchema)
|
||||
ft.countWrite(fs)
|
||||
},
|
||||
})
|
||||
go inf.Run(ft.stopCh)
|
||||
if !cache.WaitForCacheSync(ft.stopCh, inf.HasSynced) {
|
||||
ft.t.Errorf("Failed to sync main informer cache")
|
||||
}
|
||||
}
|
||||
|
||||
func (ft *fightTest) countWrite(fs *flowcontrol.FlowSchema) {
|
||||
ft.countsMutex.Lock()
|
||||
defer ft.countsMutex.Unlock()
|
||||
ft.writeCounts[fs.Name]++
|
||||
}
|
||||
|
||||
func (ft *fightTest) createController(invert bool, i int) {
|
||||
fieldMgr := fmt.Sprintf("testController%d%v", i, invert)
|
||||
myConfig := rest.CopyConfig(ft.loopbackConfig)
|
||||
myConfig = rest.AddUserAgent(myConfig, fieldMgr)
|
||||
myClientset := clientset.NewForConfigOrDie(myConfig)
|
||||
fcIfc := myClientset.FlowcontrolV1beta1()
|
||||
informerFactory := informers.NewSharedInformerFactory(myClientset, 0)
|
||||
foundToDangling := func(found bool) bool { return !found }
|
||||
if invert {
|
||||
foundToDangling = func(found bool) bool { return found }
|
||||
}
|
||||
ctlr := utilfc.NewTestable(utilfc.TestableConfig{
|
||||
Name: fieldMgr,
|
||||
FoundToDangling: foundToDangling,
|
||||
Clock: clock.RealClock{},
|
||||
AsFieldManager: fieldMgr,
|
||||
InformerFactory: informerFactory,
|
||||
FlowcontrolClient: fcIfc,
|
||||
ServerConcurrencyLimit: 200, // server concurrency limit
|
||||
RequestWaitLimit: time.Minute / 4, // request wait limit
|
||||
ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator,
|
||||
QueueSetFactory: fqtesting.NewNoRestraintFactory(),
|
||||
})
|
||||
ft.ctlrs[invert][i] = ctlr
|
||||
informerFactory.Start(ft.stopCh)
|
||||
go ctlr.Run(ft.stopCh)
|
||||
}
|
||||
|
||||
func (ft *fightTest) evaluate(tBeforeCreate, tAfterCreate time.Time) {
|
||||
tBeforeLock := time.Now()
|
||||
ft.countsMutex.Lock()
|
||||
defer ft.countsMutex.Unlock()
|
||||
tAfterLock := time.Now()
|
||||
minFightSecs := tBeforeLock.Sub(tAfterCreate).Seconds()
|
||||
maxFightSecs := tAfterLock.Sub(tBeforeCreate).Seconds()
|
||||
minTotalWrites := int(minFightSecs / 10)
|
||||
maxWritesPerWriter := 6 * int(math.Ceil(maxFightSecs/60))
|
||||
maxTotalWrites := (1 + ft.teamSize*2) * maxWritesPerWriter
|
||||
for flowSchemaName, writeCount := range ft.writeCounts {
|
||||
if writeCount < minTotalWrites {
|
||||
ft.t.Errorf("There were a total of %d writes to FlowSchema %s but there should have been at least %d from %s to %s", writeCount, flowSchemaName, minTotalWrites, tAfterCreate, tBeforeLock)
|
||||
} else if writeCount > maxTotalWrites {
|
||||
ft.t.Errorf("There were a total of %d writes to FlowSchema %s but there should have been no more than %d from %s to %s", writeCount, flowSchemaName, maxTotalWrites, tBeforeCreate, tAfterLock)
|
||||
} else {
|
||||
ft.t.Logf("There were a total of %d writes to FlowSchema %s over %v, %v seconds", writeCount, flowSchemaName, minFightSecs, maxFightSecs)
|
||||
}
|
||||
}
|
||||
}
|
||||
func TestConfigConsumerFight(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.APIPriorityAndFairness, true)()
|
||||
_, loopbackConfig, closeFn := setup(t, 100, 100)
|
||||
defer closeFn()
|
||||
const teamSize = 3
|
||||
ft := newFightTest(t, loopbackConfig, teamSize)
|
||||
tBeforeCreate := time.Now()
|
||||
ft.createMainInformer()
|
||||
ft.foreach(ft.createController)
|
||||
tAfterCreate := time.Now()
|
||||
time.Sleep(110 * time.Second)
|
||||
ft.evaluate(tBeforeCreate, tAfterCreate)
|
||||
close(ft.stopCh)
|
||||
}
|
||||
|
||||
func (ft *fightTest) foreach(visit func(invert bool, i int)) {
|
||||
for i := 0; i < ft.teamSize; i++ {
|
||||
// The order of the following enumeration is not deterministic,
|
||||
// and that is good.
|
||||
invert := rand.Intn(2) == 0
|
||||
visit(invert, i)
|
||||
visit(!invert, i)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user