Fix failure(context canceled) in scheduler_perf benchmark (#114843)

* Fix failure in scheduler_perf benchmark

Signed-off-by: Kante Yin <kerthcet@gmail.com>

* Fatal when error in cleaning up nodes in scheduler perf tests

Signed-off-by: Kante Yin <kerthcet@gmail.com>

* Use derived context to better organize the codes

Signed-off-by: Kante Yin <kerthcet@gmail.com>

* Change log level to 2 in scheduler perf-test

Signed-off-by: Kante Yin <kerthcet@gmail.com>

---------

Signed-off-by: Kante Yin <kerthcet@gmail.com>
This commit is contained in:
Kante Yin 2023-01-31 08:21:00 +08:00 committed by GitHub
parent 2eb2c88b1d
commit 3d0894fabf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 33 additions and 33 deletions

View File

@ -17,11 +17,18 @@ limitations under the License.
package benchmark package benchmark
import ( import (
"flag"
"testing" "testing"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
) )
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
// Run with -v=2, this is the default log level in production.
ktesting.DefaultConfig = ktesting.NewConfig(ktesting.Verbosity(2))
ktesting.DefaultConfig.AddFlags(flag.CommandLine)
flag.Parse()
framework.EtcdMain(m.Run) framework.EtcdMain(m.Run)
} }

View File

