diff --git a/test/integration/scheduler_perf/scheduler_perf.go b/test/integration/scheduler_perf/scheduler_perf.go index 3911c9c5619..f24a6060eb4 100644 --- a/test/integration/scheduler_perf/scheduler_perf.go +++ b/test/integration/scheduler_perf/scheduler_perf.go @@ -807,6 +807,13 @@ func (co churnOp) patchParams(w *workload) (realOp, error) { return &co, nil } +type SchedulingStage string + +const ( + Scheduled SchedulingStage = "Scheduled" + Attempted SchedulingStage = "Attempted" +) + // barrierOp defines an op that can be used to wait until all scheduled pods of // one or many namespaces have been bound to nodes. This is useful when pods // were scheduled with SkipWaitToCompletion set to true. @@ -816,9 +823,16 @@ type barrierOp struct { // Namespaces to block on. Empty array or not specifying this field signifies // that the barrier should block on all namespaces. Namespaces []string + // Determines what stage of pods scheduling the barrier should wait for. + // If empty, it is interpreted as "Scheduled". + // Optional + StageRequirement SchedulingStage } func (bo *barrierOp) isValid(allowParameterization bool) error { + if bo.StageRequirement != "" && bo.StageRequirement != Scheduled && bo.StageRequirement != Attempted { + return fmt.Errorf("invalid StageRequirement %s", bo.StageRequirement) + } return nil } @@ -827,6 +841,9 @@ func (*barrierOp) collectsMetrics() bool { } func (bo barrierOp) patchParams(w *workload) (realOp, error) { + if bo.StageRequirement == "" { + bo.StageRequirement = Scheduled + } return &bo, nil } @@ -1565,16 +1582,26 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact tCtx.Fatalf("op %d: unknown namespace %s", opIndex, namespace) } } - if err := waitUntilPodsScheduled(tCtx, podInformer, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil { - tCtx.Fatalf("op %d: %v", opIndex, err) - } - // At the end of the barrier, we can be sure that there are no pods - // pending scheduling in the namespaces that we just blocked on. - if len(concreteOp.Namespaces) == 0 { - numPodsScheduledPerNamespace = make(map[string]int) - } else { - for _, namespace := range concreteOp.Namespaces { - delete(numPodsScheduledPerNamespace, namespace) + switch concreteOp.StageRequirement { + case Attempted: + if err := waitUntilPodsAttempted(tCtx, podInformer, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil { + tCtx.Fatalf("op %d: %v", opIndex, err) + } + case Scheduled: + // Default should be treated like "Scheduled", so handling both in the same way. + fallthrough + default: + if err := waitUntilPodsScheduled(tCtx, podInformer, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil { + tCtx.Fatalf("op %d: %v", opIndex, err) + } + // At the end of the barrier, we can be sure that there are no pods + // pending scheduling in the namespaces that we just blocked on. + if len(concreteOp.Namespaces) == 0 { + numPodsScheduledPerNamespace = make(map[string]int) + } else { + for _, namespace := range concreteOp.Namespaces { + delete(numPodsScheduledPerNamespace, namespace) + } } } @@ -1845,7 +1872,7 @@ func waitUntilPodsScheduledInNamespace(tCtx ktesting.TContext, podInformer corei return true, ctx.Err() default: } - scheduled, unscheduled, err := getScheduledPods(podInformer, namespace) + scheduled, attempted, unattempted, err := getScheduledPods(podInformer, namespace) if err != nil { return false, err } @@ -1854,8 +1881,10 @@ func waitUntilPodsScheduledInNamespace(tCtx ktesting.TContext, podInformer corei return true, nil } tCtx.Logf("namespace: %s, pods: want %d, got %d", namespace, wantCount, len(scheduled)) - if len(unscheduled) > 0 { - pendingPod = unscheduled[0] + if len(attempted) > 0 { + pendingPod = attempted[0] + } else if len(unattempted) > 0 { + pendingPod = unattempted[0] } else { pendingPod = nil } @@ -1863,7 +1892,42 @@ func waitUntilPodsScheduledInNamespace(tCtx ktesting.TContext, podInformer corei }) if err != nil && pendingPod != nil { - err = fmt.Errorf("at least pod %s is not scheduled: %v", klog.KObj(pendingPod), err) + err = fmt.Errorf("at least pod %s is not scheduled: %w", klog.KObj(pendingPod), err) + } + return err +} + +// waitUntilPodsAttemptedInNamespace blocks until all pods in the given +// namespace at least once went through a schedyling cycle. +// Times out after 10 minutes similarly to waitUntilPodsScheduledInNamespace. +func waitUntilPodsAttemptedInNamespace(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, namespace string, wantCount int) error { + var pendingPod *v1.Pod + + err := wait.PollUntilContextTimeout(tCtx, 1*time.Second, 10*time.Minute, true, func(ctx context.Context) (bool, error) { + select { + case <-ctx.Done(): + return true, ctx.Err() + default: + } + scheduled, attempted, unattempted, err := getScheduledPods(podInformer, namespace) + if err != nil { + return false, err + } + if len(scheduled)+len(attempted) >= wantCount { + tCtx.Logf("all pods attempted to be scheduled") + return true, nil + } + tCtx.Logf("namespace: %s, attempted pods: want %d, got %d", namespace, wantCount, len(scheduled)+len(attempted)) + if len(unattempted) > 0 { + pendingPod = unattempted[0] + } else { + pendingPod = nil + } + return false, nil + }) + + if err != nil && pendingPod != nil { + err = fmt.Errorf("at least pod %s is not attempted: %w", klog.KObj(pendingPod), err) } return err } @@ -1894,6 +1958,32 @@ func waitUntilPodsScheduled(tCtx ktesting.TContext, podInformer coreinformers.Po return nil } +// waitUntilPodsAttempted blocks until the all pods in the given namespaces are +// attempted (at least once went through a schedyling cycle). +func waitUntilPodsAttempted(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, namespaces []string, numPodsScheduledPerNamespace map[string]int) error { + // If unspecified, default to all known namespaces. + if len(namespaces) == 0 { + for namespace := range numPodsScheduledPerNamespace { + namespaces = append(namespaces, namespace) + } + } + for _, namespace := range namespaces { + select { + case <-tCtx.Done(): + return context.Cause(tCtx) + default: + } + wantCount, ok := numPodsScheduledPerNamespace[namespace] + if !ok { + return fmt.Errorf("unknown namespace %s", namespace) + } + if err := waitUntilPodsAttemptedInNamespace(tCtx, podInformer, namespace, wantCount); err != nil { + return fmt.Errorf("error waiting for pods in namespace %q: %w", namespace, err) + } + } + return nil +} + func getSpecFromFile(path *string, spec interface{}) error { bytes, err := os.ReadFile(*path) if err != nil { diff --git a/test/integration/scheduler_perf/util.go b/test/integration/scheduler_perf/util.go index 551fd287d71..884287646a6 100644 --- a/test/integration/scheduler_perf/util.go +++ b/test/integration/scheduler_perf/util.go @@ -148,28 +148,41 @@ func mustSetupCluster(tCtx ktesting.TContext, config *config.KubeSchedulerConfig return informerFactory, tCtx } -// Returns the list of scheduled and unscheduled pods in the specified namespaces. +func isAttempted(pod *v1.Pod) bool { + for _, cond := range pod.Status.Conditions { + if cond.Type == v1.PodScheduled { + return true + } + } + return false +} + +// getScheduledPods returns the list of scheduled, attempted but unschedulable +// and unattempted pods in the specified namespaces. // Note that no namespaces specified matches all namespaces. -func getScheduledPods(podInformer coreinformers.PodInformer, namespaces ...string) ([]*v1.Pod, []*v1.Pod, error) { +func getScheduledPods(podInformer coreinformers.PodInformer, namespaces ...string) ([]*v1.Pod, []*v1.Pod, []*v1.Pod, error) { pods, err := podInformer.Lister().List(labels.Everything()) if err != nil { - return nil, nil, err + return nil, nil, nil, err } s := sets.New(namespaces...) scheduled := make([]*v1.Pod, 0, len(pods)) - unscheduled := make([]*v1.Pod, 0, len(pods)) + attempted := make([]*v1.Pod, 0, len(pods)) + unattempted := make([]*v1.Pod, 0, len(pods)) for i := range pods { pod := pods[i] if len(s) == 0 || s.Has(pod.Namespace) { if len(pod.Spec.NodeName) > 0 { scheduled = append(scheduled, pod) + } else if isAttempted(pod) { + attempted = append(attempted, pod) } else { - unscheduled = append(unscheduled, pod) + unattempted = append(unattempted, pod) } } } - return scheduled, unscheduled, nil + return scheduled, attempted, unattempted, nil } // DataItem is the data point.