chore: ensure the scheduler handles events before checking the pod position

This commit is contained in:
Kensei Nakada 2024-09-25 11:49:20 +09:00
parent 6df3b28f22
commit a2b3a4f4dc
3 changed files with 164 additions and 137 deletions

View File

@ -920,9 +920,9 @@ func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) {
events := framework.PodSchedulingPropertiesChange(newPod, oldPod)
for _, evt := range events {
hint := p.isPodWorthRequeuing(logger, pInfo, evt, oldPod, newPod)
queue := p.requeuePodViaQueueingHint(logger, pInfo, hint, framework.UnscheduledPodUpdate.Label)
queue := p.requeuePodViaQueueingHint(logger, pInfo, hint, evt.Label)
if queue != unschedulablePods {
logger.V(5).Info("Pod moved to an internal scheduling queue because the Pod is updated", "pod", klog.KObj(newPod), "event", framework.PodUpdate, "queue", queue)
logger.V(5).Info("Pod moved to an internal scheduling queue because the Pod is updated", "pod", klog.KObj(newPod), "event", evt.Label, "queue", queue)
p.unschedulablePods.delete(pInfo.Pod, gated)
}
if queue == activeQ {

View File

@ -134,6 +134,7 @@ func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
start := time.Now()
defer metrics.EventHandlingLatency.WithLabelValues(framework.UnscheduledPodUpdate.Label).Observe(metrics.SinceInSeconds(start))
logger := sched.logger
oldPod, newPod := oldObj.(*v1.Pod), newObj.(*v1.Pod)
// Bypass update event that carries identical objects; otherwise, a duplicated
@ -142,6 +143,10 @@ func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
return
}
for _, evt := range framework.PodSchedulingPropertiesChange(newPod, oldPod) {
defer metrics.EventHandlingLatency.WithLabelValues(evt.Label).Observe(metrics.SinceInSeconds(start))
}
isAssumed, err := sched.Cache.IsAssumedPod(newPod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", newPod.Namespace, newPod.Name, err))
@ -246,7 +251,12 @@ func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) {
}
events := framework.PodSchedulingPropertiesChange(newPod, oldPod)
// Save the time it takes to update the pod in the cache.
updatingDuration := metrics.SinceInSeconds(start)
for _, evt := range events {
startMoving := time.Now()
// SchedulingQueue.AssignedPodUpdated has a problem:
// It internally pre-filters Pods to move to activeQ,
// while taking only in-tree plugins into consideration.
@ -257,10 +267,12 @@ func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) {
// Here we use MoveAllToActiveOrBackoffQueue only when QueueingHint is enabled.
// (We cannot switch to MoveAllToActiveOrBackoffQueue right away because of throughput concern.)
if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.AssignedPodUpdate, oldPod, newPod, nil)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, evt, oldPod, newPod, nil)
} else {
sched.SchedulingQueue.AssignedPodUpdated(logger, oldPod, newPod, evt)
}
movingDuration := metrics.SinceInSeconds(startMoving)
metrics.EventHandlingLatency.WithLabelValues(evt.Label).Observe(updatingDuration + movingDuration)
}
}

View File

