diff --git a/pkg/scheduler/internal/queue/nominator.go b/pkg/scheduler/internal/queue/nominator.go new file mode 100644 index 00000000000..e75212ee043 --- /dev/null +++ b/pkg/scheduler/internal/queue/nominator.go @@ -0,0 +1,195 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "slices" + "sync" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + listersv1 "k8s.io/client-go/listers/core/v1" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" +) + +// nominator is a structure that stores pods nominated to run on nodes. +// It exists because nominatedNodeName of pod objects stored in the structure +// may be different than what scheduler has here. We should be able to find pods +// by their UID and update/delete them. +type nominator struct { + // nLock synchronizes all operations related to nominator. + // It should not be used anywhere else. + // Caution: DO NOT take ("SchedulingQueue.lock" or "activeQueue.lock") after taking "nLock". + // You should always take "SchedulingQueue.lock" and "activeQueue.lock" first, + // otherwise the nominator could end up in deadlock. + // Correct locking order is: SchedulingQueue.lock > activeQueue.lock > nLock. + nLock sync.RWMutex + + // podLister is used to verify if the given pod is alive. + podLister listersv1.PodLister + // nominatedPods is a map keyed by a node name and the value is a list of + // pods which are nominated to run on the node. These are pods which can be in + // the activeQ or unschedulablePods. + nominatedPods map[string][]PodRef + // nominatedPodToNode is map keyed by a Pod UID to the node name where it is + // nominated. + nominatedPodToNode map[types.UID]string +} + +func newPodNominator(podLister listersv1.PodLister) *nominator { + return &nominator{ + podLister: podLister, + nominatedPods: make(map[string][]PodRef), + nominatedPodToNode: make(map[types.UID]string), + } +} + +// AddNominatedPod adds a pod to the nominated pods of the given node. +// This is called during the preemption process after a node is nominated to run +// the pod. We update the structure before sending a request to update the pod +// object to avoid races with the following scheduling cycles. +func (npm *nominator) AddNominatedPod(logger klog.Logger, pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) { + npm.nLock.Lock() + npm.addNominatedPodUnlocked(logger, pi, nominatingInfo) + npm.nLock.Unlock() +} + +func (npm *nominator) addNominatedPodUnlocked(logger klog.Logger, pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) { + // Always delete the pod if it already exists, to ensure we never store more than + // one instance of the pod. + npm.deleteUnlocked(pi.Pod) + + var nodeName string + if nominatingInfo.Mode() == framework.ModeOverride { + nodeName = nominatingInfo.NominatedNodeName + } else if nominatingInfo.Mode() == framework.ModeNoop { + if pi.Pod.Status.NominatedNodeName == "" { + return + } + nodeName = pi.Pod.Status.NominatedNodeName + } + + if npm.podLister != nil { + // If the pod was removed or if it was already scheduled, don't nominate it. + updatedPod, err := npm.podLister.Pods(pi.Pod.Namespace).Get(pi.Pod.Name) + if err != nil { + logger.V(4).Info("Pod doesn't exist in podLister, aborted adding it to the nominator", "pod", klog.KObj(pi.Pod)) + return + } + if updatedPod.Spec.NodeName != "" { + logger.V(4).Info("Pod is already scheduled to a node, aborted adding it to the nominator", "pod", klog.KObj(pi.Pod), "node", updatedPod.Spec.NodeName) + return + } + } + + npm.nominatedPodToNode[pi.Pod.UID] = nodeName + for _, np := range npm.nominatedPods[nodeName] { + if np.UID == pi.Pod.UID { + logger.V(4).Info("Pod already exists in the nominator", "pod", np.UID) + return + } + } + npm.nominatedPods[nodeName] = append(npm.nominatedPods[nodeName], PodToRef(pi.Pod)) +} + +// UpdateNominatedPod updates the with . +func (npm *nominator) UpdateNominatedPod(logger klog.Logger, oldPod *v1.Pod, newPodInfo *framework.PodInfo) { + npm.nLock.Lock() + defer npm.nLock.Unlock() + // In some cases, an Update event with no "NominatedNode" present is received right + // after a node("NominatedNode") is reserved for this pod in memory. + // In this case, we need to keep reserving the NominatedNode when updating the pod pointer. + var nominatingInfo *framework.NominatingInfo + // We won't fall into below `if` block if the Update event represents: + // (1) NominatedNode info is added + // (2) NominatedNode info is updated + // (3) NominatedNode info is removed + if nominatedNodeName(oldPod) == "" && nominatedNodeName(newPodInfo.Pod) == "" { + if nnn, ok := npm.nominatedPodToNode[oldPod.UID]; ok { + // This is the only case we should continue reserving the NominatedNode + nominatingInfo = &framework.NominatingInfo{ + NominatingMode: framework.ModeOverride, + NominatedNodeName: nnn, + } + } + } + // We update irrespective of the nominatedNodeName changed or not, to ensure + // that pod pointer is updated. + npm.deleteUnlocked(oldPod) + npm.addNominatedPodUnlocked(logger, newPodInfo, nominatingInfo) +} + +// DeleteNominatedPodIfExists deletes from nominatedPods. +func (npm *nominator) DeleteNominatedPodIfExists(pod *v1.Pod) { + npm.nLock.Lock() + npm.deleteUnlocked(pod) + npm.nLock.Unlock() +} + +func (npm *nominator) deleteUnlocked(p *v1.Pod) { + nnn, ok := npm.nominatedPodToNode[p.UID] + if !ok { + return + } + for i, np := range npm.nominatedPods[nnn] { + if np.UID == p.UID { + npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn][:i], npm.nominatedPods[nnn][i+1:]...) + if len(npm.nominatedPods[nnn]) == 0 { + delete(npm.nominatedPods, nnn) + } + break + } + } + delete(npm.nominatedPodToNode, p.UID) +} + +func (npm *nominator) nominatedPodsForNode(nodeName string) []PodRef { + npm.nLock.RLock() + defer npm.nLock.RUnlock() + return slices.Clone(npm.nominatedPods[nodeName]) +} + +// nominatedNodeName returns nominated node name of a Pod. +func nominatedNodeName(pod *v1.Pod) string { + return pod.Status.NominatedNodeName +} + +type PodRef struct { + Name string + Namespace string + UID types.UID +} + +func PodToRef(pod *v1.Pod) PodRef { + return PodRef{ + Name: pod.Name, + Namespace: pod.Namespace, + UID: pod.UID, + } +} + +func (np PodRef) ToPod() *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: np.Name, + Namespace: np.Namespace, + UID: np.UID, + }, + } +} diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 2a5b530d0d0..6fb21c208db 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -31,12 +31,10 @@ import ( "fmt" "math/rand" "reflect" - "slices" "sync" "time" v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" @@ -137,11 +135,6 @@ func NewSchedulingQueue( return NewPriorityQueue(lessFn, informerFactory, opts...) } -// NominatedNodeName returns nominated node name of a Pod. -func NominatedNodeName(pod *v1.Pod) string { - return pod.Status.NominatedNodeName -} - // PriorityQueue implements a scheduling queue. // The head of PriorityQueue is the highest priority pending pod. This structure // has two sub queues and a additional data structure, namely: activeQ, @@ -1212,29 +1205,6 @@ func (p *PriorityQueue) Close() { p.activeQ.broadcast() } -// DeleteNominatedPodIfExists deletes from nominatedPods. -func (npm *nominator) DeleteNominatedPodIfExists(pod *v1.Pod) { - npm.nLock.Lock() - npm.delete(pod) - npm.nLock.Unlock() -} - -// AddNominatedPod adds a pod to the nominated pods of the given node. -// This is called during the preemption process after a node is nominated to run -// the pod. We update the structure before sending a request to update the pod -// object to avoid races with the following scheduling cycles. -func (npm *nominator) AddNominatedPod(logger klog.Logger, pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) { - npm.nLock.Lock() - npm.addNominatedPodUnlocked(logger, pi, nominatingInfo) - npm.nLock.Unlock() -} - -func (npm *nominator) nominatedPodsForNode(nodeName string) []PodRef { - npm.nLock.RLock() - defer npm.nLock.RUnlock() - return slices.Clone(npm.nominatedPods[nodeName]) -} - // NominatedPodsForNode returns a copy of pods that are nominated to run on the given node, // but they are waiting for other pods to be removed from the node. // CAUTION: Make sure you don't call this function while taking any queue's lock in any scenario. @@ -1362,147 +1332,6 @@ func newUnschedulablePods(unschedulableRecorder, gatedRecorder metrics.MetricRec } } -type PodRef struct { - Name string - Namespace string - UID types.UID -} - -func PodToRef(pod *v1.Pod) PodRef { - return PodRef{ - Name: pod.Name, - Namespace: pod.Namespace, - UID: pod.UID, - } -} - -func (np PodRef) ToPod() *v1.Pod { - return &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: np.Name, - Namespace: np.Namespace, - UID: np.UID, - }, - } -} - -// nominator is a structure that stores pods nominated to run on nodes. -// It exists because nominatedNodeName of pod objects stored in the structure -// may be different than what scheduler has here. We should be able to find pods -// by their UID and update/delete them. -type nominator struct { - // nLock synchronizes all operations related to nominator. - // Caution: DO NOT take ("SchedulingQueue.lock" or "activeQueue.lock") after taking "nLock". - // You should always take "SchedulingQueue.lock" and "activeQueue.lock" first, - // otherwise the nominator could end up in deadlock. - // Correct locking order is: SchedulingQueue.lock > activeQueue.lock > nLock. - nLock sync.RWMutex - - // podLister is used to verify if the given pod is alive. - podLister listersv1.PodLister - // nominatedPods is a map keyed by a node name and the value is a list of - // pods which are nominated to run on the node. These are pods which can be in - // the activeQ or unschedulablePods. - nominatedPods map[string][]PodRef - // nominatedPodToNode is map keyed by a Pod UID to the node name where it is - // nominated. - nominatedPodToNode map[types.UID]string - // nominatedPodsToInfo returns PodInfos cached in the queues for nominated PodRefs. - // Note: it takes SchedulingQueue.lock inside. - // Make sure you don't call this function while taking any lock in any scenario. - nominatedPodsToInfo func([]PodRef) []*framework.PodInfo -} - -func (npm *nominator) addNominatedPodUnlocked(logger klog.Logger, pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) { - // Always delete the pod if it already exists, to ensure we never store more than - // one instance of the pod. - npm.delete(pi.Pod) - - var nodeName string - if nominatingInfo.Mode() == framework.ModeOverride { - nodeName = nominatingInfo.NominatedNodeName - } else if nominatingInfo.Mode() == framework.ModeNoop { - if pi.Pod.Status.NominatedNodeName == "" { - return - } - nodeName = pi.Pod.Status.NominatedNodeName - } - - if npm.podLister != nil { - // If the pod was removed or if it was already scheduled, don't nominate it. - updatedPod, err := npm.podLister.Pods(pi.Pod.Namespace).Get(pi.Pod.Name) - if err != nil { - logger.V(4).Info("Pod doesn't exist in podLister, aborted adding it to the nominator", "pod", klog.KObj(pi.Pod)) - return - } - if updatedPod.Spec.NodeName != "" { - logger.V(4).Info("Pod is already scheduled to a node, aborted adding it to the nominator", "pod", klog.KObj(pi.Pod), "node", updatedPod.Spec.NodeName) - return - } - } - - npm.nominatedPodToNode[pi.Pod.UID] = nodeName - for _, np := range npm.nominatedPods[nodeName] { - if np.UID == pi.Pod.UID { - logger.V(4).Info("Pod already exists in the nominator", "pod", np.UID) - return - } - } - npm.nominatedPods[nodeName] = append(npm.nominatedPods[nodeName], PodToRef(pi.Pod)) -} - -func (npm *nominator) delete(p *v1.Pod) { - nnn, ok := npm.nominatedPodToNode[p.UID] - if !ok { - return - } - for i, np := range npm.nominatedPods[nnn] { - if np.UID == p.UID { - npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn][:i], npm.nominatedPods[nnn][i+1:]...) - if len(npm.nominatedPods[nnn]) == 0 { - delete(npm.nominatedPods, nnn) - } - break - } - } - delete(npm.nominatedPodToNode, p.UID) -} - -// UpdateNominatedPod updates the with . -func (npm *nominator) UpdateNominatedPod(logger klog.Logger, oldPod *v1.Pod, newPodInfo *framework.PodInfo) { - npm.nLock.Lock() - defer npm.nLock.Unlock() - // In some cases, an Update event with no "NominatedNode" present is received right - // after a node("NominatedNode") is reserved for this pod in memory. - // In this case, we need to keep reserving the NominatedNode when updating the pod pointer. - var nominatingInfo *framework.NominatingInfo - // We won't fall into below `if` block if the Update event represents: - // (1) NominatedNode info is added - // (2) NominatedNode info is updated - // (3) NominatedNode info is removed - if NominatedNodeName(oldPod) == "" && NominatedNodeName(newPodInfo.Pod) == "" { - if nnn, ok := npm.nominatedPodToNode[oldPod.UID]; ok { - // This is the only case we should continue reserving the NominatedNode - nominatingInfo = &framework.NominatingInfo{ - NominatingMode: framework.ModeOverride, - NominatedNodeName: nnn, - } - } - } - // We update irrespective of the nominatedNodeName changed or not, to ensure - // that pod pointer is updated. - npm.delete(oldPod) - npm.addNominatedPodUnlocked(logger, newPodInfo, nominatingInfo) -} - -func newPodNominator(podLister listersv1.PodLister) *nominator { - return &nominator{ - podLister: podLister, - nominatedPods: make(map[string][]PodRef), - nominatedPodToNode: make(map[types.UID]string), - } -} - func podInfoKeyFunc(pInfo *framework.QueuedPodInfo) string { return cache.NewObjectName(pInfo.Pod.Namespace, pInfo.Pod.Name).String() } diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index e78be3d2163..c00a3006d29 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -84,7 +84,7 @@ var ( nominatorCmpOpts = []cmp.Option{ cmp.AllowUnexported(nominator{}), - cmpopts.IgnoreFields(nominator{}, "podLister", "nLock", "nominatedPodsToInfo"), + cmpopts.IgnoreFields(nominator{}, "podLister", "nLock"), } queueHintReturnQueue = func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {