Add integration tests for gang queueing

This commit is contained in:
Maciej Skoczeń
2025-10-21 14:00:05 +00:00
parent 8d67173de0
commit c086bdeaa2
2 changed files with 127 additions and 27 deletions

View File

@@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/mock"
v1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1"
schedulingapi "k8s.io/api/scheduling/v1alpha1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -51,6 +52,13 @@ import (
"k8s.io/utils/ptr"
)
type rejectingPhase string
const (
rejectingPhasePreEnqueue rejectingPhase = "PreEnqueue"
rejectingPhaseSchedulingCycle rejectingPhase = "SchedulingCycle"
)
type CoreResourceEnqueueTestCase struct {
Name string
// InitialNodes is the list of Nodes to be created at first.
@@ -77,9 +85,19 @@ type CoreResourceEnqueueTestCase struct {
InitialVolumeAttachment []*storagev1.VolumeAttachment
// InitialDeviceClasses are the list of DeviceClass to be created at first.
InitialDeviceClasses []*resourceapi.DeviceClass
// InitialWorkloads is the list of Workloads to be created at first.
InitialWorkloads []*schedulingapi.Workload
// Pods are the list of Pods to be created.
// All of them are expected to be unschedulable at first.
Pods []*v1.Pod
// ExpectedInitialRejectingPhase specifies how Pods are supposed to be initially rejected.
// rejectingPhase could be either "PreEnqueue" or "SchedulingCycle". Depending on the value:
// - "PreEnqueue": test case waits for the Pods to be observed by the scheduler
// and put in the unschedulable pods pool, being blocked at the PreEnqueue gate.
// - "SchedulingCycle": test case lets pods experience scheduling cycle once,
// being rejected by PreFilter or Filter gates.
// Defaults to "SchedulingCycle".
ExpectedInitialRejectingPhase rejectingPhase
// TriggerFn is the function that triggers the event to move Pods.
// It returns the map keyed with ClusterEvents to be triggered by this function,
// and valued with the number of triggering of the event.
@@ -98,6 +116,9 @@ type CoreResourceEnqueueTestCase struct {
EnableDRAExtendedResource bool
// EnableNodeDeclaredFeatures indicates if the test case runs with the NodeDeclaredFeatures feature gate enabled.
EnableNodeDeclaredFeatures bool
// EnableGangScheduling indicates wether the test case should run with feature gates
// GenericWorkload and GangScheduling enabled or not.
EnableGangScheduling bool
}
var (
@@ -133,7 +154,8 @@ var CoreResourceEnqueueTestCases = []*CoreResourceEnqueueTestCase{
}
return map[fwk.ClusterEvent]uint64{{Resource: fwk.Node, ActionType: fwk.UpdateNodeAllocatable}: 1}, nil
},
WantRequeuedPods: sets.New("pod2"),
WantRequeuedPods: sets.New("pod2"),
EnableSchedulingQueueHint: sets.New(true),
},
{
Name: "Pod rejected by the PodAffinity plugin is requeued when a new Node is created and turned to ready",
@@ -2581,6 +2603,53 @@ var CoreResourceEnqueueTestCases = []*CoreResourceEnqueueTestCase{
},
WantRequeuedPods: sets.Set[string]{},
},
{
Name: "Pod rejected by the GangScheduling plugin is requeued when a new pod with matching workload reference is created",
EnablePlugins: []string{names.GangScheduling},
InitialNodes: []*v1.Node{
st.MakeNode().Name("fake-node1").Obj(),
},
InitialWorkloads: []*schedulingapi.Workload{
st.MakeWorkload().Name("w1").PodGroup(st.MakePodGroup().Name("pg1").MinCount(2).Obj()).Obj(),
},
Pods: []*v1.Pod{
st.MakePod().Name("pod1").Container("image").WorkloadRef(&v1.WorkloadReference{Name: "w1", PodGroup: "pg1"}).Obj(),
st.MakePod().Name("pod2").Container("image").WorkloadRef(&v1.WorkloadReference{Name: "w1", PodGroup: "pg2"}).Obj(),
},
TriggerFn: func(testCtx *testutils.TestContext) (map[fwk.ClusterEvent]uint64, error) {
pod := st.MakePod().Name("pod3").Container("image").WorkloadRef(&v1.WorkloadReference{Name: "w1", PodGroup: "pg1"}).Obj()
if _, err := testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).Create(testCtx.Ctx, pod, metav1.CreateOptions{}); err != nil {
return nil, fmt.Errorf("failed to create Pod %q: %w", pod.Name, err)
}
return map[fwk.ClusterEvent]uint64{framework.EventUnscheduledPodAdd: 1}, nil
},
ExpectedInitialRejectingPhase: rejectingPhasePreEnqueue,
WantRequeuedPods: sets.New("pod1", "pod3"),
EnableSchedulingQueueHint: sets.New(true),
EnableGangScheduling: true,
},
{
Name: "Pod rejected by the GangScheduling plugin is requeued when a matching workload is created",
EnablePlugins: []string{names.GangScheduling},
InitialNodes: []*v1.Node{
st.MakeNode().Name("fake-node1").Obj(),
},
Pods: []*v1.Pod{
st.MakePod().Name("pod1").Container("image").WorkloadRef(&v1.WorkloadReference{Name: "w1", PodGroup: "pg1"}).Obj(),
st.MakePod().Name("pod2").Container("image").WorkloadRef(&v1.WorkloadReference{Name: "w1", PodGroup: "pg2"}).Obj(),
},
TriggerFn: func(testCtx *testutils.TestContext) (map[fwk.ClusterEvent]uint64, error) {
workload := st.MakeWorkload().Name("w1").PodGroup(st.MakePodGroup().Name("pg1").MinCount(1).Obj()).Obj()
if _, err := testCtx.ClientSet.SchedulingV1alpha1().Workloads(testCtx.NS.Name).Create(testCtx.Ctx, workload, metav1.CreateOptions{}); err != nil {
return nil, fmt.Errorf("failed to create Workload %q: %w", workload.Name, err)
}
return map[fwk.ClusterEvent]uint64{{Resource: fwk.Workload, ActionType: fwk.Add}: 1}, nil
},
ExpectedInitialRejectingPhase: rejectingPhasePreEnqueue,
WantRequeuedPods: sets.New("pod1"),
EnableSchedulingQueueHint: sets.New(true),
EnableGangScheduling: true,
},
}
// TestCoreResourceEnqueue verify Pods failed by in-tree default plugins can be
@@ -2608,7 +2677,12 @@ func RunTestCoreResourceEnqueue(t *testing.T, tt *CoreResourceEnqueueTestCase) {
ndffeatures.AllFeatures = originalAllFeatures
}()
}
if tt.EnableGangScheduling {
featuregatetesting.SetFeatureGatesDuringTest(t, utilfeature.DefaultFeatureGate, featuregatetesting.FeatureOverrides{
features.GenericWorkload: true,
features.GangScheduling: true,
})
}
logger, _ := ktesting.NewTestContext(t)
opts := []scheduler.Option{scheduler.WithPodInitialBackoffSeconds(0), scheduler.WithPodMaxBackoffSeconds(0)}
@@ -2709,6 +2783,13 @@ func RunTestCoreResourceEnqueue(t *testing.T, tt *CoreResourceEnqueueTestCase) {
}
}
for _, wl := range tt.InitialWorkloads {
wl.Namespace = ns
if _, err := cs.SchedulingV1alpha1().Workloads(ns).Create(testCtx.Ctx, wl, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create a Workload %q: %v", wl.Name, err)
}
}
for _, pod := range tt.InitialPods {
if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create an initial Pod %q: %v", pod.Name, err)
@@ -2721,34 +2802,47 @@ func RunTestCoreResourceEnqueue(t *testing.T, tt *CoreResourceEnqueueTestCase) {
}
}
// Wait for the tt.Pods to be present in the scheduling active queue.
if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
pendingPods, _ := testCtx.Scheduler.SchedulingQueue.PendingPods()
return len(pendingPods) == len(tt.Pods) && len(testCtx.Scheduler.SchedulingQueue.PodsInActiveQ()) == len(tt.Pods), nil
}); err != nil {
t.Fatalf("Failed to wait for all pods to be present in the scheduling queue: %v", err)
}
t.Log("Confirmed Pods in the scheduling queue, starting to schedule them")
// Pop all pods out. They should become unschedulable.
for i := 0; i < len(tt.Pods); i++ {
testCtx.Scheduler.ScheduleOne(testCtx.Ctx)
}
// Wait for the tt.Pods to be still present in the scheduling (unschedulable) queue.
if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
activePodsCount := len(testCtx.Scheduler.SchedulingQueue.PodsInActiveQ())
if activePodsCount > 0 {
return false, fmt.Errorf("active queue was expected to be empty, but found %v Pods", activePodsCount)
switch tt.ExpectedInitialRejectingPhase {
case rejectingPhasePreEnqueue:
// Wait for the tt.Pods to be unschedulable in the scheduling queue.
if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
pendingPods, _ := testCtx.Scheduler.SchedulingQueue.PendingPods()
return len(pendingPods) == len(tt.Pods) && len(testCtx.Scheduler.SchedulingQueue.PodsInActiveQ()) == 0, nil
}); err != nil {
t.Fatalf("Failed to wait for all pods to be present in the scheduling queue: %v", err)
}
t.Log("All pods are waiting as unschedulable, will trigger triggerFn")
case rejectingPhaseSchedulingCycle:
fallthrough
default: // Defaults to rejectingPhaseSchedulingCycle.
// Wait for the tt.Pods to be present in the scheduling active queue.
if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
pendingPods, _ := testCtx.Scheduler.SchedulingQueue.PendingPods()
return len(pendingPods) == len(tt.Pods) && len(testCtx.Scheduler.SchedulingQueue.PodsInActiveQ()) == len(tt.Pods), nil
}); err != nil {
t.Fatalf("Failed to wait for all pods to be present in the scheduling queue: %v", err)
}
pendingPods, _ := testCtx.Scheduler.SchedulingQueue.PendingPods()
return len(pendingPods) == len(tt.Pods), nil
}); err != nil {
t.Fatalf("Failed to wait for all pods to remain in the scheduling queue after scheduling attempts: %v", err)
}
t.Log("Confirmed Pods in the scheduling queue, starting to schedule them")
t.Log("finished initial schedulings for all Pods, will trigger triggerFn")
// Pop all pods out. They should become unschedulable.
for i := 0; i < len(tt.Pods); i++ {
testCtx.Scheduler.ScheduleOne(testCtx.Ctx)
}
// Wait for the tt.Pods to be still present in the scheduling (unschedulable) queue.
if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
activePodsCount := len(testCtx.Scheduler.SchedulingQueue.PodsInActiveQ())
if activePodsCount > 0 {
return false, fmt.Errorf("active queue was expected to be empty, but found %v Pods", activePodsCount)
}
pendingPods, _ := testCtx.Scheduler.SchedulingQueue.PendingPods()
return len(pendingPods) == len(tt.Pods), nil
}); err != nil {
t.Fatalf("Failed to wait for all pods to remain in the scheduling queue after scheduling attempts: %v", err)
}
t.Log("finished initial schedulings for all Pods, will trigger triggerFn")
}
legacyregistry.Reset() // reset the metric before triggering
wantTriggeredEvents, err := tt.TriggerFn(testCtx)

View File

@@ -29,6 +29,7 @@ import (
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
resourceapi "k8s.io/api/resource/v1"
schedulingapiv1alpha1 "k8s.io/api/scheduling/v1alpha1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@@ -520,6 +521,11 @@ func InitTestAPIServer(t *testing.T, nsPrefix string, admission admission.Interf
options.GenericServerRunOptions.RuntimeConfigEmulationForwardCompatible = true
}
}
if utilfeature.DefaultFeatureGate.Enabled(features.GenericWorkload) {
options.APIEnablement.RuntimeConfig = cliflag.ConfigurationMap{
schedulingapiv1alpha1.SchemeGroupVersion.String(): "true",
}
}
},
ModifyServerConfig: func(config *controlplane.Config) {
if admission != nil {