@ -40,6 +40,8 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/metrics/testutil"
"k8s.io/component-helpers/storage/volume"
"k8s.io/klog/v2"
configv1 "k8s.io/kube-scheduler/config/v1"
@ -207,7 +209,9 @@ func TestCoreResourceEnqueue(t *testing.T) {
// All of them are expected to be unschedulable at first.
pods []*v1.Pod
// triggerFn is the function that triggers the event to move Pods.
triggerFn func(testCtx *testutils.TestContext) error
// It returns the map keyed with ClusterEvents to be triggered by this function,
// and valued with the number of triggering of the event.
triggerFn func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error)
// wantRequeuedPods is the map of Pods that are expected to be requeued after triggerFn.
wantRequeuedPods sets.Set[string]
// enableSchedulingQueueHint indicates which feature gate value(s) the test case should run with.
@ -224,14 +228,14 @@ func TestCoreResourceEnqueue(t *testing.T) {
st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Obj(),
st.MakePod().Name("pod2").Toleration(v1.TaintNodeNotReady).Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
// Trigger a NodeChange event by increasing CPU capacity.
// It makes Pod2 schedulable.
// Pod1 is not requeued because the Node is still unready and it doesn't have the required toleration.
if _, err := testCtx.ClientSet.CoreV1().Nodes().UpdateStatus(testCtx.Ctx, st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Taints([]v1.Taint{{Key: v1.TaintNodeNotReady, Effect: v1.TaintEffectNoSchedule}}).Obj(), metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update the node: %w", err)
return nil, fmt.Errorf("failed to update the node: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.NodeAllocatableChange: 1}, nil
},
wantRequeuedPods: sets.New("pod2"),
},
@ -245,7 +249,7 @@ func TestCoreResourceEnqueue(t *testing.T) {
// - Pod2 will be rejected by the PodAffinity plugin.
st.MakePod().Label("anti", "anti").Name("pod2").PodAntiAffinityExists("anti", "node", st.PodAntiAffinityWithRequiredReq).Container("image").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
// Trigger a NodeCreated event.
// Note that this Node has a un-ready taint and pod2 should be requeued ideally because unschedulable plugins registered for pod2 is PodAffinity.
// However, due to preCheck, it's not requeueing pod2 to activeQ.
@ -253,21 +257,23 @@ func TestCoreResourceEnqueue(t *testing.T) {
// https://github.com/kubernetes/kubernetes/issues/110175
node := st.MakeNode().Name("fake-node2").Label("node", "fake-node2").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Taints([]v1.Taint{{Key: v1.TaintNodeNotReady, Effect: v1.TaintEffectNoSchedule}}).Obj()
if _, err := testCtx.ClientSet.CoreV1().Nodes().Create(testCtx.Ctx, node, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("failed to create a new node: %w", err)
return nil, fmt.Errorf("failed to create a new node: %w", err)
}
// As a mitigation of an issue described above, all plugins subscribing Node/Add event register UpdateNodeTaint too.
// So, this removal of taint moves pod2 to activeQ.
node.Spec.Taints = nil
if _, err := testCtx.ClientSet.CoreV1().Nodes().Update(testCtx.Ctx, node, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to remove taints off the node: %w", err)
return nil, fmt.Errorf("failed to remove taints off the node: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{
framework.NodeAdd: 1,
framework.NodeTaintChange: 1}, nil
},
wantRequeuedPods: sets.New("pod2"),
},
{
name: "Pod rejected by the NodeAffinity plugin is requeued when a Node is updated",
name: "Pod rejected by the NodeAffinity plugin is requeued when a Node's label is updated",
initialNodes: []*v1.Node{st.MakeNode().Name("fake-node1").Label("group", "a").Obj()},
pods: []*v1.Pod{
// - Pod1 will be rejected by the NodeAffinity plugin.
@ -275,14 +281,14 @@ func TestCoreResourceEnqueue(t *testing.T) {
// - Pod2 will be rejected by the NodeAffinity plugin.
st.MakePod().Name("pod2").NodeAffinityIn("group", []string{"c"}, st.NodeSelectorTypeMatchExpressions).Container("image").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
// Trigger a NodeUpdate event to change label.
// It causes pod1 to be requeued.
// It causes pod2 not to be requeued.
if _, err := testCtx.ClientSet.CoreV1().Nodes().Update(testCtx.Ctx, st.MakeNode().Name("fake-node1").Label("group", "b").Obj(), metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update the pod: %w", err)
return nil, fmt.Errorf("failed to update the node: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.NodeLabelChange: 1}, nil
},
wantRequeuedPods: sets.New("pod1"),
},
@ -296,13 +302,13 @@ func TestCoreResourceEnqueue(t *testing.T) {
// - The pod will be blocked by the NodeAffinity plugin for node2, therefore we know NodeAffinity will be queried for qhint for both testing nodes.
st.MakePod().Name("pod1").NodeAffinityIn("group", []string{"a"}, st.NodeSelectorTypeMatchExpressions).Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
// Trigger a NodeUpdate event to add new label.
// It won't cause pod to be requeued, because there was a match already before the update, meaning this plugin wasn't blocking the scheduling.
if _, err := testCtx.ClientSet.CoreV1().Nodes().Update(testCtx.Ctx, st.MakeNode().Name("node1").Label("group", "a").Label("node", "fake-node").Obj(), metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update the pod: %w", err)
return nil, fmt.Errorf("failed to update the node: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.NodeLabelChange: 1}, nil
},
wantRequeuedPods: sets.Set[string]{},
enableSchedulingQueueHint: []bool{true},
@ -316,14 +322,14 @@ func TestCoreResourceEnqueue(t *testing.T) {
// - Pod2 will be rejected by the NodeAffinity plugin.
st.MakePod().Name("pod2").NodeAffinityIn("group", []string{"c"}, st.NodeSelectorTypeMatchExpressions).Container("image").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
// Trigger a NodeAdd event with the awaited label.
// It causes pod1 to be requeued.
// It causes pod2 not to be requeued.
if _, err := testCtx.ClientSet.CoreV1().Nodes().Create(testCtx.Ctx, st.MakeNode().Name("fake-node2").Label("group", "b").Obj(), metav1.CreateOptions{}); err != nil {
return fmt.Errorf("failed to update the pod: %w", err)
return nil, fmt.Errorf("failed to update the node: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.NodeAdd: 1}, nil
},
wantRequeuedPods: sets.New("pod1"),
},
@ -334,13 +340,13 @@ func TestCoreResourceEnqueue(t *testing.T) {
// - Pod1 doesn't have the required toleration and will be rejected by the TaintToleration plugin.
st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Container("image").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
// Trigger a PodUpdate event by adding a toleration to Pod1.
// It makes Pod1 schedulable.
if _, err := testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).Update(testCtx.Ctx, st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Container("image").Toleration("taint-key").Obj(), metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update the pod: %w", err)
return nil, fmt.Errorf("failed to update the pod: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.PodTolerationChange: 1}, nil
},
wantRequeuedPods: sets.New("pod1"),
},
@ -351,13 +357,13 @@ func TestCoreResourceEnqueue(t *testing.T) {
// - Pod1 requests a large amount of CPU and will be rejected by the NodeResourcesFit plugin.
st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
// Trigger a PodUpdate event by reducing cpu requested by pod1.
// It makes Pod1 schedulable.
if _, err := testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).Update(testCtx.Ctx, st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Container("image").Obj(), metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update the pod: %w", err)
return nil, fmt.Errorf("failed to update the pod: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.PodRequestScaledDown: 1}, nil
},
wantRequeuedPods: sets.New("pod1"),
},
@ -371,13 +377,13 @@ func TestCoreResourceEnqueue(t *testing.T) {
// - Pod2 request will be rejected because there are not enough free resources on the Node
st.MakePod().Name("pod2").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Container("image").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
// Trigger an assigned Pod1 delete event.
// It makes Pod2 schedulable.
if err := testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).Delete(testCtx.Ctx, "pod1", metav1.DeleteOptions{GracePeriodSeconds: new(int64)}); err != nil {
return fmt.Errorf("failed to delete pod1: %w", err)
return nil, fmt.Errorf("failed to delete pod1: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.AssignedPodDelete: 1}, nil
},
wantRequeuedPods: sets.New("pod2"),
},
@ -390,14 +396,14 @@ func TestCoreResourceEnqueue(t *testing.T) {
// - Pod2 requests a large amount of CPU and will be rejected by the NodeResourcesFit plugin.
st.MakePod().Name("pod2").Req(map[v1.ResourceName]string{v1.ResourceCPU: "5"}).Container("image").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
// Trigger a NodeCreated event.
// It makes Pod1 schedulable. Pod2 is not requeued because of having too high requests.
node := st.MakeNode().Name("fake-node2").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj()
if _, err := testCtx.ClientSet.CoreV1().Nodes().Create(testCtx.Ctx, node, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("failed to create a new node: %w", err)
return nil, fmt.Errorf("failed to create a new node: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.NodeAdd: 1}, nil
},
wantRequeuedPods: sets.New("pod1"),
},
@ -410,13 +416,13 @@ func TestCoreResourceEnqueue(t *testing.T) {
// - Pod2 requests a large amount of CPU and will be rejected by the NodeResourcesFit plugin.
st.MakePod().Name("pod2").Req(map[v1.ResourceName]string{v1.ResourceCPU: "5"}).Container("image").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
// Trigger a NodeUpdate event that increases the CPU capacity of fake-node.
// It makes Pod1 schedulable. Pod2 is not requeued because of having too high requests.
if _, err := testCtx.ClientSet.CoreV1().Nodes().UpdateStatus(testCtx.Ctx, st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj(), metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update fake-node: %w", err)
return nil, fmt.Errorf("failed to update fake-node: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.NodeAllocatableChange: 1}, nil
},
wantRequeuedPods: sets.New("pod1"),
},
@ -430,12 +436,12 @@ func TestCoreResourceEnqueue(t *testing.T) {
// - Pod1 requests available amount of CPU (in fake-node1), but will be rejected by NodeAffinity plugin. Note that the NodeResourceFit plugin will register for QHints because it rejected fake-node2.
st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).NodeAffinityIn("group", []string{"b"}, st.NodeSelectorTypeMatchExpressions).Container("image").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
// Trigger a NodeUpdate event that increases unrealted (not requested) memory capacity of fake-node1, which should not requeue Pod1.
if _, err := testCtx.ClientSet.CoreV1().Nodes().UpdateStatus(testCtx.Ctx, st.MakeNode().Name("fake-node1").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "4", v1.ResourceMemory: "4000"}).Obj(), metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update fake-node: %w", err)
return nil, fmt.Errorf("failed to update fake-node: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.NodeAllocatableChange: 1}, nil
},
wantRequeuedPods: sets.Set[string]{},
enableSchedulingQueueHint: []bool{true},
@ -450,12 +456,12 @@ func TestCoreResourceEnqueue(t *testing.T) {
// - Pod1 requests available amount of CPU (in fake-node1), but will be rejected by NodeAffinity plugin. Note that the NodeResourceFit plugin will register for QHints because it rejected fake-node2.
st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).NodeAffinityIn("group", []string{"b"}, st.NodeSelectorTypeMatchExpressions).Container("image").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
// Trigger a NodeUpdate event that increases the requested CPU capacity of fake-node1, which should requeue Pod1.
if _, err := testCtx.ClientSet.CoreV1().Nodes().UpdateStatus(testCtx.Ctx, st.MakeNode().Name("fake-node1").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "5"}).Obj(), metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update fake-node: %w", err)
return nil, fmt.Errorf("failed to update fake-node: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.NodeAllocatableChange: 1}, nil
},
wantRequeuedPods: sets.New("pod1"),
enableSchedulingQueueHint: []bool{true},
@ -474,42 +480,28 @@ func TestCoreResourceEnqueue(t *testing.T) {
// - Pod2 is unschedulable because Pod1 saturated ResourcePods limit in fake-node2. Note that the NodeResourceFit plugin will register for QHints because it rejected fake-node2.
st.MakePod().Name("pod2").NodeAffinityIn("group", []string{"b"}, st.NodeSelectorTypeMatchExpressions).Container("image").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
// Trigger a NodeUpdate event that increases the allowed Pods number of fake-node1, which should requeue Pod2.
if _, err := testCtx.ClientSet.CoreV1().Nodes().UpdateStatus(testCtx.Ctx, st.MakeNode().Name("fake-node1").Capacity(map[v1.ResourceName]string{v1.ResourcePods: "3"}).Obj(), metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update fake-node: %w", err)
return nil, fmt.Errorf("failed to update fake-node: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.NodeAllocatableChange: 1}, nil
},
wantRequeuedPods: sets.New("pod2"),
enableSchedulingQueueHint: []bool{true},
},
{
name: "Updating pod condition doesn't retry scheduling if the Pod was rejected by TaintToleration",
name: "Updating pod label doesn't retry scheduling if the Pod was rejected by TaintToleration",
initialNodes: []*v1.Node{st.MakeNode().Name("fake-node").Taints([]v1.Taint{{Key: v1.TaintNodeNotReady, Effect: v1.TaintEffectNoSchedule}}).Obj()},
pods: []*v1.Pod{
// - Pod1 doesn't have the required toleration and will be rejected by the TaintToleration plugin.
st.MakePod().Name("pod1").Container("image").Obj(),
},
// Simulate a Pod update by directly calling `SchedulingQueue.Update` instead of actually updating a Pod
// because we don't have a way to confirm the scheduler has handled a Pod update event at the moment.
// TODO: actually update a Pod update and confirm the scheduler has handled a Pod update event with a metric.
// https://github.com/kubernetes/kubernetes/pull/122234#discussion_r1597456808
triggerFn: func(testCtx *testutils.TestContext) (err error) {
// Trigger a Pod Condition update event.
// It will not make pod1 schedulable
var (
oldPod *v1.Pod
newPod *v1.Pod
)
if oldPod, err = testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).Get(testCtx.Ctx, "pod1", metav1.GetOptions{}); err != nil {
return fmt.Errorf("failed to get the pod: %w", err)
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
if _, err := testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).Update(testCtx.Ctx, st.MakePod().Name("pod1").Label("key", "val").Container("image").Obj(), metav1.UpdateOptions{}); err != nil {
return nil, fmt.Errorf("failed to update the pod: %w", err)
}
newPod = oldPod.DeepCopy()
newPod.Status.Conditions[0].Message = "injected message"
testCtx.Scheduler.SchedulingQueue.Update(klog.FromContext(testCtx.Ctx), oldPod, newPod)
return nil
return map[framework.ClusterEvent]uint64{framework.PodLabelChange: 1}, nil
},
wantRequeuedPods: sets.Set[string]{},
// This behaviour is only true when enabling QHint
@ -529,7 +521,7 @@ func TestCoreResourceEnqueue(t *testing.T) {
// - Pod1 will be rejected by the NodeAffinity plugin and NodeResourcesFit plugin.
st.MakePod().Label("unscheduled", "plugins").Name("pod1").NodeAffinityIn("metadata.name", []string{"fake-node"}, st.NodeSelectorTypeMatchFields).Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
// Because of preCheck, we cannot determine which unschedulable plugins are registered for pod1.
// So, not the ideal way as the integration test though,
// here we check the unschedulable plugins by directly using the SchedulingQueue function for now.
@ -537,18 +529,18 @@ func TestCoreResourceEnqueue(t *testing.T) {
// after QHint is graduated and preCheck is removed.
pInfo, ok := testCtx.Scheduler.SchedulingQueue.GetPod("pod1", testCtx.NS.Name)
if !ok || pInfo == nil {
return fmt.Errorf("pod1 is not found in the scheduling queue")
return nil, fmt.Errorf("pod1 is not found in the scheduling queue")
}
if pInfo.Pod.Name != "pod1" {
return fmt.Errorf("unexpected pod info: %#v", pInfo)
return nil, fmt.Errorf("unexpected pod info: %#v", pInfo)
}
if pInfo.UnschedulablePlugins.Difference(sets.New(names.NodeAffinity, names.NodeResourcesFit)).Len() != 0 {
return fmt.Errorf("unexpected unschedulable plugin(s) is registered in pod1: %v", pInfo.UnschedulablePlugins.UnsortedList())
return nil, fmt.Errorf("unexpected unschedulable plugin(s) is registered in pod1: %v", pInfo.UnschedulablePlugins.UnsortedList())
}
return nil
return nil, nil
},
},
{
@ -563,12 +555,12 @@ func TestCoreResourceEnqueue(t *testing.T) {
st.MakePod().Name("pod3").Label("anti2", "anti2").PodAntiAffinityExists("anti2", "node", st.PodAntiAffinityWithRequiredReq).Container("image").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
// Delete the pod's label 'anti1' which will make it match pod2's antiAffinity.
if _, err := testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).Update(testCtx.Ctx, st.MakePod().Name("pod1").Label("anti2", "anti2").Container("image").Node("fake-node").Obj(), metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update pod1: %w", err)
return nil, fmt.Errorf("failed to update pod1: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.PodLabelChange: 1}, nil
},
wantRequeuedPods: sets.New("pod2"),
enableSchedulingQueueHint: []bool{true},
@ -585,12 +577,12 @@ func TestCoreResourceEnqueue(t *testing.T) {
st.MakePod().Name("pod3").PodAffinityExists("bbb", "node", st.PodAffinityWithRequiredReq).Container("image").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
// Add label to the pod which will make it match pod2's nodeAffinity.
if _, err := testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).Update(testCtx.Ctx, st.MakePod().Name("pod1").Label("aaa", "bbb").Container("image").Node("fake-node").Obj(), metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update pod1: %w", err)
return nil, fmt.Errorf("failed to update pod1: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.PodLabelChange: 1}, nil
},
wantRequeuedPods: sets.New("pod2"),
enableSchedulingQueueHint: []bool{true},
@ -605,12 +597,12 @@ func TestCoreResourceEnqueue(t *testing.T) {
st.MakePod().Name("pod2").PodAffinityExists("ccc", "region", st.PodAffinityWithRequiredReq).Container("image").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
// Add label to the node which will make it match pod1's podAffinity.
if _, err := testCtx.ClientSet.CoreV1().Nodes().Update(testCtx.Ctx, st.MakeNode().Name("fake-node").Label("zone", "zone1").Obj(), metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update pod1: %w", err)
return nil, fmt.Errorf("failed to update pod1: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.NodeLabelChange: 1}, nil
},
wantRequeuedPods: sets.New("pod1"),
enableSchedulingQueueHint: []bool{true},
@ -626,14 +618,14 @@ func TestCoreResourceEnqueue(t *testing.T) {
st.MakePod().Name("pod3").Container("image").ContainerPort([]v1.ContainerPort{{ContainerPort: 8080, HostPort: 8080}}).Obj(),
st.MakePod().Name("pod4").Container("image").ContainerPort([]v1.ContainerPort{{ContainerPort: 8080, HostPort: 8081}}).Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
// Trigger an assigned Pod delete event.
// Because Pod1 and Pod3 have common port, deleting Pod1 makes Pod3 schedulable.
// By setting GracePeriodSeconds to 0, allowing Pod3 to be requeued immediately after Pod1 is deleted.
if err := testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).Delete(testCtx.Ctx, "pod1", metav1.DeleteOptions{GracePeriodSeconds: new(int64)}); err != nil {
return fmt.Errorf("failed to delete Pod: %w", err)
return nil, fmt.Errorf("failed to delete Pod: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.AssignedPodDelete: 1}, nil
},
wantRequeuedPods: sets.New("pod3"),
enableSchedulingQueueHint: []bool{true},
@ -647,14 +639,14 @@ func TestCoreResourceEnqueue(t *testing.T) {
pods: []*v1.Pod{
st.MakePod().Name("pod2").Container("image").ContainerPort([]v1.ContainerPort{{ContainerPort: 8080, HostPort: 8080}}).Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
// Trigger a NodeCreated event.
// It makes Pod2 schedulable.
node := st.MakeNode().Name("fake-node2").Label("node", "fake-node2").Obj()
if _, err := testCtx.ClientSet.CoreV1().Nodes().Create(testCtx.Ctx, node, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("failed to create a new node: %w", err)
return nil, fmt.Errorf("failed to create a new node: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.NodeAdd: 1}, nil
},
wantRequeuedPods: sets.New("pod2"),
enableSchedulingQueueHint: []bool{true},
@ -665,13 +657,13 @@ func TestCoreResourceEnqueue(t *testing.T) {
pods: []*v1.Pod{
st.MakePod().Name("pod1").Container("image").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
// Trigger a NodeUpdate event to change the Node to ready.
// It makes Pod1 schedulable.
if _, err := testCtx.ClientSet.CoreV1().Nodes().Update(testCtx.Ctx, st.MakeNode().Name("fake-node").Unschedulable(false).Obj(), metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update the node: %w", err)
return nil, fmt.Errorf("failed to update the node: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.NodeSpecUnschedulableChange: 1}, nil
},
wantRequeuedPods: sets.New("pod1"),
},
@ -681,14 +673,14 @@ func TestCoreResourceEnqueue(t *testing.T) {
pods: []*v1.Pod{
st.MakePod().Name("pod1").Container("image").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
// Trigger a NodeCreated event.
// It makes Pod1 schedulable.
node := st.MakeNode().Name("fake-node2").Obj()
if _, err := testCtx.ClientSet.CoreV1().Nodes().Create(testCtx.Ctx, node, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("failed to create a new node: %w", err)
return nil, fmt.Errorf("failed to create a new node: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.NodeAdd: 1}, nil
},
wantRequeuedPods: sets.New("pod1"),
},
@ -698,13 +690,13 @@ func TestCoreResourceEnqueue(t *testing.T) {
pods: []*v1.Pod{
st.MakePod().Name("pod1").Container("image").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
// Trigger a NodeCreated event, but with unschedulable node, which wouldn't make Pod1 schedulable.
node := st.MakeNode().Name("fake-node2").Unschedulable(true).Obj()
if _, err := testCtx.ClientSet.CoreV1().Nodes().Create(testCtx.Ctx, node, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("failed to create a new node: %w", err)
return nil, fmt.Errorf("failed to create a new node: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.NodeAdd: 1}, nil
},
wantRequeuedPods: sets.Set[string]{},
// This test case is valid only when QHint is enabled
@ -723,15 +715,15 @@ func TestCoreResourceEnqueue(t *testing.T) {
st.MakePod().Name("pod3").Label("key1", "val").SpreadConstraint(1, "node", v1.DoNotSchedule, st.MakeLabelSelector().Exists("key1").Obj(), ptr.To(int32(3)), nil, nil, nil).Container("image").Obj(),
st.MakePod().Name("pod4").Label("key2", "val").SpreadConstraint(1, "node", v1.DoNotSchedule, st.MakeLabelSelector().Exists("key2").Obj(), ptr.To(int32(3)), nil, nil, nil).Container("image").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
// Trigger an assigned Pod add event.
// It should requeue pod3 only because this pod only has key1 label that pod3's topologyspread checks, and doesn't have key2 label that pod4's one does.
pod := st.MakePod().Name("pod5").Label("key1", "val").Node("fake-node").Container("image").Obj()
if _, err := testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).Create(testCtx.Ctx, pod, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("failed to create Pod %q: %w", pod.Name, err)
return nil, fmt.Errorf("failed to create Pod %q: %w", pod.Name, err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.AssignedPodAdd: 1}, nil
},
wantRequeuedPods: sets.New("pod3"),
enableSchedulingQueueHint: []bool{true},
@ -748,14 +740,14 @@ func TestCoreResourceEnqueue(t *testing.T) {
st.MakePod().Name("pod3").Label("key1", "val").SpreadConstraint(1, "node", v1.DoNotSchedule, st.MakeLabelSelector().Exists("key1").Obj(), ptr.To(int32(3)), nil, nil, nil).Container("image").Obj(),
st.MakePod().Name("pod4").Label("key2", "val").SpreadConstraint(1, "node", v1.DoNotSchedule, st.MakeLabelSelector().Exists("key2").Obj(), ptr.To(int32(3)), nil, nil, nil).Container("image").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
// Trigger an assigned Pod update event.
// It should requeue pod3 only because this updated pod had key1 label,
// and it's related only to the label selector that pod3's topologyspread has.
if _, err := testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).Update(testCtx.Ctx, st.MakePod().Name("pod1").Label("key3", "val").Container("image").Node("fake-node").Obj(), metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update the pod: %w", err)
return nil, fmt.Errorf("failed to update the pod: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.AssignedPodUpdate: 1}, nil
},
wantRequeuedPods: sets.New("pod3"),
enableSchedulingQueueHint: []bool{true},
@ -772,13 +764,13 @@ func TestCoreResourceEnqueue(t *testing.T) {
st.MakePod().Name("pod3").Label("key1", "val").SpreadConstraint(1, "node", v1.DoNotSchedule, st.MakeLabelSelector().Exists("key1").Obj(), ptr.To(int32(2)), nil, nil, nil).Container("image").Obj(),
st.MakePod().Name("pod4").Label("key2", "val").SpreadConstraint(1, "node", v1.DoNotSchedule, st.MakeLabelSelector().Exists("key2").Obj(), ptr.To(int32(3)), nil, nil, nil).Container("image").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
// Trigger an assigned Pod delete event.
// It should requeue pod3 only because this pod only has key1 label that pod3's topologyspread checks, and doesn't have key2 label that pod4's one does.
if err := testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).Delete(testCtx.Ctx, "pod1", metav1.DeleteOptions{GracePeriodSeconds: new(int64)}); err != nil {
return fmt.Errorf("failed to delete Pod: %w", err)
return nil, fmt.Errorf("failed to delete Pod: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.AssignedPodDelete: 1}, nil
},
wantRequeuedPods: sets.New("pod3"),
enableSchedulingQueueHint: []bool{true},
@ -798,14 +790,14 @@ func TestCoreResourceEnqueue(t *testing.T) {
st.MakePod().Name("pod3").Label("key1", "val").SpreadConstraint(1, "node", v1.DoNotSchedule, st.MakeLabelSelector().Exists("key1").Obj(), ptr.To(int32(2)), nil, nil, nil).Container("image").Obj(),
st.MakePod().Name("pod4").Label("key1", "val").SpreadConstraint(1, "zone", v1.DoNotSchedule, st.MakeLabelSelector().Exists("key1").Obj(), ptr.To(int32(2)), nil, nil, nil).Container("image").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
// Trigger an Node add event.
// It should requeue pod3 only because this node only has node label, and doesn't have zone label that pod4's topologyspread requires.
node := st.MakeNode().Name("fake-node3").Label("node", "fake-node").Obj()
if _, err := testCtx.ClientSet.CoreV1().Nodes().Create(testCtx.Ctx, node, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("failed to create a new node: %w", err)
return nil, fmt.Errorf("failed to create a new node: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.NodeAdd: 1}, nil
},
wantRequeuedPods: sets.New("pod3"),
enableSchedulingQueueHint: []bool{true},
@ -825,14 +817,14 @@ func TestCoreResourceEnqueue(t *testing.T) {
st.MakePod().Name("pod3").Label("key1", "val").SpreadConstraint(1, "node", v1.DoNotSchedule, st.MakeLabelSelector().Exists("key1").Obj(), ptr.To(int32(3)), nil, nil, nil).Container("image").Obj(),
st.MakePod().Name("pod4").Label("key1", "val").SpreadConstraint(1, "zone", v1.DoNotSchedule, st.MakeLabelSelector().Exists("key1").Obj(), ptr.To(int32(3)), nil, nil, nil).Container("image").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
// Trigger an Node update event.
// It should requeue pod4 only because this node only has zone label, and doesn't have node label that pod3 requires.
node := st.MakeNode().Name("fake-node2").Label("zone", "fake-node").Obj()
if _, err := testCtx.ClientSet.CoreV1().Nodes().Update(testCtx.Ctx, node, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update node: %w", err)
return nil, fmt.Errorf("failed to update node: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.NodeLabelChange: 1}, nil
},
wantRequeuedPods: sets.New("pod4"),
enableSchedulingQueueHint: []bool{true},
@ -852,13 +844,13 @@ func TestCoreResourceEnqueue(t *testing.T) {
st.MakePod().Name("pod3").Label("key1", "val").SpreadConstraint(1, "node", v1.DoNotSchedule, st.MakeLabelSelector().Exists("key1").Obj(), ptr.To(int32(3)), nil, nil, nil).Container("image").Obj(),
st.MakePod().Name("pod4").Label("key1", "val").SpreadConstraint(1, "zone", v1.DoNotSchedule, st.MakeLabelSelector().Exists("key1").Obj(), ptr.To(int32(3)), nil, nil, nil).Container("image").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
// Trigger an NodeTaint delete event.
// It should requeue pod4 only because this node only has zone label, and doesn't have node label that pod3 requires.
if err := testCtx.ClientSet.CoreV1().Nodes().Delete(testCtx.Ctx, "fake-node2", metav1.DeleteOptions{}); err != nil {
return fmt.Errorf("failed to update node: %w", err)
return nil, fmt.Errorf("failed to update node: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.NodeDelete: 1}, nil
},
wantRequeuedPods: sets.New("pod4"),
enableSchedulingQueueHint: []bool{true},
@ -878,13 +870,13 @@ func TestCoreResourceEnqueue(t *testing.T) {
st.MakePod().Name("pod3").Label("key1", "val").SpreadConstraint(1, "node", v1.DoNotSchedule, st.MakeLabelSelector().Exists("key1").Obj(), ptr.To(int32(3)), nil, nil, nil).Container("image").Obj(),
st.MakePod().Name("pod4").Label("key1", "val").SpreadConstraint(1, "zone", v1.DoNotSchedule, st.MakeLabelSelector().Exists("key1").Obj(), ptr.To(int32(3)), nil, nil, nil).Container("image").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
// Trigger an NodeTaint delete event.
// It should requeue both pod3 and pod4 only because PodTopologySpread subscribes to Node/delete events.
if err := testCtx.ClientSet.CoreV1().Nodes().Delete(testCtx.Ctx, "fake-node2", metav1.DeleteOptions{}); err != nil {
return fmt.Errorf("failed to update node: %w", err)
return nil, fmt.Errorf("failed to update node: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.NodeDelete: 1}, nil
},
wantRequeuedPods: sets.New("pod3", "pod4"),
enableSchedulingQueueHint: []bool{false},
@ -905,14 +897,14 @@ func TestCoreResourceEnqueue(t *testing.T) {
st.MakePod().Name("pod3").Label("key1", "val").SpreadConstraint(1, "node", v1.DoNotSchedule, st.MakeLabelSelector().Exists("key1").Obj(), ptr.To(int32(3)), nil, nil, nil).Container("image").Obj(),
st.MakePod().Name("pod4").Label("key1", "val").SpreadConstraint(1, "zone", v1.DoNotSchedule, st.MakeLabelSelector().Exists("key1").Obj(), ptr.To(int32(3)), nil, nil, nil).Container("image").Toleration("aaa").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
// Trigger an NodeTaint update event.
// It should requeue pod4 only because this node only has zone label, and doesn't have node label that pod3 requires.
node := st.MakeNode().Name("fake-node3").Label("zone", "fake-node").Taints([]v1.Taint{{Key: "aaa", Value: "bbb", Effect: v1.TaintEffectNoSchedule}}).Obj()
if _, err := testCtx.ClientSet.CoreV1().Nodes().Update(testCtx.Ctx, node, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update node: %w", err)
return nil, fmt.Errorf("failed to update node: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.NodeTaintChange: 1}, nil
},
wantRequeuedPods: sets.New("pod4"),
},
@ -950,16 +942,16 @@ func TestCoreResourceEnqueue(t *testing.T) {
pods: []*v1.Pod{
st.MakePod().Name("pod2").Container("image").PVC("pvc2").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
pv2 := st.MakePersistentVolume().Name("pv2").Label(v1.LabelTopologyZone, "us-west1-a").
AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}).
Capacity(v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}).
HostPathVolumeSource(&v1.HostPathVolumeSource{Path: "/tmp", Type: ptr.To(v1.HostPathDirectoryOrCreate)}).
Obj()
if _, err := testCtx.ClientSet.CoreV1().PersistentVolumes().Create(testCtx.Ctx, pv2, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("failed to create pv2: %w", err)
return nil, fmt.Errorf("failed to create pv2: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.PvAdd: 1}, nil
},
wantRequeuedPods: sets.New("pod2"),
enableSchedulingQueueHint: []bool{true},
@ -1005,16 +997,16 @@ func TestCoreResourceEnqueue(t *testing.T) {
pods: []*v1.Pod{
st.MakePod().Name("pod2").Container("image").PVC("pvc2").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
pv2 := st.MakePersistentVolume().Name("pv2").Label(v1.LabelTopologyZone, "us-west1-a").
AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}).
Capacity(v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}).
HostPathVolumeSource(&v1.HostPathVolumeSource{Path: "/tmp", Type: ptr.To(v1.HostPathDirectoryOrCreate)}).
Obj()
if _, err := testCtx.ClientSet.CoreV1().PersistentVolumes().Update(testCtx.Ctx, pv2, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update pv2: %w", err)
return nil, fmt.Errorf("failed to update pv2: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.PvUpdate: 1}, nil
},
wantRequeuedPods: sets.New("pod2"),
enableSchedulingQueueHint: []bool{true},
@ -1054,7 +1046,7 @@ func TestCoreResourceEnqueue(t *testing.T) {
st.MakePod().Name("pod2").Container("image").PVC("pvc2").Obj(),
st.MakePod().Name("pod3").Container("image").PVC("pvc3").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
pvc2 := st.MakePersistentVolumeClaim().
Name("pvc2").
Annotation(volume.AnnBindCompleted, "true").
@ -1063,9 +1055,9 @@ func TestCoreResourceEnqueue(t *testing.T) {
Resources(v1.VolumeResourceRequirements{Requests: v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}}).
Obj()
if _, err := testCtx.ClientSet.CoreV1().PersistentVolumeClaims(testCtx.NS.Name).Create(testCtx.Ctx, pvc2, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("failed to add pvc2: %w", err)
return nil, fmt.Errorf("failed to add pvc2: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.PvcAdd: 1}, nil
},
wantRequeuedPods: sets.New("pod2"),
enableSchedulingQueueHint: []bool{true},
@ -1111,7 +1103,7 @@ func TestCoreResourceEnqueue(t *testing.T) {
st.MakePod().Name("pod2").Container("image").PVC("pvc2").Obj(),
st.MakePod().Name("pod3").Container("image").PVC("pvc3").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
pvc2 := st.MakePersistentVolumeClaim().
Name("pvc2").
Annotation(volume.AnnBindCompleted, "true").
@ -1120,9 +1112,9 @@ func TestCoreResourceEnqueue(t *testing.T) {
Resources(v1.VolumeResourceRequirements{Requests: v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}}).
Obj()
if _, err := testCtx.ClientSet.CoreV1().PersistentVolumeClaims(testCtx.NS.Name).Update(testCtx.Ctx, pvc2, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update pvc2: %w", err)
return nil, fmt.Errorf("failed to update pvc2: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.PvcUpdate: 1}, nil
},
wantRequeuedPods: sets.New("pod2"),
enableSchedulingQueueHint: []bool{true},
@ -1161,16 +1153,16 @@ func TestCoreResourceEnqueue(t *testing.T) {
pods: []*v1.Pod{
st.MakePod().Name("pod2").Container("image").PVC("pvc2").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
sc1 := st.MakeStorageClass().
Name("sc1").
VolumeBindingMode(storagev1.VolumeBindingWaitForFirstConsumer).
Provisioner("p").
Obj()
if _, err := testCtx.ClientSet.StorageV1().StorageClasses().Create(testCtx.Ctx, sc1, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("failed to create sc1: %w", err)
return nil, fmt.Errorf("failed to create sc1: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.StorageClassAdd: 1}, nil
},
wantRequeuedPods: sets.New("pod2"),
enableSchedulingQueueHint: []bool{true},
@ -1216,7 +1208,7 @@ func TestCoreResourceEnqueue(t *testing.T) {
pods: []*v1.Pod{
st.MakePod().Name("pod2").Container("image").PVC("pvc2").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
triggerFn: func(testCtx *testutils.TestContext) (map[framework.ClusterEvent]uint64, error) {
pv2 := st.MakePersistentVolume().Name("pv2").
Labels(map[string]string{v1.LabelTopologyZone: "us-east1", "unrelated": "unrelated"}).
AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}).
@ -1224,9 +1216,9 @@ func TestCoreResourceEnqueue(t *testing.T) {
HostPathVolumeSource(&v1.HostPathVolumeSource{Path: "/tmp", Type: ptr.To(v1.HostPathDirectoryOrCreate)}).
Obj()
if _, err := testCtx.ClientSet.CoreV1().PersistentVolumes().Update(testCtx.Ctx, pv2, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update pv2: %w", err)
return nil, fmt.Errorf("failed to update pv2: %w", err)
}
return nil
return map[framework.ClusterEvent]uint64{framework.PvUpdate: 1}, nil
},
wantRequeuedPods: sets.Set[string]{},
enableSchedulingQueueHint: []bool{true},
@ -1319,12 +1311,35 @@ func TestCoreResourceEnqueue(t *testing.T) {
t.Log("finished initial schedulings for all Pods, will trigger triggerFn")
err := tt.triggerFn(testCtx)
legacyregistry.Reset() // reset the metric before triggering
wantTriggeredEvents, err := tt.triggerFn(testCtx)
if err != nil {
t.Fatalf("Failed to trigger the event: %v", err)
}
t.Log("triggered tt.triggerFn, will check if tt.requeuedPods are requeued")
t.Log("finished triggering triggerFn, waiting for the scheduler to handle events")
if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
for e, count := range wantTriggeredEvents {
vec, err := testutil.GetHistogramVecFromGatherer(legacyregistry.DefaultGatherer, "scheduler_event_handling_duration_seconds", map[string]string{
"event": string(e.Label),
})
if err != nil {
return false, err
}
if vec.GetAggregatedSampleCount() != count {
t.Logf("Expected %d sample for event %s, got %d", count, e.Label, vec.GetAggregatedSampleCount())
return false, nil
}
}
return true, nil
}); err != nil {
t.Fatalf("Failed to wait for the scheduler to handle the event: %v", err)
}
t.Log("all events are handled by the scheduler, will check if tt.requeuedPods are requeued")
// Wait for the tt.pods to be still present in the scheduling queue.
var requeuedPods sets.Set[string]