Merge pull request #100049 from Huang-Wei/basic-check-prior-to-enq

sched: support PreEnqueueChecks prior to moving Pods
This commit is contained in:
Kubernetes Prow Robot 2021-04-08 17:11:04 -07:00 committed by GitHub
commit 7cd73076a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 298 additions and 64 deletions

View File

@ -27,6 +27,12 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
v1helper "k8s.io/component-helpers/scheduling/corev1"
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
"k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/profile"
)
@ -38,7 +44,7 @@ func (sched *Scheduler) onPvAdd(obj interface{}) {
// provisioning and binding process, will not trigger events to schedule pod
// again. So we need to move pods to active queue on PV add for this
// scenario.
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvAdd)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvAdd, nil)
}
func (sched *Scheduler) onPvUpdate(old, new interface{}) {
@ -46,15 +52,15 @@ func (sched *Scheduler) onPvUpdate(old, new interface{}) {
// bindings due to conflicts if PVs are updated by PV controller or other
// parties, then scheduler will add pod back to unschedulable queue. We
// need to move pods to active queue on PV update for this scenario.
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvUpdate)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvUpdate, nil)
}
func (sched *Scheduler) onPvcAdd(obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvcAdd)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvcAdd, nil)
}
func (sched *Scheduler) onPvcUpdate(old, new interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvcUpdate)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvcUpdate, nil)
}
func (sched *Scheduler) onStorageClassAdd(obj interface{}) {
@ -71,20 +77,20 @@ func (sched *Scheduler) onStorageClassAdd(obj interface{}) {
// We don't need to invalidate cached results because results will not be
// cached for pod that has unbound immediate PVCs.
if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.StorageClassAdd)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.StorageClassAdd, nil)
}
}
func (sched *Scheduler) onServiceAdd(obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceAdd)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceAdd, nil)
}
func (sched *Scheduler) onServiceUpdate(oldObj interface{}, newObj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceUpdate)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceUpdate, nil)
}
func (sched *Scheduler) onServiceDelete(obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceDelete)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceDelete, nil)
}
func (sched *Scheduler) addNodeToCache(obj interface{}) {
@ -94,12 +100,9 @@ func (sched *Scheduler) addNodeToCache(obj interface{}) {
return
}
if err := sched.SchedulerCache.AddNode(node); err != nil {
klog.ErrorS(err, "Scheduler cache AddNode failed")
}
nodeInfo := sched.SchedulerCache.AddNode(node)
klog.V(3).InfoS("Add event for node", "node", klog.KObj(node))
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.NodeAdd)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.NodeAdd, preCheckForNode(nodeInfo))
}
func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
@ -114,13 +117,10 @@ func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
return
}
if err := sched.SchedulerCache.UpdateNode(oldNode, newNode); err != nil {
klog.ErrorS(err, "Scheduler cache UpdateNode failed")
}
nodeInfo := sched.SchedulerCache.UpdateNode(oldNode, newNode)
// Only requeue unschedulable pods if the node became more schedulable.
if event := nodeSchedulingPropertiesChange(newNode, oldNode); event != "" {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(event)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(event, preCheckForNode(nodeInfo))
}
}
@ -152,11 +152,11 @@ func (sched *Scheduler) deleteNodeFromCache(obj interface{}) {
}
func (sched *Scheduler) onCSINodeAdd(obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.CSINodeAdd)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.CSINodeAdd, nil)
}
func (sched *Scheduler) onCSINodeUpdate(oldObj, newObj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.CSINodeUpdate)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.CSINodeUpdate, nil)
}
func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
@ -286,7 +286,7 @@ func (sched *Scheduler) deletePodFromCache(obj interface{}) {
klog.ErrorS(err, "Scheduler cache RemovePod failed", "pod", klog.KObj(pod))
}
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.AssignedPodDelete)
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.AssignedPodDelete, nil)
}
// assignedPod selects pods that are assigned (scheduled and running).
@ -506,3 +506,39 @@ func nodeConditionsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
func nodeSpecUnschedulableChanged(newNode *v1.Node, oldNode *v1.Node) bool {
return newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable && !newNode.Spec.Unschedulable
}
func preCheckForNode(nodeInfo *framework.NodeInfo) queue.PreEnqueueCheck {
// In addition to the checks in kubelet (pkg/kubelet/lifecycle/predicate.go#GeneralPredicates),
// the following logic appends a taint/toleration check.
// TODO: verify if kubelet should also apply the taint/toleration check, and then unify the
// logic with kubelet and move to a shared place.
//
// Note: the following checks doesn't take preemption into considerations, in very rare
// cases (e.g., node resizing), "pod" may still fail a check but preemption helps. We deliberately
// chose to ignore those cases as unschedulable pods will be re-queued eventually.
return func(pod *v1.Pod) bool {
if len(noderesources.Fits(pod, nodeInfo)) != 0 {
return false
}
// Ignore parsing errors for backwards compatibility.
matches, _ := nodeaffinity.GetRequiredNodeAffinity(pod).Match(nodeInfo.Node())
if !matches {
return false
}
if !nodename.Fits(pod, nodeInfo) {
return false
}
if !nodeports.Fits(pod, nodeInfo) {
return false
}
_, isUntolerated := v1helper.FindMatchingUntoleratedTaint(nodeInfo.Node().Spec.Taints, pod.Spec.Tolerations, func(t *v1.Taint) bool {
// PodToleratesNodeTaints is only interested in NoSchedule and NoExecute taints.
return t.Effect == v1.TaintEffectNoSchedule || t.Effect == v1.TaintEffectNoExecute
})
return !isUntolerated
}
}

