kubernetes/test/integration/apiserver/flowcontrol/fight_test.go
2022-09-21 18:54:20 -04:00

197 lines
7.0 KiB
Go

/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flowcontrol
import (
"context"
"fmt"
"math"
"math/rand"
"sync"
"testing"
"time"
flowcontrol "k8s.io/api/flowcontrol/v1beta3"
genericfeatures "k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilfc "k8s.io/apiserver/pkg/util/flowcontrol"
fqtesting "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/utils/clock"
testclocks "k8s.io/utils/clock/testing"
)
/*
fightTest configures a test of how API Priority and Fairness config
controllers fight when they disagree on how to set FlowSchemaStatus.
In particular, they set the condition that indicates integrity of
the reference to the PriorityLevelConfiguration. The scenario tested is
two teams of controllers, where the controllers in one team set the
condition normally and the controllers in the other team set the condition
to the opposite value.
This is a behavioral test: it instantiates these controllers and runs them
almost normally. The test aims to run the controllers for a little under
2 minutes. The test takes clock readings to get upper and lower bounds on
how long each controller ran, and calculates consequent bounds on the number
of writes that should happen to each FlowSchemaStatus. The test creates
an informer to observe the writes. The calculated lower bound on the
number of writes is very lax, assuming only that one write can be done
every 10 seconds.
*/
type fightTest struct {
t *testing.T
ctx context.Context
loopbackConfig *rest.Config
teamSize int
stopCh chan struct{}
now time.Time
clk *testclocks.FakeClock
ctlrs map[bool][]utilfc.Interface
countsMutex sync.Mutex
// writeCounts maps FlowSchema.Name to number of writes
writeCounts map[string]int
}
func newFightTest(t *testing.T, loopbackConfig *rest.Config, teamSize int) *fightTest {
now := time.Now()
ft := &fightTest{
t: t,
ctx: context.Background(),
loopbackConfig: loopbackConfig,
teamSize: teamSize,
stopCh: make(chan struct{}),
now: now,
clk: testclocks.NewFakeClock(now),
ctlrs: map[bool][]utilfc.Interface{
false: make([]utilfc.Interface, teamSize),
true: make([]utilfc.Interface, teamSize)},
writeCounts: map[string]int{},
}
return ft
}
func (ft *fightTest) createMainInformer() {
myConfig := rest.CopyConfig(ft.loopbackConfig)
myConfig = rest.AddUserAgent(myConfig, "audience")
myClientset := clientset.NewForConfigOrDie(myConfig)
informerFactory := informers.NewSharedInformerFactory(myClientset, 0)
inf := informerFactory.Flowcontrol().V1beta3().FlowSchemas().Informer()
inf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
fs := obj.(*flowcontrol.FlowSchema)
ft.countWrite(fs)
},
UpdateFunc: func(oldObj, newObj interface{}) {
fs := newObj.(*flowcontrol.FlowSchema)
ft.countWrite(fs)
},
})
go inf.Run(ft.stopCh)
if !cache.WaitForCacheSync(ft.stopCh, inf.HasSynced) {
ft.t.Errorf("Failed to sync main informer cache")
}
}
func (ft *fightTest) countWrite(fs *flowcontrol.FlowSchema) {
ft.countsMutex.Lock()
defer ft.countsMutex.Unlock()
ft.writeCounts[fs.Name]++
}
func (ft *fightTest) createController(invert bool, i int) {
fieldMgr := fmt.Sprintf("testController%d%v", i, invert)
myConfig := rest.CopyConfig(ft.loopbackConfig)
myConfig = rest.AddUserAgent(myConfig, fieldMgr)
myClientset := clientset.NewForConfigOrDie(myConfig)
fcIfc := myClientset.FlowcontrolV1beta3()
informerFactory := informers.NewSharedInformerFactory(myClientset, 0)
foundToDangling := func(found bool) bool { return !found }
if invert {
foundToDangling = func(found bool) bool { return found }
}
ctlr := utilfc.NewTestable(utilfc.TestableConfig{
Name: fieldMgr,
FoundToDangling: foundToDangling,
Clock: clock.RealClock{},
AsFieldManager: fieldMgr,
InformerFactory: informerFactory,
FlowcontrolClient: fcIfc,
ServerConcurrencyLimit: 200, // server concurrency limit
RequestWaitLimit: time.Minute / 4, // request wait limit
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
QueueSetFactory: fqtesting.NewNoRestraintFactory(),
})
ft.ctlrs[invert][i] = ctlr
informerFactory.Start(ft.stopCh)
go ctlr.Run(ft.stopCh)
}
func (ft *fightTest) evaluate(tBeforeCreate, tAfterCreate time.Time) {
tBeforeLock := time.Now()
ft.countsMutex.Lock()
defer ft.countsMutex.Unlock()
tAfterLock := time.Now()
minFightSecs := tBeforeLock.Sub(tAfterCreate).Seconds()
maxFightSecs := tAfterLock.Sub(tBeforeCreate).Seconds()
minTotalWrites := int(minFightSecs / 10)
maxWritesPerWriter := 6 * int(math.Ceil(maxFightSecs/60))
maxTotalWrites := (1 + ft.teamSize*2) * maxWritesPerWriter
for flowSchemaName, writeCount := range ft.writeCounts {
if writeCount < minTotalWrites {
ft.t.Errorf("There were a total of %d writes to FlowSchema %s but there should have been at least %d from %s to %s", writeCount, flowSchemaName, minTotalWrites, tAfterCreate, tBeforeLock)
} else if writeCount > maxTotalWrites {
ft.t.Errorf("There were a total of %d writes to FlowSchema %s but there should have been no more than %d from %s to %s", writeCount, flowSchemaName, maxTotalWrites, tBeforeCreate, tAfterLock)
} else {
ft.t.Logf("There were a total of %d writes to FlowSchema %s over %v, %v seconds", writeCount, flowSchemaName, minFightSecs, maxFightSecs)
}
}
}
func TestConfigConsumerFight(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.APIPriorityAndFairness, true)()
kubeConfig, closeFn := setup(t, 100, 100)
defer closeFn()
const teamSize = 3
ft := newFightTest(t, kubeConfig, teamSize)
tBeforeCreate := time.Now()
ft.createMainInformer()
ft.foreach(ft.createController)
tAfterCreate := time.Now()
time.Sleep(110 * time.Second)
ft.evaluate(tBeforeCreate, tAfterCreate)
close(ft.stopCh)
}
func (ft *fightTest) foreach(visit func(invert bool, i int)) {
for i := 0; i < ft.teamSize; i++ {
// The order of the following enumeration is not deterministic,
// and that is good.
invert := rand.Intn(2) == 0
visit(invert, i)
visit(!invert, i)
}
}