mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
197 lines
7.0 KiB
Go
197 lines
7.0 KiB
Go
/*
|
|
Copyright 2021 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package flowcontrol
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"math/rand"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
flowcontrol "k8s.io/api/flowcontrol/v1beta3"
|
|
genericfeatures "k8s.io/apiserver/pkg/features"
|
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
|
utilfc "k8s.io/apiserver/pkg/util/flowcontrol"
|
|
fqtesting "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing"
|
|
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
|
"k8s.io/client-go/informers"
|
|
clientset "k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/client-go/tools/cache"
|
|
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
|
"k8s.io/utils/clock"
|
|
testclocks "k8s.io/utils/clock/testing"
|
|
)
|
|
|
|
/*
|
|
fightTest configures a test of how API Priority and Fairness config
|
|
|
|
controllers fight when they disagree on how to set FlowSchemaStatus.
|
|
In particular, they set the condition that indicates integrity of
|
|
the reference to the PriorityLevelConfiguration. The scenario tested is
|
|
two teams of controllers, where the controllers in one team set the
|
|
condition normally and the controllers in the other team set the condition
|
|
to the opposite value.
|
|
|
|
This is a behavioral test: it instantiates these controllers and runs them
|
|
almost normally. The test aims to run the controllers for a little under
|
|
2 minutes. The test takes clock readings to get upper and lower bounds on
|
|
how long each controller ran, and calculates consequent bounds on the number
|
|
of writes that should happen to each FlowSchemaStatus. The test creates
|
|
an informer to observe the writes. The calculated lower bound on the
|
|
number of writes is very lax, assuming only that one write can be done
|
|
every 10 seconds.
|
|
*/
|
|
type fightTest struct {
|
|
t *testing.T
|
|
ctx context.Context
|
|
loopbackConfig *rest.Config
|
|
teamSize int
|
|
stopCh chan struct{}
|
|
now time.Time
|
|
clk *testclocks.FakeClock
|
|
ctlrs map[bool][]utilfc.Interface
|
|
|
|
countsMutex sync.Mutex
|
|
|
|
// writeCounts maps FlowSchema.Name to number of writes
|
|
writeCounts map[string]int
|
|
}
|
|
|
|
func newFightTest(t *testing.T, loopbackConfig *rest.Config, teamSize int) *fightTest {
|
|
now := time.Now()
|
|
ft := &fightTest{
|
|
t: t,
|
|
ctx: context.Background(),
|
|
loopbackConfig: loopbackConfig,
|
|
teamSize: teamSize,
|
|
stopCh: make(chan struct{}),
|
|
now: now,
|
|
clk: testclocks.NewFakeClock(now),
|
|
ctlrs: map[bool][]utilfc.Interface{
|
|
false: make([]utilfc.Interface, teamSize),
|
|
true: make([]utilfc.Interface, teamSize)},
|
|
writeCounts: map[string]int{},
|
|
}
|
|
return ft
|
|
}
|
|
|
|
func (ft *fightTest) createMainInformer() {
|
|
myConfig := rest.CopyConfig(ft.loopbackConfig)
|
|
myConfig = rest.AddUserAgent(myConfig, "audience")
|
|
myClientset := clientset.NewForConfigOrDie(myConfig)
|
|
informerFactory := informers.NewSharedInformerFactory(myClientset, 0)
|
|
inf := informerFactory.Flowcontrol().V1beta3().FlowSchemas().Informer()
|
|
inf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
AddFunc: func(obj interface{}) {
|
|
fs := obj.(*flowcontrol.FlowSchema)
|
|
ft.countWrite(fs)
|
|
},
|
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
|
fs := newObj.(*flowcontrol.FlowSchema)
|
|
ft.countWrite(fs)
|
|
},
|
|
})
|
|
go inf.Run(ft.stopCh)
|
|
if !cache.WaitForCacheSync(ft.stopCh, inf.HasSynced) {
|
|
ft.t.Errorf("Failed to sync main informer cache")
|
|
}
|
|
}
|
|
|
|
func (ft *fightTest) countWrite(fs *flowcontrol.FlowSchema) {
|
|
ft.countsMutex.Lock()
|
|
defer ft.countsMutex.Unlock()
|
|
ft.writeCounts[fs.Name]++
|
|
}
|
|
|
|
func (ft *fightTest) createController(invert bool, i int) {
|
|
fieldMgr := fmt.Sprintf("testController%d%v", i, invert)
|
|
myConfig := rest.CopyConfig(ft.loopbackConfig)
|
|
myConfig = rest.AddUserAgent(myConfig, fieldMgr)
|
|
myClientset := clientset.NewForConfigOrDie(myConfig)
|
|
fcIfc := myClientset.FlowcontrolV1beta3()
|
|
informerFactory := informers.NewSharedInformerFactory(myClientset, 0)
|
|
foundToDangling := func(found bool) bool { return !found }
|
|
if invert {
|
|
foundToDangling = func(found bool) bool { return found }
|
|
}
|
|
ctlr := utilfc.NewTestable(utilfc.TestableConfig{
|
|
Name: fieldMgr,
|
|
FoundToDangling: foundToDangling,
|
|
Clock: clock.RealClock{},
|
|
AsFieldManager: fieldMgr,
|
|
InformerFactory: informerFactory,
|
|
FlowcontrolClient: fcIfc,
|
|
ServerConcurrencyLimit: 200, // server concurrency limit
|
|
RequestWaitLimit: time.Minute / 4, // request wait limit
|
|
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
|
|
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
|
|
QueueSetFactory: fqtesting.NewNoRestraintFactory(),
|
|
})
|
|
ft.ctlrs[invert][i] = ctlr
|
|
informerFactory.Start(ft.stopCh)
|
|
go ctlr.Run(ft.stopCh)
|
|
}
|
|
|
|
func (ft *fightTest) evaluate(tBeforeCreate, tAfterCreate time.Time) {
|
|
tBeforeLock := time.Now()
|
|
ft.countsMutex.Lock()
|
|
defer ft.countsMutex.Unlock()
|
|
tAfterLock := time.Now()
|
|
minFightSecs := tBeforeLock.Sub(tAfterCreate).Seconds()
|
|
maxFightSecs := tAfterLock.Sub(tBeforeCreate).Seconds()
|
|
minTotalWrites := int(minFightSecs / 10)
|
|
maxWritesPerWriter := 6 * int(math.Ceil(maxFightSecs/60))
|
|
maxTotalWrites := (1 + ft.teamSize*2) * maxWritesPerWriter
|
|
for flowSchemaName, writeCount := range ft.writeCounts {
|
|
if writeCount < minTotalWrites {
|
|
ft.t.Errorf("There were a total of %d writes to FlowSchema %s but there should have been at least %d from %s to %s", writeCount, flowSchemaName, minTotalWrites, tAfterCreate, tBeforeLock)
|
|
} else if writeCount > maxTotalWrites {
|
|
ft.t.Errorf("There were a total of %d writes to FlowSchema %s but there should have been no more than %d from %s to %s", writeCount, flowSchemaName, maxTotalWrites, tBeforeCreate, tAfterLock)
|
|
} else {
|
|
ft.t.Logf("There were a total of %d writes to FlowSchema %s over %v, %v seconds", writeCount, flowSchemaName, minFightSecs, maxFightSecs)
|
|
}
|
|
}
|
|
}
|
|
func TestConfigConsumerFight(t *testing.T) {
|
|
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.APIPriorityAndFairness, true)()
|
|
kubeConfig, closeFn := setup(t, 100, 100)
|
|
defer closeFn()
|
|
const teamSize = 3
|
|
ft := newFightTest(t, kubeConfig, teamSize)
|
|
tBeforeCreate := time.Now()
|
|
ft.createMainInformer()
|
|
ft.foreach(ft.createController)
|
|
tAfterCreate := time.Now()
|
|
time.Sleep(110 * time.Second)
|
|
ft.evaluate(tBeforeCreate, tAfterCreate)
|
|
close(ft.stopCh)
|
|
}
|
|
|
|
func (ft *fightTest) foreach(visit func(invert bool, i int)) {
|
|
for i := 0; i < ft.teamSize; i++ {
|
|
// The order of the following enumeration is not deterministic,
|
|
// and that is good.
|
|
invert := rand.Intn(2) == 0
|
|
visit(invert, i)
|
|
visit(!invert, i)
|
|
}
|
|
}
|