mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #126282 from macsko/fix_scheduler_perf_tests_taking_too_long
Init etcd and apiserver per test case in scheduler_perf integration tests
This commit is contained in:
commit
b95f9c32d6
@ -24,7 +24,6 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
||||||
"k8s.io/apimachinery/pkg/api/meta"
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||||
@ -142,18 +141,6 @@ func (c *createAny) create(tCtx ktesting.TContext, env map[string]any) {
|
|||||||
}
|
}
|
||||||
_, err = resourceClient.Create(tCtx, obj, options)
|
_, err = resourceClient.Create(tCtx, obj, options)
|
||||||
}
|
}
|
||||||
if err == nil && shouldCleanup(tCtx) {
|
|
||||||
tCtx.CleanupCtx(func(tCtx ktesting.TContext) {
|
|
||||||
del := resourceClient.Delete
|
|
||||||
if mapping.Scope.Name() != meta.RESTScopeNameNamespace {
|
|
||||||
del = resourceClient.Namespace(c.Namespace).Delete
|
|
||||||
}
|
|
||||||
err := del(tCtx, obj.GetName(), metav1.DeleteOptions{})
|
|
||||||
if !apierrors.IsNotFound(err) {
|
|
||||||
tCtx.ExpectNoError(err, fmt.Sprintf("deleting %s.%s %s", obj.GetKind(), obj.GetAPIVersion(), klog.KObj(obj)))
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// Retry, some errors (like CRD just created and type not ready for use yet) are temporary.
|
// Retry, some errors (like CRD just created and type not ready for use yet) are temporary.
|
||||||
|
@ -31,8 +31,6 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
|
||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
"k8s.io/apimachinery/pkg/api/meta"
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
@ -764,32 +762,35 @@ func initTestOutput(tb testing.TB) io.Writer {
|
|||||||
return output
|
return output
|
||||||
}
|
}
|
||||||
|
|
||||||
type cleanupKeyType struct{}
|
|
||||||
|
|
||||||
var cleanupKey = cleanupKeyType{}
|
|
||||||
|
|
||||||
// shouldCleanup returns true if a function should clean up resource in the
|
|
||||||
// apiserver when the test is done. This is true for unit tests (etcd and
|
|
||||||
// apiserver get reused) and false for benchmarks (each benchmark starts with a
|
|
||||||
// clean state, so cleaning up just wastes time).
|
|
||||||
//
|
|
||||||
// The default if not explicitly set in the context is true.
|
|
||||||
func shouldCleanup(ctx context.Context) bool {
|
|
||||||
val := ctx.Value(cleanupKey)
|
|
||||||
if enabled, ok := val.(bool); ok {
|
|
||||||
return enabled
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// withCleanup sets whether cleaning up resources in the apiserver
|
|
||||||
// should be done. The default is true.
|
|
||||||
func withCleanup(tCtx ktesting.TContext, enabled bool) ktesting.TContext {
|
|
||||||
return ktesting.WithValue(tCtx, cleanupKey, enabled)
|
|
||||||
}
|
|
||||||
|
|
||||||
var perfSchedulingLabelFilter = flag.String("perf-scheduling-label-filter", "performance", "comma-separated list of labels which a testcase must have (no prefix or +) or must not have (-), used by BenchmarkPerfScheduling")
|
var perfSchedulingLabelFilter = flag.String("perf-scheduling-label-filter", "performance", "comma-separated list of labels which a testcase must have (no prefix or +) or must not have (-), used by BenchmarkPerfScheduling")
|
||||||
|
|
||||||
|
func setupTestCase(t testing.TB, tc *testCase, output io.Writer, outOfTreePluginRegistry frameworkruntime.Registry) (informers.SharedInformerFactory, ktesting.TContext) {
|
||||||
|
tCtx := ktesting.Init(t, initoption.PerTestOutput(*useTestingLog))
|
||||||
|
|
||||||
|
// Ensure that there are no leaked
|
||||||
|
// goroutines. They could influence
|
||||||
|
// performance of the next benchmark.
|
||||||
|
// This must *after* RedirectKlog
|
||||||
|
// because then during cleanup, the
|
||||||
|
// test will wait for goroutines to
|
||||||
|
// quit *before* restoring klog settings.
|
||||||
|
framework.GoleakCheck(t)
|
||||||
|
|
||||||
|
// Now that we are ready to run, start
|
||||||
|
// etcd.
|
||||||
|
framework.StartEtcd(t, output)
|
||||||
|
|
||||||
|
for feature, flag := range tc.FeatureGates {
|
||||||
|
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, feature, flag)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 30 minutes should be plenty enough even for the 5000-node tests.
|
||||||
|
timeout := 30 * time.Minute
|
||||||
|
tCtx = ktesting.WithTimeout(tCtx, timeout, fmt.Sprintf("timed out after the %s per-test timeout", timeout))
|
||||||
|
|
||||||
|
return setupClusterForWorkload(tCtx, tc.SchedulerConfigPath, tc.FeatureGates, outOfTreePluginRegistry)
|
||||||
|
}
|
||||||
|
|
||||||
// RunBenchmarkPerfScheduling runs the scheduler performance tests.
|
// RunBenchmarkPerfScheduling runs the scheduler performance tests.
|
||||||
//
|
//
|
||||||
// You can pass your own scheduler plugins via outOfTreePluginRegistry.
|
// You can pass your own scheduler plugins via outOfTreePluginRegistry.
|
||||||
@ -821,33 +822,8 @@ func RunBenchmarkPerfScheduling(b *testing.B, outOfTreePluginRegistry frameworkr
|
|||||||
if !enabled(*perfSchedulingLabelFilter, append(tc.Labels, w.Labels...)...) {
|
if !enabled(*perfSchedulingLabelFilter, append(tc.Labels, w.Labels...)...) {
|
||||||
b.Skipf("disabled by label filter %q", *perfSchedulingLabelFilter)
|
b.Skipf("disabled by label filter %q", *perfSchedulingLabelFilter)
|
||||||
}
|
}
|
||||||
tCtx := ktesting.Init(b, initoption.PerTestOutput(*useTestingLog))
|
|
||||||
|
|
||||||
// Ensure that there are no leaked
|
informerFactory, tCtx := setupTestCase(b, tc, output, outOfTreePluginRegistry)
|
||||||
// goroutines. They could influence
|
|
||||||
// performance of the next benchmark.
|
|
||||||
// This must *after* RedirectKlog
|
|
||||||
// because then during cleanup, the
|
|
||||||
// test will wait for goroutines to
|
|
||||||
// quit *before* restoring klog settings.
|
|
||||||
framework.GoleakCheck(b)
|
|
||||||
|
|
||||||
// Now that we are ready to run, start
|
|
||||||
// etcd.
|
|
||||||
framework.StartEtcd(b, output)
|
|
||||||
|
|
||||||
// 30 minutes should be plenty enough even for the 5000-node tests.
|
|
||||||
timeout := 30 * time.Minute
|
|
||||||
tCtx = ktesting.WithTimeout(tCtx, timeout, fmt.Sprintf("timed out after the %s per-test timeout", timeout))
|
|
||||||
|
|
||||||
for feature, flag := range tc.FeatureGates {
|
|
||||||
featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, feature, flag)
|
|
||||||
}
|
|
||||||
informerFactory, tCtx := setupClusterForWorkload(tCtx, tc.SchedulerConfigPath, tc.FeatureGates, outOfTreePluginRegistry)
|
|
||||||
|
|
||||||
// No need to clean up, each benchmark testcase starts with an empty
|
|
||||||
// etcd database.
|
|
||||||
tCtx = withCleanup(tCtx, false)
|
|
||||||
|
|
||||||
results := runWorkload(tCtx, tc, w, informerFactory)
|
results := runWorkload(tCtx, tc, w, informerFactory)
|
||||||
dataItems.DataItems = append(dataItems.DataItems, results...)
|
dataItems.DataItems = append(dataItems.DataItems, results...)
|
||||||
@ -889,16 +865,6 @@ func RunBenchmarkPerfScheduling(b *testing.B, outOfTreePluginRegistry frameworkr
|
|||||||
|
|
||||||
var testSchedulingLabelFilter = flag.String("test-scheduling-label-filter", "integration-test", "comma-separated list of labels which a testcase must have (no prefix or +) or must not have (-), used by TestScheduling")
|
var testSchedulingLabelFilter = flag.String("test-scheduling-label-filter", "integration-test", "comma-separated list of labels which a testcase must have (no prefix or +) or must not have (-), used by TestScheduling")
|
||||||
|
|
||||||
type schedulerConfig struct {
|
|
||||||
schedulerConfigPath string
|
|
||||||
featureGates map[featuregate.Feature]bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c schedulerConfig) equals(tc *testCase) bool {
|
|
||||||
return c.schedulerConfigPath == tc.SchedulerConfigPath &&
|
|
||||||
cmp.Equal(c.featureGates, tc.FeatureGates)
|
|
||||||
}
|
|
||||||
|
|
||||||
func loadSchedulerConfig(file string) (*config.KubeSchedulerConfiguration, error) {
|
func loadSchedulerConfig(file string) (*config.KubeSchedulerConfiguration, error) {
|
||||||
data, err := os.ReadFile(file)
|
data, err := os.ReadFile(file)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -997,7 +963,6 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
|
|||||||
b.ReportMetric(duration.Seconds(), "runtime_seconds")
|
b.ReportMetric(duration.Seconds(), "runtime_seconds")
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
cleanup := shouldCleanup(tCtx)
|
|
||||||
|
|
||||||
// Disable error checking of the sampling interval length in the
|
// Disable error checking of the sampling interval length in the
|
||||||
// throughput collector by default. When running benchmarks, report
|
// throughput collector by default. When running benchmarks, report
|
||||||
@ -1028,11 +993,6 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
|
|||||||
// All namespaces listed in numPodsScheduledPerNamespace will be cleaned up.
|
// All namespaces listed in numPodsScheduledPerNamespace will be cleaned up.
|
||||||
numPodsScheduledPerNamespace := make(map[string]int)
|
numPodsScheduledPerNamespace := make(map[string]int)
|
||||||
|
|
||||||
if cleanup {
|
|
||||||
// This must run before controllers get shut down.
|
|
||||||
defer cleanupWorkload(tCtx, tc, numPodsScheduledPerNamespace)
|
|
||||||
}
|
|
||||||
|
|
||||||
for opIndex, op := range unrollWorkloadTemplate(tCtx, tc.WorkloadTemplate, w) {
|
for opIndex, op := range unrollWorkloadTemplate(tCtx, tc.WorkloadTemplate, w) {
|
||||||
realOp, err := op.realOp.patchParams(w)
|
realOp, err := op.realOp.patchParams(w)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1052,13 +1012,6 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
|
|||||||
if err := nodePreparer.PrepareNodes(tCtx, nextNodeIndex); err != nil {
|
if err := nodePreparer.PrepareNodes(tCtx, nextNodeIndex); err != nil {
|
||||||
tCtx.Fatalf("op %d: %v", opIndex, err)
|
tCtx.Fatalf("op %d: %v", opIndex, err)
|
||||||
}
|
}
|
||||||
if cleanup {
|
|
||||||
defer func() {
|
|
||||||
if err := nodePreparer.CleanupNodes(tCtx); err != nil {
|
|
||||||
tCtx.Fatalf("failed to clean up nodes, error: %v", err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
nextNodeIndex += concreteOp.Count
|
nextNodeIndex += concreteOp.Count
|
||||||
|
|
||||||
case *createNamespacesOp:
|
case *createNamespacesOp:
|
||||||
@ -1333,51 +1286,6 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
|
|||||||
return dataItems
|
return dataItems
|
||||||
}
|
}
|
||||||
|
|
||||||
// cleanupWorkload ensures that everything is removed from the API server that
|
|
||||||
// might have been created by runWorkload. This must be done before starting
|
|
||||||
// the next workload because otherwise it might stumble over previously created
|
|
||||||
// objects. For example, the namespaces are the same in different workloads, so
|
|
||||||
// not deleting them would cause the next one to fail with "cannot create
|
|
||||||
// namespace: already exists".
|
|
||||||
//
|
|
||||||
// Calling cleanupWorkload can be skipped if it is known that the next workload
|
|
||||||
// will run with a fresh etcd instance.
|
|
||||||
func cleanupWorkload(tCtx ktesting.TContext, tc *testCase, numPodsScheduledPerNamespace map[string]int) {
|
|
||||||
deleteNow := *metav1.NewDeleteOptions(0)
|
|
||||||
for namespace := range numPodsScheduledPerNamespace {
|
|
||||||
// Pods have to be deleted explicitly, with no grace period. Normally
|
|
||||||
// kubelet will set the DeletionGracePeriodSeconds to zero when it's okay
|
|
||||||
// to remove a deleted pod, but we don't run kubelet...
|
|
||||||
if err := tCtx.Client().CoreV1().Pods(namespace).DeleteCollection(tCtx, deleteNow, metav1.ListOptions{}); err != nil {
|
|
||||||
tCtx.Fatalf("failed to delete pods in namespace %q: %v", namespace, err)
|
|
||||||
}
|
|
||||||
if err := tCtx.Client().CoreV1().Namespaces().Delete(tCtx, namespace, deleteNow); err != nil {
|
|
||||||
tCtx.Fatalf("Deleting Namespace %q in numPodsScheduledPerNamespace: %v", namespace, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// We need to wait here because even with deletion timestamp set,
|
|
||||||
// actually removing a namespace can take some time (garbage collecting
|
|
||||||
// other generated object like secrets, etc.) and we don't want to
|
|
||||||
// start the next workloads while that cleanup is still going on.
|
|
||||||
if err := wait.PollUntilContextTimeout(tCtx, time.Second, 5*time.Minute, false, func(ctx context.Context) (bool, error) {
|
|
||||||
namespaces, err := tCtx.Client().CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
for _, namespace := range namespaces.Items {
|
|
||||||
if _, ok := numPodsScheduledPerNamespace[namespace.Name]; ok {
|
|
||||||
// A namespace created by the workload, need to wait.
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// All namespaces gone.
|
|
||||||
return true, nil
|
|
||||||
}); err != nil {
|
|
||||||
tCtx.Fatalf("failed while waiting for namespace removal: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func createNamespaceIfNotPresent(tCtx ktesting.TContext, namespace string, podsPerNamespace *map[string]int) {
|
func createNamespaceIfNotPresent(tCtx ktesting.TContext, namespace string, podsPerNamespace *map[string]int) {
|
||||||
if _, ok := (*podsPerNamespace)[namespace]; !ok {
|
if _, ok := (*podsPerNamespace)[namespace]; !ok {
|
||||||
// The namespace has not created yet.
|
// The namespace has not created yet.
|
||||||
|
@ -18,11 +18,6 @@ package benchmark
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
|
||||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
|
||||||
"k8s.io/kubernetes/test/integration/framework"
|
|
||||||
"k8s.io/kubernetes/test/utils/ktesting"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestScheduling(t *testing.T) {
|
func TestScheduling(t *testing.T) {
|
||||||
@ -34,69 +29,18 @@ func TestScheduling(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for leaks at the very end.
|
|
||||||
framework.GoleakCheck(t)
|
|
||||||
|
|
||||||
// All integration test cases share the same etcd, similar to
|
|
||||||
// https://github.com/kubernetes/kubernetes/blob/18d05b646d09b2971dc5400bc288062b0414e8cf/test/integration/framework/etcd.go#L186-L222.
|
|
||||||
framework.StartEtcd(t, nil)
|
|
||||||
|
|
||||||
// Workloads with the same configuration share the same apiserver. For that
|
|
||||||
// we first need to determine what those different configs are.
|
|
||||||
var configs []schedulerConfig
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
tcEnabled := false
|
t.Run(tc.Name, func(t *testing.T) {
|
||||||
for _, w := range tc.Workloads {
|
for _, w := range tc.Workloads {
|
||||||
if enabled(*testSchedulingLabelFilter, append(tc.Labels, w.Labels...)...) {
|
t.Run(w.Name, func(t *testing.T) {
|
||||||
tcEnabled = true
|
if !enabled(*testSchedulingLabelFilter, append(tc.Labels, w.Labels...)...) {
|
||||||
break
|
t.Skipf("disabled by label filter %q", *testSchedulingLabelFilter)
|
||||||
}
|
|
||||||
}
|
|
||||||
if !tcEnabled {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
exists := false
|
|
||||||
for _, config := range configs {
|
|
||||||
if config.equals(tc) {
|
|
||||||
exists = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !exists {
|
|
||||||
configs = append(configs, schedulerConfig{schedulerConfigPath: tc.SchedulerConfigPath, featureGates: tc.FeatureGates})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, config := range configs {
|
|
||||||
// Not a sub test because we don't have a good name for it.
|
|
||||||
func() {
|
|
||||||
tCtx := ktesting.Init(t)
|
|
||||||
|
|
||||||
// No timeout here because the `go test -timeout` will ensure that
|
|
||||||
// the test doesn't get stuck forever.
|
|
||||||
|
|
||||||
for feature, flag := range config.featureGates {
|
|
||||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, feature, flag)
|
|
||||||
}
|
|
||||||
informerFactory, tCtx := setupClusterForWorkload(tCtx, config.schedulerConfigPath, config.featureGates, nil)
|
|
||||||
|
|
||||||
for _, tc := range testCases {
|
|
||||||
if !config.equals(tc) {
|
|
||||||
// Runs with some other config.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Run(tc.Name, func(t *testing.T) {
|
|
||||||
for _, w := range tc.Workloads {
|
|
||||||
t.Run(w.Name, func(t *testing.T) {
|
|
||||||
if !enabled(*testSchedulingLabelFilter, append(tc.Labels, w.Labels...)...) {
|
|
||||||
t.Skipf("disabled by label filter %q", *testSchedulingLabelFilter)
|
|
||||||
}
|
|
||||||
tCtx := ktesting.WithTB(tCtx, t)
|
|
||||||
runWorkload(tCtx, tc, w, informerFactory)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
informerFactory, tCtx := setupTestCase(t, tc, nil, nil)
|
||||||
|
|
||||||
|
runWorkload(tCtx, tc, w, informerFactory)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}()
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user