diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index f90efd2969b..f14d59c4f6e 100755 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -27,6 +27,12 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" + v1helper "k8s.io/component-helpers/scheduling/corev1" + "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/profile" ) @@ -38,7 +44,7 @@ func (sched *Scheduler) onPvAdd(obj interface{}) { // provisioning and binding process, will not trigger events to schedule pod // again. So we need to move pods to active queue on PV add for this // scenario. - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvAdd) + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvAdd, nil) } func (sched *Scheduler) onPvUpdate(old, new interface{}) { @@ -46,15 +52,15 @@ func (sched *Scheduler) onPvUpdate(old, new interface{}) { // bindings due to conflicts if PVs are updated by PV controller or other // parties, then scheduler will add pod back to unschedulable queue. We // need to move pods to active queue on PV update for this scenario. - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvUpdate) + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvUpdate, nil) } func (sched *Scheduler) onPvcAdd(obj interface{}) { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvcAdd) + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvcAdd, nil) } func (sched *Scheduler) onPvcUpdate(old, new interface{}) { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvcUpdate) + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvcUpdate, nil) } func (sched *Scheduler) onStorageClassAdd(obj interface{}) { @@ -71,20 +77,20 @@ func (sched *Scheduler) onStorageClassAdd(obj interface{}) { // We don't need to invalidate cached results because results will not be // cached for pod that has unbound immediate PVCs. if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.StorageClassAdd) + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.StorageClassAdd, nil) } } func (sched *Scheduler) onServiceAdd(obj interface{}) { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceAdd) + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceAdd, nil) } func (sched *Scheduler) onServiceUpdate(oldObj interface{}, newObj interface{}) { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceUpdate) + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceUpdate, nil) } func (sched *Scheduler) onServiceDelete(obj interface{}) { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceDelete) + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceDelete, nil) } func (sched *Scheduler) addNodeToCache(obj interface{}) { @@ -94,12 +100,9 @@ func (sched *Scheduler) addNodeToCache(obj interface{}) { return } - if err := sched.SchedulerCache.AddNode(node); err != nil { - klog.ErrorS(err, "Scheduler cache AddNode failed") - } - + nodeInfo := sched.SchedulerCache.AddNode(node) klog.V(3).InfoS("Add event for node", "node", klog.KObj(node)) - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.NodeAdd) + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.NodeAdd, preCheckForNode(nodeInfo)) } func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) { @@ -114,13 +117,10 @@ func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) { return } - if err := sched.SchedulerCache.UpdateNode(oldNode, newNode); err != nil { - klog.ErrorS(err, "Scheduler cache UpdateNode failed") - } - + nodeInfo := sched.SchedulerCache.UpdateNode(oldNode, newNode) // Only requeue unschedulable pods if the node became more schedulable. if event := nodeSchedulingPropertiesChange(newNode, oldNode); event != "" { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(event) + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(event, preCheckForNode(nodeInfo)) } } @@ -152,11 +152,11 @@ func (sched *Scheduler) deleteNodeFromCache(obj interface{}) { } func (sched *Scheduler) onCSINodeAdd(obj interface{}) { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.CSINodeAdd) + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.CSINodeAdd, nil) } func (sched *Scheduler) onCSINodeUpdate(oldObj, newObj interface{}) { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.CSINodeUpdate) + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.CSINodeUpdate, nil) } func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) { @@ -286,7 +286,7 @@ func (sched *Scheduler) deletePodFromCache(obj interface{}) { klog.ErrorS(err, "Scheduler cache RemovePod failed", "pod", klog.KObj(pod)) } - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.AssignedPodDelete) + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.AssignedPodDelete, nil) } // assignedPod selects pods that are assigned (scheduled and running). @@ -506,3 +506,39 @@ func nodeConditionsChanged(newNode *v1.Node, oldNode *v1.Node) bool { func nodeSpecUnschedulableChanged(newNode *v1.Node, oldNode *v1.Node) bool { return newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable && !newNode.Spec.Unschedulable } + +func preCheckForNode(nodeInfo *framework.NodeInfo) queue.PreEnqueueCheck { + // In addition to the checks in kubelet (pkg/kubelet/lifecycle/predicate.go#GeneralPredicates), + // the following logic appends a taint/toleration check. + // TODO: verify if kubelet should also apply the taint/toleration check, and then unify the + // logic with kubelet and move to a shared place. + // + // Note: the following checks doesn't take preemption into considerations, in very rare + // cases (e.g., node resizing), "pod" may still fail a check but preemption helps. We deliberately + // chose to ignore those cases as unschedulable pods will be re-queued eventually. + return func(pod *v1.Pod) bool { + if len(noderesources.Fits(pod, nodeInfo)) != 0 { + return false + } + + // Ignore parsing errors for backwards compatibility. + matches, _ := nodeaffinity.GetRequiredNodeAffinity(pod).Match(nodeInfo.Node()) + if !matches { + return false + } + + if !nodename.Fits(pod, nodeInfo) { + return false + } + + if !nodeports.Fits(pod, nodeInfo) { + return false + } + + _, isUntolerated := v1helper.FindMatchingUntoleratedTaint(nodeInfo.Node().Spec.Taints, pod.Spec.Tolerations, func(t *v1.Taint) bool { + // PodToleratesNodeTaints is only interested in NoSchedule and NoExecute taints. + return t.Effect == v1.TaintEffectNoSchedule || t.Effect == v1.TaintEffectNoExecute + }) + return !isUntolerated + } +} diff --git a/pkg/scheduler/eventhandlers_test.go b/pkg/scheduler/eventhandlers_test.go index b0bc31844e2..39196ca8e68 100644 --- a/pkg/scheduler/eventhandlers_test.go +++ b/pkg/scheduler/eventhandlers_test.go @@ -22,12 +22,15 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/internal/cache" fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake" "k8s.io/kubernetes/pkg/scheduler/internal/queue" + st "k8s.io/kubernetes/pkg/scheduler/testing" ) func TestSkipPodUpdate(t *testing.T) { @@ -443,3 +446,107 @@ func withPodName(pod *v1.Pod, name string) *v1.Pod { pod.Name = name return pod } + +func TestPreCheckForNode(t *testing.T) { + cpu4 := map[v1.ResourceName]string{v1.ResourceCPU: "4"} + cpu8 := map[v1.ResourceName]string{v1.ResourceCPU: "8"} + cpu16 := map[v1.ResourceName]string{v1.ResourceCPU: "16"} + tests := []struct { + name string + nodeFn func() *v1.Node + existingPods, pods []*v1.Pod + want []bool + }{ + { + name: "regular node, pods with a single constraint", + nodeFn: func() *v1.Node { + return st.MakeNode().Name("fake-node").Label("hostname", "fake-node").Capacity(cpu8).Obj() + }, + existingPods: []*v1.Pod{ + st.MakePod().Name("p").HostPort(80).Obj(), + }, + pods: []*v1.Pod{ + st.MakePod().Name("p1").Req(cpu4).Obj(), + st.MakePod().Name("p2").Req(cpu16).Obj(), + st.MakePod().Name("p3").Req(cpu4).Req(cpu8).Obj(), + st.MakePod().Name("p4").NodeAffinityIn("hostname", []string{"fake-node"}).Obj(), + st.MakePod().Name("p5").NodeAffinityNotIn("hostname", []string{"fake-node"}).Obj(), + st.MakePod().Name("p6").Obj(), + st.MakePod().Name("p7").Node("invalid-node").Obj(), + st.MakePod().Name("p8").HostPort(8080).Obj(), + st.MakePod().Name("p9").HostPort(80).Obj(), + }, + want: []bool{true, false, false, true, false, true, false, true, false}, + }, + { + name: "tainted node, pods with a single constraint", + nodeFn: func() *v1.Node { + node := st.MakeNode().Name("fake-node").Obj() + node.Spec.Taints = []v1.Taint{ + {Key: "foo", Effect: v1.TaintEffectNoSchedule}, + {Key: "bar", Effect: v1.TaintEffectPreferNoSchedule}, + } + return node + }, + pods: []*v1.Pod{ + st.MakePod().Name("p1").Obj(), + st.MakePod().Name("p2").Toleration("foo").Obj(), + st.MakePod().Name("p3").Toleration("bar").Obj(), + st.MakePod().Name("p4").Toleration("bar").Toleration("foo").Obj(), + }, + want: []bool{false, true, false, true}, + }, + { + name: "regular node, pods with multiple constraints", + nodeFn: func() *v1.Node { + return st.MakeNode().Name("fake-node").Label("hostname", "fake-node").Capacity(cpu8).Obj() + }, + existingPods: []*v1.Pod{ + st.MakePod().Name("p").HostPort(80).Obj(), + }, + pods: []*v1.Pod{ + st.MakePod().Name("p1").Req(cpu4).NodeAffinityNotIn("hostname", []string{"fake-node"}).Obj(), + st.MakePod().Name("p2").Req(cpu16).NodeAffinityIn("hostname", []string{"fake-node"}).Obj(), + st.MakePod().Name("p3").Req(cpu8).NodeAffinityIn("hostname", []string{"fake-node"}).Obj(), + st.MakePod().Name("p4").HostPort(8080).Node("invalid-node").Obj(), + st.MakePod().Name("p5").Req(cpu4).NodeAffinityIn("hostname", []string{"fake-node"}).HostPort(80).Obj(), + }, + want: []bool{false, false, true, false, false}, + }, + { + name: "tainted node, pods with multiple constraints", + nodeFn: func() *v1.Node { + node := st.MakeNode().Name("fake-node").Label("hostname", "fake-node").Capacity(cpu8).Obj() + node.Spec.Taints = []v1.Taint{ + {Key: "foo", Effect: v1.TaintEffectNoSchedule}, + {Key: "bar", Effect: v1.TaintEffectPreferNoSchedule}, + } + return node + }, + pods: []*v1.Pod{ + st.MakePod().Name("p1").Req(cpu4).Toleration("bar").Obj(), + st.MakePod().Name("p2").Req(cpu4).Toleration("bar").Toleration("foo").Obj(), + st.MakePod().Name("p3").Req(cpu16).Toleration("foo").Obj(), + st.MakePod().Name("p3").Req(cpu16).Toleration("bar").Obj(), + }, + want: []bool{false, true, false, false}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + nodeInfo := framework.NewNodeInfo(tt.existingPods...) + nodeInfo.SetNode(tt.nodeFn()) + preCheckFn := preCheckForNode(nodeInfo) + + var got []bool + for _, pod := range tt.pods { + got = append(got, preCheckFn(pod)) + } + + if diff := cmp.Diff(tt.want, got); diff != "" { + t.Errorf("Unexpected diff (-want, +got):\n%s", diff) + } + }) + } +} diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index a29c0791510..28d3ca6ed3b 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -824,12 +824,11 @@ func (n *NodeInfo) updateUsedPorts(pod *v1.Pod, add bool) { } // SetNode sets the overall node information. -func (n *NodeInfo) SetNode(node *v1.Node) error { +func (n *NodeInfo) SetNode(node *v1.Node) { n.node = node n.Allocatable = NewResource(node.Status.Allocatable) n.TransientInfo = NewTransientSchedulerInfo() n.Generation = nextGeneration() - return nil } // RemoveNode removes the node object, leaving all other tracking information. diff --git a/pkg/scheduler/internal/cache/cache.go b/pkg/scheduler/internal/cache/cache.go index c1f07742b0a..a24276aaadd 100644 --- a/pkg/scheduler/internal/cache/cache.go +++ b/pkg/scheduler/internal/cache/cache.go @@ -593,7 +593,7 @@ func (cache *schedulerCache) GetPod(pod *v1.Pod) (*v1.Pod, error) { return podState.pod, nil } -func (cache *schedulerCache) AddNode(node *v1.Node) error { +func (cache *schedulerCache) AddNode(node *v1.Node) *framework.NodeInfo { cache.mu.Lock() defer cache.mu.Unlock() @@ -608,10 +608,11 @@ func (cache *schedulerCache) AddNode(node *v1.Node) error { cache.nodeTree.addNode(node) cache.addNodeImageStates(node, n.info) - return n.info.SetNode(node) + n.info.SetNode(node) + return n.info.Clone() } -func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error { +func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) *framework.NodeInfo { cache.mu.Lock() defer cache.mu.Unlock() @@ -627,7 +628,8 @@ func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error { cache.nodeTree.updateNode(oldNode, newNode) cache.addNodeImageStates(newNode, n.info) - return n.info.SetNode(newNode) + n.info.SetNode(newNode) + return n.info.Clone() } // RemoveNode removes a node from the cache's tree. diff --git a/pkg/scheduler/internal/cache/cache_test.go b/pkg/scheduler/internal/cache/cache_test.go index c4d279221a4..074032c4fb2 100644 --- a/pkg/scheduler/internal/cache/cache_test.go +++ b/pkg/scheduler/internal/cache/cache_test.go @@ -869,9 +869,7 @@ func TestRemovePod(t *testing.T) { t.Error(err) } for _, n := range tt.nodes { - if err := cache.AddNode(n); err != nil { - t.Error(err) - } + cache.AddNode(n) } if err := cache.RemovePod(tt.pod); err != nil { @@ -1080,9 +1078,7 @@ func TestNodeOperators(t *testing.T) { node := test.node cache := newSchedulerCache(time.Second, time.Second, nil) - if err := cache.AddNode(node); err != nil { - t.Fatal(err) - } + cache.AddNode(node) for _, pod := range test.pods { if err := cache.AddPod(pod); err != nil { t.Fatal(err) @@ -1126,9 +1122,7 @@ func TestNodeOperators(t *testing.T) { node.Status.Allocatable[v1.ResourceMemory] = mem50m expected.Allocatable.Memory = mem50m.Value() - if err := cache.UpdateNode(nil, node); err != nil { - t.Error(err) - } + cache.UpdateNode(nil, node) got, found = cache.nodes[node.Name] if !found { t.Errorf("Failed to find node %v in schedulertypes after UpdateNode.", node.Name) @@ -1265,9 +1259,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { addNode := func(i int) operation { return func(t *testing.T) { - if err := cache.AddNode(nodes[i]); err != nil { - t.Error(err) - } + cache.AddNode(nodes[i]) } } removeNode := func(i int) operation { @@ -1279,9 +1271,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { } updateNode := func(i int) operation { return func(t *testing.T) { - if err := cache.UpdateNode(nodes[i], updatedNodes[i]); err != nil { - t.Error(err) - } + cache.UpdateNode(nodes[i], updatedNodes[i]) } } addPod := func(i int) operation { @@ -1589,9 +1579,7 @@ func TestSchedulerCache_updateNodeInfoSnapshotList(t *testing.T) { var snapshot *Snapshot addNode := func(t *testing.T, i int) { - if err := cache.AddNode(nodes[i]); err != nil { - t.Error(err) - } + cache.AddNode(nodes[i]) _, ok := snapshot.nodeInfoMap[nodes[i].Name] if !ok { snapshot.nodeInfoMap[nodes[i].Name] = cache.nodes[nodes[i].Name].info diff --git a/pkg/scheduler/internal/cache/fake/fake_cache.go b/pkg/scheduler/internal/cache/fake/fake_cache.go index 0b1d0714045..0bcf7e13ae6 100644 --- a/pkg/scheduler/internal/cache/fake/fake_cache.go +++ b/pkg/scheduler/internal/cache/fake/fake_cache.go @@ -18,6 +18,7 @@ package fake import ( v1 "k8s.io/api/core/v1" + "k8s.io/kubernetes/pkg/scheduler/framework" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" ) @@ -64,10 +65,10 @@ func (c *Cache) GetPod(pod *v1.Pod) (*v1.Pod, error) { } // AddNode is a fake method for testing. -func (c *Cache) AddNode(node *v1.Node) error { return nil } +func (c *Cache) AddNode(node *v1.Node) *framework.NodeInfo { return nil } // UpdateNode is a fake method for testing. -func (c *Cache) UpdateNode(oldNode, newNode *v1.Node) error { return nil } +func (c *Cache) UpdateNode(oldNode, newNode *v1.Node) *framework.NodeInfo { return nil } // RemoveNode is a fake method for testing. func (c *Cache) RemoveNode(node *v1.Node) error { return nil } diff --git a/pkg/scheduler/internal/cache/interface.go b/pkg/scheduler/internal/cache/interface.go index f1baf4b525c..4cd6d8f8869 100644 --- a/pkg/scheduler/internal/cache/interface.go +++ b/pkg/scheduler/internal/cache/interface.go @@ -94,10 +94,12 @@ type Cache interface { IsAssumedPod(pod *v1.Pod) (bool, error) // AddNode adds overall information about node. - AddNode(node *v1.Node) error + // It returns a clone of added NodeInfo object. + AddNode(node *v1.Node) *framework.NodeInfo // UpdateNode updates overall information about node. - UpdateNode(oldNode, newNode *v1.Node) error + // It returns a clone of updated NodeInfo object. + UpdateNode(oldNode, newNode *v1.Node) *framework.NodeInfo // RemoveNode removes overall information about node. RemoveNode(node *v1.Node) error diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 393d11ff392..1a58c849210 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -66,6 +66,11 @@ const ( DefaultPodMaxBackoffDuration time.Duration = 10 * time.Second ) +// PreEnqueueCheck is a function type. It's used to build functions that +// run against a Pod and the caller can choose to enqueue or skip the Pod +// by the checking result. +type PreEnqueueCheck func(pod *v1.Pod) bool + // SchedulingQueue is an interface for a queue to store pods waiting to be scheduled. // The interface follows a pattern similar to cache.FIFO and cache.Heap and // makes it easy to use those data structures as a SchedulingQueue. @@ -85,7 +90,7 @@ type SchedulingQueue interface { Pop() (*framework.QueuedPodInfo, error) Update(oldPod, newPod *v1.Pod) error Delete(pod *v1.Pod) error - MoveAllToActiveOrBackoffQueue(event string) + MoveAllToActiveOrBackoffQueue(event string, preCheck PreEnqueueCheck) AssignedPodAdded(pod *v1.Pod) AssignedPodUpdated(pod *v1.Pod) PendingPods() []*v1.Pod @@ -524,12 +529,14 @@ func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) { // This function adds all pods and then signals the condition variable to ensure that // if Pop() is waiting for an item, it receives it after all the pods are in the // queue and the head is the highest priority pod. -func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(event string) { +func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(event string, preCheck PreEnqueueCheck) { p.lock.Lock() defer p.lock.Unlock() unschedulablePods := make([]*framework.QueuedPodInfo, 0, len(p.unschedulableQ.podInfoMap)) for _, pInfo := range p.unschedulableQ.podInfoMap { - unschedulablePods = append(unschedulablePods, pInfo) + if preCheck == nil || preCheck(pInfo.Pod) { + unschedulablePods = append(unschedulablePods, pInfo) + } } p.movePodsToActiveOrBackoffQueue(unschedulablePods, event) } diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index ab1adb80533..1c14b872671 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -25,6 +25,8 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" + "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -37,6 +39,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/util" + "k8s.io/utils/pointer" ) const queueMetricMetadata = ` @@ -237,7 +240,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) { } // move all pods to active queue when we were trying to schedule them - q.MoveAllToActiveOrBackoffQueue("test") + q.MoveAllToActiveOrBackoffQueue("test", nil) oldCycle := q.SchedulingCycle() firstPod, _ := q.Pop() @@ -477,10 +480,10 @@ func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) { b.StartTimer() if tt.moveEvent != "" { - q.MoveAllToActiveOrBackoffQueue(tt.moveEvent) + q.MoveAllToActiveOrBackoffQueue(tt.moveEvent, nil) } else { // Random case. - q.MoveAllToActiveOrBackoffQueue(events[i%len(events)]) + q.MoveAllToActiveOrBackoffQueue(events[i%len(events)], nil) } } }) @@ -506,7 +509,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { hpp2.Name = "hpp2" q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(hpp2, "barPlugin"), q.SchedulingCycle()) // Pods is still backing off, move the pod into backoffQ. - q.MoveAllToActiveOrBackoffQueue(NodeAdd) + q.MoveAllToActiveOrBackoffQueue(NodeAdd, nil) if q.activeQ.Len() != 1 { t.Errorf("Expected 1 item to be in activeQ, but got: %v", q.activeQ.Len()) } @@ -532,7 +535,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { // Move clock by podInitialBackoffDuration, so that pods in the unschedulableQ would pass the backing off, // and the pods will be moved into activeQ. c.Step(q.podInitialBackoffDuration) - q.MoveAllToActiveOrBackoffQueue(NodeAdd) + q.MoveAllToActiveOrBackoffQueue(NodeAdd, nil) // hpp2 won't be moved regardless of its backoff timer. if q.activeQ.Len() != 4 { t.Errorf("Expected 4 items to be in activeQ, but got: %v", q.activeQ.Len()) @@ -641,7 +644,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) { t.Error("Unexpected list of pending Pods.") } // Move all to active queue. We should still see the same set of pods. - q.MoveAllToActiveOrBackoffQueue("test") + q.MoveAllToActiveOrBackoffQueue("test", nil) if !reflect.DeepEqual(expectedSet, makeSet(q.PendingPods())) { t.Error("Unexpected list of pending Pods...") } @@ -944,7 +947,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) { q.AddUnschedulableIfNotPresent(p1, q.SchedulingCycle()) c.Step(DefaultPodInitialBackoffDuration) // Move all unschedulable pods to the active queue. - q.MoveAllToActiveOrBackoffQueue(UnschedulableTimeout) + q.MoveAllToActiveOrBackoffQueue(UnschedulableTimeout, nil) // Simulation is over. Now let's pop all pods. The pod popped first should be // the last one we pop here. for i := 0; i < 5; i++ { @@ -996,7 +999,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { // Move clock to make the unschedulable pods complete backoff. c.Step(DefaultPodInitialBackoffDuration + time.Second) // Move all unschedulable pods to the active queue. - q.MoveAllToActiveOrBackoffQueue(UnschedulableTimeout) + q.MoveAllToActiveOrBackoffQueue(UnschedulableTimeout, nil) // Simulate a pod being popped by the scheduler, // At this time, unschedulable pod should be popped. @@ -1039,7 +1042,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { // Move clock to make the unschedulable pods complete backoff. c.Step(DefaultPodInitialBackoffDuration + time.Second) // Move all unschedulable pods to the active queue. - q.MoveAllToActiveOrBackoffQueue(UnschedulableTimeout) + q.MoveAllToActiveOrBackoffQueue(UnschedulableTimeout, nil) // At this time, newerPod should be popped // because it is the oldest tried pod. @@ -1104,7 +1107,7 @@ func TestHighPriorityBackoff(t *testing.T) { // Put in the unschedulable queue. q.AddUnschedulableIfNotPresent(p, q.SchedulingCycle()) // Move all unschedulable pods to the active queue. - q.MoveAllToActiveOrBackoffQueue("test") + q.MoveAllToActiveOrBackoffQueue("test", nil) p, err = q.Pop() if err != nil { @@ -1211,7 +1214,7 @@ var ( queue.podBackoffQ.Add(pInfo) } moveAllToActiveOrBackoffQ = func(queue *PriorityQueue, _ *framework.QueuedPodInfo) { - queue.MoveAllToActiveOrBackoffQueue(UnschedulableTimeout) + queue.MoveAllToActiveOrBackoffQueue(UnschedulableTimeout, nil) } flushBackoffQ = func(queue *PriorityQueue, _ *framework.QueuedPodInfo) { queue.clock.(*clock.FakeClock).Step(2 * time.Second) @@ -1691,7 +1694,7 @@ func TestBackOffFlow(t *testing.T) { } // An event happens. - q.MoveAllToActiveOrBackoffQueue(UnschedulableTimeout) + q.MoveAllToActiveOrBackoffQueue(UnschedulableTimeout, nil) if _, ok, _ := q.podBackoffQ.Get(podInfo); !ok { t.Errorf("pod %v is not in the backoff queue", podID) @@ -1815,6 +1818,76 @@ func TestPodMatchesEvent(t *testing.T) { } } +func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) { + var podInfos []*framework.QueuedPodInfo + for i := 0; i < 5; i++ { + pInfo := newQueuedPodInfoForLookup(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("p%d", i)}, + Spec: v1.PodSpec{Priority: pointer.Int32Ptr(int32(i))}, + }) + podInfos = append(podInfos, pInfo) + } + + tests := []struct { + name string + preEnqueueCheck PreEnqueueCheck + podInfos []*framework.QueuedPodInfo + want []string + }{ + { + name: "nil PreEnqueueCheck", + podInfos: podInfos, + want: []string{"p0", "p1", "p2", "p3", "p4"}, + }, + { + name: "move Pods with priority greater than 2", + podInfos: podInfos, + preEnqueueCheck: func(pod *v1.Pod) bool { return *pod.Spec.Priority >= 2 }, + want: []string{"p2", "p3", "p4"}, + }, + { + name: "move Pods with even priority and greater than 2", + podInfos: podInfos, + preEnqueueCheck: func(pod *v1.Pod) bool { + return *pod.Spec.Priority%2 == 0 && *pod.Spec.Priority >= 2 + }, + want: []string{"p2", "p4"}, + }, + { + name: "move Pods with even and negative priority", + podInfos: podInfos, + preEnqueueCheck: func(pod *v1.Pod) bool { + return *pod.Spec.Priority%2 == 0 && *pod.Spec.Priority < 0 + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + q := NewTestQueue(context.Background(), newDefaultQueueSort()) + for _, podInfo := range tt.podInfos { + q.AddUnschedulableIfNotPresent(podInfo, q.schedulingCycle) + } + q.MoveAllToActiveOrBackoffQueue("test", tt.preEnqueueCheck) + var got []string + for q.podBackoffQ.Len() != 0 { + obj, err := q.podBackoffQ.Pop() + if err != nil { + t.Fatalf("Fail to pop pod from backoffQ: %v", err) + } + queuedPodInfo, ok := obj.(*framework.QueuedPodInfo) + if !ok { + t.Fatalf("Fail to covert popped obj (type %T) to *framework.QueuedPodInfo", obj) + } + got = append(got, queuedPodInfo.Pod.Name) + } + if diff := cmp.Diff(tt.want, got); diff != "" { + t.Errorf("Unexpected diff (-want, +got):\n%s", diff) + } + }) + } +} + func makeQueuedPodInfos(num int, timestamp time.Time) []*framework.QueuedPodInfo { var pInfos = make([]*framework.QueuedPodInfo, 0, num) for i := 1; i <= num; i++ { diff --git a/pkg/scheduler/testing/wrappers.go b/pkg/scheduler/testing/wrappers.go index 72d8644d7d6..695bb268cdb 100644 --- a/pkg/scheduler/testing/wrappers.go +++ b/pkg/scheduler/testing/wrappers.go @@ -250,6 +250,25 @@ func (p *PodWrapper) NominatedNodeName(n string) *PodWrapper { return p } +// Toleration creates a toleration (with the operator Exists) +// and injects into the inner pod. +func (p *PodWrapper) Toleration(key string) *PodWrapper { + p.Spec.Tolerations = append(p.Spec.Tolerations, v1.Toleration{ + Key: key, + Operator: v1.TolerationOpExists, + }) + return p +} + +// HostPort creates a container with a hostPort valued `hostPort`, +// and injects into the inner pod. +func (p *PodWrapper) HostPort(port int32) *PodWrapper { + p.Spec.Containers = append(p.Spec.Containers, v1.Container{ + Ports: []v1.ContainerPort{{HostPort: port}}, + }) + return p +} + // PodAffinityKind represents different kinds of PodAffinity. type PodAffinityKind int