Merge pull request #127700 from macsko/add_option_waitforpodsprocessed

Add option to wait for pods to be attempted in barrierOp in scheduler_perf
This commit is contained in:
Kubernetes Prow Robot
2024-10-01 05:17:49 +01:00
committed by GitHub
2 changed files with 123 additions and 20 deletions

View File

@@ -807,6 +807,13 @@ func (co churnOp) patchParams(w *workload) (realOp, error) {
return &co, nil
}
type SchedulingStage string
const (
Scheduled SchedulingStage = "Scheduled"
Attempted SchedulingStage = "Attempted"
)
// barrierOp defines an op that can be used to wait until all scheduled pods of
// one or many namespaces have been bound to nodes. This is useful when pods
// were scheduled with SkipWaitToCompletion set to true.
@@ -816,9 +823,16 @@ type barrierOp struct {
// Namespaces to block on. Empty array or not specifying this field signifies
// that the barrier should block on all namespaces.
Namespaces []string
// Determines what stage of pods scheduling the barrier should wait for.
// If empty, it is interpreted as "Scheduled".
// Optional
StageRequirement SchedulingStage
}
func (bo *barrierOp) isValid(allowParameterization bool) error {
if bo.StageRequirement != "" && bo.StageRequirement != Scheduled && bo.StageRequirement != Attempted {
return fmt.Errorf("invalid StageRequirement %s", bo.StageRequirement)
}
return nil
}
@@ -827,6 +841,9 @@ func (*barrierOp) collectsMetrics() bool {
}
func (bo barrierOp) patchParams(w *workload) (realOp, error) {
if bo.StageRequirement == "" {
bo.StageRequirement = Scheduled
}
return &bo, nil
}
@@ -1565,16 +1582,26 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
tCtx.Fatalf("op %d: unknown namespace %s", opIndex, namespace)
}
}
if err := waitUntilPodsScheduled(tCtx, podInformer, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err)
}
// At the end of the barrier, we can be sure that there are no pods
// pending scheduling in the namespaces that we just blocked on.
if len(concreteOp.Namespaces) == 0 {
numPodsScheduledPerNamespace = make(map[string]int)
} else {
for _, namespace := range concreteOp.Namespaces {
delete(numPodsScheduledPerNamespace, namespace)
switch concreteOp.StageRequirement {
case Attempted:
if err := waitUntilPodsAttempted(tCtx, podInformer, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err)
}
case Scheduled:
// Default should be treated like "Scheduled", so handling both in the same way.
fallthrough
default:
if err := waitUntilPodsScheduled(tCtx, podInformer, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err)
}
// At the end of the barrier, we can be sure that there are no pods
// pending scheduling in the namespaces that we just blocked on.
if len(concreteOp.Namespaces) == 0 {
numPodsScheduledPerNamespace = make(map[string]int)
} else {
for _, namespace := range concreteOp.Namespaces {
delete(numPodsScheduledPerNamespace, namespace)
}
}
}
@@ -1845,7 +1872,7 @@ func waitUntilPodsScheduledInNamespace(tCtx ktesting.TContext, podInformer corei
return true, ctx.Err()
default:
}
scheduled, unscheduled, err := getScheduledPods(podInformer, namespace)
scheduled, attempted, unattempted, err := getScheduledPods(podInformer, namespace)
if err != nil {
return false, err
}
@@ -1854,8 +1881,10 @@ func waitUntilPodsScheduledInNamespace(tCtx ktesting.TContext, podInformer corei
return true, nil
}
tCtx.Logf("namespace: %s, pods: want %d, got %d", namespace, wantCount, len(scheduled))
if len(unscheduled) > 0 {
pendingPod = unscheduled[0]
if len(attempted) > 0 {
pendingPod = attempted[0]
} else if len(unattempted) > 0 {
pendingPod = unattempted[0]
} else {
pendingPod = nil
}
@@ -1863,7 +1892,42 @@ func waitUntilPodsScheduledInNamespace(tCtx ktesting.TContext, podInformer corei
})
if err != nil && pendingPod != nil {
err = fmt.Errorf("at least pod %s is not scheduled: %v", klog.KObj(pendingPod), err)
err = fmt.Errorf("at least pod %s is not scheduled: %w", klog.KObj(pendingPod), err)
}
return err
}
// waitUntilPodsAttemptedInNamespace blocks until all pods in the given
// namespace at least once went through a schedyling cycle.
// Times out after 10 minutes similarly to waitUntilPodsScheduledInNamespace.
func waitUntilPodsAttemptedInNamespace(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, namespace string, wantCount int) error {
var pendingPod *v1.Pod
err := wait.PollUntilContextTimeout(tCtx, 1*time.Second, 10*time.Minute, true, func(ctx context.Context) (bool, error) {
select {
case <-ctx.Done():
return true, ctx.Err()
default:
}
scheduled, attempted, unattempted, err := getScheduledPods(podInformer, namespace)
if err != nil {
return false, err
}
if len(scheduled)+len(attempted) >= wantCount {
tCtx.Logf("all pods attempted to be scheduled")
return true, nil
}
tCtx.Logf("namespace: %s, attempted pods: want %d, got %d", namespace, wantCount, len(scheduled)+len(attempted))
if len(unattempted) > 0 {
pendingPod = unattempted[0]
} else {
pendingPod = nil
}
return false, nil
})
if err != nil && pendingPod != nil {
err = fmt.Errorf("at least pod %s is not attempted: %w", klog.KObj(pendingPod), err)
}
return err
}
@@ -1894,6 +1958,32 @@ func waitUntilPodsScheduled(tCtx ktesting.TContext, podInformer coreinformers.Po
return nil
}
// waitUntilPodsAttempted blocks until the all pods in the given namespaces are
// attempted (at least once went through a schedyling cycle).
func waitUntilPodsAttempted(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, namespaces []string, numPodsScheduledPerNamespace map[string]int) error {
// If unspecified, default to all known namespaces.
if len(namespaces) == 0 {
for namespace := range numPodsScheduledPerNamespace {
namespaces = append(namespaces, namespace)
}
}
for _, namespace := range namespaces {
select {
case <-tCtx.Done():
return context.Cause(tCtx)
default:
}
wantCount, ok := numPodsScheduledPerNamespace[namespace]
if !ok {
return fmt.Errorf("unknown namespace %s", namespace)
}
if err := waitUntilPodsAttemptedInNamespace(tCtx, podInformer, namespace, wantCount); err != nil {
return fmt.Errorf("error waiting for pods in namespace %q: %w", namespace, err)
}
}
return nil
}
func getSpecFromFile(path *string, spec interface{}) error {
bytes, err := os.ReadFile(*path)
if err != nil {