@ -584,10 +584,14 @@ func BenchmarkPerfScheduling(b *testing.B) {
b.Run(tc.Name, func(b *testing.B) { b.Run(tc.Name, func(b *testing.B) {
for _, w := range tc.Workloads { for _, w := range tc.Workloads {
b.Run(w.Name, func(b *testing.B) { b.Run(w.Name, func(b *testing.B) {
// 30 minutes should be plenty enough even for the 5000-node tests.
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Minute)
b.Cleanup(cancel)
for feature, flag := range tc.FeatureGates { for feature, flag := range tc.FeatureGates {
defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, feature, flag)() defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, feature, flag)()
} }
dataItems.DataItems = append(dataItems.DataItems, runWorkload(b, tc, w)...) dataItems.DataItems = append(dataItems.DataItems, runWorkload(ctx, b, tc, w)...)
// Reset metrics to prevent metrics generated in current workload gets // Reset metrics to prevent metrics generated in current workload gets
// carried over to the next workload. // carried over to the next workload.
legacyregistry.Reset() legacyregistry.Reset()
@ -639,10 +643,7 @@ func unrollWorkloadTemplate(b *testing.B, wt []op, w *workload) []op {
return unrolled return unrolled
} }
func runWorkload(b *testing.B, tc *testCase, w *workload) []DataItem { func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) []DataItem {
// 30 minutes should be plenty enough even for the 5000-node tests.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()
var cfg *config.KubeSchedulerConfiguration var cfg *config.KubeSchedulerConfiguration
var err error var err error
if tc.SchedulerConfigPath != nil { if tc.SchedulerConfigPath != nil {
@ -654,7 +655,7 @@ func runWorkload(b *testing.B, tc *testCase, w *workload) []DataItem {
b.Fatalf("validate scheduler config file failed: %v", err) b.Fatalf("validate scheduler config file failed: %v", err)
} }
} }
finalFunc, podInformer, client, dynClient := mustSetupScheduler(b, cfg) finalFunc, podInformer, client, dynClient := mustSetupScheduler(ctx, b, cfg)
b.Cleanup(finalFunc) b.Cleanup(finalFunc)
var mu sync.Mutex var mu sync.Mutex
@ -665,7 +666,7 @@ func runWorkload(b *testing.B, tc *testCase, w *workload) []DataItem {
numPodsScheduledPerNamespace := make(map[string]int) numPodsScheduledPerNamespace := make(map[string]int)
b.Cleanup(func() { b.Cleanup(func() {
for namespace := range numPodsScheduledPerNamespace { for namespace := range numPodsScheduledPerNamespace {
if err := client.CoreV1().Namespaces().Delete(context.Background(), namespace, metav1.DeleteOptions{}); err != nil { if err := client.CoreV1().Namespaces().Delete(ctx, namespace, metav1.DeleteOptions{}); err != nil {
b.Errorf("Deleting Namespace in numPodsScheduledPerNamespace: %v", err) b.Errorf("Deleting Namespace in numPodsScheduledPerNamespace: %v", err)
} }
} }
@ -691,7 +692,9 @@ func runWorkload(b *testing.B, tc *testCase, w *workload) []DataItem {
b.Fatalf("op %d: %v", opIndex, err) b.Fatalf("op %d: %v", opIndex, err)
} }
b.Cleanup(func() { b.Cleanup(func() {
_ = nodePreparer.CleanupNodes(ctx) if err := nodePreparer.CleanupNodes(ctx); err != nil {
b.Fatalf("failed to clean up nodes, error: %v", err)
}
}) })
nextNodeIndex += concreteOp.Count nextNodeIndex += concreteOp.Count
@ -700,8 +703,8 @@ func runWorkload(b *testing.B, tc *testCase, w *workload) []DataItem {
if err != nil { if err != nil {
b.Fatalf("op %d: %v", opIndex, err) b.Fatalf("op %d: %v", opIndex, err)
} }
if err := nsPreparer.prepare(); err != nil { if err := nsPreparer.prepare(ctx); err != nil {
nsPreparer.cleanup() nsPreparer.cleanup(ctx)
b.Fatalf("op %d: %v", opIndex, err) b.Fatalf("op %d: %v", opIndex, err)
} }
for _, n := range nsPreparer.namespaces() { for _, n := range nsPreparer.namespaces() {
@ -985,6 +988,7 @@ func waitUntilPodsScheduledInNamespace(ctx context.Context, b *testing.B, podInf
return false, err return false, err
} }
if len(scheduled) >= wantCount { if len(scheduled) >= wantCount {
b.Logf("scheduling succeed")
return true, nil return true, nil
} }
b.Logf("namespace: %s, pods: want %d, got %d", namespace, wantCount, len(scheduled)) b.Logf("namespace: %s, pods: want %d, got %d", namespace, wantCount, len(scheduled))
@ -1195,7 +1199,7 @@ func (p *namespacePreparer) namespaces() []string {
} }
// prepare creates the namespaces. // prepare creates the namespaces.
func (p *namespacePreparer) prepare() error { func (p *namespacePreparer) prepare(ctx context.Context) error {
base := &v1.Namespace{} base := &v1.Namespace{}
if p.spec != nil { if p.spec != nil {
base = p.spec base = p.spec
@ -1205,7 +1209,7 @@ func (p *namespacePreparer) prepare() error {
n := base.DeepCopy() n := base.DeepCopy()
n.Name = fmt.Sprintf("%s-%d", p.prefix, i) n.Name = fmt.Sprintf("%s-%d", p.prefix, i)
if err := testutils.RetryWithExponentialBackOff(func() (bool, error) { if err := testutils.RetryWithExponentialBackOff(func() (bool, error) {
_, err := p.client.CoreV1().Namespaces().Create(context.Background(), n, metav1.CreateOptions{}) _, err := p.client.CoreV1().Namespaces().Create(ctx, n, metav1.CreateOptions{})
return err == nil || apierrors.IsAlreadyExists(err), nil return err == nil || apierrors.IsAlreadyExists(err), nil
}); err != nil { }); err != nil {
return err return err
@ -1215,11 +1219,11 @@ func (p *namespacePreparer) prepare() error {
} }
// cleanup deletes existing test namespaces. // cleanup deletes existing test namespaces.
func (p *namespacePreparer) cleanup() error { func (p *namespacePreparer) cleanup(ctx context.Context) error {
var errRet error var errRet error
for i := 0; i < p.count; i++ { for i := 0; i < p.count; i++ {
n := fmt.Sprintf("%s-%d", p.prefix, i) n := fmt.Sprintf("%s-%d", p.prefix, i)
if err := p.client.CoreV1().Namespaces().Delete(context.Background(), n, metav1.DeleteOptions{}); err != nil { if err := p.client.CoreV1().Namespaces().Delete(ctx, n, metav1.DeleteOptions{}); err != nil {
p.t.Errorf("Deleting Namespace: %v", err) p.t.Errorf("Deleting Namespace: %v", err)
errRet = err errRet = err
} }

View File

@ -75,7 +75,8 @@ func newDefaultComponentConfig() (*config.KubeSchedulerConfiguration, error) {
// remove resources after finished. // remove resources after finished.
// Notes on rate limiter: // Notes on rate limiter:
// - client rate limit is set to 5000. // - client rate limit is set to 5000.
func mustSetupScheduler(b *testing.B, config *config.KubeSchedulerConfiguration) (util.ShutdownFunc, coreinformers.PodInformer, clientset.Interface, dynamic.Interface) { func mustSetupScheduler(ctx context.Context, b *testing.B, config *config.KubeSchedulerConfiguration) (util.ShutdownFunc, coreinformers.PodInformer, clientset.Interface, dynamic.Interface) {
ctx, cancel := context.WithCancel(ctx)
// Run API server with minimimal logging by default. Can be raised with -v. // Run API server with minimimal logging by default. Can be raised with -v.
framework.MinVerbosity = 0 framework.MinVerbosity = 0
@ -106,12 +107,11 @@ func mustSetupScheduler(b *testing.B, config *config.KubeSchedulerConfiguration)
// Not all config options will be effective but only those mostly related with scheduler performance will // Not all config options will be effective but only those mostly related with scheduler performance will
// be applied to start a scheduler, most of them are defined in `scheduler.schedulerOptions`. // be applied to start a scheduler, most of them are defined in `scheduler.schedulerOptions`.
_, podInformer, schedulerShutdown := util.StartScheduler(client, cfg, config) _, podInformer := util.StartScheduler(ctx, client, cfg, config)
fakePVControllerShutdown := util.StartFakePVController(client) util.StartFakePVController(ctx, client)
shutdownFn := func() { shutdownFn := func() {
fakePVControllerShutdown() cancel()
schedulerShutdown()
tearDownFn() tearDownFn()
} }

View File

@ -67,9 +67,7 @@ type ShutdownFunc func()
// StartScheduler configures and starts a scheduler given a handle to the clientSet interface // StartScheduler configures and starts a scheduler given a handle to the clientSet interface
// and event broadcaster. It returns the running scheduler, podInformer and the shutdown function to stop it. // and event broadcaster. It returns the running scheduler, podInformer and the shutdown function to stop it.
func StartScheduler(clientSet clientset.Interface, kubeConfig *restclient.Config, cfg *kubeschedulerconfig.KubeSchedulerConfiguration) (*scheduler.Scheduler, coreinformers.PodInformer, ShutdownFunc) { func StartScheduler(ctx context.Context, clientSet clientset.Interface, kubeConfig *restclient.Config, cfg *kubeschedulerconfig.KubeSchedulerConfiguration) (*scheduler.Scheduler, coreinformers.PodInformer) {
ctx, cancel := context.WithCancel(context.Background())
informerFactory := scheduler.NewInformerFactory(clientSet, 0) informerFactory := scheduler.NewInformerFactory(clientSet, 0)
evtBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ evtBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
Interface: clientSet.EventsV1()}) Interface: clientSet.EventsV1()})
@ -97,24 +95,16 @@ func StartScheduler(clientSet clientset.Interface, kubeConfig *restclient.Config
informerFactory.WaitForCacheSync(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done())
go sched.Run(ctx) go sched.Run(ctx)
shutdownFunc := func() { return sched, informerFactory.Core().V1().Pods()
klog.Infof("destroying scheduler")
cancel()
klog.Infof("destroyed scheduler")
}
return sched, informerFactory.Core().V1().Pods(), shutdownFunc
} }
// StartFakePVController is a simplified pv controller logic that sets PVC VolumeName and annotation for each PV binding. // StartFakePVController is a simplified pv controller logic that sets PVC VolumeName and annotation for each PV binding.
// TODO(mborsz): Use a real PV controller here. // TODO(mborsz): Use a real PV controller here.
func StartFakePVController(clientSet clientset.Interface) ShutdownFunc { func StartFakePVController(ctx context.Context, clientSet clientset.Interface) {
ctx, cancel := context.WithCancel(context.Background())
informerFactory := informers.NewSharedInformerFactory(clientSet, 0) informerFactory := informers.NewSharedInformerFactory(clientSet, 0)
pvInformer := informerFactory.Core().V1().PersistentVolumes() pvInformer := informerFactory.Core().V1().PersistentVolumes()
syncPV := func(obj *v1.PersistentVolume) { syncPV := func(obj *v1.PersistentVolume) {
ctx := context.Background()
if obj.Spec.ClaimRef != nil { if obj.Spec.ClaimRef != nil {
claimRef := obj.Spec.ClaimRef claimRef := obj.Spec.ClaimRef
pvc, err := clientSet.CoreV1().PersistentVolumeClaims(claimRef.Namespace).Get(ctx, claimRef.Name, metav1.GetOptions{}) pvc, err := clientSet.CoreV1().PersistentVolumeClaims(claimRef.Namespace).Get(ctx, claimRef.Name, metav1.GetOptions{})
@ -145,7 +135,6 @@ func StartFakePVController(clientSet clientset.Interface) ShutdownFunc {
}) })
informerFactory.Start(ctx.Done()) informerFactory.Start(ctx.Done())
return ShutdownFunc(cancel)
} }
// TestContext store necessary context info // TestContext store necessary context info