diff --git a/test/integration/scheduler/queueing/queue.go b/test/integration/scheduler/queueing/queue.go index 471af5693d8..dfb1ac7067c 100644 --- a/test/integration/scheduler/queueing/queue.go +++ b/test/integration/scheduler/queueing/queue.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/mock" v1 "k8s.io/api/core/v1" resourceapi "k8s.io/api/resource/v1" + schedulingapi "k8s.io/api/scheduling/v1alpha1" storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -51,6 +52,13 @@ import ( "k8s.io/utils/ptr" ) +type rejectingPhase string + +const ( + rejectingPhasePreEnqueue rejectingPhase = "PreEnqueue" + rejectingPhaseSchedulingCycle rejectingPhase = "SchedulingCycle" +) + type CoreResourceEnqueueTestCase struct { Name string // InitialNodes is the list of Nodes to be created at first. @@ -77,9 +85,19 @@ type CoreResourceEnqueueTestCase struct { InitialVolumeAttachment []*storagev1.VolumeAttachment // InitialDeviceClasses are the list of DeviceClass to be created at first. InitialDeviceClasses []*resourceapi.DeviceClass + // InitialWorkloads is the list of Workloads to be created at first. + InitialWorkloads []*schedulingapi.Workload // Pods are the list of Pods to be created. // All of them are expected to be unschedulable at first. Pods []*v1.Pod + // ExpectedInitialRejectingPhase specifies how Pods are supposed to be initially rejected. + // rejectingPhase could be either "PreEnqueue" or "SchedulingCycle". Depending on the value: + // - "PreEnqueue": test case waits for the Pods to be observed by the scheduler + // and put in the unschedulable pods pool, being blocked at the PreEnqueue gate. + // - "SchedulingCycle": test case lets pods experience scheduling cycle once, + // being rejected by PreFilter or Filter gates. + // Defaults to "SchedulingCycle". + ExpectedInitialRejectingPhase rejectingPhase // TriggerFn is the function that triggers the event to move Pods. // It returns the map keyed with ClusterEvents to be triggered by this function, // and valued with the number of triggering of the event. @@ -98,6 +116,9 @@ type CoreResourceEnqueueTestCase struct { EnableDRAExtendedResource bool // EnableNodeDeclaredFeatures indicates if the test case runs with the NodeDeclaredFeatures feature gate enabled. EnableNodeDeclaredFeatures bool + // EnableGangScheduling indicates wether the test case should run with feature gates + // GenericWorkload and GangScheduling enabled or not. + EnableGangScheduling bool } var ( @@ -133,7 +154,8 @@ var CoreResourceEnqueueTestCases = []*CoreResourceEnqueueTestCase{ } return map[fwk.ClusterEvent]uint64{{Resource: fwk.Node, ActionType: fwk.UpdateNodeAllocatable}: 1}, nil }, - WantRequeuedPods: sets.New("pod2"), + WantRequeuedPods: sets.New("pod2"), + EnableSchedulingQueueHint: sets.New(true), }, { Name: "Pod rejected by the PodAffinity plugin is requeued when a new Node is created and turned to ready", @@ -2581,6 +2603,53 @@ var CoreResourceEnqueueTestCases = []*CoreResourceEnqueueTestCase{ }, WantRequeuedPods: sets.Set[string]{}, }, + { + Name: "Pod rejected by the GangScheduling plugin is requeued when a new pod with matching workload reference is created", + EnablePlugins: []string{names.GangScheduling}, + InitialNodes: []*v1.Node{ + st.MakeNode().Name("fake-node1").Obj(), + }, + InitialWorkloads: []*schedulingapi.Workload{ + st.MakeWorkload().Name("w1").PodGroup(st.MakePodGroup().Name("pg1").MinCount(2).Obj()).Obj(), + }, + Pods: []*v1.Pod{ + st.MakePod().Name("pod1").Container("image").WorkloadRef(&v1.WorkloadReference{Name: "w1", PodGroup: "pg1"}).Obj(), + st.MakePod().Name("pod2").Container("image").WorkloadRef(&v1.WorkloadReference{Name: "w1", PodGroup: "pg2"}).Obj(), + }, + TriggerFn: func(testCtx *testutils.TestContext) (map[fwk.ClusterEvent]uint64, error) { + pod := st.MakePod().Name("pod3").Container("image").WorkloadRef(&v1.WorkloadReference{Name: "w1", PodGroup: "pg1"}).Obj() + if _, err := testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).Create(testCtx.Ctx, pod, metav1.CreateOptions{}); err != nil { + return nil, fmt.Errorf("failed to create Pod %q: %w", pod.Name, err) + } + return map[fwk.ClusterEvent]uint64{framework.EventUnscheduledPodAdd: 1}, nil + }, + ExpectedInitialRejectingPhase: rejectingPhasePreEnqueue, + WantRequeuedPods: sets.New("pod1", "pod3"), + EnableSchedulingQueueHint: sets.New(true), + EnableGangScheduling: true, + }, + { + Name: "Pod rejected by the GangScheduling plugin is requeued when a matching workload is created", + EnablePlugins: []string{names.GangScheduling}, + InitialNodes: []*v1.Node{ + st.MakeNode().Name("fake-node1").Obj(), + }, + Pods: []*v1.Pod{ + st.MakePod().Name("pod1").Container("image").WorkloadRef(&v1.WorkloadReference{Name: "w1", PodGroup: "pg1"}).Obj(), + st.MakePod().Name("pod2").Container("image").WorkloadRef(&v1.WorkloadReference{Name: "w1", PodGroup: "pg2"}).Obj(), + }, + TriggerFn: func(testCtx *testutils.TestContext) (map[fwk.ClusterEvent]uint64, error) { + workload := st.MakeWorkload().Name("w1").PodGroup(st.MakePodGroup().Name("pg1").MinCount(1).Obj()).Obj() + if _, err := testCtx.ClientSet.SchedulingV1alpha1().Workloads(testCtx.NS.Name).Create(testCtx.Ctx, workload, metav1.CreateOptions{}); err != nil { + return nil, fmt.Errorf("failed to create Workload %q: %w", workload.Name, err) + } + return map[fwk.ClusterEvent]uint64{{Resource: fwk.Workload, ActionType: fwk.Add}: 1}, nil + }, + ExpectedInitialRejectingPhase: rejectingPhasePreEnqueue, + WantRequeuedPods: sets.New("pod1"), + EnableSchedulingQueueHint: sets.New(true), + EnableGangScheduling: true, + }, } // TestCoreResourceEnqueue verify Pods failed by in-tree default plugins can be @@ -2608,7 +2677,12 @@ func RunTestCoreResourceEnqueue(t *testing.T, tt *CoreResourceEnqueueTestCase) { ndffeatures.AllFeatures = originalAllFeatures }() } - + if tt.EnableGangScheduling { + featuregatetesting.SetFeatureGatesDuringTest(t, utilfeature.DefaultFeatureGate, featuregatetesting.FeatureOverrides{ + features.GenericWorkload: true, + features.GangScheduling: true, + }) + } logger, _ := ktesting.NewTestContext(t) opts := []scheduler.Option{scheduler.WithPodInitialBackoffSeconds(0), scheduler.WithPodMaxBackoffSeconds(0)} @@ -2709,6 +2783,13 @@ func RunTestCoreResourceEnqueue(t *testing.T, tt *CoreResourceEnqueueTestCase) { } } + for _, wl := range tt.InitialWorkloads { + wl.Namespace = ns + if _, err := cs.SchedulingV1alpha1().Workloads(ns).Create(testCtx.Ctx, wl, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create a Workload %q: %v", wl.Name, err) + } + } + for _, pod := range tt.InitialPods { if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}); err != nil { t.Fatalf("Failed to create an initial Pod %q: %v", pod.Name, err) @@ -2721,34 +2802,47 @@ func RunTestCoreResourceEnqueue(t *testing.T, tt *CoreResourceEnqueueTestCase) { } } - // Wait for the tt.Pods to be present in the scheduling active queue. - if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { - pendingPods, _ := testCtx.Scheduler.SchedulingQueue.PendingPods() - return len(pendingPods) == len(tt.Pods) && len(testCtx.Scheduler.SchedulingQueue.PodsInActiveQ()) == len(tt.Pods), nil - }); err != nil { - t.Fatalf("Failed to wait for all pods to be present in the scheduling queue: %v", err) - } - - t.Log("Confirmed Pods in the scheduling queue, starting to schedule them") - - // Pop all pods out. They should become unschedulable. - for i := 0; i < len(tt.Pods); i++ { - testCtx.Scheduler.ScheduleOne(testCtx.Ctx) - } - // Wait for the tt.Pods to be still present in the scheduling (unschedulable) queue. - if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { - activePodsCount := len(testCtx.Scheduler.SchedulingQueue.PodsInActiveQ()) - if activePodsCount > 0 { - return false, fmt.Errorf("active queue was expected to be empty, but found %v Pods", activePodsCount) + switch tt.ExpectedInitialRejectingPhase { + case rejectingPhasePreEnqueue: + // Wait for the tt.Pods to be unschedulable in the scheduling queue. + if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { + pendingPods, _ := testCtx.Scheduler.SchedulingQueue.PendingPods() + return len(pendingPods) == len(tt.Pods) && len(testCtx.Scheduler.SchedulingQueue.PodsInActiveQ()) == 0, nil + }); err != nil { + t.Fatalf("Failed to wait for all pods to be present in the scheduling queue: %v", err) + } + t.Log("All pods are waiting as unschedulable, will trigger triggerFn") + case rejectingPhaseSchedulingCycle: + fallthrough + default: // Defaults to rejectingPhaseSchedulingCycle. + // Wait for the tt.Pods to be present in the scheduling active queue. + if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { + pendingPods, _ := testCtx.Scheduler.SchedulingQueue.PendingPods() + return len(pendingPods) == len(tt.Pods) && len(testCtx.Scheduler.SchedulingQueue.PodsInActiveQ()) == len(tt.Pods), nil + }); err != nil { + t.Fatalf("Failed to wait for all pods to be present in the scheduling queue: %v", err) } - pendingPods, _ := testCtx.Scheduler.SchedulingQueue.PendingPods() - return len(pendingPods) == len(tt.Pods), nil - }); err != nil { - t.Fatalf("Failed to wait for all pods to remain in the scheduling queue after scheduling attempts: %v", err) - } + t.Log("Confirmed Pods in the scheduling queue, starting to schedule them") - t.Log("finished initial schedulings for all Pods, will trigger triggerFn") + // Pop all pods out. They should become unschedulable. + for i := 0; i < len(tt.Pods); i++ { + testCtx.Scheduler.ScheduleOne(testCtx.Ctx) + } + // Wait for the tt.Pods to be still present in the scheduling (unschedulable) queue. + if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { + activePodsCount := len(testCtx.Scheduler.SchedulingQueue.PodsInActiveQ()) + if activePodsCount > 0 { + return false, fmt.Errorf("active queue was expected to be empty, but found %v Pods", activePodsCount) + } + + pendingPods, _ := testCtx.Scheduler.SchedulingQueue.PendingPods() + return len(pendingPods) == len(tt.Pods), nil + }); err != nil { + t.Fatalf("Failed to wait for all pods to remain in the scheduling queue after scheduling attempts: %v", err) + } + t.Log("finished initial schedulings for all Pods, will trigger triggerFn") + } legacyregistry.Reset() // reset the metric before triggering wantTriggeredEvents, err := tt.TriggerFn(testCtx) diff --git a/test/integration/util/util.go b/test/integration/util/util.go index de4b2ae6252..8793fd08332 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -29,6 +29,7 @@ import ( v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1" resourceapi "k8s.io/api/resource/v1" + schedulingapiv1alpha1 "k8s.io/api/scheduling/v1alpha1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -520,6 +521,11 @@ func InitTestAPIServer(t *testing.T, nsPrefix string, admission admission.Interf options.GenericServerRunOptions.RuntimeConfigEmulationForwardCompatible = true } } + if utilfeature.DefaultFeatureGate.Enabled(features.GenericWorkload) { + options.APIEnablement.RuntimeConfig = cliflag.ConfigurationMap{ + schedulingapiv1alpha1.SchemeGroupVersion.String(): "true", + } + } }, ModifyServerConfig: func(config *controlplane.Config) { if admission != nil {