Merge pull request #116395 from alculquicondor/fix-podinfo-race

One lock among PodNominator and SchedulingQueue
This commit is contained in:
Kubernetes Prow Robot 2023-03-09 22:44:17 -08:00 committed by GitHub
commit 8b3d529523
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 83 additions and 63 deletions

View File

@ -584,6 +584,9 @@ type Framework interface {
// PercentageOfNodesToScore returns percentageOfNodesToScore associated to a profile. // PercentageOfNodesToScore returns percentageOfNodesToScore associated to a profile.
PercentageOfNodesToScore() *int32 PercentageOfNodesToScore() *int32
// SetPodNominator sets the PodNominator
SetPodNominator(nominator PodNominator)
} }
// Handle provides data and some tools that plugins can use. It is // Handle provides data and some tools that plugins can use. It is

View File

@ -368,6 +368,10 @@ func NewFramework(r Registry, profile *config.KubeSchedulerProfile, stopCh <-cha
return f, nil return f, nil
} }
func (f *frameworkImpl) SetPodNominator(n framework.PodNominator) {
f.PodNominator = n
}
// getScoreWeights makes sure that, between MultiPoint-Score plugin weights and individual Score // getScoreWeights makes sure that, between MultiPoint-Score plugin weights and individual Score
// plugin weights there is not an overflow of MaxTotalScore. // plugin weights there is not an overflow of MaxTotalScore.
func getScoreWeights(f *frameworkImpl, pluginsMap map[string]framework.Plugin, plugins []config.Plugin) error { func getScoreWeights(f *frameworkImpl, pluginsMap map[string]framework.Plugin, plugins []config.Plugin) error {

View File

@ -34,7 +34,6 @@ import (
"time" "time"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
@ -143,8 +142,7 @@ func NominatedNodeName(pod *v1.Pod) string {
// - unschedulablePods holds pods that were already attempted for scheduling and // - unschedulablePods holds pods that were already attempted for scheduling and
// are currently determined to be unschedulable. // are currently determined to be unschedulable.
type PriorityQueue struct { type PriorityQueue struct {
// PodNominator abstracts the operations to maintain nominated Pods. *nominator
framework.PodNominator
stop chan struct{} stop chan struct{}
clock clock.Clock clock clock.Clock
@ -156,7 +154,6 @@ type PriorityQueue struct {
// the maximum time a pod can stay in the unschedulablePods. // the maximum time a pod can stay in the unschedulablePods.
podMaxInUnschedulablePodsDuration time.Duration podMaxInUnschedulablePodsDuration time.Duration
lock sync.RWMutex
cond sync.Cond cond sync.Cond
// activeQ is heap structure that scheduler actively looks at to find pods to // activeQ is heap structure that scheduler actively looks at to find pods to
@ -192,7 +189,7 @@ type priorityQueueOptions struct {
podInitialBackoffDuration time.Duration podInitialBackoffDuration time.Duration
podMaxBackoffDuration time.Duration podMaxBackoffDuration time.Duration
podMaxInUnschedulablePodsDuration time.Duration podMaxInUnschedulablePodsDuration time.Duration
podNominator framework.PodNominator podLister listersv1.PodLister
clusterEventMap map[framework.ClusterEvent]sets.String clusterEventMap map[framework.ClusterEvent]sets.String
preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin
} }
@ -221,10 +218,10 @@ func WithPodMaxBackoffDuration(duration time.Duration) Option {
} }
} }
// WithPodNominator sets pod nominator for PriorityQueue. // WithPodLister sets pod lister for PriorityQueue.
func WithPodNominator(pn framework.PodNominator) Option { func WithPodLister(pl listersv1.PodLister) Option {
return func(o *priorityQueueOptions) { return func(o *priorityQueueOptions) {
o.podNominator = pn o.podLister = pl
} }
} }
@ -276,6 +273,9 @@ func NewPriorityQueue(
opts ...Option, opts ...Option,
) *PriorityQueue { ) *PriorityQueue {
options := defaultPriorityQueueOptions options := defaultPriorityQueueOptions
if options.podLister == nil {
options.podLister = informerFactory.Core().V1().Pods().Lister()
}
for _, opt := range opts { for _, opt := range opts {
opt(&options) opt(&options)
} }
@ -286,12 +286,8 @@ func NewPriorityQueue(
return lessFn(pInfo1, pInfo2) return lessFn(pInfo1, pInfo2)
} }
if options.podNominator == nil {
options.podNominator = NewPodNominator(informerFactory.Core().V1().Pods().Lister())
}
pq := &PriorityQueue{ pq := &PriorityQueue{
PodNominator: options.podNominator, nominator: newPodNominator(options.podLister),
clock: options.clock, clock: options.clock,
stop: make(chan struct{}), stop: make(chan struct{}),
podInitialBackoffDuration: options.podInitialBackoffDuration, podInitialBackoffDuration: options.podInitialBackoffDuration,
@ -384,7 +380,7 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error {
} }
klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", PodAdd, "queue", activeQName) klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", PodAdd, "queue", activeQName)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc() metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc()
p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil) p.addNominatedPodUnlocked(pInfo.PodInfo, nil)
p.cond.Broadcast() p.cond.Broadcast()
return nil return nil
@ -438,7 +434,7 @@ func (p *PriorityQueue) activate(pod *v1.Pod) bool {
p.unschedulablePods.delete(pInfo.Pod, gated) p.unschedulablePods.delete(pInfo.Pod, gated)
p.podBackoffQ.Delete(pInfo) p.podBackoffQ.Delete(pInfo)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", ForceActivate).Inc() metrics.SchedulerQueueIncomingPods.WithLabelValues("active", ForceActivate).Inc()
p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil) p.addNominatedPodUnlocked(pInfo.PodInfo, nil)
return true return true
} }
@ -499,7 +495,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodI
} }
p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil) p.addNominatedPodUnlocked(pInfo.PodInfo, nil)
return nil return nil
} }
@ -610,14 +606,14 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
// If the pod is already in the active queue, just update it there. // If the pod is already in the active queue, just update it there.
if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists { if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists {
pInfo := updatePod(oldPodInfo, newPod) pInfo := updatePod(oldPodInfo, newPod)
p.PodNominator.UpdateNominatedPod(oldPod, pInfo.PodInfo) p.updateNominatedPodUnlocked(oldPod, pInfo.PodInfo)
return p.activeQ.Update(pInfo) return p.activeQ.Update(pInfo)
} }
// If the pod is in the backoff queue, update it there. // If the pod is in the backoff queue, update it there.
if oldPodInfo, exists, _ := p.podBackoffQ.Get(oldPodInfo); exists { if oldPodInfo, exists, _ := p.podBackoffQ.Get(oldPodInfo); exists {
pInfo := updatePod(oldPodInfo, newPod) pInfo := updatePod(oldPodInfo, newPod)
p.PodNominator.UpdateNominatedPod(oldPod, pInfo.PodInfo) p.updateNominatedPodUnlocked(oldPod, pInfo.PodInfo)
return p.podBackoffQ.Update(pInfo) return p.podBackoffQ.Update(pInfo)
} }
} }
@ -625,7 +621,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
// If the pod is in the unschedulable queue, updating it may make it schedulable. // If the pod is in the unschedulable queue, updating it may make it schedulable.
if usPodInfo := p.unschedulablePods.get(newPod); usPodInfo != nil { if usPodInfo := p.unschedulablePods.get(newPod); usPodInfo != nil {
pInfo := updatePod(usPodInfo, newPod) pInfo := updatePod(usPodInfo, newPod)
p.PodNominator.UpdateNominatedPod(oldPod, pInfo.PodInfo) p.updateNominatedPodUnlocked(oldPod, pInfo.PodInfo)
if isPodUpdated(oldPod, newPod) { if isPodUpdated(oldPod, newPod) {
gated := usPodInfo.Gated gated := usPodInfo.Gated
if p.isPodBackingoff(usPodInfo) { if p.isPodBackingoff(usPodInfo) {
@ -654,7 +650,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
if added, err := p.addToActiveQ(pInfo); !added { if added, err := p.addToActiveQ(pInfo); !added {
return err return err
} }
p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil) p.addNominatedPodUnlocked(pInfo.PodInfo, nil)
klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", PodUpdate, "queue", activeQName) klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", PodUpdate, "queue", activeQName)
p.cond.Broadcast() p.cond.Broadcast()
return nil return nil
@ -665,7 +661,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
func (p *PriorityQueue) Delete(pod *v1.Pod) error { func (p *PriorityQueue) Delete(pod *v1.Pod) error {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
p.PodNominator.DeleteNominatedPodIfExists(pod) p.deleteNominatedPodIfExistsUnlocked(pod)
pInfo := newQueuedPodInfoForLookup(pod) pInfo := newQueuedPodInfoForLookup(pod)
if err := p.activeQ.Delete(pInfo); err != nil { if err := p.activeQ.Delete(pInfo); err != nil {
// The item was probably not found in the activeQ. // The item was probably not found in the activeQ.
@ -776,8 +772,7 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.
// any affinity term that matches "pod". // any affinity term that matches "pod".
// NOTE: this function assumes lock has been acquired in caller. // NOTE: this function assumes lock has been acquired in caller.
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*framework.QueuedPodInfo { func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*framework.QueuedPodInfo {
var nsLabels labels.Set nsLabels := interpodaffinity.GetNamespaceLabelsSnapshot(pod.Namespace, p.nsLister)
nsLabels = interpodaffinity.GetNamespaceLabelsSnapshot(pod.Namespace, p.nsLister)
var podsToMove []*framework.QueuedPodInfo var podsToMove []*framework.QueuedPodInfo
for _, pInfo := range p.unschedulablePods.podInfoMap { for _, pInfo := range p.unschedulablePods.podInfoMap {
@ -824,9 +819,13 @@ func (p *PriorityQueue) Close() {
// DeleteNominatedPodIfExists deletes <pod> from nominatedPods. // DeleteNominatedPodIfExists deletes <pod> from nominatedPods.
func (npm *nominator) DeleteNominatedPodIfExists(pod *v1.Pod) { func (npm *nominator) DeleteNominatedPodIfExists(pod *v1.Pod) {
npm.Lock() npm.lock.Lock()
npm.deleteNominatedPodIfExistsUnlocked(pod)
npm.lock.Unlock()
}
func (npm *nominator) deleteNominatedPodIfExistsUnlocked(pod *v1.Pod) {
npm.delete(pod) npm.delete(pod)
npm.Unlock()
} }
// AddNominatedPod adds a pod to the nominated pods of the given node. // AddNominatedPod adds a pod to the nominated pods of the given node.
@ -834,16 +833,16 @@ func (npm *nominator) DeleteNominatedPodIfExists(pod *v1.Pod) {
// the pod. We update the structure before sending a request to update the pod // the pod. We update the structure before sending a request to update the pod
// object to avoid races with the following scheduling cycles. // object to avoid races with the following scheduling cycles.
func (npm *nominator) AddNominatedPod(pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) { func (npm *nominator) AddNominatedPod(pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) {
npm.Lock() npm.lock.Lock()
npm.add(pi, nominatingInfo) npm.addNominatedPodUnlocked(pi, nominatingInfo)
npm.Unlock() npm.lock.Unlock()
} }
// NominatedPodsForNode returns a copy of pods that are nominated to run on the given node, // NominatedPodsForNode returns a copy of pods that are nominated to run on the given node,
// but they are waiting for other pods to be removed from the node. // but they are waiting for other pods to be removed from the node.
func (npm *nominator) NominatedPodsForNode(nodeName string) []*framework.PodInfo { func (npm *nominator) NominatedPodsForNode(nodeName string) []*framework.PodInfo {
npm.RLock() npm.lock.RLock()
defer npm.RUnlock() defer npm.lock.RUnlock()
// Make a copy of the nominated Pods so the caller can mutate safely. // Make a copy of the nominated Pods so the caller can mutate safely.
pods := make([]*framework.PodInfo, len(npm.nominatedPods[nodeName])) pods := make([]*framework.PodInfo, len(npm.nominatedPods[nodeName]))
for i := 0; i < len(pods); i++ { for i := 0; i < len(pods); i++ {
@ -985,10 +984,10 @@ type nominator struct {
// nominated. // nominated.
nominatedPodToNode map[types.UID]string nominatedPodToNode map[types.UID]string
sync.RWMutex lock sync.RWMutex
} }
func (npm *nominator) add(pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) { func (npm *nominator) addNominatedPodUnlocked(pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) {
// Always delete the pod if it already exists, to ensure we never store more than // Always delete the pod if it already exists, to ensure we never store more than
// one instance of the pod. // one instance of the pod.
npm.delete(pi.Pod) npm.delete(pi.Pod)
@ -1045,8 +1044,12 @@ func (npm *nominator) delete(p *v1.Pod) {
// UpdateNominatedPod updates the <oldPod> with <newPod>. // UpdateNominatedPod updates the <oldPod> with <newPod>.
func (npm *nominator) UpdateNominatedPod(oldPod *v1.Pod, newPodInfo *framework.PodInfo) { func (npm *nominator) UpdateNominatedPod(oldPod *v1.Pod, newPodInfo *framework.PodInfo) {
npm.Lock() npm.lock.Lock()
defer npm.Unlock() defer npm.lock.Unlock()
npm.updateNominatedPodUnlocked(oldPod, newPodInfo)
}
func (npm *nominator) updateNominatedPodUnlocked(oldPod *v1.Pod, newPodInfo *framework.PodInfo) {
// In some cases, an Update event with no "NominatedNode" present is received right // In some cases, an Update event with no "NominatedNode" present is received right
// after a node("NominatedNode") is reserved for this pod in memory. // after a node("NominatedNode") is reserved for this pod in memory.
// In this case, we need to keep reserving the NominatedNode when updating the pod pointer. // In this case, we need to keep reserving the NominatedNode when updating the pod pointer.
@ -1067,13 +1070,17 @@ func (npm *nominator) UpdateNominatedPod(oldPod *v1.Pod, newPodInfo *framework.P
// We update irrespective of the nominatedNodeName changed or not, to ensure // We update irrespective of the nominatedNodeName changed or not, to ensure
// that pod pointer is updated. // that pod pointer is updated.
npm.delete(oldPod) npm.delete(oldPod)
npm.add(newPodInfo, nominatingInfo) npm.addNominatedPodUnlocked(newPodInfo, nominatingInfo)
} }
// NewPodNominator creates a nominator as a backing of framework.PodNominator. // NewPodNominator creates a nominator as a backing of framework.PodNominator.
// A podLister is passed in so as to check if the pod exists // A podLister is passed in so as to check if the pod exists
// before adding its nominatedNode info. // before adding its nominatedNode info.
func NewPodNominator(podLister listersv1.PodLister) framework.PodNominator { func NewPodNominator(podLister listersv1.PodLister) framework.PodNominator {
return newPodNominator(podLister)
}
func newPodNominator(podLister listersv1.PodLister) *nominator {
return &nominator{ return &nominator{
podLister: podLister, podLister: podLister,
nominatedPods: make(map[string][]*framework.PodInfo), nominatedPods: make(map[string][]*framework.PodInfo),

View File

@ -76,6 +76,11 @@ var (
scheduledPodInfo = mustNewPodInfo( scheduledPodInfo = mustNewPodInfo(
st.MakePod().Name("sp").Namespace("ns1").UID("spns1").Node("foo").Obj(), st.MakePod().Name("sp").Namespace("ns1").UID("spns1").Node("foo").Obj(),
) )
nominatorCmpOpts = []cmp.Option{
cmp.AllowUnexported(nominator{}),
cmpopts.IgnoreFields(nominator{}, "podLister", "lock"),
}
) )
func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod { func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod {
@ -109,7 +114,7 @@ func TestPriorityQueue_Add(t *testing.T) {
"node1": {medPriorityPodInfo, unschedulablePodInfo}, "node1": {medPriorityPodInfo, unschedulablePodInfo},
}, },
} }
if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" { if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" {
t.Errorf("Unexpected diff after adding pods (-want, +got):\n%s", diff) t.Errorf("Unexpected diff after adding pods (-want, +got):\n%s", diff)
} }
if p, err := q.Pop(); err != nil || p.Pod != highPriorityPodInfo.Pod { if p, err := q.Pop(); err != nil || p.Pod != highPriorityPodInfo.Pod {
@ -121,8 +126,8 @@ func TestPriorityQueue_Add(t *testing.T) {
if p, err := q.Pop(); err != nil || p.Pod != unschedulablePodInfo.Pod { if p, err := q.Pop(); err != nil || p.Pod != unschedulablePodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name)
} }
if len(q.PodNominator.(*nominator).nominatedPods["node1"]) != 2 { if len(q.nominator.nominatedPods["node1"]) != 2 {
t.Errorf("Expected medPriorityPodInfo and unschedulablePodInfo to be still present in nomindatePods: %v", q.PodNominator.(*nominator).nominatedPods["node1"]) t.Errorf("Expected medPriorityPodInfo and unschedulablePodInfo to be still present in nomindatePods: %v", q.nominator.nominatedPods["node1"])
} }
} }
@ -167,14 +172,14 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
"node1": {highPriNominatedPodInfo, unschedulablePodInfo}, "node1": {highPriNominatedPodInfo, unschedulablePodInfo},
}, },
} }
if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" { if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" {
t.Errorf("Unexpected diff after adding pods (-want, +got):\n%s", diff) t.Errorf("Unexpected diff after adding pods (-want, +got):\n%s", diff)
} }
if p, err := q.Pop(); err != nil || p.Pod != highPriNominatedPodInfo.Pod { if p, err := q.Pop(); err != nil || p.Pod != highPriNominatedPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriNominatedPodInfo.Pod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", highPriNominatedPodInfo.Pod.Name, p.Pod.Name)
} }
if len(q.PodNominator.(*nominator).nominatedPods) != 1 { if len(q.nominator.nominatedPods) != 1 {
t.Errorf("Expected nomindatePods to have one element: %v", q.PodNominator) t.Errorf("Expected nomindatePods to have one element: %v", q.nominator)
} }
if getUnschedulablePod(q, unschedulablePodInfo.Pod) != unschedulablePodInfo.Pod { if getUnschedulablePod(q, unschedulablePodInfo.Pod) != unschedulablePodInfo.Pod {
t.Errorf("Pod %v was not found in the unschedulablePods.", unschedulablePodInfo.Pod.Name) t.Errorf("Pod %v was not found in the unschedulablePods.", unschedulablePodInfo.Pod.Name)
@ -255,8 +260,8 @@ func TestPriorityQueue_Pop(t *testing.T) {
if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod { if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name)
} }
if len(q.PodNominator.(*nominator).nominatedPods["node1"]) != 1 { if len(q.nominator.nominatedPods["node1"]) != 1 {
t.Errorf("Expected medPriorityPodInfo to be present in nomindatePods: %v", q.PodNominator.(*nominator).nominatedPods["node1"]) t.Errorf("Expected medPriorityPodInfo to be present in nomindatePods: %v", q.nominator.nominatedPods["node1"])
} }
}() }()
q.Add(medPriorityPodInfo.Pod) q.Add(medPriorityPodInfo.Pod)
@ -273,16 +278,16 @@ func TestPriorityQueue_Update(t *testing.T) {
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(highPriorityPodInfo.Pod)); !exists { if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(highPriorityPodInfo.Pod)); !exists {
t.Errorf("Expected %v to be added to activeQ.", highPriorityPodInfo.Pod.Name) t.Errorf("Expected %v to be added to activeQ.", highPriorityPodInfo.Pod.Name)
} }
if len(q.PodNominator.(*nominator).nominatedPods) != 0 { if len(q.nominator.nominatedPods) != 0 {
t.Errorf("Expected nomindatePods to be empty: %v", q.PodNominator) t.Errorf("Expected nomindatePods to be empty: %v", q.nominator)
} }
// Update highPriorityPodInfo and add a nominatedNodeName to it. // Update highPriorityPodInfo and add a nominatedNodeName to it.
q.Update(highPriorityPodInfo.Pod, highPriNominatedPodInfo.Pod) q.Update(highPriorityPodInfo.Pod, highPriNominatedPodInfo.Pod)
if q.activeQ.Len() != 1 { if q.activeQ.Len() != 1 {
t.Error("Expected only one item in activeQ.") t.Error("Expected only one item in activeQ.")
} }
if len(q.PodNominator.(*nominator).nominatedPods) != 1 { if len(q.nominator.nominatedPods) != 1 {
t.Errorf("Expected one item in nomindatePods map: %v", q.PodNominator) t.Errorf("Expected one item in nomindatePods map: %v", q.nominator)
} }
// Updating an unschedulable pod which is not in any of the two queues, should // Updating an unschedulable pod which is not in any of the two queues, should
// add the pod to activeQ. // add the pod to activeQ.
@ -363,14 +368,14 @@ func TestPriorityQueue_Delete(t *testing.T) {
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod)); exists { if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(highPriNominatedPodInfo.Pod)); exists {
t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPodInfo.Pod.Name) t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPodInfo.Pod.Name)
} }
if len(q.PodNominator.(*nominator).nominatedPods) != 1 { if len(q.nominator.nominatedPods) != 1 {
t.Errorf("Expected nomindatePods to have only 'unschedulablePodInfo': %v", q.PodNominator.(*nominator).nominatedPods) t.Errorf("Expected nomindatePods to have only 'unschedulablePodInfo': %v", q.nominator.nominatedPods)
} }
if err := q.Delete(unschedulablePodInfo.Pod); err != nil { if err := q.Delete(unschedulablePodInfo.Pod); err != nil {
t.Errorf("delete failed: %v", err) t.Errorf("delete failed: %v", err)
} }
if len(q.PodNominator.(*nominator).nominatedPods) != 0 { if len(q.nominator.nominatedPods) != 0 {
t.Errorf("Expected nomindatePods to be empty: %v", q.PodNominator) t.Errorf("Expected nomindatePods to be empty: %v", q.nominator)
} }
} }
@ -842,7 +847,7 @@ func TestPriorityQueue_NominatedPodDeleted(t *testing.T) {
podLister := informerFactory.Core().V1().Pods().Lister() podLister := informerFactory.Core().V1().Pods().Lister()
// Build a PriorityQueue. // Build a PriorityQueue.
q := NewPriorityQueue(newDefaultQueueSort(), informerFactory, WithPodNominator(NewPodNominator(podLister))) q := NewPriorityQueue(newDefaultQueueSort(), informerFactory, WithPodLister(podLister))
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
informerFactory.Start(ctx.Done()) informerFactory.Start(ctx.Done())
@ -924,14 +929,14 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
"node5": {unschedulablePodInfo}, "node5": {unschedulablePodInfo},
}, },
} }
if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" { if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" {
t.Errorf("Unexpected diff after adding pods (-want, +got):\n%s", diff) t.Errorf("Unexpected diff after adding pods (-want, +got):\n%s", diff)
} }
if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod { if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name)
} }
// List of nominated pods shouldn't change after popping them from the queue. // List of nominated pods shouldn't change after popping them from the queue.
if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" { if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" {
t.Errorf("Unexpected diff after popping pods (-want, +got):\n%s", diff) t.Errorf("Unexpected diff after popping pods (-want, +got):\n%s", diff)
} }
// Update one of the nominated pods that doesn't have nominatedNodeName in the // Update one of the nominated pods that doesn't have nominatedNodeName in the
@ -949,14 +954,14 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
"node5": {unschedulablePodInfo}, "node5": {unschedulablePodInfo},
}, },
} }
if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" { if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" {
t.Errorf("Unexpected diff after updating pods (-want, +got):\n%s", diff) t.Errorf("Unexpected diff after updating pods (-want, +got):\n%s", diff)
} }
// Attempt to nominate a pod that was deleted from the informer cache. // Attempt to nominate a pod that was deleted from the informer cache.
// Nothing should change. // Nothing should change.
q.AddNominatedPod(nonExistentPodInfo, &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: "node1"}) q.AddNominatedPod(nonExistentPodInfo, &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: "node1"})
if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" { if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" {
t.Errorf("Unexpected diff after nominating a deleted pod (-want, +got):\n%s", diff) t.Errorf("Unexpected diff after nominating a deleted pod (-want, +got):\n%s", diff)
} }
// Attempt to nominate a pod that was already scheduled in the informer cache. // Attempt to nominate a pod that was already scheduled in the informer cache.
@ -964,7 +969,7 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
scheduledPodCopy := scheduledPodInfo.Pod.DeepCopy() scheduledPodCopy := scheduledPodInfo.Pod.DeepCopy()
scheduledPodInfo.Pod.Spec.NodeName = "" scheduledPodInfo.Pod.Spec.NodeName = ""
q.AddNominatedPod(mustNewTestPodInfo(t, scheduledPodCopy), &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: "node1"}) q.AddNominatedPod(mustNewTestPodInfo(t, scheduledPodCopy), &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: "node1"})
if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" { if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" {
t.Errorf("Unexpected diff after nominating a scheduled pod (-want, +got):\n%s", diff) t.Errorf("Unexpected diff after nominating a scheduled pod (-want, +got):\n%s", diff)
} }
@ -981,7 +986,7 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
"node5": {unschedulablePodInfo}, "node5": {unschedulablePodInfo},
}, },
} }
if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominator{}), cmpopts.IgnoreFields(nominator{}, "podLister", "RWMutex")); diff != "" { if diff := cmp.Diff(q.nominator, expectedNominatedPods, nominatorCmpOpts...); diff != "" {
t.Errorf("Unexpected diff after deleting pods (-want, +got):\n%s", diff) t.Errorf("Unexpected diff after deleting pods (-want, +got):\n%s", diff)
} }
} }
@ -1197,7 +1202,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
// Add an unschedulable pod to a priority queue. // Add an unschedulable pod to a priority queue.
// This makes a situation that the pod was tried to schedule // This makes a situation that the pod was tried to schedule
// and had been determined unschedulable so far // and had been determined unschedulable so far
unschedulablePod := st.MakePod().Name(fmt.Sprintf("test-pod-unscheduled")).Namespace("ns1").UID("tp001").Priority(highPriority).NominatedNodeName("node1").Obj() unschedulablePod := st.MakePod().Name("test-pod-unscheduled").Namespace("ns1").UID("tp001").Priority(highPriority).NominatedNodeName("node1").Obj()
// Update pod condition to unschedulable. // Update pod condition to unschedulable.
podutil.UpdatePodCondition(&unschedulablePod.Status, &v1.PodCondition{ podutil.UpdatePodCondition(&unschedulablePod.Status, &v1.PodCondition{

View File

@ -281,8 +281,6 @@ func New(client clientset.Interface,
podLister := informerFactory.Core().V1().Pods().Lister() podLister := informerFactory.Core().V1().Pods().Lister()
nodeLister := informerFactory.Core().V1().Nodes().Lister() nodeLister := informerFactory.Core().V1().Nodes().Lister()
// The nominator will be passed all the way to framework instantiation.
nominator := internalqueue.NewPodNominator(podLister)
snapshot := internalcache.NewEmptySnapshot() snapshot := internalcache.NewEmptySnapshot()
clusterEventMap := make(map[framework.ClusterEvent]sets.String) clusterEventMap := make(map[framework.ClusterEvent]sets.String)
@ -292,7 +290,6 @@ func New(client clientset.Interface,
frameworkruntime.WithKubeConfig(options.kubeConfig), frameworkruntime.WithKubeConfig(options.kubeConfig),
frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithPodNominator(nominator),
frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)), frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
frameworkruntime.WithClusterEventMap(clusterEventMap), frameworkruntime.WithClusterEventMap(clusterEventMap),
frameworkruntime.WithParallelism(int(options.parallelism)), frameworkruntime.WithParallelism(int(options.parallelism)),
@ -315,12 +312,16 @@ func New(client clientset.Interface,
informerFactory, informerFactory,
internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second), internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second), internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
internalqueue.WithPodNominator(nominator), internalqueue.WithPodLister(podLister),
internalqueue.WithClusterEventMap(clusterEventMap), internalqueue.WithClusterEventMap(clusterEventMap),
internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration), internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
internalqueue.WithPreEnqueuePluginMap(preEnqueuePluginMap), internalqueue.WithPreEnqueuePluginMap(preEnqueuePluginMap),
) )
for _, fwk := range profiles {
fwk.SetPodNominator(podQueue)
}
schedulerCache := internalcache.New(durationToExpireAssumedPod, stopEverything) schedulerCache := internalcache.New(durationToExpireAssumedPod, stopEverything)
// Setup cache debugger. // Setup cache debugger.