View File

@ -22,12 +22,15 @@ import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake"
"k8s.io/kubernetes/pkg/scheduler/internal/queue"
st "k8s.io/kubernetes/pkg/scheduler/testing"
)
func TestSkipPodUpdate(t *testing.T) {
@ -443,3 +446,107 @@ func withPodName(pod *v1.Pod, name string) *v1.Pod {
pod.Name = name
return pod
}
func TestPreCheckForNode(t *testing.T) {
cpu4 := map[v1.ResourceName]string{v1.ResourceCPU: "4"}
cpu8 := map[v1.ResourceName]string{v1.ResourceCPU: "8"}
cpu16 := map[v1.ResourceName]string{v1.ResourceCPU: "16"}
tests := []struct {
name string
nodeFn func() *v1.Node
existingPods, pods []*v1.Pod
want []bool
}{
{
name: "regular node, pods with a single constraint",
nodeFn: func() *v1.Node {
return st.MakeNode().Name("fake-node").Label("hostname", "fake-node").Capacity(cpu8).Obj()
},
existingPods: []*v1.Pod{
st.MakePod().Name("p").HostPort(80).Obj(),
},
pods: []*v1.Pod{
st.MakePod().Name("p1").Req(cpu4).Obj(),
st.MakePod().Name("p2").Req(cpu16).Obj(),
st.MakePod().Name("p3").Req(cpu4).Req(cpu8).Obj(),
st.MakePod().Name("p4").NodeAffinityIn("hostname", []string{"fake-node"}).Obj(),
st.MakePod().Name("p5").NodeAffinityNotIn("hostname", []string{"fake-node"}).Obj(),
st.MakePod().Name("p6").Obj(),
st.MakePod().Name("p7").Node("invalid-node").Obj(),
st.MakePod().Name("p8").HostPort(8080).Obj(),
st.MakePod().Name("p9").HostPort(80).Obj(),
},
want: []bool{true, false, false, true, false, true, false, true, false},
},
{
name: "tainted node, pods with a single constraint",
nodeFn: func() *v1.Node {
node := st.MakeNode().Name("fake-node").Obj()
node.Spec.Taints = []v1.Taint{
{Key: "foo", Effect: v1.TaintEffectNoSchedule},
{Key: "bar", Effect: v1.TaintEffectPreferNoSchedule},
}
return node
},
pods: []*v1.Pod{
st.MakePod().Name("p1").Obj(),
st.MakePod().Name("p2").Toleration("foo").Obj(),
st.MakePod().Name("p3").Toleration("bar").Obj(),
st.MakePod().Name("p4").Toleration("bar").Toleration("foo").Obj(),
},
want: []bool{false, true, false, true},
},
{
name: "regular node, pods with multiple constraints",
nodeFn: func() *v1.Node {
return st.MakeNode().Name("fake-node").Label("hostname", "fake-node").Capacity(cpu8).Obj()
},
existingPods: []*v1.Pod{
st.MakePod().Name("p").HostPort(80).Obj(),
},
pods: []*v1.Pod{
st.MakePod().Name("p1").Req(cpu4).NodeAffinityNotIn("hostname", []string{"fake-node"}).Obj(),
st.MakePod().Name("p2").Req(cpu16).NodeAffinityIn("hostname", []string{"fake-node"}).Obj(),
st.MakePod().Name("p3").Req(cpu8).NodeAffinityIn("hostname", []string{"fake-node"}).Obj(),
st.MakePod().Name("p4").HostPort(8080).Node("invalid-node").Obj(),
st.MakePod().Name("p5").Req(cpu4).NodeAffinityIn("hostname", []string{"fake-node"}).HostPort(80).Obj(),
},
want: []bool{false, false, true, false, false},
},
{
name: "tainted node, pods with multiple constraints",
nodeFn: func() *v1.Node {
node := st.MakeNode().Name("fake-node").Label("hostname", "fake-node").Capacity(cpu8).Obj()
node.Spec.Taints = []v1.Taint{
{Key: "foo", Effect: v1.TaintEffectNoSchedule},
{Key: "bar", Effect: v1.TaintEffectPreferNoSchedule},
}
return node
},
pods: []*v1.Pod{
st.MakePod().Name("p1").Req(cpu4).Toleration("bar").Obj(),
st.MakePod().Name("p2").Req(cpu4).Toleration("bar").Toleration("foo").Obj(),
st.MakePod().Name("p3").Req(cpu16).Toleration("foo").Obj(),
st.MakePod().Name("p3").Req(cpu16).Toleration("bar").Obj(),
},
want: []bool{false, true, false, false},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
nodeInfo := framework.NewNodeInfo(tt.existingPods...)
nodeInfo.SetNode(tt.nodeFn())
preCheckFn := preCheckForNode(nodeInfo)
var got []bool
for _, pod := range tt.pods {
got = append(got, preCheckFn(pod))
}
if diff := cmp.Diff(tt.want, got); diff != "" {
t.Errorf("Unexpected diff (-want, +got):\n%s", diff)
}
})
}
}

