diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index ea172e7f45a..e4ca45e9aa2 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -1173,7 +1173,7 @@ func TestFindFitPredicateCallCounts(t *testing.T) { if err := scheduler.cache.UpdateSnapshot(scheduler.nodeInfoSnapshot); err != nil { t.Fatal(err) } - fwk.PreemptHandle().AddNominatedPod(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}, "1") + fwk.PreemptHandle().AddNominatedPod(framework.NewPodInfo(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}), "1") _, _, err = scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), test.pod) diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index 91cdf7d2a9e..5475f1c8bca 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -352,7 +352,7 @@ func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodL } // As is from SharedInformer, we need to do a DeepCopy() here. - podInfo.Pod = cachedPod.DeepCopy() + podInfo.PodInfo = framework.NewPodInfo(cachedPod.DeepCopy()) if err := podQueue.AddUnschedulableIfNotPresent(podInfo, podQueue.SchedulingCycle()); err != nil { klog.ErrorS(err, "Error occurred") } diff --git a/pkg/scheduler/factory_test.go b/pkg/scheduler/factory_test.go index 744018c8dd6..45bb4e95160 100644 --- a/pkg/scheduler/factory_test.go +++ b/pkg/scheduler/factory_test.go @@ -467,7 +467,7 @@ func TestDefaultErrorFunc(t *testing.T) { queue.Delete(testPod) } - testPodInfo := &framework.QueuedPodInfo{Pod: testPod} + testPodInfo := &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(testPod)} errFunc := MakeDefaultErrorFunc(client, podInformer.Lister(), queue, schedulerCache) errFunc(testPodInfo, tt.injectErr) @@ -538,7 +538,7 @@ func TestDefaultErrorFunc_NodeNotFound(t *testing.T) { } } - testPodInfo := &framework.QueuedPodInfo{Pod: testPod} + testPodInfo := &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(testPod)} errFunc := MakeDefaultErrorFunc(client, podInformer.Lister(), queue, schedulerCache) errFunc(testPodInfo, tt.injectErr) @@ -573,7 +573,7 @@ func TestDefaultErrorFunc_PodAlreadyBound(t *testing.T) { // Add node to schedulerCache no matter it's deleted in API server or not. schedulerCache.AddNode(&nodeFoo) - testPodInfo := &framework.QueuedPodInfo{Pod: testPod} + testPodInfo := &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(testPod)} errFunc := MakeDefaultErrorFunc(client, podInformer.Lister(), queue, schedulerCache) errFunc(testPodInfo, fmt.Errorf("binding rejected: timeout")) diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index 61d52c0713d..f71475eb819 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -604,13 +604,13 @@ type PreemptHandle interface { type PodNominator interface { // AddNominatedPod adds the given pod to the nominated pod map or // updates it if it already exists. - AddNominatedPod(pod *v1.Pod, nodeName string) + AddNominatedPod(pod *PodInfo, nodeName string) // DeleteNominatedPodIfExists deletes nominatedPod from internal cache. It's a no-op if it doesn't exist. DeleteNominatedPodIfExists(pod *v1.Pod) // UpdateNominatedPod updates the with . - UpdateNominatedPod(oldPod, newPod *v1.Pod) + UpdateNominatedPod(oldPod *v1.Pod, newPodInfo *PodInfo) // NominatedPodsForNode returns nominatedPods on the given node. - NominatedPodsForNode(nodeName string) []*v1.Pod + NominatedPodsForNode(nodeName string) []*PodInfo } // PluginsRunner abstracts operations to run some plugins. diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go index c7b19d73298..22459823479 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go @@ -733,17 +733,17 @@ func PrepareCandidate(c Candidate, fh framework.Handle, cs kubernetes.Interface, // worth the complexity, especially because we generally expect to have a very // small number of nominated pods per node. func getLowerPriorityNominatedPods(pn framework.PodNominator, pod *v1.Pod, nodeName string) []*v1.Pod { - pods := pn.NominatedPodsForNode(nodeName) + podInfos := pn.NominatedPodsForNode(nodeName) - if len(pods) == 0 { + if len(podInfos) == 0 { return nil } var lowerPriorityPods []*v1.Pod podPriority := corev1helpers.PodPriority(pod) - for _, p := range pods { - if corev1helpers.PodPriority(p) < podPriority { - lowerPriorityPods = append(lowerPriorityPods, p) + for _, pi := range podInfos { + if corev1helpers.PodPriority(pi.Pod) < podPriority { + lowerPriorityPods = append(lowerPriorityPods, pi.Pod) } } return lowerPriorityPods diff --git a/pkg/scheduler/framework/plugins/queuesort/priority_sort_test.go b/pkg/scheduler/framework/plugins/queuesort/priority_sort_test.go index 0e660f99fcf..5982422e5ef 100644 --- a/pkg/scheduler/framework/plugins/queuesort/priority_sort_test.go +++ b/pkg/scheduler/framework/plugins/queuesort/priority_sort_test.go @@ -37,55 +37,55 @@ func TestLess(t *testing.T) { { name: "p1.priority less than p2.priority", p1: &framework.QueuedPodInfo{ - Pod: &v1.Pod{ + PodInfo: framework.NewPodInfo(&v1.Pod{ Spec: v1.PodSpec{ Priority: &lowPriority, }, - }, + }), }, p2: &framework.QueuedPodInfo{ - Pod: &v1.Pod{ + PodInfo: framework.NewPodInfo(&v1.Pod{ Spec: v1.PodSpec{ Priority: &highPriority, }, - }, + }), }, expected: false, // p2 should be ahead of p1 in the queue }, { name: "p1.priority greater than p2.priority", p1: &framework.QueuedPodInfo{ - Pod: &v1.Pod{ + PodInfo: framework.NewPodInfo(&v1.Pod{ Spec: v1.PodSpec{ Priority: &highPriority, }, - }, + }), }, p2: &framework.QueuedPodInfo{ - Pod: &v1.Pod{ + PodInfo: framework.NewPodInfo(&v1.Pod{ Spec: v1.PodSpec{ Priority: &lowPriority, }, - }, + }), }, expected: true, // p1 should be ahead of p2 in the queue }, { name: "equal priority. p1 is added to schedulingQ earlier than p2", p1: &framework.QueuedPodInfo{ - Pod: &v1.Pod{ + PodInfo: framework.NewPodInfo(&v1.Pod{ Spec: v1.PodSpec{ Priority: &highPriority, }, - }, + }), Timestamp: t1, }, p2: &framework.QueuedPodInfo{ - Pod: &v1.Pod{ + PodInfo: framework.NewPodInfo(&v1.Pod{ Spec: v1.PodSpec{ Priority: &highPriority, }, - }, + }), Timestamp: t2, }, expected: true, // p1 should be ahead of p2 in the queue @@ -93,19 +93,19 @@ func TestLess(t *testing.T) { { name: "equal priority. p2 is added to schedulingQ earlier than p1", p1: &framework.QueuedPodInfo{ - Pod: &v1.Pod{ + PodInfo: framework.NewPodInfo(&v1.Pod{ Spec: v1.PodSpec{ Priority: &highPriority, }, - }, + }), Timestamp: t2, }, p2: &framework.QueuedPodInfo{ - Pod: &v1.Pod{ + PodInfo: framework.NewPodInfo(&v1.Pod{ Spec: v1.PodSpec{ Priority: &highPriority, }, - }, + }), Timestamp: t1, }, expected: false, // p2 should be ahead of p1 in the queue diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index f326ec400f2..9d98a2b6612 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -657,18 +657,17 @@ func addNominatedPods(ctx context.Context, ph framework.PreemptHandle, pod *v1.P // This may happen only in tests. return false, state, nodeInfo, nil } - nominatedPods := ph.NominatedPodsForNode(nodeInfo.Node().Name) - if len(nominatedPods) == 0 { + nominatedPodInfos := ph.NominatedPodsForNode(nodeInfo.Node().Name) + if len(nominatedPodInfos) == 0 { return false, state, nodeInfo, nil } nodeInfoOut := nodeInfo.Clone() stateOut := state.Clone() podsAdded := false - for _, p := range nominatedPods { - if corev1.PodPriority(p) >= corev1.PodPriority(pod) && p.UID != pod.UID { - podInfoToAdd := framework.NewPodInfo(p) - nodeInfoOut.AddPodInfo(podInfoToAdd) - status := ph.RunPreFilterExtensionAddPod(ctx, stateOut, pod, podInfoToAdd, nodeInfoOut) + for _, pi := range nominatedPodInfos { + if corev1.PodPriority(pi.Pod) >= corev1.PodPriority(pod) && pi.Pod.UID != pod.UID { + nodeInfoOut.AddPodInfo(pi) + status := ph.RunPreFilterExtensionAddPod(ctx, stateOut, pod, pi, nodeInfoOut) if !status.IsSuccess() { return false, state, nodeInfo, status.AsError() } diff --git a/pkg/scheduler/framework/runtime/framework_test.go b/pkg/scheduler/framework/runtime/framework_test.go index f7f80bc80af..8ef9d1c3d16 100644 --- a/pkg/scheduler/framework/runtime/framework_test.go +++ b/pkg/scheduler/framework/runtime/framework_test.go @@ -1305,7 +1305,7 @@ func TestFilterPluginsWithNominatedPods(t *testing.T) { podNominator := internalqueue.NewPodNominator() if tt.nominatedPod != nil { - podNominator.AddNominatedPod(tt.nominatedPod, nodeName) + podNominator.AddNominatedPod(framework.NewPodInfo(tt.nominatedPod), nodeName) } f, err := newFrameworkWithQueueSortAndBind(registry, cfgPls, emptyArgs, WithPodNominator(podNominator)) if err != nil { diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index a892f6052ff..109d7d350d5 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -44,7 +44,7 @@ var generation int64 // the pod's status in the scheduling queue, such as the timestamp when // it's added to the queue. type QueuedPodInfo struct { - Pod *v1.Pod + *PodInfo // The time pod added to the scheduling queue. Timestamp time.Time // Number of schedule attempts before successfully scheduled. @@ -60,7 +60,7 @@ type QueuedPodInfo struct { // DeepCopy returns a deep copy of the QueuedPodInfo object. func (pqi *QueuedPodInfo) DeepCopy() *QueuedPodInfo { return &QueuedPodInfo{ - Pod: pqi.Pod.DeepCopy(), + PodInfo: pqi.PodInfo.DeepCopy(), Timestamp: pqi.Timestamp, Attempts: pqi.Attempts, InitialAttemptTimestamp: pqi.InitialAttemptTimestamp, @@ -79,6 +79,66 @@ type PodInfo struct { ParseError error } +// DeepCopy returns a deep copy of the PodInfo object. +func (pi *PodInfo) DeepCopy() *PodInfo { + return &PodInfo{ + Pod: pi.Pod.DeepCopy(), + RequiredAffinityTerms: pi.RequiredAffinityTerms, + RequiredAntiAffinityTerms: pi.RequiredAntiAffinityTerms, + PreferredAffinityTerms: pi.PreferredAffinityTerms, + PreferredAntiAffinityTerms: pi.PreferredAntiAffinityTerms, + ParseError: pi.ParseError, + } +} + +// Update creates a full new PodInfo by default. And only updates the pod when the PodInfo +// has been instantiated and the passed pod is the exact same one as the original pod. +func (pi *PodInfo) Update(pod *v1.Pod) { + if pod != nil && pi.Pod != nil && pi.Pod.UID == pod.UID { + // PodInfo includes immutable information, and so it is safe to update the pod in place if it is + // the exact same pod + pi.Pod = pod + return + } + var preferredAffinityTerms []v1.WeightedPodAffinityTerm + var preferredAntiAffinityTerms []v1.WeightedPodAffinityTerm + if affinity := pod.Spec.Affinity; affinity != nil { + if a := affinity.PodAffinity; a != nil { + preferredAffinityTerms = a.PreferredDuringSchedulingIgnoredDuringExecution + } + if a := affinity.PodAntiAffinity; a != nil { + preferredAntiAffinityTerms = a.PreferredDuringSchedulingIgnoredDuringExecution + } + } + + // Attempt to parse the affinity terms + var parseErrs []error + requiredAffinityTerms, err := getAffinityTerms(pod, schedutil.GetPodAffinityTerms(pod.Spec.Affinity)) + if err != nil { + parseErrs = append(parseErrs, fmt.Errorf("requiredAffinityTerms: %w", err)) + } + requiredAntiAffinityTerms, err := getAffinityTerms(pod, + schedutil.GetPodAntiAffinityTerms(pod.Spec.Affinity)) + if err != nil { + parseErrs = append(parseErrs, fmt.Errorf("requiredAntiAffinityTerms: %w", err)) + } + weightedAffinityTerms, err := getWeightedAffinityTerms(pod, preferredAffinityTerms) + if err != nil { + parseErrs = append(parseErrs, fmt.Errorf("preferredAffinityTerms: %w", err)) + } + weightedAntiAffinityTerms, err := getWeightedAffinityTerms(pod, preferredAntiAffinityTerms) + if err != nil { + parseErrs = append(parseErrs, fmt.Errorf("preferredAntiAffinityTerms: %w", err)) + } + + pi.Pod = pod + pi.RequiredAffinityTerms = requiredAffinityTerms + pi.RequiredAntiAffinityTerms = requiredAntiAffinityTerms + pi.PreferredAffinityTerms = weightedAffinityTerms + pi.PreferredAntiAffinityTerms = weightedAntiAffinityTerms + pi.ParseError = utilerrors.NewAggregate(parseErrs) +} + // AffinityTerm is a processed version of v1.PodAffinityTerm. type AffinityTerm struct { Namespaces sets.String @@ -177,46 +237,11 @@ func getWeightedAffinityTerms(pod *v1.Pod, v1Terms []v1.WeightedPodAffinityTerm) return terms, nil } -// NewPodInfo return a new PodInfo +// NewPodInfo returns a new PodInfo. func NewPodInfo(pod *v1.Pod) *PodInfo { - var preferredAffinityTerms []v1.WeightedPodAffinityTerm - var preferredAntiAffinityTerms []v1.WeightedPodAffinityTerm - if affinity := pod.Spec.Affinity; affinity != nil { - if a := affinity.PodAffinity; a != nil { - preferredAffinityTerms = a.PreferredDuringSchedulingIgnoredDuringExecution - } - if a := affinity.PodAntiAffinity; a != nil { - preferredAntiAffinityTerms = a.PreferredDuringSchedulingIgnoredDuringExecution - } - } - - // Attempt to parse the affinity terms - var parseErrs []error - requiredAffinityTerms, err := getAffinityTerms(pod, schedutil.GetPodAffinityTerms(pod.Spec.Affinity)) - if err != nil { - parseErrs = append(parseErrs, fmt.Errorf("requiredAffinityTerms: %w", err)) - } - requiredAntiAffinityTerms, err := getAffinityTerms(pod, schedutil.GetPodAntiAffinityTerms(pod.Spec.Affinity)) - if err != nil { - parseErrs = append(parseErrs, fmt.Errorf("requiredAntiAffinityTerms: %w", err)) - } - weightedAffinityTerms, err := getWeightedAffinityTerms(pod, preferredAffinityTerms) - if err != nil { - parseErrs = append(parseErrs, fmt.Errorf("preferredAffinityTerms: %w", err)) - } - weightedAntiAffinityTerms, err := getWeightedAffinityTerms(pod, preferredAntiAffinityTerms) - if err != nil { - parseErrs = append(parseErrs, fmt.Errorf("preferredAntiAffinityTerms: %w", err)) - } - - return &PodInfo{ - Pod: pod, - RequiredAffinityTerms: requiredAffinityTerms, - RequiredAntiAffinityTerms: requiredAntiAffinityTerms, - PreferredAffinityTerms: weightedAffinityTerms, - PreferredAntiAffinityTerms: weightedAntiAffinityTerms, - ParseError: utilerrors.NewAggregate(parseErrs), - } + pInfo := &PodInfo{} + pInfo.Update(pod) + return pInfo } // ImageStateSummary provides summarized information about the state of an image. diff --git a/pkg/scheduler/internal/cache/debugger/dumper.go b/pkg/scheduler/internal/cache/debugger/dumper.go index 562e4cfc041..6e694b67ce0 100644 --- a/pkg/scheduler/internal/cache/debugger/dumper.go +++ b/pkg/scheduler/internal/cache/debugger/dumper.go @@ -70,11 +70,11 @@ func (d *CacheDumper) printNodeInfo(name string, n *framework.NodeInfo) string { nodeData.WriteString(printPod(p.Pod)) } // Dumping nominated pods info on the node - nominatedPods := d.podQueue.NominatedPodsForNode(name) - if len(nominatedPods) != 0 { - nodeData.WriteString(fmt.Sprintf("Nominated Pods(number: %v):\n", len(nominatedPods))) - for _, p := range nominatedPods { - nodeData.WriteString(printPod(p)) + nominatedPodInfos := d.podQueue.NominatedPodsForNode(name) + if len(nominatedPodInfos) != 0 { + nodeData.WriteString(fmt.Sprintf("Nominated Pods(number: %v):\n", len(nominatedPodInfos))) + for _, pi := range nominatedPodInfos { + nodeData.WriteString(printPod(pi.Pod)) } } return nodeData.String() diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index be925242e66..21aeca0b9a7 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -194,10 +194,12 @@ var defaultPriorityQueueOptions = priorityQueueOptions{ // Making sure that PriorityQueue implements SchedulingQueue. var _ SchedulingQueue = &PriorityQueue{} -// newQueuedPodInfoNoTimestamp builds a QueuedPodInfo object without timestamp. -func newQueuedPodInfoNoTimestamp(pod *v1.Pod) *framework.QueuedPodInfo { +// newQueuedPodInfoForLookup builds a QueuedPodInfo object for a lookup in the queue. +func newQueuedPodInfoForLookup(pod *v1.Pod) *framework.QueuedPodInfo { + // Since this is only used for a lookup in the queue, we only need to set the Pod, + // and so we avoid creating a full PodInfo, which is expensive to instantiate frequently. return &framework.QueuedPodInfo{ - Pod: pod, + PodInfo: &framework.PodInfo{Pod: pod}, } } @@ -262,7 +264,7 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error { klog.ErrorS(nil, "Error: pod is already in the podBackoff queue", "pod", klog.KObj(pod)) } metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc() - p.PodNominator.AddNominatedPod(pod, "") + p.PodNominator.AddNominatedPod(pInfo.PodInfo, "") p.cond.Broadcast() return nil @@ -316,7 +318,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodI metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc() } - p.PodNominator.AddNominatedPod(pod, "") + p.PodNominator.AddNominatedPod(pInfo.PodInfo, "") return nil } @@ -412,48 +414,53 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { defer p.lock.Unlock() if oldPod != nil { - oldPodInfo := newQueuedPodInfoNoTimestamp(oldPod) + oldPodInfo := newQueuedPodInfoForLookup(oldPod) // If the pod is already in the active queue, just update it there. if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists { - p.PodNominator.UpdateNominatedPod(oldPod, newPod) - err := p.activeQ.Update(updatePod(oldPodInfo, newPod)) - return err + pInfo := updatePod(oldPodInfo, newPod) + p.PodNominator.UpdateNominatedPod(oldPod, pInfo.PodInfo) + return p.activeQ.Update(pInfo) } // If the pod is in the backoff queue, update it there. if oldPodInfo, exists, _ := p.podBackoffQ.Get(oldPodInfo); exists { - p.PodNominator.UpdateNominatedPod(oldPod, newPod) + pInfo := updatePod(oldPodInfo, newPod) + p.PodNominator.UpdateNominatedPod(oldPod, pInfo.PodInfo) p.podBackoffQ.Delete(oldPodInfo) - err := p.activeQ.Add(updatePod(oldPodInfo, newPod)) - if err == nil { - p.cond.Broadcast() + if err := p.activeQ.Add(pInfo); err != nil { + return err } - return err + p.cond.Broadcast() + return nil } } // If the pod is in the unschedulable queue, updating it may make it schedulable. if usPodInfo := p.unschedulableQ.get(newPod); usPodInfo != nil { - p.PodNominator.UpdateNominatedPod(oldPod, newPod) if isPodUpdated(oldPod, newPod) { p.unschedulableQ.delete(usPodInfo.Pod) - err := p.activeQ.Add(updatePod(usPodInfo, newPod)) - if err == nil { - p.cond.Broadcast() + pInfo := updatePod(usPodInfo, newPod) + p.PodNominator.UpdateNominatedPod(oldPod, pInfo.PodInfo) + if err := p.activeQ.Add(pInfo); err != nil { + return err } - return err + p.cond.Broadcast() + return nil } - // Pod is already in unschedulable queue and hasnt updated, no need to backoff again - p.unschedulableQ.addOrUpdate(updatePod(usPodInfo, newPod)) + pInfo := updatePod(usPodInfo, newPod) + p.PodNominator.UpdateNominatedPod(oldPod, pInfo.PodInfo) + // Pod is already in unschedulable queue and hasn't updated, no need to backoff again + p.unschedulableQ.addOrUpdate(pInfo) return nil } // If pod is not in any of the queues, we put it in the active queue. - err := p.activeQ.Add(p.newQueuedPodInfo(newPod)) - if err == nil { - p.PodNominator.AddNominatedPod(newPod, "") - p.cond.Broadcast() + pInfo := p.newQueuedPodInfo(newPod) + if err := p.activeQ.Add(pInfo); err != nil { + return err } - return err + p.PodNominator.AddNominatedPod(pInfo.PodInfo, "") + p.cond.Broadcast() + return nil } // Delete deletes the item from either of the two queues. It assumes the pod is @@ -462,9 +469,9 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error { p.lock.Lock() defer p.lock.Unlock() p.PodNominator.DeleteNominatedPodIfExists(pod) - err := p.activeQ.Delete(newQueuedPodInfoNoTimestamp(pod)) - if err != nil { // The item was probably not found in the activeQ. - p.podBackoffQ.Delete(newQueuedPodInfoNoTimestamp(pod)) + if err := p.activeQ.Delete(newQueuedPodInfoForLookup(pod)); err != nil { + // The item was probably not found in the activeQ. + p.podBackoffQ.Delete(newQueuedPodInfoForLookup(pod)) p.unschedulableQ.delete(pod) } return nil @@ -586,15 +593,15 @@ func (npm *nominatedPodMap) DeleteNominatedPodIfExists(pod *v1.Pod) { // This is called during the preemption process after a node is nominated to run // the pod. We update the structure before sending a request to update the pod // object to avoid races with the following scheduling cycles. -func (npm *nominatedPodMap) AddNominatedPod(pod *v1.Pod, nodeName string) { +func (npm *nominatedPodMap) AddNominatedPod(pi *framework.PodInfo, nodeName string) { npm.Lock() - npm.add(pod, nodeName) + npm.add(pi, nodeName) npm.Unlock() } // NominatedPodsForNode returns pods that are nominated to run on the given node, // but they are waiting for other pods to be removed from the node. -func (npm *nominatedPodMap) NominatedPodsForNode(nodeName string) []*v1.Pod { +func (npm *nominatedPodMap) NominatedPodsForNode(nodeName string) []*framework.PodInfo { npm.RLock() defer npm.RUnlock() // TODO: we may need to return a copy of []*Pods to avoid modification @@ -621,7 +628,7 @@ func (p *PriorityQueue) NumUnschedulablePods() int { func (p *PriorityQueue) newQueuedPodInfo(pod *v1.Pod) *framework.QueuedPodInfo { now := p.clock.Now() return &framework.QueuedPodInfo{ - Pod: pod, + PodInfo: framework.NewPodInfo(pod), Timestamp: now, InitialAttemptTimestamp: now, } @@ -649,7 +656,7 @@ func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInf func updatePod(oldPodInfo interface{}, newPod *v1.Pod) *framework.QueuedPodInfo { pInfo := oldPodInfo.(*framework.QueuedPodInfo) - pInfo.Pod = newPod + pInfo.Update(newPod) return pInfo } @@ -717,7 +724,7 @@ type nominatedPodMap struct { // nominatedPods is a map keyed by a node name and the value is a list of // pods which are nominated to run on the node. These are pods which can be in // the activeQ or unschedulableQ. - nominatedPods map[string][]*v1.Pod + nominatedPods map[string][]*framework.PodInfo // nominatedPodToNode is map keyed by a Pod UID to the node name where it is // nominated. nominatedPodToNode map[ktypes.UID]string @@ -725,26 +732,26 @@ type nominatedPodMap struct { sync.RWMutex } -func (npm *nominatedPodMap) add(p *v1.Pod, nodeName string) { +func (npm *nominatedPodMap) add(pi *framework.PodInfo, nodeName string) { // always delete the pod if it already exist, to ensure we never store more than // one instance of the pod. - npm.delete(p) + npm.delete(pi.Pod) nnn := nodeName if len(nnn) == 0 { - nnn = NominatedNodeName(p) + nnn = NominatedNodeName(pi.Pod) if len(nnn) == 0 { return } } - npm.nominatedPodToNode[p.UID] = nnn - for _, np := range npm.nominatedPods[nnn] { - if np.UID == p.UID { - klog.V(4).InfoS("Pod already exists in the nominated map", "pod", klog.KObj(p)) + npm.nominatedPodToNode[pi.Pod.UID] = nnn + for _, npi := range npm.nominatedPods[nnn] { + if npi.Pod.UID == pi.Pod.UID { + klog.V(4).InfoS("Pod already exists in the nominated map", "pod", klog.KObj(npi.Pod)) return } } - npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn], p) + npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn], pi) } func (npm *nominatedPodMap) delete(p *v1.Pod) { @@ -753,7 +760,7 @@ func (npm *nominatedPodMap) delete(p *v1.Pod) { return } for i, np := range npm.nominatedPods[nnn] { - if np.UID == p.UID { + if np.Pod.UID == p.UID { npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn][:i], npm.nominatedPods[nnn][i+1:]...) if len(npm.nominatedPods[nnn]) == 0 { delete(npm.nominatedPods, nnn) @@ -765,7 +772,7 @@ func (npm *nominatedPodMap) delete(p *v1.Pod) { } // UpdateNominatedPod updates the with . -func (npm *nominatedPodMap) UpdateNominatedPod(oldPod, newPod *v1.Pod) { +func (npm *nominatedPodMap) UpdateNominatedPod(oldPod *v1.Pod, newPodInfo *framework.PodInfo) { npm.Lock() defer npm.Unlock() // In some cases, an Update event with no "NominatedNode" present is received right @@ -776,7 +783,7 @@ func (npm *nominatedPodMap) UpdateNominatedPod(oldPod, newPod *v1.Pod) { // (1) NominatedNode info is added // (2) NominatedNode info is updated // (3) NominatedNode info is removed - if NominatedNodeName(oldPod) == "" && NominatedNodeName(newPod) == "" { + if NominatedNodeName(oldPod) == "" && NominatedNodeName(newPodInfo.Pod) == "" { if nnn, ok := npm.nominatedPodToNode[oldPod.UID]; ok { // This is the only case we should continue reserving the NominatedNode nodeName = nnn @@ -785,13 +792,13 @@ func (npm *nominatedPodMap) UpdateNominatedPod(oldPod, newPod *v1.Pod) { // We update irrespective of the nominatedNodeName changed or not, to ensure // that pod pointer is updated. npm.delete(oldPod) - npm.add(newPod, nodeName) + npm.add(newPodInfo, nodeName) } // NewPodNominator creates a nominatedPodMap as a backing of framework.PodNominator. func NewPodNominator() framework.PodNominator { return &nominatedPodMap{ - nominatedPods: make(map[string][]*v1.Pod), + nominatedPods: make(map[string][]*framework.PodInfo), nominatedPodToNode: make(map[ktypes.UID]string), } } diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 6efadd84000..af09279660e 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -44,7 +44,7 @@ const queueMetricMetadata = ` var lowPriority, midPriority, highPriority = int32(0), int32(100), int32(1000) var mediumPriority = (lowPriority + highPriority) / 2 -var highPriorityPod, highPriNominatedPod, medPriorityPod, unschedulablePod = v1.Pod{ +var highPriorityPodInfo, highPriNominatedPodInfo, medPriorityPodInfo, unschedulablePodInfo = framework.NewPodInfo(&v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "hpp", Namespace: "ns1", @@ -53,8 +53,8 @@ var highPriorityPod, highPriNominatedPod, medPriorityPod, unschedulablePod = v1. Spec: v1.PodSpec{ Priority: &highPriority, }, -}, - v1.Pod{ +}), + framework.NewPodInfo(&v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "hpp", Namespace: "ns1", @@ -66,8 +66,8 @@ var highPriorityPod, highPriNominatedPod, medPriorityPod, unschedulablePod = v1. Status: v1.PodStatus{ NominatedNodeName: "node1", }, - }, - v1.Pod{ + }), + framework.NewPodInfo(&v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "mpp", Namespace: "ns2", @@ -82,8 +82,8 @@ var highPriorityPod, highPriNominatedPod, medPriorityPod, unschedulablePod = v1. Status: v1.PodStatus{ NominatedNodeName: "node1", }, - }, - v1.Pod{ + }), + framework.NewPodInfo(&v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "up", Namespace: "ns1", @@ -105,7 +105,7 @@ var highPriorityPod, highPriNominatedPod, medPriorityPod, unschedulablePod = v1. }, NominatedNodeName: "node1", }, - } + }) func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod { pInfo := p.unschedulableQ.get(pod) @@ -117,38 +117,38 @@ func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod { func TestPriorityQueue_Add(t *testing.T) { q := NewPriorityQueue(newDefaultQueueSort()) - if err := q.Add(&medPriorityPod); err != nil { + if err := q.Add(medPriorityPodInfo.Pod); err != nil { t.Errorf("add failed: %v", err) } - if err := q.Add(&unschedulablePod); err != nil { + if err := q.Add(unschedulablePodInfo.Pod); err != nil { t.Errorf("add failed: %v", err) } - if err := q.Add(&highPriorityPod); err != nil { + if err := q.Add(highPriorityPodInfo.Pod); err != nil { t.Errorf("add failed: %v", err) } expectedNominatedPods := &nominatedPodMap{ nominatedPodToNode: map[types.UID]string{ - medPriorityPod.UID: "node1", - unschedulablePod.UID: "node1", + medPriorityPodInfo.Pod.UID: "node1", + unschedulablePodInfo.Pod.UID: "node1", }, - nominatedPods: map[string][]*v1.Pod{ - "node1": {&medPriorityPod, &unschedulablePod}, + nominatedPods: map[string][]*framework.PodInfo{ + "node1": {medPriorityPodInfo, unschedulablePodInfo}, }, } if !reflect.DeepEqual(q.PodNominator, expectedNominatedPods) { t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.PodNominator) } - if p, err := q.Pop(); err != nil || p.Pod != &highPriorityPod { - t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Pod.Name) + if p, err := q.Pop(); err != nil || p.Pod != highPriorityPodInfo.Pod { + t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) } - if p, err := q.Pop(); err != nil || p.Pod != &medPriorityPod { - t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Pod.Name) + if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod { + t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name) } - if p, err := q.Pop(); err != nil || p.Pod != &unschedulablePod { - t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Pod.Name) + if p, err := q.Pop(); err != nil || p.Pod != unschedulablePodInfo.Pod { + t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) } if len(q.PodNominator.(*nominatedPodMap).nominatedPods["node1"]) != 2 { - t.Errorf("Expected medPriorityPod and unschedulablePod to be still present in nomindatePods: %v", q.PodNominator.(*nominatedPodMap).nominatedPods["node1"]) + t.Errorf("Expected medPriorityPodInfo and unschedulablePodInfo to be still present in nomindatePods: %v", q.PodNominator.(*nominatedPodMap).nominatedPods["node1"]) } } @@ -159,45 +159,45 @@ func newDefaultQueueSort() framework.LessFunc { func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) { q := NewPriorityQueue(newDefaultQueueSort()) - if err := q.Add(&medPriorityPod); err != nil { + if err := q.Add(medPriorityPodInfo.Pod); err != nil { t.Errorf("add failed: %v", err) } - if err := q.Add(&highPriorityPod); err != nil { + if err := q.Add(highPriorityPodInfo.Pod); err != nil { t.Errorf("add failed: %v", err) } - if p, err := q.Pop(); err != nil || p.Pod != &highPriorityPod { - t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Pod.Name) + if p, err := q.Pop(); err != nil || p.Pod != highPriorityPodInfo.Pod { + t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) } - if p, err := q.Pop(); err != nil || p.Pod != &medPriorityPod { - t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Pod.Name) + if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod { + t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name) } } func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { q := NewPriorityQueue(newDefaultQueueSort()) - q.Add(&highPriNominatedPod) - q.AddUnschedulableIfNotPresent(newQueuedPodInfoNoTimestamp(&highPriNominatedPod), q.SchedulingCycle()) // Must not add anything. - q.AddUnschedulableIfNotPresent(newQueuedPodInfoNoTimestamp(&unschedulablePod), q.SchedulingCycle()) + q.Add(highPriNominatedPodInfo.Pod) + q.AddUnschedulableIfNotPresent(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod), q.SchedulingCycle()) // Must not add anything. + q.AddUnschedulableIfNotPresent(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod), q.SchedulingCycle()) expectedNominatedPods := &nominatedPodMap{ nominatedPodToNode: map[types.UID]string{ - unschedulablePod.UID: "node1", - highPriNominatedPod.UID: "node1", + unschedulablePodInfo.Pod.UID: "node1", + highPriNominatedPodInfo.Pod.UID: "node1", }, - nominatedPods: map[string][]*v1.Pod{ - "node1": {&highPriNominatedPod, &unschedulablePod}, + nominatedPods: map[string][]*framework.PodInfo{ + "node1": {highPriNominatedPodInfo, unschedulablePodInfo}, }, } if !reflect.DeepEqual(q.PodNominator, expectedNominatedPods) { t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.PodNominator) } - if p, err := q.Pop(); err != nil || p.Pod != &highPriNominatedPod { - t.Errorf("Expected: %v after Pop, but got: %v", highPriNominatedPod.Name, p.Pod.Name) + if p, err := q.Pop(); err != nil || p.Pod != highPriNominatedPodInfo.Pod { + t.Errorf("Expected: %v after Pop, but got: %v", highPriNominatedPodInfo.Pod.Name, p.Pod.Name) } if len(q.PodNominator.(*nominatedPodMap).nominatedPods) != 1 { t.Errorf("Expected nomindatePods to have one element: %v", q.PodNominator) } - if getUnschedulablePod(q, &unschedulablePod) != &unschedulablePod { - t.Errorf("Pod %v was not found in the unschedulableQ.", unschedulablePod.Name) + if getUnschedulablePod(q, unschedulablePodInfo.Pod) != unschedulablePodInfo.Pod { + t.Errorf("Pod %v was not found in the unschedulableQ.", unschedulablePodInfo.Pod.Name) } } @@ -256,7 +256,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) { }, } - if err := q.AddUnschedulableIfNotPresent(newQueuedPodInfoNoTimestamp(unschedulablePod), oldCycle); err != nil { + if err := q.AddUnschedulableIfNotPresent(newQueuedPodInfoForLookup(unschedulablePod), oldCycle); err != nil { t.Errorf("Failed to call AddUnschedulableIfNotPresent(%v): %v", unschedulablePod.Name, err) } } @@ -264,7 +264,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) { // Since there was a move request at the same cycle as "oldCycle", these pods // should be in the backoff queue. for i := 1; i < totalNum; i++ { - if _, exists, _ := q.podBackoffQ.Get(newQueuedPodInfoNoTimestamp(&expectedPods[i])); !exists { + if _, exists, _ := q.podBackoffQ.Get(newQueuedPodInfoForLookup(&expectedPods[i])); !exists { t.Errorf("Expected %v to be added to podBackoffQ.", expectedPods[i].Name) } } @@ -276,28 +276,28 @@ func TestPriorityQueue_Pop(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - if p, err := q.Pop(); err != nil || p.Pod != &medPriorityPod { - t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Pod.Name) + if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod { + t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name) } if len(q.PodNominator.(*nominatedPodMap).nominatedPods["node1"]) != 1 { - t.Errorf("Expected medPriorityPod to be present in nomindatePods: %v", q.PodNominator.(*nominatedPodMap).nominatedPods["node1"]) + t.Errorf("Expected medPriorityPodInfo to be present in nomindatePods: %v", q.PodNominator.(*nominatedPodMap).nominatedPods["node1"]) } }() - q.Add(&medPriorityPod) + q.Add(medPriorityPodInfo.Pod) wg.Wait() } func TestPriorityQueue_Update(t *testing.T) { q := NewPriorityQueue(newDefaultQueueSort()) - q.Update(nil, &highPriorityPod) - if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(&highPriorityPod)); !exists { - t.Errorf("Expected %v to be added to activeQ.", highPriorityPod.Name) + q.Update(nil, highPriorityPodInfo.Pod) + if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(highPriorityPodInfo.Pod)); !exists { + t.Errorf("Expected %v to be added to activeQ.", highPriorityPodInfo.Pod.Name) } if len(q.PodNominator.(*nominatedPodMap).nominatedPods) != 0 { t.Errorf("Expected nomindatePods to be empty: %v", q.PodNominator) } - // Update highPriorityPod and add a nominatedNodeName to it. - q.Update(&highPriorityPod, &highPriNominatedPod) + // Update highPriorityPodInfo and add a nominatedNodeName to it. + q.Update(highPriorityPodInfo.Pod, highPriNominatedPodInfo.Pod) if q.activeQ.Len() != 1 { t.Error("Expected only one item in activeQ.") } @@ -306,30 +306,30 @@ func TestPriorityQueue_Update(t *testing.T) { } // Updating an unschedulable pod which is not in any of the two queues, should // add the pod to activeQ. - q.Update(&unschedulablePod, &unschedulablePod) - if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(&unschedulablePod)); !exists { - t.Errorf("Expected %v to be added to activeQ.", unschedulablePod.Name) + q.Update(unschedulablePodInfo.Pod, unschedulablePodInfo.Pod) + if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod)); !exists { + t.Errorf("Expected %v to be added to activeQ.", unschedulablePodInfo.Pod.Name) } // Updating a pod that is already in activeQ, should not change it. - q.Update(&unschedulablePod, &unschedulablePod) + q.Update(unschedulablePodInfo.Pod, unschedulablePodInfo.Pod) if len(q.unschedulableQ.podInfoMap) != 0 { t.Error("Expected unschedulableQ to be empty.") } - if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(&unschedulablePod)); !exists { - t.Errorf("Expected: %v to be added to activeQ.", unschedulablePod.Name) + if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod)); !exists { + t.Errorf("Expected: %v to be added to activeQ.", unschedulablePodInfo.Pod.Name) } - if p, err := q.Pop(); err != nil || p.Pod != &highPriNominatedPod { - t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Pod.Name) + if p, err := q.Pop(); err != nil || p.Pod != highPriNominatedPodInfo.Pod { + t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) } // Updating a pod that is in unschedulableQ in a way that it may // become schedulable should add the pod to the activeQ. - q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&medPriorityPod), q.SchedulingCycle()) + q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(medPriorityPodInfo.Pod), q.SchedulingCycle()) if len(q.unschedulableQ.podInfoMap) != 1 { t.Error("Expected unschedulableQ to be 1.") } - updatedPod := medPriorityPod.DeepCopy() + updatedPod := medPriorityPodInfo.Pod.DeepCopy() updatedPod.ClusterName = "test" - q.Update(&medPriorityPod, updatedPod) + q.Update(medPriorityPodInfo.Pod, updatedPod) if p, err := q.Pop(); err != nil || p.Pod != updatedPod { t.Errorf("Expected: %v after Pop, but got: %v", updatedPod.Name, p.Pod.Name) } @@ -337,21 +337,21 @@ func TestPriorityQueue_Update(t *testing.T) { func TestPriorityQueue_Delete(t *testing.T) { q := NewPriorityQueue(newDefaultQueueSort()) - q.Update(&highPriorityPod, &highPriNominatedPod) - q.Add(&unschedulablePod) - if err := q.Delete(&highPriNominatedPod); err != nil { + q.Update(highPriorityPodInfo.Pod, highPriNominatedPodInfo.Pod) + q.Add(unschedulablePodInfo.Pod) + if err := q.Delete(highPriNominatedPodInfo.Pod); err != nil { t.Errorf("delete failed: %v", err) } - if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(&unschedulablePod)); !exists { - t.Errorf("Expected %v to be in activeQ.", unschedulablePod.Name) + if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(unschedulablePodInfo.Pod)); !exists { + t.Errorf("Expected %v to be in activeQ.", unschedulablePodInfo.Pod.Name) } - if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(&highPriNominatedPod)); exists { - t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPod.Name) + if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod)); exists { + t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPodInfo.Pod.Name) } if len(q.PodNominator.(*nominatedPodMap).nominatedPods) != 1 { - t.Errorf("Expected nomindatePods to have only 'unschedulablePod': %v", q.PodNominator.(*nominatedPodMap).nominatedPods) + t.Errorf("Expected nomindatePods to have only 'unschedulablePodInfo': %v", q.PodNominator.(*nominatedPodMap).nominatedPods) } - if err := q.Delete(&unschedulablePod); err != nil { + if err := q.Delete(unschedulablePodInfo.Pod); err != nil { t.Errorf("delete failed: %v", err) } if len(q.PodNominator.(*nominatedPodMap).nominatedPods) != 0 { @@ -362,9 +362,9 @@ func TestPriorityQueue_Delete(t *testing.T) { func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { c := clock.NewFakeClock(time.Now()) q := NewPriorityQueue(newDefaultQueueSort(), WithClock(c)) - q.Add(&medPriorityPod) - q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&unschedulablePod), q.SchedulingCycle()) - q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&highPriorityPod), q.SchedulingCycle()) + q.Add(medPriorityPodInfo.Pod) + q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(unschedulablePodInfo.Pod), q.SchedulingCycle()) + q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(highPriorityPodInfo.Pod), q.SchedulingCycle()) // Pods is still backing off, move the pod into backoffQ. q.MoveAllToActiveOrBackoffQueue("test") if q.activeQ.Len() != 1 { @@ -379,10 +379,10 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { q.podBackoffQ.Pop() q.schedulingCycle++ - q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&unschedulablePod), q.SchedulingCycle()) - q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&highPriorityPod), q.SchedulingCycle()) - if q.unschedulableQ.get(&unschedulablePod) == nil || q.unschedulableQ.get(&highPriorityPod) == nil { - t.Errorf("Expected %v and %v in the unschedulableQ", unschedulablePod.Name, highPriorityPod.Name) + q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(unschedulablePodInfo.Pod), q.SchedulingCycle()) + q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(highPriorityPodInfo.Pod), q.SchedulingCycle()) + if q.unschedulableQ.get(unschedulablePodInfo.Pod) == nil || q.unschedulableQ.get(highPriorityPodInfo.Pod) == nil { + t.Errorf("Expected %v and %v in the unschedulableQ", unschedulablePodInfo.Pod.Name, highPriorityPodInfo.Pod.Name) } // Move clock by podInitialBackoffDuration, so that pods in the unschedulableQ would pass the backing off, // and the pods will be moved into activeQ. @@ -401,7 +401,7 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { // when a pod with pod affinity is in unschedulableQ and another pod with a // matching label is added, the unschedulable pod is moved to activeQ. func TestPriorityQueue_AssignedPodAdded(t *testing.T) { - affinityPod := unschedulablePod.DeepCopy() + affinityPod := unschedulablePodInfo.Pod.DeepCopy() affinityPod.Name = "afp" affinityPod.Spec = v1.PodSpec{ Affinity: &v1.Affinity{ @@ -435,9 +435,9 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) { c := clock.NewFakeClock(time.Now()) q := NewPriorityQueue(newDefaultQueueSort(), WithClock(c)) - q.Add(&medPriorityPod) + q.Add(medPriorityPodInfo.Pod) // Add a couple of pods to the unschedulableQ. - q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&unschedulablePod), q.SchedulingCycle()) + q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(unschedulablePodInfo.Pod), q.SchedulingCycle()) q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(affinityPod), q.SchedulingCycle()) // Move clock to make the unschedulable pods complete backoff. @@ -448,24 +448,24 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) { if getUnschedulablePod(q, affinityPod) != nil { t.Error("affinityPod is still in the unschedulableQ.") } - if _, exists, _ := q.activeQ.Get(newQueuedPodInfoNoTimestamp(affinityPod)); !exists { + if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(affinityPod)); !exists { t.Error("affinityPod is not moved to activeQ.") } // Check that the other pod is still in the unschedulableQ. - if getUnschedulablePod(q, &unschedulablePod) == nil { - t.Error("unschedulablePod is not in the unschedulableQ.") + if getUnschedulablePod(q, unschedulablePodInfo.Pod) == nil { + t.Error("unschedulablePodInfo is not in the unschedulableQ.") } } func TestPriorityQueue_NominatedPodsForNode(t *testing.T) { q := NewPriorityQueue(newDefaultQueueSort()) - q.Add(&medPriorityPod) - q.Add(&unschedulablePod) - q.Add(&highPriorityPod) - if p, err := q.Pop(); err != nil || p.Pod != &highPriorityPod { - t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Pod.Name) + q.Add(medPriorityPodInfo.Pod) + q.Add(unschedulablePodInfo.Pod) + q.Add(highPriorityPodInfo.Pod) + if p, err := q.Pop(); err != nil || p.Pod != highPriorityPodInfo.Pod { + t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) } - expectedList := []*v1.Pod{&medPriorityPod, &unschedulablePod} + expectedList := []*framework.PodInfo{medPriorityPodInfo, unschedulablePodInfo} if !reflect.DeepEqual(expectedList, q.NominatedPodsForNode("node1")) { t.Error("Unexpected list of nominated Pods for node.") } @@ -484,11 +484,11 @@ func TestPriorityQueue_PendingPods(t *testing.T) { } q := NewPriorityQueue(newDefaultQueueSort()) - q.Add(&medPriorityPod) - q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&unschedulablePod), q.SchedulingCycle()) - q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&highPriorityPod), q.SchedulingCycle()) + q.Add(medPriorityPodInfo.Pod) + q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(unschedulablePodInfo.Pod), q.SchedulingCycle()) + q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(highPriorityPodInfo.Pod), q.SchedulingCycle()) - expectedSet := makeSet([]*v1.Pod{&medPriorityPod, &unschedulablePod, &highPriorityPod}) + expectedSet := makeSet([]*v1.Pod{medPriorityPodInfo.Pod, unschedulablePodInfo.Pod, highPriorityPodInfo.Pod}) if !reflect.DeepEqual(expectedSet, makeSet(q.PendingPods())) { t.Error("Unexpected list of pending Pods.") } @@ -501,31 +501,31 @@ func TestPriorityQueue_PendingPods(t *testing.T) { func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { q := NewPriorityQueue(newDefaultQueueSort()) - if err := q.Add(&medPriorityPod); err != nil { + if err := q.Add(medPriorityPodInfo.Pod); err != nil { t.Errorf("add failed: %v", err) } - // Update unschedulablePod on a different node than specified in the pod. - q.AddNominatedPod(&unschedulablePod, "node5") + // Update unschedulablePodInfo on a different node than specified in the pod. + q.AddNominatedPod(framework.NewPodInfo(unschedulablePodInfo.Pod), "node5") // Update nominated node name of a pod on a node that is not specified in the pod object. - q.AddNominatedPod(&highPriorityPod, "node2") + q.AddNominatedPod(framework.NewPodInfo(highPriorityPodInfo.Pod), "node2") expectedNominatedPods := &nominatedPodMap{ nominatedPodToNode: map[types.UID]string{ - medPriorityPod.UID: "node1", - highPriorityPod.UID: "node2", - unschedulablePod.UID: "node5", + medPriorityPodInfo.Pod.UID: "node1", + highPriorityPodInfo.Pod.UID: "node2", + unschedulablePodInfo.Pod.UID: "node5", }, - nominatedPods: map[string][]*v1.Pod{ - "node1": {&medPriorityPod}, - "node2": {&highPriorityPod}, - "node5": {&unschedulablePod}, + nominatedPods: map[string][]*framework.PodInfo{ + "node1": {medPriorityPodInfo}, + "node2": {highPriorityPodInfo}, + "node5": {unschedulablePodInfo}, }, } if !reflect.DeepEqual(q.PodNominator, expectedNominatedPods) { t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.PodNominator) } - if p, err := q.Pop(); err != nil || p.Pod != &medPriorityPod { - t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Pod.Name) + if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod { + t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name) } // List of nominated pods shouldn't change after popping them from the queue. if !reflect.DeepEqual(q.PodNominator, expectedNominatedPods) { @@ -533,17 +533,17 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { } // Update one of the nominated pods that doesn't have nominatedNodeName in the // pod object. It should be updated correctly. - q.AddNominatedPod(&highPriorityPod, "node4") + q.AddNominatedPod(highPriorityPodInfo, "node4") expectedNominatedPods = &nominatedPodMap{ nominatedPodToNode: map[types.UID]string{ - medPriorityPod.UID: "node1", - highPriorityPod.UID: "node4", - unschedulablePod.UID: "node5", + medPriorityPodInfo.Pod.UID: "node1", + highPriorityPodInfo.Pod.UID: "node4", + unschedulablePodInfo.Pod.UID: "node5", }, - nominatedPods: map[string][]*v1.Pod{ - "node1": {&medPriorityPod}, - "node4": {&highPriorityPod}, - "node5": {&unschedulablePod}, + nominatedPods: map[string][]*framework.PodInfo{ + "node1": {medPriorityPodInfo}, + "node4": {highPriorityPodInfo}, + "node5": {unschedulablePodInfo}, }, } if !reflect.DeepEqual(q.PodNominator, expectedNominatedPods) { @@ -552,15 +552,15 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { // Delete a nominated pod that doesn't have nominatedNodeName in the pod // object. It should be deleted. - q.DeleteNominatedPodIfExists(&highPriorityPod) + q.DeleteNominatedPodIfExists(highPriorityPodInfo.Pod) expectedNominatedPods = &nominatedPodMap{ nominatedPodToNode: map[types.UID]string{ - medPriorityPod.UID: "node1", - unschedulablePod.UID: "node5", + medPriorityPodInfo.Pod.UID: "node1", + unschedulablePodInfo.Pod.UID: "node5", }, - nominatedPods: map[string][]*v1.Pod{ - "node1": {&medPriorityPod}, - "node5": {&unschedulablePod}, + nominatedPods: map[string][]*framework.PodInfo{ + "node1": {medPriorityPodInfo}, + "node5": {unschedulablePodInfo}, }, } if !reflect.DeepEqual(q.PodNominator, expectedNominatedPods) { @@ -647,35 +647,35 @@ func TestUnschedulablePodsMap(t *testing.T) { name: "create, update, delete subset of pods", podsToAdd: []*v1.Pod{pods[0], pods[1], pods[2], pods[3]}, expectedMapAfterAdd: map[string]*framework.QueuedPodInfo{ - util.GetPodFullName(pods[0]): {Pod: pods[0]}, - util.GetPodFullName(pods[1]): {Pod: pods[1]}, - util.GetPodFullName(pods[2]): {Pod: pods[2]}, - util.GetPodFullName(pods[3]): {Pod: pods[3]}, + util.GetPodFullName(pods[0]): {PodInfo: framework.NewPodInfo(pods[0])}, + util.GetPodFullName(pods[1]): {PodInfo: framework.NewPodInfo(pods[1])}, + util.GetPodFullName(pods[2]): {PodInfo: framework.NewPodInfo(pods[2])}, + util.GetPodFullName(pods[3]): {PodInfo: framework.NewPodInfo(pods[3])}, }, podsToUpdate: []*v1.Pod{updatedPods[0]}, expectedMapAfterUpdate: map[string]*framework.QueuedPodInfo{ - util.GetPodFullName(pods[0]): {Pod: updatedPods[0]}, - util.GetPodFullName(pods[1]): {Pod: pods[1]}, - util.GetPodFullName(pods[2]): {Pod: pods[2]}, - util.GetPodFullName(pods[3]): {Pod: pods[3]}, + util.GetPodFullName(pods[0]): {PodInfo: framework.NewPodInfo(updatedPods[0])}, + util.GetPodFullName(pods[1]): {PodInfo: framework.NewPodInfo(pods[1])}, + util.GetPodFullName(pods[2]): {PodInfo: framework.NewPodInfo(pods[2])}, + util.GetPodFullName(pods[3]): {PodInfo: framework.NewPodInfo(pods[3])}, }, podsToDelete: []*v1.Pod{pods[0], pods[1]}, expectedMapAfterDelete: map[string]*framework.QueuedPodInfo{ - util.GetPodFullName(pods[2]): {Pod: pods[2]}, - util.GetPodFullName(pods[3]): {Pod: pods[3]}, + util.GetPodFullName(pods[2]): {PodInfo: framework.NewPodInfo(pods[2])}, + util.GetPodFullName(pods[3]): {PodInfo: framework.NewPodInfo(pods[3])}, }, }, { name: "create, update, delete all", podsToAdd: []*v1.Pod{pods[0], pods[3]}, expectedMapAfterAdd: map[string]*framework.QueuedPodInfo{ - util.GetPodFullName(pods[0]): {Pod: pods[0]}, - util.GetPodFullName(pods[3]): {Pod: pods[3]}, + util.GetPodFullName(pods[0]): {PodInfo: framework.NewPodInfo(pods[0])}, + util.GetPodFullName(pods[3]): {PodInfo: framework.NewPodInfo(pods[3])}, }, podsToUpdate: []*v1.Pod{updatedPods[3]}, expectedMapAfterUpdate: map[string]*framework.QueuedPodInfo{ - util.GetPodFullName(pods[0]): {Pod: pods[0]}, - util.GetPodFullName(pods[3]): {Pod: updatedPods[3]}, + util.GetPodFullName(pods[0]): {PodInfo: framework.NewPodInfo(pods[0])}, + util.GetPodFullName(pods[3]): {PodInfo: framework.NewPodInfo(updatedPods[3])}, }, podsToDelete: []*v1.Pod{pods[0], pods[3]}, expectedMapAfterDelete: map[string]*framework.QueuedPodInfo{}, @@ -684,17 +684,17 @@ func TestUnschedulablePodsMap(t *testing.T) { name: "delete non-existing and existing pods", podsToAdd: []*v1.Pod{pods[1], pods[2]}, expectedMapAfterAdd: map[string]*framework.QueuedPodInfo{ - util.GetPodFullName(pods[1]): {Pod: pods[1]}, - util.GetPodFullName(pods[2]): {Pod: pods[2]}, + util.GetPodFullName(pods[1]): {PodInfo: framework.NewPodInfo(pods[1])}, + util.GetPodFullName(pods[2]): {PodInfo: framework.NewPodInfo(pods[2])}, }, podsToUpdate: []*v1.Pod{updatedPods[1]}, expectedMapAfterUpdate: map[string]*framework.QueuedPodInfo{ - util.GetPodFullName(pods[1]): {Pod: updatedPods[1]}, - util.GetPodFullName(pods[2]): {Pod: pods[2]}, + util.GetPodFullName(pods[1]): {PodInfo: framework.NewPodInfo(updatedPods[1])}, + util.GetPodFullName(pods[2]): {PodInfo: framework.NewPodInfo(pods[2])}, }, podsToDelete: []*v1.Pod{pods[2], pods[3]}, expectedMapAfterDelete: map[string]*framework.QueuedPodInfo{ - util.GetPodFullName(pods[1]): {Pod: updatedPods[1]}, + util.GetPodFullName(pods[1]): {PodInfo: framework.NewPodInfo(updatedPods[1])}, }, }, } @@ -703,7 +703,7 @@ func TestUnschedulablePodsMap(t *testing.T) { t.Run(test.name, func(t *testing.T) { upm := newUnschedulablePodsMap(nil) for _, p := range test.podsToAdd { - upm.addOrUpdate(newQueuedPodInfoNoTimestamp(p)) + upm.addOrUpdate(newQueuedPodInfoForLookup(p)) } if !reflect.DeepEqual(upm.podInfoMap, test.expectedMapAfterAdd) { t.Errorf("Unexpected map after adding pods. Expected: %v, got: %v", @@ -712,7 +712,7 @@ func TestUnschedulablePodsMap(t *testing.T) { if len(test.podsToUpdate) > 0 { for _, p := range test.podsToUpdate { - upm.addOrUpdate(newQueuedPodInfoNoTimestamp(p)) + upm.addOrUpdate(newQueuedPodInfoForLookup(p)) } if !reflect.DeepEqual(upm.podInfoMap, test.expectedMapAfterUpdate) { t.Errorf("Unexpected map after updating pods. Expected: %v, got: %v", @@ -785,7 +785,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) { t.Errorf("Error while popping the head of the queue: %v", err) } // Update pod condition to unschedulable. - podutil.UpdatePodCondition(&p1.Pod.Status, &v1.PodCondition{ + podutil.UpdatePodCondition(&p1.PodInfo.Pod.Status, &v1.PodCondition{ Type: v1.PodScheduled, Status: v1.ConditionFalse, Reason: v1.PodReasonUnschedulable, @@ -805,7 +805,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) { t.Errorf("Error while popping pods from the queue: %v", err) } if (i == 4) != (p1 == p) { - t.Errorf("A pod tried before is not the last pod popped: i: %v, pod name: %v", i, p.Pod.Name) + t.Errorf("A pod tried before is not the last pod popped: i: %v, pod name: %v", i, p.PodInfo.Pod.Name) } } } @@ -844,7 +844,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { }) // Put in the unschedulable queue - q.AddUnschedulableIfNotPresent(newQueuedPodInfoNoTimestamp(&unschedulablePod), q.SchedulingCycle()) + q.AddUnschedulableIfNotPresent(newQueuedPodInfoForLookup(&unschedulablePod), q.SchedulingCycle()) // Move clock to make the unschedulable pods complete backoff. c.Step(DefaultPodInitialBackoffDuration + time.Second) // Move all unschedulable pods to the active queue. @@ -878,7 +878,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { } q.Add(&newerPod) - // And then unschedulablePod was determined as unschedulable AGAIN. + // And then unschedulablePodInfo was determined as unschedulable AGAIN. podutil.UpdatePodCondition(&unschedulablePod.Status, &v1.PodCondition{ Type: v1.PodScheduled, Status: v1.ConditionFalse, @@ -887,7 +887,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { }) // And then, put unschedulable pod to the unschedulable queue - q.AddUnschedulableIfNotPresent(newQueuedPodInfoNoTimestamp(&unschedulablePod), q.SchedulingCycle()) + q.AddUnschedulableIfNotPresent(newQueuedPodInfoForLookup(&unschedulablePod), q.SchedulingCycle()) // Move clock to make the unschedulable pods complete backoff. c.Step(DefaultPodInitialBackoffDuration + time.Second) // Move all unschedulable pods to the active queue. @@ -1021,10 +1021,10 @@ func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) { q.flushUnschedulableQLeftover() if p, err := q.Pop(); err != nil || p.Pod != &highPod { - t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Pod.Name) + t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) } if p, err := q.Pop(); err != nil || p.Pod != &midPod { - t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Pod.Name) + t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name) } } @@ -1097,11 +1097,11 @@ func TestPodTimestamp(t *testing.T) { var timestamp = time.Now() pInfo1 := &framework.QueuedPodInfo{ - Pod: pod1, + PodInfo: framework.NewPodInfo(pod1), Timestamp: timestamp, } pInfo2 := &framework.QueuedPodInfo{ - Pod: pod2, + PodInfo: framework.NewPodInfo(pod2), Timestamp: timestamp.Add(time.Second), } @@ -1399,13 +1399,13 @@ func TestIncomingPodsMetrics(t *testing.T) { var pInfos = make([]*framework.QueuedPodInfo, 0, 3) for i := 1; i <= 3; i++ { p := &framework.QueuedPodInfo{ - Pod: &v1.Pod{ + PodInfo: framework.NewPodInfo(&v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("test-pod-%d", i), Namespace: fmt.Sprintf("ns%d", i), UID: types.UID(fmt.Sprintf("tp-%d", i)), }, - }, + }), Timestamp: timestamp, } pInfos = append(pInfos, p) @@ -1574,13 +1574,13 @@ func makeQueuedPodInfos(num int, timestamp time.Time) []*framework.QueuedPodInfo var pInfos = make([]*framework.QueuedPodInfo, 0, num) for i := 1; i <= num; i++ { p := &framework.QueuedPodInfo{ - Pod: &v1.Pod{ + PodInfo: framework.NewPodInfo(&v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("test-pod-%d", i), Namespace: fmt.Sprintf("ns%d", i), UID: types.UID(fmt.Sprintf("tp-%d", i)), }, - }, + }), Timestamp: timestamp, } pInfos = append(pInfos, p) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index a4d11786c6f..fa95b878a09 100755 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -325,7 +325,7 @@ func (sched *Scheduler) recordSchedulingFailure(fwk framework.Framework, podInfo // and the time the scheduler receives a Pod Update for the nominated pod. // Here we check for nil only for tests. if sched.SchedulingQueue != nil { - sched.SchedulingQueue.AddNominatedPod(podInfo.Pod, nominatedNode) + sched.SchedulingQueue.AddNominatedPod(podInfo.PodInfo, nominatedNode) } pod := podInfo.Pod diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 4cfb7de8b8f..06acf9e4b12 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -357,7 +357,7 @@ func TestSchedulerScheduleOne(t *testing.T) { gotError = err }, NextPod: func() *framework.QueuedPodInfo { - return &framework.QueuedPodInfo{Pod: item.sendPod} + return &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(item.sendPod)} }, Profiles: profile.Map{ testSchedulerName: fwk, @@ -838,7 +838,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C SchedulerCache: scache, Algorithm: algo, NextPod: func() *framework.QueuedPodInfo { - return &framework.QueuedPodInfo{Pod: clientcache.Pop(queuedPodStore).(*v1.Pod)} + return &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(clientcache.Pop(queuedPodStore).(*v1.Pod))} }, Error: func(p *framework.QueuedPodInfo, err error) { errChan <- err