diff --git a/test/integration/scheduler_perf/config/performance-config.yaml b/test/integration/scheduler_perf/config/performance-config.yaml index 82a0abc9e87..8f29ab49599 100644 --- a/test/integration/scheduler_perf/config/performance-config.yaml +++ b/test/integration/scheduler_perf/config/performance-config.yaml @@ -1167,7 +1167,9 @@ maxClaimsPerNode: 20 # SchedulingWithResourceClaimTemplateStructured uses a ResourceClaimTemplate -# and dynamically creates ResourceClaim instances for each pod. +# and dynamically creates ResourceClaim instances for each pod. Node, pod and +# device counts are chosen so that the cluster gets filled up completely. +# # The driver uses structured parameters. - name: SchedulingWithResourceClaimTemplateStructured featureGates: @@ -1234,6 +1236,104 @@ measurePods: 2500 maxClaimsPerNode: 10 +# SteadyStateResourceClaimTemplateStructured uses a ResourceClaimTemplate +# and dynamically creates ResourceClaim instances for each pod, but never +# more than 10 at a time. Then it waits for a pod to get scheduled +# before deleting it and creating another one. +# +# The workload determines whether there are other pods in the cluster. +# +# The driver uses structured parameters. +- name: SteadyStateClusterResourceClaimTemplateStructured + featureGates: + DynamicResourceAllocation: true + # SchedulerQueueingHints: true + workloadTemplate: + - opcode: createNodes + countParam: $nodesWithoutDRA + - opcode: createNodes + nodeTemplatePath: config/dra/node-with-dra-test-driver.yaml + countParam: $nodesWithDRA + - opcode: createResourceDriver + driverName: test-driver.cdi.k8s.io + nodes: scheduler-perf-dra-* + maxClaimsPerNodeParam: $maxClaimsPerNode + structuredParameters: true + - opcode: createAny + templatePath: config/dra/deviceclass-structured.yaml + - opcode: createAny + templatePath: config/dra/resourceclaimtemplate-structured.yaml + namespace: init + - opcode: createPods + namespace: init + countParam: $initPods + podTemplatePath: config/dra/pod-with-claim-template.yaml + - opcode: createAny + templatePath: config/dra/resourceclaimtemplate-structured.yaml + namespace: test + - opcode: createPods + namespace: test + count: 10 + steadyState: true + durationParam: $duration + podTemplatePath: config/dra/pod-with-claim-template.yaml + collectMetrics: true + workloads: + - name: fast + labels: [integration-test, fast, short] + params: + # This testcase runs through all code paths without + # taking too long overall. + nodesWithDRA: 1 + nodesWithoutDRA: 1 + initPods: 0 + maxClaimsPerNode: 10 + duration: 2s + - name: empty_100nodes + params: + nodesWithDRA: 100 + nodesWithoutDRA: 0 + initPods: 0 + maxClaimsPerNode: 2 + duration: 10s + - name: empty_200nodes + params: + nodesWithDRA: 200 + nodesWithoutDRA: 0 + initPods: 0 + maxClaimsPerNode: 2 + duration: 10s + - name: empty_500nodes + params: + nodesWithDRA: 500 + nodesWithoutDRA: 0 + initPods: 0 + maxClaimsPerNode: 2 + duration: 10s + # In the "full" scenarios, the cluster can accommodate exactly one additional pod. + # These are slower because scheduling the initial pods takes time. + - name: full_100nodes + params: + nodesWithDRA: 100 + nodesWithoutDRA: 0 + initPods: 199 + maxClaimsPerNode: 2 + duration: 10s + - name: full_200nodes + params: + nodesWithDRA: 200 + nodesWithoutDRA: 0 + initPods: 399 + maxClaimsPerNode: 2 + duration: 10s + - name: full_500nodes + params: + nodesWithDRA: 500 + nodesWithoutDRA: 0 + initPods: 999 + maxClaimsPerNode: 2 + duration: 10s + # SchedulingWithResourceClaimTemplate uses ResourceClaims # with deterministic names that are shared between pods. # There is a fixed ratio of 1:5 between claims and pods. diff --git a/test/integration/scheduler_perf/scheduler_perf.go b/test/integration/scheduler_perf/scheduler_perf.go index 2b88e12ba93..94da182036f 100644 --- a/test/integration/scheduler_perf/scheduler_perf.go +++ b/test/integration/scheduler_perf/scheduler_perf.go @@ -49,6 +49,7 @@ import ( coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/restmapper" + "k8s.io/client-go/tools/cache" "k8s.io/component-base/featuregate" featuregatetesting "k8s.io/component-base/featuregate/testing" logsapi "k8s.io/component-base/logs/api/v1" @@ -63,10 +64,12 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" "k8s.io/kubernetes/pkg/scheduler/metrics" + schedutil "k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/test/integration/framework" testutils "k8s.io/kubernetes/test/utils" "k8s.io/kubernetes/test/utils/ktesting" "k8s.io/kubernetes/test/utils/ktesting/initoption" + "k8s.io/utils/ptr" "sigs.k8s.io/yaml" ) @@ -341,7 +344,7 @@ func (ms thresholdMetricSelector) isValid(mcc *metricsCollectorConfig) error { } type params struct { - params map[string]int + params map[string]any // isUsed field records whether params is used or not. isUsed map[string]bool } @@ -358,14 +361,14 @@ type params struct { // to: // // params{ -// params: map[string]int{ +// params: map[string]any{ // "intNodes": 500, // "initPods": 50, // }, // isUsed: map[string]bool{}, // empty map // } func (p *params) UnmarshalJSON(b []byte) error { - aux := map[string]int{} + aux := map[string]any{} if err := json.Unmarshal(b, &aux); err != nil { return err @@ -376,14 +379,31 @@ func (p *params) UnmarshalJSON(b []byte) error { return nil } -// get returns param. +// get retrieves the parameter as an integer func (p params) get(key string) (int, error) { + // JSON unmarshals integer constants in an "any" field as float. + f, err := getParam[float64](p, key) + if err != nil { + return 0, err + } + return int(f), nil +} + +// getParam retrieves the parameter as specific type. There is no conversion, +// so in practice this means that only types that JSON unmarshaling uses +// (float64, string, bool) work. +func getParam[T float64 | string | bool](p params, key string) (T, error) { p.isUsed[key] = true param, ok := p.params[key] - if ok { - return param, nil + var t T + if !ok { + return t, fmt.Errorf("parameter %s is undefined", key) } - return 0, fmt.Errorf("parameter %s is undefined", key) + t, ok = param.(T) + if !ok { + return t, fmt.Errorf("parameter %s has the wrong type %T", key, param) + } + return t, nil } // unusedParams returns the names of unusedParams @@ -576,6 +596,27 @@ type createPodsOp struct { Count int // Template parameter for Count. CountParam string + // If false, Count pods get created rapidly. This can be used to + // measure how quickly the scheduler can fill up a cluster. + // + // If true, Count pods get created, the operation waits for + // a pod to get scheduled, deletes it and then creates another. + // This continues until the configured Duration is over. + // Metrics collection, if enabled, runs in parallel. + // + // This mode can be used to measure how the scheduler behaves + // in a steady state where the cluster is always at roughly the + // same level of utilization. Pods can be created in a separate, + // earlier operation to simulate non-empty clusters. + // + // Note that the operation will delete any scheduled pod in + // the namespace, so use different namespaces for pods that + // are supposed to be kept running. + SteadyState bool + // How long to keep the cluster in a steady state. + Duration metav1.Duration + // Template parameter for Duration. + DurationParam string // Whether or not to enable metrics collection for this createPodsOp. // Optional. Both CollectMetrics and SkipWaitToCompletion cannot be true at // the same time for a particular createPodsOp. @@ -608,6 +649,9 @@ func (cpo *createPodsOp) isValid(allowParameterization bool) error { // use-cases right now. return fmt.Errorf("collectMetrics and skipWaitToCompletion cannot be true at the same time") } + if cpo.SkipWaitToCompletion && cpo.SteadyState { + return errors.New("skipWaitToCompletion and steadyState cannot be true at the same time") + } return nil } @@ -623,6 +667,15 @@ func (cpo createPodsOp) patchParams(w *workload) (realOp, error) { return nil, err } } + if cpo.DurationParam != "" { + durationStr, err := getParam[string](w.Params, cpo.DurationParam[1:]) + if err != nil { + return nil, err + } + if cpo.Duration.Duration, err = time.ParseDuration(durationStr); err != nil { + return nil, fmt.Errorf("parsing duration parameter %s: %w", cpo.DurationParam, err) + } + } return &cpo, (&cpo).isValid(false) } @@ -1298,14 +1351,19 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact collectorCtx, collectors = startCollectingMetrics(tCtx, &collectorWG, podInformer, tc.MetricsCollectorConfig, throughputErrorMargin, opIndex, namespace, []string{namespace}) defer collectorCtx.Cancel("cleaning up") } - if err := createPods(tCtx, namespace, concreteOp); err != nil { + if err := createPodsRapidly(tCtx, namespace, concreteOp); err != nil { tCtx.Fatalf("op %d: %v", opIndex, err) } - if concreteOp.SkipWaitToCompletion { + switch { + case concreteOp.SkipWaitToCompletion: // Only record those namespaces that may potentially require barriers // in the future. numPodsScheduledPerNamespace[namespace] += concreteOp.Count - } else { + case concreteOp.SteadyState: + if err := createPodsSteadily(tCtx, namespace, podInformer, concreteOp); err != nil { + tCtx.Fatalf("op %d: %v", opIndex, err) + } + default: if err := waitUntilPodsScheduledInNamespace(tCtx, podInformer, namespace, concreteOp.Count); err != nil { tCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err) } @@ -1588,7 +1646,12 @@ func getNodePreparer(prefix string, cno *createNodesOp, clientset clientset.Inte ), nil } -func createPods(tCtx ktesting.TContext, namespace string, cpo *createPodsOp) error { +// createPodsRapidly implements the "create pods rapidly" mode of [createPodsOp]. +// It's a nop when cpo.SteadyState is true. +func createPodsRapidly(tCtx ktesting.TContext, namespace string, cpo *createPodsOp) error { + if cpo.SteadyState { + return nil + } strategy, err := getPodStrategy(cpo) if err != nil { return err @@ -1600,6 +1663,147 @@ func createPods(tCtx ktesting.TContext, namespace string, cpo *createPodsOp) err return podCreator.CreatePods(tCtx) } +// createPodsSteadily implements the "create pods and delete pods" mode of [createPodsOp]. +// It's a nop when cpo.SteadyState is false. +func createPodsSteadily(tCtx ktesting.TContext, namespace string, podInformer coreinformers.PodInformer, cpo *createPodsOp) error { + if !cpo.SteadyState { + return nil + } + strategy, err := getPodStrategy(cpo) + if err != nil { + return err + } + tCtx.Logf("creating pods in namespace %q for %s", namespace, cpo.Duration) + tCtx = ktesting.WithTimeout(tCtx, cpo.Duration.Duration, fmt.Sprintf("the operation ran for the configured %s", cpo.Duration.Duration)) + + // Start watching pods in the namespace. Any pod which is seen as being scheduled + // gets deleted. + scheduledPods := make(chan *v1.Pod, cpo.Count) + scheduledPodsClosed := false + var mutex sync.Mutex + defer func() { + mutex.Lock() + defer mutex.Unlock() + close(scheduledPods) + scheduledPodsClosed = true + }() + + existingPods := 0 + runningPods := 0 + onPodChange := func(oldObj, newObj any) { + oldPod, newPod, err := schedutil.As[*v1.Pod](oldObj, newObj) + if err != nil { + tCtx.Errorf("unexpected pod events: %v", err) + return + } + + mutex.Lock() + defer mutex.Unlock() + if oldPod == nil { + existingPods++ + } + if (oldPod == nil || oldPod.Spec.NodeName == "") && newPod.Spec.NodeName != "" { + // Got scheduled. + runningPods++ + + // Only ask for deletion in our namespace. + if newPod.Namespace != namespace { + return + } + if !scheduledPodsClosed { + select { + case <-tCtx.Done(): + case scheduledPods <- newPod: + } + } + } + } + handle, err := podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + onPodChange(nil, obj) + }, + UpdateFunc: func(oldObj, newObj any) { + onPodChange(oldObj, newObj) + }, + DeleteFunc: func(obj any) { + pod, _, err := schedutil.As[*v1.Pod](obj, nil) + if err != nil { + tCtx.Errorf("unexpected pod events: %v", err) + return + } + + existingPods-- + if pod.Spec.NodeName != "" { + runningPods-- + } + }, + }) + if err != nil { + return fmt.Errorf("register event handler: %w", err) + } + defer func() { + tCtx.ExpectNoError(podInformer.Informer().RemoveEventHandler(handle), "remove event handler") + }() + + // Seed the namespace with the initial number of pods. + if err := strategy(tCtx, tCtx.Client(), namespace, cpo.Count); err != nil { + return fmt.Errorf("create initial %d pods: %w", cpo.Count, err) + } + + // Now loop until we are done. Report periodically how many pods were scheduled. + countScheduledPods := 0 + lastCountScheduledPods := 0 + logPeriod := time.Second + ticker := time.NewTicker(logPeriod) + defer ticker.Stop() + for { + select { + case <-tCtx.Done(): + tCtx.Logf("Completed after seeing %d scheduled pod: %v", countScheduledPods, context.Cause(tCtx)) + return nil + case <-scheduledPods: + countScheduledPods++ + if countScheduledPods%cpo.Count == 0 { + // All scheduled. Start over with a new batch. + err := tCtx.Client().CoreV1().Pods(namespace).DeleteCollection(tCtx, metav1.DeleteOptions{ + GracePeriodSeconds: ptr.To(int64(0)), + PropagationPolicy: ptr.To(metav1.DeletePropagationBackground), // Foreground will block. + }, metav1.ListOptions{}) + // Ignore errors when the time is up. errors.Is(context.Canceled) would + // be more precise, but doesn't work because client-go doesn't reliably + // propagate it. Instead, this was seen: + // client rate limiter Wait returned an error: rate: Wait(n=1) would exceed context deadline + if tCtx.Err() != nil { + continue + } + if err != nil { + return fmt.Errorf("delete scheduled pods: %w", err) + } + err = strategy(tCtx, tCtx.Client(), namespace, cpo.Count) + if tCtx.Err() != nil { + continue + } + if err != nil { + return fmt.Errorf("create next batch of pods: %w", err) + } + } + case <-ticker.C: + delta := countScheduledPods - lastCountScheduledPods + lastCountScheduledPods = countScheduledPods + func() { + mutex.Lock() + defer mutex.Unlock() + + tCtx.Logf("%d pods got scheduled in total in namespace %q, overall %d out of %d pods scheduled: %f pods/s in last interval", + countScheduledPods, namespace, + runningPods, existingPods, + float64(delta)/logPeriod.Seconds(), + ) + }() + } + } +} + // waitUntilPodsScheduledInNamespace blocks until all pods in the given // namespace are scheduled. Times out after 10 minutes because even at the // lowest observed QPS of ~10 pods/sec, a 5000-node test should complete. diff --git a/test/integration/scheduler_perf/util.go b/test/integration/scheduler_perf/util.go index 2e93d42a62c..f2f95ff10bf 100644 --- a/test/integration/scheduler_perf/util.go +++ b/test/integration/scheduler_perf/util.go @@ -26,6 +26,7 @@ import ( "path" "sort" "strings" + "sync" "time" v1 "k8s.io/api/core/v1" @@ -35,6 +36,7 @@ import ( "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" restclient "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "k8s.io/component-base/featuregate" "k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/testutil" @@ -45,6 +47,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/apis/config" kubeschedulerscheme "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" + schedutil "k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/util" testutils "k8s.io/kubernetes/test/utils" @@ -378,7 +381,7 @@ type throughputCollector struct { podInformer coreinformers.PodInformer schedulingThroughputs []float64 labels map[string]string - namespaces []string + namespaces sets.Set[string] errorMargin float64 } @@ -386,7 +389,7 @@ func newThroughputCollector(podInformer coreinformers.PodInformer, labels map[st return &throughputCollector{ podInformer: podInformer, labels: labels, - namespaces: namespaces, + namespaces: sets.New(namespaces...), errorMargin: errorMargin, } } @@ -396,11 +399,75 @@ func (tc *throughputCollector) init() error { } func (tc *throughputCollector) run(tCtx ktesting.TContext) { - podsScheduled, _, err := getScheduledPods(tc.podInformer, tc.namespaces...) - if err != nil { - klog.Fatalf("%v", err) + // The collector is based on informer cache events instead of periodically listing pods because: + // - polling causes more overhead + // - it does not work when pods get created, scheduled and deleted quickly + // + // Normally, informers cannot be used to observe state changes reliably. + // They only guarantee that the *some* updates get reported, but not *all*. + // But in scheduler_perf, the scheduler and the test share the same informer, + // therefore we are guaranteed to see a new pod without NodeName (because + // that is what the scheduler needs to see to schedule it) and then the updated + // pod with NodeName (because nothing makes further changes to it). + var mutex sync.Mutex + scheduledPods := 0 + getScheduledPods := func() int { + mutex.Lock() + defer mutex.Unlock() + return scheduledPods } - lastScheduledCount := len(podsScheduled) + onPodChange := func(oldObj, newObj any) { + oldPod, newPod, err := schedutil.As[*v1.Pod](oldObj, newObj) + if err != nil { + tCtx.Errorf("unexpected pod events: %v", err) + return + } + + if !tc.namespaces.Has(newPod.Namespace) { + return + } + + mutex.Lock() + defer mutex.Unlock() + if (oldPod == nil || oldPod.Spec.NodeName == "") && newPod.Spec.NodeName != "" { + // Got scheduled. + scheduledPods++ + } + } + handle, err := tc.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + onPodChange(nil, obj) + }, + UpdateFunc: func(oldObj, newObj any) { + onPodChange(oldObj, newObj) + }, + }) + if err != nil { + tCtx.Fatalf("register pod event handler: %v", err) + } + defer func() { + tCtx.ExpectNoError(tc.podInformer.Informer().RemoveEventHandler(handle), "remove event handler") + }() + + // Waiting for the initial sync didn't work, `handle.HasSynced` always returned + // false - perhaps because the event handlers get added to a running informer. + // That's okay(ish), throughput is typically measured within an empty namespace. + // + // syncTicker := time.NewTicker(time.Millisecond) + // defer syncTicker.Stop() + // for { + // select { + // case <-syncTicker.C: + // if handle.HasSynced() { + // break + // } + // case <-tCtx.Done(): + // return + // } + // } + tCtx.Logf("Started pod throughput collector for namespace(s) %s, %d pods scheduled so far", sets.List(tc.namespaces), getScheduledPods()) + + lastScheduledCount := getScheduledPods() ticker := time.NewTicker(throughputSampleInterval) defer ticker.Stop() lastSampleTime := time.Now() @@ -413,12 +480,8 @@ func (tc *throughputCollector) run(tCtx ktesting.TContext) { return case <-ticker.C: now := time.Now() - podsScheduled, _, err := getScheduledPods(tc.podInformer, tc.namespaces...) - if err != nil { - klog.Fatalf("%v", err) - } - scheduled := len(podsScheduled) + scheduled := getScheduledPods() // Only do sampling if number of scheduled pods is greater than zero. if scheduled == 0 { continue