Merge pull request #118202 from pohly/scheduler-perf-unit-test

scheduler-perf: run as integration tests
This commit is contained in:
Kubernetes Prow Robot 2023-06-28 06:24:31 -07:00 committed by GitHub
commit c78204dc06
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 372 additions and 136 deletions

View File

@ -41,27 +41,22 @@ import (
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/util/feature" "k8s.io/apiserver/pkg/util/feature"
cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
typedv1 "k8s.io/client-go/kubernetes/typed/batch/v1" typedv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
"k8s.io/client-go/metadata"
"k8s.io/client-go/metadata/metadatainformer"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/util/retry" "k8s.io/client-go/util/retry"
featuregatetesting "k8s.io/component-base/featuregate/testing" featuregatetesting "k8s.io/component-base/featuregate/testing"
basemetrics "k8s.io/component-base/metrics" basemetrics "k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/testutil" "k8s.io/component-base/metrics/testutil"
"k8s.io/controller-manager/pkg/informerfactory"
"k8s.io/klog/v2" "k8s.io/klog/v2"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/garbagecollector"
jobcontroller "k8s.io/kubernetes/pkg/controller/job" jobcontroller "k8s.io/kubernetes/pkg/controller/job"
"k8s.io/kubernetes/pkg/controller/job/metrics" "k8s.io/kubernetes/pkg/controller/job/metrics"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
"k8s.io/kubernetes/test/integration/util"
"k8s.io/utils/pointer" "k8s.io/utils/pointer"
) )
@ -1313,7 +1308,7 @@ func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) {
defer cancel() defer cancel()
restConfig.QPS = 200 restConfig.QPS = 200
restConfig.Burst = 200 restConfig.Burst = 200
runGC := createGC(ctx, t, restConfig, informerSet) runGC := util.CreateGCController(ctx, t, *restConfig, informerSet)
informerSet.Start(ctx.Done()) informerSet.Start(ctx.Done())
go jc.Run(ctx, 1) go jc.Run(ctx, 1)
runGC() runGC()
@ -2092,40 +2087,6 @@ func createJobControllerWithSharedInformers(restConfig *restclient.Config, infor
return jc, ctx, cancel return jc, ctx, cancel
} }
func createGC(ctx context.Context, t *testing.T, restConfig *restclient.Config, informerSet informers.SharedInformerFactory) func() {
restConfig = restclient.AddUserAgent(restConfig, "gc-controller")
clientSet := clientset.NewForConfigOrDie(restConfig)
metadataClient, err := metadata.NewForConfig(restConfig)
if err != nil {
t.Fatalf("Failed to create metadataClient: %v", err)
}
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(clientSet.Discovery()))
restMapper.Reset()
metadataInformers := metadatainformer.NewSharedInformerFactory(metadataClient, 0)
alwaysStarted := make(chan struct{})
close(alwaysStarted)
gc, err := garbagecollector.NewGarbageCollector(
clientSet,
metadataClient,
restMapper,
garbagecollector.DefaultIgnoredResources(),
informerfactory.NewInformerFactory(informerSet, metadataInformers),
alwaysStarted,
)
if err != nil {
t.Fatalf("Failed creating garbage collector")
}
startGC := func() {
syncPeriod := 5 * time.Second
go wait.Until(func() {
restMapper.Reset()
}, syncPeriod, ctx.Done())
go gc.Run(ctx, 1)
go gc.Sync(ctx, clientSet.Discovery(), syncPeriod)
}
return startGC
}
func hasJobTrackingFinalizer(obj metav1.Object) bool { func hasJobTrackingFinalizer(obj metav1.Object) bool {
for _, fin := range obj.GetFinalizers() { for _, fin := range obj.GetFinalizers() {
if fin == batchv1.JobTrackingFinalizer { if fin == batchv1.JobTrackingFinalizer {

View File

@ -100,3 +100,15 @@ performance.
During interactive debugging sessions it is possible to enable per-test output During interactive debugging sessions it is possible to enable per-test output
via -use-testing-log. via -use-testing-log.
## Integration tests
To run integration tests, use:
```
make test-integration WHAT=./test/integration/scheduler_perf KUBE_TEST_ARGS=-use-testing-log
```
Integration testing uses the same `config/performance-config.yaml` as
benchmarking. By default, workloads labeled as `integration-test` are executed
as part of integration testing. `-test-scheduling-label-filter` can be used to
change that.

View File

@ -1,3 +1,17 @@
# The following labels are used in this file:
# - fast: short execution time, ideally less than 30 seconds
# - integration-test: used to select workloads that
# run in pull-kubernetes-integration. Choosing those tests
# is a tradeoff between code coverage and overall runtime.
# - performance: used to select workloads that run
# in ci-benchmark-scheduler-perf. Such workloads
# must run long enough (ideally, longer than 10 seconds)
# to provide meaningful samples for the pod scheduling
# rate.
#
# Combining "performance" and "fast" selects suitable workloads for a local
# before/after comparisons with benchstat.
- name: SchedulingBasic - name: SchedulingBasic
defaultPodTemplatePath: config/pod-default.yaml defaultPodTemplatePath: config/pod-default.yaml
workloadTemplate: workloadTemplate:
@ -10,7 +24,7 @@
collectMetrics: true collectMetrics: true
workloads: workloads:
- name: 500Nodes - name: 500Nodes
labels: [fast] labels: [integration-test, fast]
params: params:
initNodes: 500 initNodes: 500
initPods: 500 initPods: 500
@ -39,7 +53,7 @@
namespace: sched-1 namespace: sched-1
workloads: workloads:
- name: 500Nodes - name: 500Nodes
labels: [fast] labels: [integration-test, fast]
params: params:
initNodes: 500 initNodes: 500
initPods: 100 initPods: 100
@ -161,7 +175,7 @@
collectMetrics: true collectMetrics: true
workloads: workloads:
- name: 500Nodes - name: 500Nodes
labels: [fast] labels: [integration-test, fast]
params: params:
initNodes: 500 initNodes: 500
initPods: 500 initPods: 500
@ -223,7 +237,7 @@
collectMetrics: true collectMetrics: true
workloads: workloads:
- name: 500Nodes - name: 500Nodes
labels: [fast] labels: [integration-test, fast]
params: params:
initNodes: 500 initNodes: 500
initPods: 500 initPods: 500
@ -308,7 +322,7 @@
collectMetrics: true collectMetrics: true
workloads: workloads:
- name: 500Nodes - name: 500Nodes
labels: [fast] labels: [integration-test, fast]
params: params:
initNodes: 500 initNodes: 500
initPods: 1000 initPods: 1000
@ -386,7 +400,7 @@
collectMetrics: true collectMetrics: true
workloads: workloads:
- name: 500Nodes - name: 500Nodes
labels: [fast] labels: [integration-test, fast]
params: params:
initNodes: 500 initNodes: 500
initPods: 200 initPods: 200
@ -504,7 +518,7 @@
collectMetrics: true collectMetrics: true
workloads: workloads:
- name: 1000Nodes - name: 1000Nodes
labels: [fast] labels: [integration-test, fast]
params: params:
initNodes: 1000 initNodes: 1000
measurePods: 1000 measurePods: 1000
@ -734,6 +748,7 @@
collectMetrics: true collectMetrics: true
workloads: workloads:
- name: fast - name: fast
labels: [integration-test, fast]
params: params:
# This testcase runs through all code paths without # This testcase runs through all code paths without
# taking too long overall. # taking too long overall.
@ -743,7 +758,7 @@
measurePods: 10 measurePods: 10
maxClaimsPerNode: 10 maxClaimsPerNode: 10
- name: 2000pods_100nodes - name: 2000pods_100nodes
labels: [performance,fast] labels: [performance, fast]
params: params:
# In this testcase, the number of nodes is smaller # In this testcase, the number of nodes is smaller
# than the limit for the PodScheduling slices. # than the limit for the PodScheduling slices.

View File

@ -30,6 +30,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
resourcev1alpha2 "k8s.io/api/resource/v1alpha2" resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -41,6 +43,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
cacheddiscovery "k8s.io/client-go/discovery/cached" cacheddiscovery "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/restmapper" "k8s.io/client-go/restmapper"
@ -125,7 +128,7 @@ type testCase struct {
Workloads []*workload Workloads []*workload
// SchedulerConfigPath is the path of scheduler configuration // SchedulerConfigPath is the path of scheduler configuration
// Optional // Optional
SchedulerConfigPath *string SchedulerConfigPath string
// Default path to spec file describing the pods to create. // Default path to spec file describing the pods to create.
// This path can be overridden in createPodsOp by setting PodTemplatePath . // This path can be overridden in createPodsOp by setting PodTemplatePath .
// Optional // Optional
@ -637,7 +640,7 @@ func initTestOutput(tb testing.TB) io.Writer {
return output return output
} }
var perfSchedulingLabelFilter = flag.String("perf-scheduling-label-filter", "performance", "comma-separated list of labels which a testcase must have (no prefix or +) or must not have (-)") var perfSchedulingLabelFilter = flag.String("perf-scheduling-label-filter", "performance", "comma-separated list of labels which a testcase must have (no prefix or +) or must not have (-), used by BenchmarkPerfScheduling")
func BenchmarkPerfScheduling(b *testing.B) { func BenchmarkPerfScheduling(b *testing.B) {
testCases, err := getTestCases(configFile) testCases, err := getTestCases(configFile)
@ -696,7 +699,8 @@ func BenchmarkPerfScheduling(b *testing.B) {
for feature, flag := range tc.FeatureGates { for feature, flag := range tc.FeatureGates {
defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, feature, flag)() defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, feature, flag)()
} }
results := runWorkload(ctx, b, tc, w) informerFactory, client, dyncClient := setupClusterForWorkload(ctx, b, tc.SchedulerConfigPath, tc.FeatureGates)
results := runWorkload(ctx, b, tc, w, informerFactory, client, dyncClient, false)
dataItems.DataItems = append(dataItems.DataItems, results...) dataItems.DataItems = append(dataItems.DataItems, results...)
if len(results) > 0 { if len(results) > 0 {
@ -734,6 +738,95 @@ func BenchmarkPerfScheduling(b *testing.B) {
} }
} }
var testSchedulingLabelFilter = flag.String("test-scheduling-label-filter", "integration-test", "comma-separated list of labels which a testcase must have (no prefix or +) or must not have (-), used by TestScheduling")
func TestScheduling(t *testing.T) {
testCases, err := getTestCases(configFile)
if err != nil {
t.Fatal(err)
}
if err = validateTestCases(testCases); err != nil {
t.Fatal(err)
}
// Check for leaks at the very end.
framework.GoleakCheck(t)
// All integration test cases share the same etcd, similar to
// https://github.com/kubernetes/kubernetes/blob/18d05b646d09b2971dc5400bc288062b0414e8cf/test/integration/framework/etcd.go#L186-L222.
framework.StartEtcd(t, nil)
// Workloads with the same configuration share the same apiserver. For that
// we first need to determine what those different configs are.
var configs []schedulerConfig
for _, tc := range testCases {
tcEnabled := false
for _, w := range tc.Workloads {
if enabled(*testSchedulingLabelFilter, append(tc.Labels, w.Labels...)...) {
tcEnabled = true
break
}
}
if !tcEnabled {
continue
}
exists := false
for _, config := range configs {
if config.equals(tc) {
exists = true
break
}
}
if !exists {
configs = append(configs, schedulerConfig{schedulerConfigPath: tc.SchedulerConfigPath, featureGates: tc.FeatureGates})
}
}
for _, config := range configs {
// Not a sub test because we don't have a good name for it.
func() {
_, ctx := ktesting.NewTestContext(t)
// No timeout here because the `go test -timeout` will ensure that
// the test doesn't get stuck forever.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for feature, flag := range config.featureGates {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, feature, flag)()
}
informerFactory, client, dynClient := setupClusterForWorkload(ctx, t, config.schedulerConfigPath, config.featureGates)
for _, tc := range testCases {
if !config.equals(tc) {
// Runs with some other config.
continue
}
t.Run(tc.Name, func(t *testing.T) {
for _, w := range tc.Workloads {
t.Run(w.Name, func(t *testing.T) {
if !enabled(*testSchedulingLabelFilter, append(tc.Labels, w.Labels...)...) {
t.Skipf("disabled by label filter %q", *testSchedulingLabelFilter)
}
_, ctx := ktesting.NewTestContext(t)
runWorkload(ctx, t, tc, w, informerFactory, client, dynClient, true)
})
}
})
}
}()
}
}
type schedulerConfig struct {
schedulerConfigPath string
featureGates map[featuregate.Feature]bool
}
func (c schedulerConfig) equals(tc *testCase) bool {
return c.schedulerConfigPath == tc.SchedulerConfigPath &&
cmp.Equal(c.featureGates, tc.FeatureGates)
}
func loadSchedulerConfig(file string) (*config.KubeSchedulerConfiguration, error) { func loadSchedulerConfig(file string) (*config.KubeSchedulerConfiguration, error) {
data, err := os.ReadFile(file) data, err := os.ReadFile(file)
if err != nil { if err != nil {
@ -750,16 +843,16 @@ func loadSchedulerConfig(file string) (*config.KubeSchedulerConfiguration, error
return nil, fmt.Errorf("couldn't decode as KubeSchedulerConfiguration, got %s: ", gvk) return nil, fmt.Errorf("couldn't decode as KubeSchedulerConfiguration, got %s: ", gvk)
} }
func unrollWorkloadTemplate(b *testing.B, wt []op, w *workload) []op { func unrollWorkloadTemplate(tb testing.TB, wt []op, w *workload) []op {
var unrolled []op var unrolled []op
for opIndex, o := range wt { for opIndex, o := range wt {
realOp, err := o.realOp.patchParams(w) realOp, err := o.realOp.patchParams(w)
if err != nil { if err != nil {
b.Fatalf("op %d: %v", opIndex, err) tb.Fatalf("op %d: %v", opIndex, err)
} }
switch concreteOp := realOp.(type) { switch concreteOp := realOp.(type) {
case *createPodSetsOp: case *createPodSetsOp:
b.Logf("Creating %d pod sets %s", concreteOp.Count, concreteOp.CountParam) tb.Logf("Creating %d pod sets %s", concreteOp.Count, concreteOp.CountParam)
for i := 0; i < concreteOp.Count; i++ { for i := 0; i < concreteOp.Count; i++ {
copy := concreteOp.CreatePodsOp copy := concreteOp.CreatePodsOp
ns := fmt.Sprintf("%s-%d", concreteOp.NamespacePrefix, i) ns := fmt.Sprintf("%s-%d", concreteOp.NamespacePrefix, i)
@ -773,35 +866,56 @@ func unrollWorkloadTemplate(b *testing.B, wt []op, w *workload) []op {
return unrolled return unrolled
} }
func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) []DataItem { func setupClusterForWorkload(ctx context.Context, tb testing.TB, configPath string, featureGates map[featuregate.Feature]bool) (informers.SharedInformerFactory, clientset.Interface, dynamic.Interface) {
start := time.Now()
b.Cleanup(func() {
duration := time.Now().Sub(start)
// This includes startup and shutdown time and thus does not
// reflect scheduling performance. It's useful to get a feeling
// for how long each workload runs overall.
b.ReportMetric(duration.Seconds(), "runtime_seconds")
})
var cfg *config.KubeSchedulerConfiguration var cfg *config.KubeSchedulerConfiguration
var err error var err error
if tc.SchedulerConfigPath != nil { if configPath != "" {
cfg, err = loadSchedulerConfig(*tc.SchedulerConfigPath) cfg, err = loadSchedulerConfig(configPath)
if err != nil { if err != nil {
b.Fatalf("error loading scheduler config file: %v", err) tb.Fatalf("error loading scheduler config file: %v", err)
} }
if err = validation.ValidateKubeSchedulerConfiguration(cfg); err != nil { if err = validation.ValidateKubeSchedulerConfiguration(cfg); err != nil {
b.Fatalf("validate scheduler config file failed: %v", err) tb.Fatalf("validate scheduler config file failed: %v", err)
} }
} }
informerFactory, client, dynClient := mustSetupScheduler(ctx, b, cfg, tc.FeatureGates) return mustSetupCluster(ctx, tb, cfg, featureGates)
}
func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory, client clientset.Interface, dynClient dynamic.Interface, cleanup bool) []DataItem {
b, benchmarking := tb.(*testing.B)
if benchmarking {
start := time.Now()
b.Cleanup(func() {
duration := time.Since(start)
// This includes startup and shutdown time and thus does not
// reflect scheduling performance. It's useful to get a feeling
// for how long each workload runs overall.
b.ReportMetric(duration.Seconds(), "runtime_seconds")
})
}
// Disable error checking of the sampling interval length in the
// throughput collector by default. When running benchmarks, report
// it as test failure when samples are not taken regularly.
var throughputErrorMargin float64
if benchmarking {
// TODO: To prevent the perf-test failure, we increased the error margin, if still not enough
// one day, we should think of another approach to avoid this trick.
throughputErrorMargin = 30
}
// Additional informers needed for testing. The pod informer was // Additional informers needed for testing. The pod informer was
// already created before (scheduler.NewInformerFactory) and the // already created before (scheduler.NewInformerFactory) and the
// factory was started for it (mustSetupScheduler), therefore we don't // factory was started for it (mustSetupCluster), therefore we don't
// need to start again. // need to start again.
podInformer := informerFactory.Core().V1().Pods() podInformer := informerFactory.Core().V1().Pods()
// Everything else started by this function gets stopped before it returns.
ctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
defer wg.Wait()
defer cancel()
var mu sync.Mutex var mu sync.Mutex
var dataItems []DataItem var dataItems []DataItem
nextNodeIndex := 0 nextNodeIndex := 0
@ -809,48 +923,47 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [
// All namespaces listed in numPodsScheduledPerNamespace will be cleaned up. // All namespaces listed in numPodsScheduledPerNamespace will be cleaned up.
numPodsScheduledPerNamespace := make(map[string]int) numPodsScheduledPerNamespace := make(map[string]int)
b.Cleanup(func() { if cleanup {
for namespace := range numPodsScheduledPerNamespace { // This must run before controllers get shut down.
if err := client.CoreV1().Namespaces().Delete(ctx, namespace, metav1.DeleteOptions{}); err != nil { defer cleanupWorkload(ctx, tb, tc, client, numPodsScheduledPerNamespace)
b.Errorf("Deleting Namespace in numPodsScheduledPerNamespace: %v", err) }
}
}
})
for opIndex, op := range unrollWorkloadTemplate(b, tc.WorkloadTemplate, w) { for opIndex, op := range unrollWorkloadTemplate(tb, tc.WorkloadTemplate, w) {
realOp, err := op.realOp.patchParams(w) realOp, err := op.realOp.patchParams(w)
if err != nil { if err != nil {
b.Fatalf("op %d: %v", opIndex, err) tb.Fatalf("op %d: %v", opIndex, err)
} }
select { select {
case <-ctx.Done(): case <-ctx.Done():
b.Fatalf("op %d: %v", opIndex, ctx.Err()) tb.Fatalf("op %d: %v", opIndex, ctx.Err())
default: default:
} }
switch concreteOp := realOp.(type) { switch concreteOp := realOp.(type) {
case *createNodesOp: case *createNodesOp:
nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, client) nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, client)
if err != nil { if err != nil {
b.Fatalf("op %d: %v", opIndex, err) tb.Fatalf("op %d: %v", opIndex, err)
} }
if err := nodePreparer.PrepareNodes(ctx, nextNodeIndex); err != nil { if err := nodePreparer.PrepareNodes(ctx, nextNodeIndex); err != nil {
b.Fatalf("op %d: %v", opIndex, err) tb.Fatalf("op %d: %v", opIndex, err)
}
if cleanup {
defer func() {
if err := nodePreparer.CleanupNodes(ctx); err != nil {
tb.Fatalf("failed to clean up nodes, error: %v", err)
}
}()
} }
b.Cleanup(func() {
if err := nodePreparer.CleanupNodes(ctx); err != nil {
b.Fatalf("failed to clean up nodes, error: %v", err)
}
})
nextNodeIndex += concreteOp.Count nextNodeIndex += concreteOp.Count
case *createNamespacesOp: case *createNamespacesOp:
nsPreparer, err := newNamespacePreparer(concreteOp, client, b) nsPreparer, err := newNamespacePreparer(concreteOp, client, tb)
if err != nil { if err != nil {
b.Fatalf("op %d: %v", opIndex, err) tb.Fatalf("op %d: %v", opIndex, err)
} }
if err := nsPreparer.prepare(ctx); err != nil { if err := nsPreparer.prepare(ctx); err != nil {
nsPreparer.cleanup(ctx) nsPreparer.cleanup(ctx)
b.Fatalf("op %d: %v", opIndex, err) tb.Fatalf("op %d: %v", opIndex, err)
} }
for _, n := range nsPreparer.namespaces() { for _, n := range nsPreparer.namespaces() {
if _, ok := numPodsScheduledPerNamespace[n]; ok { if _, ok := numPodsScheduledPerNamespace[n]; ok {
@ -867,18 +980,23 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [
if concreteOp.Namespace != nil { if concreteOp.Namespace != nil {
namespace = *concreteOp.Namespace namespace = *concreteOp.Namespace
} }
createNamespaceIfNotPresent(ctx, b, client, namespace, &numPodsScheduledPerNamespace) createNamespaceIfNotPresent(ctx, tb, client, namespace, &numPodsScheduledPerNamespace)
if concreteOp.PodTemplatePath == nil { if concreteOp.PodTemplatePath == nil {
concreteOp.PodTemplatePath = tc.DefaultPodTemplatePath concreteOp.PodTemplatePath = tc.DefaultPodTemplatePath
} }
var collectors []testDataCollector var collectors []testDataCollector
// This needs a separate context and wait group because
// the code below needs to be sure that the goroutines
// are stopped.
var collectorCtx context.Context var collectorCtx context.Context
var collectorCancel func() var collectorCancel func()
var collectorWG sync.WaitGroup var collectorWG sync.WaitGroup
defer collectorWG.Wait()
if concreteOp.CollectMetrics { if concreteOp.CollectMetrics {
collectorCtx, collectorCancel = context.WithCancel(ctx) collectorCtx, collectorCancel = context.WithCancel(ctx)
defer collectorCancel() defer collectorCancel()
collectors = getTestDataCollectors(b, podInformer, fmt.Sprintf("%s/%s", b.Name(), namespace), namespace, tc.MetricsCollectorConfig) collectors = getTestDataCollectors(tb, podInformer, fmt.Sprintf("%s/%s", tb.Name(), namespace), namespace, tc.MetricsCollectorConfig, throughputErrorMargin)
for _, collector := range collectors { for _, collector := range collectors {
// Need loop-local variable for function below. // Need loop-local variable for function below.
collector := collector collector := collector
@ -889,8 +1007,8 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [
}() }()
} }
} }
if err := createPods(ctx, b, namespace, concreteOp, client); err != nil { if err := createPods(ctx, tb, namespace, concreteOp, client); err != nil {
b.Fatalf("op %d: %v", opIndex, err) tb.Fatalf("op %d: %v", opIndex, err)
} }
if concreteOp.SkipWaitToCompletion { if concreteOp.SkipWaitToCompletion {
// Only record those namespaces that may potentially require barriers // Only record those namespaces that may potentially require barriers
@ -901,8 +1019,8 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [
numPodsScheduledPerNamespace[namespace] = concreteOp.Count numPodsScheduledPerNamespace[namespace] = concreteOp.Count
} }
} else { } else {
if err := waitUntilPodsScheduledInNamespace(ctx, b, podInformer, namespace, concreteOp.Count); err != nil { if err := waitUntilPodsScheduledInNamespace(ctx, tb, podInformer, namespace, concreteOp.Count); err != nil {
b.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err) tb.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err)
} }
} }
if concreteOp.CollectMetrics { if concreteOp.CollectMetrics {
@ -936,7 +1054,7 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [
// Ensure the namespace exists. // Ensure the namespace exists.
nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}} nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}
if _, err := client.CoreV1().Namespaces().Create(ctx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) { if _, err := client.CoreV1().Namespaces().Create(ctx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) {
b.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err) tb.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err)
} }
var churnFns []func(name string) string var churnFns []func(name string) string
@ -944,12 +1062,12 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [
for i, path := range concreteOp.TemplatePaths { for i, path := range concreteOp.TemplatePaths {
unstructuredObj, gvk, err := getUnstructuredFromFile(path) unstructuredObj, gvk, err := getUnstructuredFromFile(path)
if err != nil { if err != nil {
b.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err) tb.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err)
} }
// Obtain GVR. // Obtain GVR.
mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil { if err != nil {
b.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err) tb.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err)
} }
gvr := mapping.Resource gvr := mapping.Resource
// Distinguish cluster-scoped with namespaced API objects. // Distinguish cluster-scoped with namespaced API objects.
@ -983,7 +1101,9 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [
switch concreteOp.Mode { switch concreteOp.Mode {
case Create: case Create:
wg.Add(1)
go func() { go func() {
defer wg.Done()
count, threshold := 0, concreteOp.Number count, threshold := 0, concreteOp.Number
if threshold == 0 { if threshold == 0 {
threshold = math.MaxInt32 threshold = math.MaxInt32
@ -1001,7 +1121,9 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [
} }
}() }()
case Recreate: case Recreate:
wg.Add(1)
go func() { go func() {
defer wg.Done()
retVals := make([][]string, len(churnFns)) retVals := make([][]string, len(churnFns))
// For each churn function, instantiate a slice of strings with length "concreteOp.Number". // For each churn function, instantiate a slice of strings with length "concreteOp.Number".
for i := range retVals { for i := range retVals {
@ -1026,11 +1148,11 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [
case *barrierOp: case *barrierOp:
for _, namespace := range concreteOp.Namespaces { for _, namespace := range concreteOp.Namespaces {
if _, ok := numPodsScheduledPerNamespace[namespace]; !ok { if _, ok := numPodsScheduledPerNamespace[namespace]; !ok {
b.Fatalf("op %d: unknown namespace %s", opIndex, namespace) tb.Fatalf("op %d: unknown namespace %s", opIndex, namespace)
} }
} }
if err := waitUntilPodsScheduled(ctx, b, podInformer, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil { if err := waitUntilPodsScheduled(ctx, tb, podInformer, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil {
b.Fatalf("op %d: %v", opIndex, err) tb.Fatalf("op %d: %v", opIndex, err)
} }
// At the end of the barrier, we can be sure that there are no pods // At the end of the barrier, we can be sure that there are no pods
// pending scheduling in the namespaces that we just blocked on. // pending scheduling in the namespaces that we just blocked on.
@ -1050,19 +1172,19 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [
default: default:
runable, ok := concreteOp.(runnableOp) runable, ok := concreteOp.(runnableOp)
if !ok { if !ok {
b.Fatalf("op %d: invalid op %v", opIndex, concreteOp) tb.Fatalf("op %d: invalid op %v", opIndex, concreteOp)
} }
for _, namespace := range runable.requiredNamespaces() { for _, namespace := range runable.requiredNamespaces() {
createNamespaceIfNotPresent(ctx, b, client, namespace, &numPodsScheduledPerNamespace) createNamespaceIfNotPresent(ctx, tb, client, namespace, &numPodsScheduledPerNamespace)
} }
runable.run(ctx, b, client) runable.run(ctx, tb, client)
} }
} }
// check unused params and inform users // check unused params and inform users
unusedParams := w.unusedParams() unusedParams := w.unusedParams()
if len(unusedParams) != 0 { if len(unusedParams) != 0 {
b.Fatalf("the parameters %v are defined on workload %s, but unused.\nPlease make sure there are no typos.", unusedParams, w.Name) tb.Fatalf("the parameters %v are defined on workload %s, but unused.\nPlease make sure there are no typos.", unusedParams, w.Name)
} }
// Some tests have unschedulable pods. Do not add an implicit barrier at the // Some tests have unschedulable pods. Do not add an implicit barrier at the
@ -1070,13 +1192,58 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [
return dataItems return dataItems
} }
func createNamespaceIfNotPresent(ctx context.Context, b *testing.B, client clientset.Interface, namespace string, podsPerNamespace *map[string]int) { // cleanupWorkload ensures that everything is removed from the API server that
// might have been created by runWorkload. This must be done before starting
// the next workload because otherwise it might stumble over previously created
// objects. For example, the namespaces are the same in different workloads, so
// not deleting them would cause the next one to fail with "cannot create
// namespace: already exists".
//
// Calling cleanupWorkload can be skipped if it is known that the next workload
// will run with a fresh etcd instance.
func cleanupWorkload(ctx context.Context, tb testing.TB, tc *testCase, client clientset.Interface, numPodsScheduledPerNamespace map[string]int) {
deleteNow := *metav1.NewDeleteOptions(0)
for namespace := range numPodsScheduledPerNamespace {
// Pods have to be deleted explicitly, with no grace period. Normally
// kubelet will set the DeletionGracePeriodSeconds to zero when it's okay
// to remove a deleted pod, but we don't run kubelet...
if err := client.CoreV1().Pods(namespace).DeleteCollection(ctx, deleteNow, metav1.ListOptions{}); err != nil {
tb.Fatalf("failed to delete pods in namespace %q: %v", namespace, err)
}
if err := client.CoreV1().Namespaces().Delete(ctx, namespace, deleteNow); err != nil {
tb.Fatalf("Deleting Namespace %q in numPodsScheduledPerNamespace: %v", namespace, err)
}
}
// We need to wait here because even with deletion timestamp set,
// actually removing a namespace can take some time (garbage collecting
// other generated object like secrets, etc.) and we don't want to
// start the next workloads while that cleanup is still going on.
if err := wait.PollUntilContextTimeout(ctx, time.Second, 5*time.Minute, false, func(ctx context.Context) (bool, error) {
namespaces, err := client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
if err != nil {
return false, err
}
for _, namespace := range namespaces.Items {
if _, ok := numPodsScheduledPerNamespace[namespace.Name]; ok {
// A namespace created by the workload, need to wait.
return false, nil
}
}
// All namespaces gone.
return true, nil
}); err != nil {
tb.Fatalf("failed while waiting for namespace removal: %v", err)
}
}
func createNamespaceIfNotPresent(ctx context.Context, tb testing.TB, client clientset.Interface, namespace string, podsPerNamespace *map[string]int) {
if _, ok := (*podsPerNamespace)[namespace]; !ok { if _, ok := (*podsPerNamespace)[namespace]; !ok {
// The namespace has not created yet. // The namespace has not created yet.
// So, create that and register it. // So, create that and register it.
_, err := client.CoreV1().Namespaces().Create(ctx, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{}) _, err := client.CoreV1().Namespaces().Create(ctx, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{})
if err != nil { if err != nil {
b.Fatalf("failed to create namespace for Pod: %v", namespace) tb.Fatalf("failed to create namespace for Pod: %v", namespace)
} }
(*podsPerNamespace)[namespace] = 0 (*podsPerNamespace)[namespace] = 0
} }
@ -1087,12 +1254,12 @@ type testDataCollector interface {
collect() []DataItem collect() []DataItem
} }
func getTestDataCollectors(tb testing.TB, podInformer coreinformers.PodInformer, name, namespace string, mcc *metricsCollectorConfig) []testDataCollector { func getTestDataCollectors(tb testing.TB, podInformer coreinformers.PodInformer, name, namespace string, mcc *metricsCollectorConfig, throughputErrorMargin float64) []testDataCollector {
if mcc == nil { if mcc == nil {
mcc = &defaultMetricsCollectorConfig mcc = &defaultMetricsCollectorConfig
} }
return []testDataCollector{ return []testDataCollector{
newThroughputCollector(tb, podInformer, map[string]string{"Name": name}, []string{namespace}), newThroughputCollector(tb, podInformer, map[string]string{"Name": name}, []string{namespace}, throughputErrorMargin),
newMetricsCollector(mcc, map[string]string{"Name": name}), newMetricsCollector(mcc, map[string]string{"Name": name}),
} }
} }
@ -1125,12 +1292,12 @@ func getNodePreparer(prefix string, cno *createNodesOp, clientset clientset.Inte
), nil ), nil
} }
func createPods(ctx context.Context, b *testing.B, namespace string, cpo *createPodsOp, clientset clientset.Interface) error { func createPods(ctx context.Context, tb testing.TB, namespace string, cpo *createPodsOp, clientset clientset.Interface) error {
strategy, err := getPodStrategy(cpo) strategy, err := getPodStrategy(cpo)
if err != nil { if err != nil {
return err return err
} }
b.Logf("creating %d pods in namespace %q", cpo.Count, namespace) tb.Logf("creating %d pods in namespace %q", cpo.Count, namespace)
config := testutils.NewTestPodCreatorConfig() config := testutils.NewTestPodCreatorConfig()
config.AddStrategy(namespace, cpo.Count, strategy) config.AddStrategy(namespace, cpo.Count, strategy)
podCreator := testutils.NewTestPodCreator(clientset, config) podCreator := testutils.NewTestPodCreator(clientset, config)
@ -1140,7 +1307,7 @@ func createPods(ctx context.Context, b *testing.B, namespace string, cpo *create
// waitUntilPodsScheduledInNamespace blocks until all pods in the given // waitUntilPodsScheduledInNamespace blocks until all pods in the given
// namespace are scheduled. Times out after 10 minutes because even at the // namespace are scheduled. Times out after 10 minutes because even at the
// lowest observed QPS of ~10 pods/sec, a 5000-node test should complete. // lowest observed QPS of ~10 pods/sec, a 5000-node test should complete.
func waitUntilPodsScheduledInNamespace(ctx context.Context, b *testing.B, podInformer coreinformers.PodInformer, namespace string, wantCount int) error { func waitUntilPodsScheduledInNamespace(ctx context.Context, tb testing.TB, podInformer coreinformers.PodInformer, namespace string, wantCount int) error {
return wait.PollImmediate(1*time.Second, 10*time.Minute, func() (bool, error) { return wait.PollImmediate(1*time.Second, 10*time.Minute, func() (bool, error) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -1152,17 +1319,17 @@ func waitUntilPodsScheduledInNamespace(ctx context.Context, b *testing.B, podInf
return false, err return false, err
} }
if len(scheduled) >= wantCount { if len(scheduled) >= wantCount {
b.Logf("scheduling succeed") tb.Logf("scheduling succeed")
return true, nil return true, nil
} }
b.Logf("namespace: %s, pods: want %d, got %d", namespace, wantCount, len(scheduled)) tb.Logf("namespace: %s, pods: want %d, got %d", namespace, wantCount, len(scheduled))
return false, nil return false, nil
}) })
} }
// waitUntilPodsScheduled blocks until the all pods in the given namespaces are // waitUntilPodsScheduled blocks until the all pods in the given namespaces are
// scheduled. // scheduled.
func waitUntilPodsScheduled(ctx context.Context, b *testing.B, podInformer coreinformers.PodInformer, namespaces []string, numPodsScheduledPerNamespace map[string]int) error { func waitUntilPodsScheduled(ctx context.Context, tb testing.TB, podInformer coreinformers.PodInformer, namespaces []string, numPodsScheduledPerNamespace map[string]int) error {
// If unspecified, default to all known namespaces. // If unspecified, default to all known namespaces.
if len(namespaces) == 0 { if len(namespaces) == 0 {
for namespace := range numPodsScheduledPerNamespace { for namespace := range numPodsScheduledPerNamespace {
@ -1179,7 +1346,7 @@ func waitUntilPodsScheduled(ctx context.Context, b *testing.B, podInformer corei
if !ok { if !ok {
return fmt.Errorf("unknown namespace %s", namespace) return fmt.Errorf("unknown namespace %s", namespace)
} }
if err := waitUntilPodsScheduledInNamespace(ctx, b, podInformer, namespace, wantCount); err != nil { if err := waitUntilPodsScheduledInNamespace(ctx, tb, podInformer, namespace, wantCount); err != nil {
return fmt.Errorf("error waiting for pods in namespace %q: %w", namespace, err) return fmt.Errorf("error waiting for pods in namespace %q: %w", namespace, err)
} }
} }
@ -1333,10 +1500,10 @@ type namespacePreparer struct {
count int count int
prefix string prefix string
spec *v1.Namespace spec *v1.Namespace
t testing.TB tb testing.TB
} }
func newNamespacePreparer(cno *createNamespacesOp, clientset clientset.Interface, b *testing.B) (*namespacePreparer, error) { func newNamespacePreparer(cno *createNamespacesOp, clientset clientset.Interface, tb testing.TB) (*namespacePreparer, error) {
ns := &v1.Namespace{} ns := &v1.Namespace{}
if cno.NamespaceTemplatePath != nil { if cno.NamespaceTemplatePath != nil {
if err := getSpecFromFile(cno.NamespaceTemplatePath, ns); err != nil { if err := getSpecFromFile(cno.NamespaceTemplatePath, ns); err != nil {
@ -1349,7 +1516,7 @@ func newNamespacePreparer(cno *createNamespacesOp, clientset clientset.Interface
count: cno.Count, count: cno.Count,
prefix: cno.Prefix, prefix: cno.Prefix,
spec: ns, spec: ns,
t: b, tb: tb,
}, nil }, nil
} }
@ -1368,7 +1535,7 @@ func (p *namespacePreparer) prepare(ctx context.Context) error {
if p.spec != nil { if p.spec != nil {
base = p.spec base = p.spec
} }
p.t.Logf("Making %d namespaces with prefix %q and template %v", p.count, p.prefix, *base) p.tb.Logf("Making %d namespaces with prefix %q and template %v", p.count, p.prefix, *base)
for i := 0; i < p.count; i++ { for i := 0; i < p.count; i++ {
n := base.DeepCopy() n := base.DeepCopy()
n.Name = fmt.Sprintf("%s-%d", p.prefix, i) n.Name = fmt.Sprintf("%s-%d", p.prefix, i)
@ -1388,7 +1555,7 @@ func (p *namespacePreparer) cleanup(ctx context.Context) error {
for i := 0; i < p.count; i++ { for i := 0; i < p.count; i++ {
n := fmt.Sprintf("%s-%d", p.prefix, i) n := fmt.Sprintf("%s-%d", p.prefix, i)
if err := p.client.CoreV1().Namespaces().Delete(ctx, n, metav1.DeleteOptions{}); err != nil { if err := p.client.CoreV1().Namespaces().Delete(ctx, n, metav1.DeleteOptions{}); err != nil {
p.t.Errorf("Deleting Namespace: %v", err) p.tb.Errorf("Deleting Namespace: %v", err)
errRet = err errRet = err
} }
} }

View File

@ -73,18 +73,20 @@ func newDefaultComponentConfig() (*config.KubeSchedulerConfiguration, error) {
return &cfg, nil return &cfg, nil
} }
// mustSetupScheduler starts the following components: // mustSetupCluster starts the following components:
// - k8s api server // - k8s api server
// - scheduler // - scheduler
// - some of the kube-controller-manager controllers
//
// It returns regular and dynamic clients, and destroyFunc which should be used to // It returns regular and dynamic clients, and destroyFunc which should be used to
// remove resources after finished. // remove resources after finished.
// Notes on rate limiter: // Notes on rate limiter:
// - client rate limit is set to 5000. // - client rate limit is set to 5000.
func mustSetupScheduler(ctx context.Context, b *testing.B, config *config.KubeSchedulerConfiguration, enabledFeatures map[featuregate.Feature]bool) (informers.SharedInformerFactory, clientset.Interface, dynamic.Interface) { func mustSetupCluster(ctx context.Context, tb testing.TB, config *config.KubeSchedulerConfiguration, enabledFeatures map[featuregate.Feature]bool) (informers.SharedInformerFactory, clientset.Interface, dynamic.Interface) {
// Run API server with minimimal logging by default. Can be raised with -v. // Run API server with minimimal logging by default. Can be raised with -v.
framework.MinVerbosity = 0 framework.MinVerbosity = 0
_, kubeConfig, tearDownFn := framework.StartTestServer(ctx, b, framework.TestServerSetup{ _, kubeConfig, tearDownFn := framework.StartTestServer(ctx, tb, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) { ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition", "Priority"} opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition", "Priority"}
@ -97,12 +99,12 @@ func mustSetupScheduler(ctx context.Context, b *testing.B, config *config.KubeSc
} }
}, },
}) })
b.Cleanup(tearDownFn) tb.Cleanup(tearDownFn)
// Cleanup will be in reverse order: first the clients get cancelled, // Cleanup will be in reverse order: first the clients get cancelled,
// then the apiserver is torn down. // then the apiserver is torn down.
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
b.Cleanup(cancel) tb.Cleanup(cancel)
// TODO: client connection configuration, such as QPS or Burst is configurable in theory, this could be derived from the `config`, need to // TODO: client connection configuration, such as QPS or Burst is configurable in theory, this could be derived from the `config`, need to
// support this when there is any testcase that depends on such configuration. // support this when there is any testcase that depends on such configuration.
@ -115,7 +117,7 @@ func mustSetupScheduler(ctx context.Context, b *testing.B, config *config.KubeSc
var err error var err error
config, err = newDefaultComponentConfig() config, err = newDefaultComponentConfig()
if err != nil { if err != nil {
b.Fatalf("Error creating default component config: %v", err) tb.Fatalf("Error creating default component config: %v", err)
} }
} }
@ -126,16 +128,20 @@ func mustSetupScheduler(ctx context.Context, b *testing.B, config *config.KubeSc
// be applied to start a scheduler, most of them are defined in `scheduler.schedulerOptions`. // be applied to start a scheduler, most of them are defined in `scheduler.schedulerOptions`.
_, informerFactory := util.StartScheduler(ctx, client, cfg, config) _, informerFactory := util.StartScheduler(ctx, client, cfg, config)
util.StartFakePVController(ctx, client, informerFactory) util.StartFakePVController(ctx, client, informerFactory)
runGC := util.CreateGCController(ctx, tb, *cfg, informerFactory)
runNS := util.CreateNamespaceController(ctx, tb, *cfg, informerFactory)
runResourceClaimController := func() {} runResourceClaimController := func() {}
if enabledFeatures[features.DynamicResourceAllocation] { if enabledFeatures[features.DynamicResourceAllocation] {
// Testing of DRA with inline resource claims depends on this // Testing of DRA with inline resource claims depends on this
// controller for creating and removing ResourceClaims. // controller for creating and removing ResourceClaims.
runResourceClaimController = util.CreateResourceClaimController(ctx, b, client, informerFactory) runResourceClaimController = util.CreateResourceClaimController(ctx, tb, client, informerFactory)
} }
informerFactory.Start(ctx.Done()) informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done())
go runGC()
go runNS()
go runResourceClaimController() go runResourceClaimController()
return informerFactory, client, dynClient return informerFactory, client, dynClient
@ -314,14 +320,16 @@ type throughputCollector struct {
schedulingThroughputs []float64 schedulingThroughputs []float64
labels map[string]string labels map[string]string
namespaces []string namespaces []string
errorMargin float64
} }
func newThroughputCollector(tb testing.TB, podInformer coreinformers.PodInformer, labels map[string]string, namespaces []string) *throughputCollector { func newThroughputCollector(tb testing.TB, podInformer coreinformers.PodInformer, labels map[string]string, namespaces []string, errorMargin float64) *throughputCollector {
return &throughputCollector{ return &throughputCollector{
tb: tb, tb: tb,
podInformer: podInformer, podInformer: podInformer,
labels: labels, labels: labels,
namespaces: namespaces, namespaces: namespaces,
errorMargin: errorMargin,
} }
} }
@ -382,9 +390,7 @@ func (tc *throughputCollector) run(ctx context.Context) {
throughput := float64(newScheduled) / durationInSeconds throughput := float64(newScheduled) / durationInSeconds
expectedDuration := throughputSampleInterval * time.Duration(skipped+1) expectedDuration := throughputSampleInterval * time.Duration(skipped+1)
errorMargin := (duration - expectedDuration).Seconds() / expectedDuration.Seconds() * 100 errorMargin := (duration - expectedDuration).Seconds() / expectedDuration.Seconds() * 100
// TODO: To prevent the perf-test failure, we increased the error margin, if still not enough if tc.errorMargin > 0 && math.Abs(errorMargin) > tc.errorMargin {
// one day, we should think of another approach to avoid this trick.
if math.Abs(errorMargin) > 30 {
// This might affect the result, report it. // This might affect the result, report it.
tc.tb.Errorf("ERROR: Expected throuput collector to sample at regular time intervals. The %d most recent intervals took %s instead of %s, a difference of %0.1f%%.", skipped+1, duration, expectedDuration, errorMargin) tc.tb.Errorf("ERROR: Expected throuput collector to sample at regular time intervals. The %d most recent intervals took %s instead of %s, a difference of %0.1f%%.", skipped+1, duration, expectedDuration, errorMargin)
} }

View File

@ -38,17 +38,22 @@ import (
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1" corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/metadata"
"k8s.io/client-go/metadata/metadatainformer"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
"k8s.io/client-go/restmapper" "k8s.io/client-go/restmapper"
"k8s.io/client-go/scale" "k8s.io/client-go/scale"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events" "k8s.io/client-go/tools/events"
pvutil "k8s.io/component-helpers/storage/volume" pvutil "k8s.io/component-helpers/storage/volume"
"k8s.io/controller-manager/pkg/informerfactory"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kube-scheduler/config/v1beta3" "k8s.io/kube-scheduler/config/v1beta3"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options" "k8s.io/kubernetes/cmd/kube-apiserver/app/options"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/disruption" "k8s.io/kubernetes/pkg/controller/disruption"
"k8s.io/kubernetes/pkg/controller/garbagecollector"
"k8s.io/kubernetes/pkg/controller/namespace"
"k8s.io/kubernetes/pkg/controller/resourceclaim" "k8s.io/kubernetes/pkg/controller/resourceclaim"
"k8s.io/kubernetes/pkg/controlplane" "k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/scheduler" "k8s.io/kubernetes/pkg/scheduler"
@ -137,7 +142,13 @@ func StartFakePVController(ctx context.Context, clientSet clientset.Interface, i
claimRef := obj.Spec.ClaimRef claimRef := obj.Spec.ClaimRef
pvc, err := clientSet.CoreV1().PersistentVolumeClaims(claimRef.Namespace).Get(ctx, claimRef.Name, metav1.GetOptions{}) pvc, err := clientSet.CoreV1().PersistentVolumeClaims(claimRef.Namespace).Get(ctx, claimRef.Name, metav1.GetOptions{})
if err != nil { if err != nil {
klog.Errorf("error while getting %v/%v: %v", claimRef.Namespace, claimRef.Name, err) // Note that the error can be anything, because components like
// apiserver are also shutting down at the same time, but this
// check is conservative and only ignores the "context canceled"
// error while shutting down.
if ctx.Err() == nil || !errors.Is(err, context.Canceled) {
klog.Errorf("error while getting %v/%v: %v", claimRef.Namespace, claimRef.Name, err)
}
return return
} }
@ -146,7 +157,10 @@ func StartFakePVController(ctx context.Context, clientSet clientset.Interface, i
metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, pvutil.AnnBindCompleted, "yes") metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, pvutil.AnnBindCompleted, "yes")
_, err := clientSet.CoreV1().PersistentVolumeClaims(claimRef.Namespace).Update(ctx, pvc, metav1.UpdateOptions{}) _, err := clientSet.CoreV1().PersistentVolumeClaims(claimRef.Namespace).Update(ctx, pvc, metav1.UpdateOptions{})
if err != nil { if err != nil {
klog.Errorf("error while updating %v/%v: %v", claimRef.Namespace, claimRef.Name, err) if ctx.Err() == nil || !errors.Is(err, context.Canceled) {
// Shutting down, no need to record this.
klog.Errorf("error while updating %v/%v: %v", claimRef.Namespace, claimRef.Name, err)
}
return return
} }
} }
@ -163,6 +177,67 @@ func StartFakePVController(ctx context.Context, clientSet clientset.Interface, i
}) })
} }
// CreateGCController creates a garbage controller and returns a run function
// for it. The informer factory needs to be started before invoking that
// function.
func CreateGCController(ctx context.Context, tb testing.TB, restConfig restclient.Config, informerSet informers.SharedInformerFactory) func() {
restclient.AddUserAgent(&restConfig, "gc-controller")
clientSet := clientset.NewForConfigOrDie(&restConfig)
metadataClient, err := metadata.NewForConfig(&restConfig)
if err != nil {
tb.Fatalf("Failed to create metadataClient: %v", err)
}
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(clientSet.Discovery()))
restMapper.Reset()
metadataInformers := metadatainformer.NewSharedInformerFactory(metadataClient, 0)
alwaysStarted := make(chan struct{})
close(alwaysStarted)
gc, err := garbagecollector.NewGarbageCollector(
clientSet,
metadataClient,
restMapper,
garbagecollector.DefaultIgnoredResources(),
informerfactory.NewInformerFactory(informerSet, metadataInformers),
alwaysStarted,
)
if err != nil {
tb.Fatalf("Failed creating garbage collector")
}
startGC := func() {
syncPeriod := 5 * time.Second
go wait.Until(func() {
restMapper.Reset()
}, syncPeriod, ctx.Done())
go gc.Run(ctx, 1)
go gc.Sync(ctx, clientSet.Discovery(), syncPeriod)
}
return startGC
}
// CreateNamespaceController creates a namespace controller and returns a run
// function for it. The informer factory needs to be started before invoking
// that function.
func CreateNamespaceController(ctx context.Context, tb testing.TB, restConfig restclient.Config, informerSet informers.SharedInformerFactory) func() {
restclient.AddUserAgent(&restConfig, "namespace-controller")
clientSet := clientset.NewForConfigOrDie(&restConfig)
metadataClient, err := metadata.NewForConfig(&restConfig)
if err != nil {
tb.Fatalf("Failed to create metadataClient: %v", err)
}
discoverResourcesFn := clientSet.Discovery().ServerPreferredNamespacedResources
controller := namespace.NewNamespaceController(
ctx,
clientSet,
metadataClient,
discoverResourcesFn,
informerSet.Core().V1().Namespaces(),
10*time.Hour,
v1.FinalizerKubernetes)
return func() {
go controller.Run(ctx, 5)
}
}
// TestContext store necessary context info // TestContext store necessary context info
type TestContext struct { type TestContext struct {
NS *v1.Namespace NS *v1.Namespace