View File

@ -824,12 +824,11 @@ func (n *NodeInfo) updateUsedPorts(pod *v1.Pod, add bool) {
}
// SetNode sets the overall node information.
func (n *NodeInfo) SetNode(node *v1.Node) error {
func (n *NodeInfo) SetNode(node *v1.Node) {
n.node = node
n.Allocatable = NewResource(node.Status.Allocatable)
n.TransientInfo = NewTransientSchedulerInfo()
n.Generation = nextGeneration()
return nil
}
// RemoveNode removes the node object, leaving all other tracking information.

View File

@ -593,7 +593,7 @@ func (cache *schedulerCache) GetPod(pod *v1.Pod) (*v1.Pod, error) {
return podState.pod, nil
}
func (cache *schedulerCache) AddNode(node *v1.Node) error {
func (cache *schedulerCache) AddNode(node *v1.Node) *framework.NodeInfo {
cache.mu.Lock()
defer cache.mu.Unlock()
@ -608,10 +608,11 @@ func (cache *schedulerCache) AddNode(node *v1.Node) error {
cache.nodeTree.addNode(node)
cache.addNodeImageStates(node, n.info)
return n.info.SetNode(node)
n.info.SetNode(node)
return n.info.Clone()
}
func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error {
func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) *framework.NodeInfo {
cache.mu.Lock()
defer cache.mu.Unlock()
@ -627,7 +628,8 @@ func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error {
cache.nodeTree.updateNode(oldNode, newNode)
cache.addNodeImageStates(newNode, n.info)
return n.info.SetNode(newNode)
n.info.SetNode(newNode)
return n.info.Clone()
}
// RemoveNode removes a node from the cache's tree.

View File

@ -869,9 +869,7 @@ func TestRemovePod(t *testing.T) {
t.Error(err)
}
for _, n := range tt.nodes {
if err := cache.AddNode(n); err != nil {
t.Error(err)
}
cache.AddNode(n)
}
if err := cache.RemovePod(tt.pod); err != nil {
@ -1080,9 +1078,7 @@ func TestNodeOperators(t *testing.T) {
node := test.node
cache := newSchedulerCache(time.Second, time.Second, nil)
if err := cache.AddNode(node); err != nil {
t.Fatal(err)
}
cache.AddNode(node)
for _, pod := range test.pods {
if err := cache.AddPod(pod); err != nil {
t.Fatal(err)
@ -1126,9 +1122,7 @@ func TestNodeOperators(t *testing.T) {
node.Status.Allocatable[v1.ResourceMemory] = mem50m
expected.Allocatable.Memory = mem50m.Value()
if err := cache.UpdateNode(nil, node); err != nil {
t.Error(err)
}
cache.UpdateNode(nil, node)
got, found = cache.nodes[node.Name]
if !found {
t.Errorf("Failed to find node %v in schedulertypes after UpdateNode.", node.Name)
@ -1265,9 +1259,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
addNode := func(i int) operation {
return func(t *testing.T) {
if err := cache.AddNode(nodes[i]); err != nil {
t.Error(err)
}
cache.AddNode(nodes[i])
}
}
removeNode := func(i int) operation {
@ -1279,9 +1271,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
}
updateNode := func(i int) operation {
return func(t *testing.T) {
if err := cache.UpdateNode(nodes[i], updatedNodes[i]); err != nil {
t.Error(err)
}
cache.UpdateNode(nodes[i], updatedNodes[i])
}
}
addPod := func(i int) operation {
@ -1589,9 +1579,7 @@ func TestSchedulerCache_updateNodeInfoSnapshotList(t *testing.T) {
var snapshot *Snapshot
addNode := func(t *testing.T, i int) {
if err := cache.AddNode(nodes[i]); err != nil {
t.Error(err)
}
cache.AddNode(nodes[i])
_, ok := snapshot.nodeInfoMap[nodes[i].Name]
if !ok {
snapshot.nodeInfoMap[nodes[i].Name] = cache.nodes[nodes[i].Name].info

View File

@ -18,6 +18,7 @@ package fake
import (
v1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/scheduler/framework"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
)
@ -64,10 +65,10 @@ func (c *Cache) GetPod(pod *v1.Pod) (*v1.Pod, error) {
}
// AddNode is a fake method for testing.
func (c *Cache) AddNode(node *v1.Node) error { return nil }
func (c *Cache) AddNode(node *v1.Node) *framework.NodeInfo { return nil }
// UpdateNode is a fake method for testing.
func (c *Cache) UpdateNode(oldNode, newNode *v1.Node) error { return nil }
func (c *Cache) UpdateNode(oldNode, newNode *v1.Node) *framework.NodeInfo { return nil }
// RemoveNode is a fake method for testing.
func (c *Cache) RemoveNode(node *v1.Node) error { return nil }

View File

@ -94,10 +94,12 @@ type Cache interface {
IsAssumedPod(pod *v1.Pod) (bool, error)
// AddNode adds overall information about node.
AddNode(node *v1.Node) error
// It returns a clone of added NodeInfo object.
AddNode(node *v1.Node) *framework.NodeInfo
// UpdateNode updates overall information about node.
UpdateNode(oldNode, newNode *v1.Node) error
// It returns a clone of updated NodeInfo object.
UpdateNode(oldNode, newNode *v1.Node) *framework.NodeInfo
// RemoveNode removes overall information about node.
RemoveNode(node *v1.Node) error

View File

@ -66,6 +66,11 @@ const (
DefaultPodMaxBackoffDuration time.Duration = 10 * time.Second
)
// PreEnqueueCheck is a function type. It's used to build functions that
// run against a Pod and the caller can choose to enqueue or skip the Pod
// by the checking result.
type PreEnqueueCheck func(pod *v1.Pod) bool
// SchedulingQueue is an interface for a queue to store pods waiting to be scheduled.
// The interface follows a pattern similar to cache.FIFO and cache.Heap and
// makes it easy to use those data structures as a SchedulingQueue.
@ -85,7 +90,7 @@ type SchedulingQueue interface {
Pop() (*framework.QueuedPodInfo, error)
Update(oldPod, newPod *v1.Pod) error
Delete(pod *v1.Pod) error
MoveAllToActiveOrBackoffQueue(event string)
MoveAllToActiveOrBackoffQueue(event string, preCheck PreEnqueueCheck)
AssignedPodAdded(pod *v1.Pod)
AssignedPodUpdated(pod *v1.Pod)
PendingPods() []*v1.Pod
@ -524,12 +529,14 @@ func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) {
// This function adds all pods and then signals the condition variable to ensure that
// if Pop() is waiting for an item, it receives it after all the pods are in the
// queue and the head is the highest priority pod.
func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(event string) {
func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(event string, preCheck PreEnqueueCheck) {
p.lock.Lock()
defer p.lock.Unlock()
unschedulablePods := make([]*framework.QueuedPodInfo, 0, len(p.unschedulableQ.podInfoMap))
for _, pInfo := range p.unschedulableQ.podInfoMap {
unschedulablePods = append(unschedulablePods, pInfo)
if preCheck == nil || preCheck(pInfo.Pod) {
unschedulablePods = append(unschedulablePods, pInfo)
}
}
p.movePodsToActiveOrBackoffQueue(unschedulablePods, event)
}

View File

@ -25,6 +25,8 @@ import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@ -37,6 +39,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/utils/pointer"
)
const queueMetricMetadata = `
@ -237,7 +240,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
}
// move all pods to active queue when we were trying to schedule them
q.MoveAllToActiveOrBackoffQueue("test")
q.MoveAllToActiveOrBackoffQueue("test", nil)
oldCycle := q.SchedulingCycle()
firstPod, _ := q.Pop()
@ -477,10 +480,10 @@ func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) {
b.StartTimer()
if tt.moveEvent != "" {
q.MoveAllToActiveOrBackoffQueue(tt.moveEvent)
q.MoveAllToActiveOrBackoffQueue(tt.moveEvent, nil)
} else {
// Random case.
q.MoveAllToActiveOrBackoffQueue(events[i%len(events)])
q.MoveAllToActiveOrBackoffQueue(events[i%len(events)], nil)
}
}
})
@ -506,7 +509,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
hpp2.Name = "hpp2"
q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(hpp2, "barPlugin"), q.SchedulingCycle())
// Pods is still backing off, move the pod into backoffQ.
q.MoveAllToActiveOrBackoffQueue(NodeAdd)
q.MoveAllToActiveOrBackoffQueue(NodeAdd, nil)
if q.activeQ.Len() != 1 {
t.Errorf("Expected 1 item to be in activeQ, but got: %v", q.activeQ.Len())
}
@ -532,7 +535,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
// Move clock by podInitialBackoffDuration, so that pods in the unschedulableQ would pass the backing off,
// and the pods will be moved into activeQ.
c.Step(q.podInitialBackoffDuration)
q.MoveAllToActiveOrBackoffQueue(NodeAdd)
q.MoveAllToActiveOrBackoffQueue(NodeAdd, nil)
// hpp2 won't be moved regardless of its backoff timer.
if q.activeQ.Len() != 4 {
t.Errorf("Expected 4 items to be in activeQ, but got: %v", q.activeQ.Len())
@ -641,7 +644,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
t.Error("Unexpected list of pending Pods.")
}
// Move all to active queue. We should still see the same set of pods.
q.MoveAllToActiveOrBackoffQueue("test")
q.MoveAllToActiveOrBackoffQueue("test", nil)
if !reflect.DeepEqual(expectedSet, makeSet(q.PendingPods())) {
t.Error("Unexpected list of pending Pods...")
}
@ -944,7 +947,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) {
q.AddUnschedulableIfNotPresent(p1, q.SchedulingCycle())
c.Step(DefaultPodInitialBackoffDuration)
// Move all unschedulable pods to the active queue.
q.MoveAllToActiveOrBackoffQueue(UnschedulableTimeout)
q.MoveAllToActiveOrBackoffQueue(UnschedulableTimeout, nil)
// Simulation is over. Now let's pop all pods. The pod popped first should be
// the last one we pop here.
for i := 0; i < 5; i++ {
@ -996,7 +999,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
// Move clock to make the unschedulable pods complete backoff.
c.Step(DefaultPodInitialBackoffDuration + time.Second)
// Move all unschedulable pods to the active queue.
q.MoveAllToActiveOrBackoffQueue(UnschedulableTimeout)
q.MoveAllToActiveOrBackoffQueue(UnschedulableTimeout, nil)
// Simulate a pod being popped by the scheduler,
// At this time, unschedulable pod should be popped.
@ -1039,7 +1042,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
// Move clock to make the unschedulable pods complete backoff.
c.Step(DefaultPodInitialBackoffDuration + time.Second)
// Move all unschedulable pods to the active queue.
q.MoveAllToActiveOrBackoffQueue(UnschedulableTimeout)
q.MoveAllToActiveOrBackoffQueue(UnschedulableTimeout, nil)
// At this time, newerPod should be popped
// because it is the oldest tried pod.
@ -1104,7 +1107,7 @@ func TestHighPriorityBackoff(t *testing.T) {
// Put in the unschedulable queue.
q.AddUnschedulableIfNotPresent(p, q.SchedulingCycle())
// Move all unschedulable pods to the active queue.
q.MoveAllToActiveOrBackoffQueue("test")
q.MoveAllToActiveOrBackoffQueue("test", nil)
p, err = q.Pop()
if err != nil {
@ -1211,7 +1214,7 @@ var (
queue.podBackoffQ.Add(pInfo)
}
moveAllToActiveOrBackoffQ = func(queue *PriorityQueue, _ *framework.QueuedPodInfo) {
queue.MoveAllToActiveOrBackoffQueue(UnschedulableTimeout)
queue.MoveAllToActiveOrBackoffQueue(UnschedulableTimeout, nil)
}
flushBackoffQ = func(queue *PriorityQueue, _ *framework.QueuedPodInfo) {
queue.clock.(*clock.FakeClock).Step(2 * time.Second)
@ -1691,7 +1694,7 @@ func TestBackOffFlow(t *testing.T) {
}
// An event happens.
q.MoveAllToActiveOrBackoffQueue(UnschedulableTimeout)
q.MoveAllToActiveOrBackoffQueue(UnschedulableTimeout, nil)
if _, ok, _ := q.podBackoffQ.Get(podInfo); !ok {
t.Errorf("pod %v is not in the backoff queue", podID)
@ -1815,6 +1818,76 @@ func TestPodMatchesEvent(t *testing.T) {
}
}
func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) {
var podInfos []*framework.QueuedPodInfo
for i := 0; i < 5; i++ {
pInfo := newQueuedPodInfoForLookup(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("p%d", i)},
Spec: v1.PodSpec{Priority: pointer.Int32Ptr(int32(i))},
})
podInfos = append(podInfos, pInfo)
}
tests := []struct {
name string
preEnqueueCheck PreEnqueueCheck
podInfos []*framework.QueuedPodInfo
want []string
}{
{
name: "nil PreEnqueueCheck",
podInfos: podInfos,
want: []string{"p0", "p1", "p2", "p3", "p4"},
},
{
name: "move Pods with priority greater than 2",
podInfos: podInfos,
preEnqueueCheck: func(pod *v1.Pod) bool { return *pod.Spec.Priority >= 2 },
want: []string{"p2", "p3", "p4"},
},
{
name: "move Pods with even priority and greater than 2",
podInfos: podInfos,
preEnqueueCheck: func(pod *v1.Pod) bool {
return *pod.Spec.Priority%2 == 0 && *pod.Spec.Priority >= 2
},
want: []string{"p2", "p4"},
},
{
name: "move Pods with even and negative priority",
podInfos: podInfos,
preEnqueueCheck: func(pod *v1.Pod) bool {
return *pod.Spec.Priority%2 == 0 && *pod.Spec.Priority < 0
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
q := NewTestQueue(context.Background(), newDefaultQueueSort())
for _, podInfo := range tt.podInfos {
q.AddUnschedulableIfNotPresent(podInfo, q.schedulingCycle)
}
q.MoveAllToActiveOrBackoffQueue("test", tt.preEnqueueCheck)
var got []string
for q.podBackoffQ.Len() != 0 {
obj, err := q.podBackoffQ.Pop()
if err != nil {
t.Fatalf("Fail to pop pod from backoffQ: %v", err)
}
queuedPodInfo, ok := obj.(*framework.QueuedPodInfo)
if !ok {
t.Fatalf("Fail to covert popped obj (type %T) to *framework.QueuedPodInfo", obj)
}
got = append(got, queuedPodInfo.Pod.Name)
}
if diff := cmp.Diff(tt.want, got); diff != "" {
t.Errorf("Unexpected diff (-want, +got):\n%s", diff)
}
})
}
}
func makeQueuedPodInfos(num int, timestamp time.Time) []*framework.QueuedPodInfo {
var pInfos = make([]*framework.QueuedPodInfo, 0, num)
for i := 1; i <= num; i++ {

View File

@ -250,6 +250,25 @@ func (p *PodWrapper) NominatedNodeName(n string) *PodWrapper {
return p
}
// Toleration creates a toleration (with the operator Exists)
// and injects into the inner pod.
func (p *PodWrapper) Toleration(key string) *PodWrapper {
p.Spec.Tolerations = append(p.Spec.Tolerations, v1.Toleration{
Key: key,
Operator: v1.TolerationOpExists,
})
return p
}
// HostPort creates a container with a hostPort valued `hostPort`,
// and injects into the inner pod.
func (p *PodWrapper) HostPort(port int32) *PodWrapper {
p.Spec.Containers = append(p.Spec.Containers, v1.Container{
Ports: []v1.ContainerPort{{HostPort: port}},
})
return p
}
// PodAffinityKind represents different kinds of PodAffinity.
type PodAffinityKind int