Move scheduling queue's nominator to a separate file

This commit is contained in:
Maciej Skoczeń 2024-07-22 11:55:18 +00:00
parent 33815db3c1
commit e303808896
3 changed files with 196 additions and 172 deletions

View File

@ -0,0 +1,195 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queue
import (
"slices"
"sync"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
)
// nominator is a structure that stores pods nominated to run on nodes.
// It exists because nominatedNodeName of pod objects stored in the structure
// may be different than what scheduler has here. We should be able to find pods
// by their UID and update/delete them.
type nominator struct {
// nLock synchronizes all operations related to nominator.
// It should not be used anywhere else.
// Caution: DO NOT take ("SchedulingQueue.lock" or "activeQueue.lock") after taking "nLock".
// You should always take "SchedulingQueue.lock" and "activeQueue.lock" first,
// otherwise the nominator could end up in deadlock.
// Correct locking order is: SchedulingQueue.lock > activeQueue.lock > nLock.
nLock sync.RWMutex
// podLister is used to verify if the given pod is alive.
podLister listersv1.PodLister
// nominatedPods is a map keyed by a node name and the value is a list of
// pods which are nominated to run on the node. These are pods which can be in
// the activeQ or unschedulablePods.
nominatedPods map[string][]PodRef
// nominatedPodToNode is map keyed by a Pod UID to the node name where it is
// nominated.
nominatedPodToNode map[types.UID]string
}
func newPodNominator(podLister listersv1.PodLister) *nominator {
return &nominator{
podLister: podLister,
nominatedPods: make(map[string][]PodRef),
nominatedPodToNode: make(map[types.UID]string),
}
}
// AddNominatedPod adds a pod to the nominated pods of the given node.
// This is called during the preemption process after a node is nominated to run
// the pod. We update the structure before sending a request to update the pod
// object to avoid races with the following scheduling cycles.
func (npm *nominator) AddNominatedPod(logger klog.Logger, pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) {
npm.nLock.Lock()
npm.addNominatedPodUnlocked(logger, pi, nominatingInfo)
npm.nLock.Unlock()
}
func (npm *nominator) addNominatedPodUnlocked(logger klog.Logger, pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) {
// Always delete the pod if it already exists, to ensure we never store more than
// one instance of the pod.
npm.deleteUnlocked(pi.Pod)
var nodeName string
if nominatingInfo.Mode() == framework.ModeOverride {
nodeName = nominatingInfo.NominatedNodeName
} else if nominatingInfo.Mode() == framework.ModeNoop {
if pi.Pod.Status.NominatedNodeName == "" {
return
}
nodeName = pi.Pod.Status.NominatedNodeName
}
if npm.podLister != nil {
// If the pod was removed or if it was already scheduled, don't nominate it.
updatedPod, err := npm.podLister.Pods(pi.Pod.Namespace).Get(pi.Pod.Name)
if err != nil {
logger.V(4).Info("Pod doesn't exist in podLister, aborted adding it to the nominator", "pod", klog.KObj(pi.Pod))
return
}
if updatedPod.Spec.NodeName != "" {
logger.V(4).Info("Pod is already scheduled to a node, aborted adding it to the nominator", "pod", klog.KObj(pi.Pod), "node", updatedPod.Spec.NodeName)
return
}
}
npm.nominatedPodToNode[pi.Pod.UID] = nodeName
for _, np := range npm.nominatedPods[nodeName] {
if np.UID == pi.Pod.UID {
logger.V(4).Info("Pod already exists in the nominator", "pod", np.UID)
return
}
}
npm.nominatedPods[nodeName] = append(npm.nominatedPods[nodeName], PodToRef(pi.Pod))
}
// UpdateNominatedPod updates the <oldPod> with <newPod>.
func (npm *nominator) UpdateNominatedPod(logger klog.Logger, oldPod *v1.Pod, newPodInfo *framework.PodInfo) {
npm.nLock.Lock()
defer npm.nLock.Unlock()
// In some cases, an Update event with no "NominatedNode" present is received right
// after a node("NominatedNode") is reserved for this pod in memory.
// In this case, we need to keep reserving the NominatedNode when updating the pod pointer.
var nominatingInfo *framework.NominatingInfo
// We won't fall into below `if` block if the Update event represents:
// (1) NominatedNode info is added
// (2) NominatedNode info is updated
// (3) NominatedNode info is removed
if nominatedNodeName(oldPod) == "" && nominatedNodeName(newPodInfo.Pod) == "" {
if nnn, ok := npm.nominatedPodToNode[oldPod.UID]; ok {
// This is the only case we should continue reserving the NominatedNode
nominatingInfo = &framework.NominatingInfo{
NominatingMode: framework.ModeOverride,
NominatedNodeName: nnn,
}
}
}
// We update irrespective of the nominatedNodeName changed or not, to ensure
// that pod pointer is updated.
npm.deleteUnlocked(oldPod)
npm.addNominatedPodUnlocked(logger, newPodInfo, nominatingInfo)
}
// DeleteNominatedPodIfExists deletes <pod> from nominatedPods.
func (npm *nominator) DeleteNominatedPodIfExists(pod *v1.Pod) {
npm.nLock.Lock()
npm.deleteUnlocked(pod)
npm.nLock.Unlock()
}
func (npm *nominator) deleteUnlocked(p *v1.Pod) {
nnn, ok := npm.nominatedPodToNode[p.UID]
if !ok {
return
}
for i, np := range npm.nominatedPods[nnn] {
if np.UID == p.UID {
npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn][:i], npm.nominatedPods[nnn][i+1:]...)
if len(npm.nominatedPods[nnn]) == 0 {
delete(npm.nominatedPods, nnn)
}
break
}
}
delete(npm.nominatedPodToNode, p.UID)
}
func (npm *nominator) nominatedPodsForNode(nodeName string) []PodRef {
npm.nLock.RLock()
defer npm.nLock.RUnlock()
return slices.Clone(npm.nominatedPods[nodeName])
}
// nominatedNodeName returns nominated node name of a Pod.
func nominatedNodeName(pod *v1.Pod) string {
return pod.Status.NominatedNodeName
}
type PodRef struct {
Name string
Namespace string
UID types.UID
}
func PodToRef(pod *v1.Pod) PodRef {
return PodRef{
Name: pod.Name,
Namespace: pod.Namespace,
UID: pod.UID,
}
}
func (np PodRef) ToPod() *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: np.Name,
Namespace: np.Namespace,
UID: np.UID,
},
}
}

View File

@ -31,12 +31,10 @@ import (
"fmt"
"math/rand"
"reflect"
"slices"
"sync"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
@ -137,11 +135,6 @@ func NewSchedulingQueue(
return NewPriorityQueue(lessFn, informerFactory, opts...)
}
// NominatedNodeName returns nominated node name of a Pod.
func NominatedNodeName(pod *v1.Pod) string {
return pod.Status.NominatedNodeName
}
// PriorityQueue implements a scheduling queue.
// The head of PriorityQueue is the highest priority pending pod. This structure
// has two sub queues and a additional data structure, namely: activeQ,
@ -1212,29 +1205,6 @@ func (p *PriorityQueue) Close() {
p.activeQ.broadcast()
}
// DeleteNominatedPodIfExists deletes <pod> from nominatedPods.
func (npm *nominator) DeleteNominatedPodIfExists(pod *v1.Pod) {
npm.nLock.Lock()
npm.delete(pod)
npm.nLock.Unlock()
}
// AddNominatedPod adds a pod to the nominated pods of the given node.
// This is called during the preemption process after a node is nominated to run
// the pod. We update the structure before sending a request to update the pod
// object to avoid races with the following scheduling cycles.
func (npm *nominator) AddNominatedPod(logger klog.Logger, pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) {
npm.nLock.Lock()
npm.addNominatedPodUnlocked(logger, pi, nominatingInfo)
npm.nLock.Unlock()
}
func (npm *nominator) nominatedPodsForNode(nodeName string) []PodRef {
npm.nLock.RLock()
defer npm.nLock.RUnlock()
return slices.Clone(npm.nominatedPods[nodeName])
}
// NominatedPodsForNode returns a copy of pods that are nominated to run on the given node,
// but they are waiting for other pods to be removed from the node.
// CAUTION: Make sure you don't call this function while taking any queue's lock in any scenario.
@ -1362,147 +1332,6 @@ func newUnschedulablePods(unschedulableRecorder, gatedRecorder metrics.MetricRec
}
}
type PodRef struct {
Name string
Namespace string
UID types.UID
}
func PodToRef(pod *v1.Pod) PodRef {
return PodRef{
Name: pod.Name,
Namespace: pod.Namespace,
UID: pod.UID,
}
}
func (np PodRef) ToPod() *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: np.Name,
Namespace: np.Namespace,
UID: np.UID,
},
}
}
// nominator is a structure that stores pods nominated to run on nodes.
// It exists because nominatedNodeName of pod objects stored in the structure
// may be different than what scheduler has here. We should be able to find pods
// by their UID and update/delete them.
type nominator struct {
// nLock synchronizes all operations related to nominator.
// Caution: DO NOT take ("SchedulingQueue.lock" or "activeQueue.lock") after taking "nLock".
// You should always take "SchedulingQueue.lock" and "activeQueue.lock" first,
// otherwise the nominator could end up in deadlock.
// Correct locking order is: SchedulingQueue.lock > activeQueue.lock > nLock.
nLock sync.RWMutex
// podLister is used to verify if the given pod is alive.
podLister listersv1.PodLister
// nominatedPods is a map keyed by a node name and the value is a list of
// pods which are nominated to run on the node. These are pods which can be in
// the activeQ or unschedulablePods.
nominatedPods map[string][]PodRef
// nominatedPodToNode is map keyed by a Pod UID to the node name where it is
// nominated.
nominatedPodToNode map[types.UID]string
// nominatedPodsToInfo returns PodInfos cached in the queues for nominated PodRefs.
// Note: it takes SchedulingQueue.lock inside.
// Make sure you don't call this function while taking any lock in any scenario.
nominatedPodsToInfo func([]PodRef) []*framework.PodInfo
}
func (npm *nominator) addNominatedPodUnlocked(logger klog.Logger, pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) {
// Always delete the pod if it already exists, to ensure we never store more than
// one instance of the pod.
npm.delete(pi.Pod)
var nodeName string
if nominatingInfo.Mode() == framework.ModeOverride {
nodeName = nominatingInfo.NominatedNodeName
} else if nominatingInfo.Mode() == framework.ModeNoop {
if pi.Pod.Status.NominatedNodeName == "" {
return
}
nodeName = pi.Pod.Status.NominatedNodeName
}
if npm.podLister != nil {
// If the pod was removed or if it was already scheduled, don't nominate it.
updatedPod, err := npm.podLister.Pods(pi.Pod.Namespace).Get(pi.Pod.Name)
if err != nil {
logger.V(4).Info("Pod doesn't exist in podLister, aborted adding it to the nominator", "pod", klog.KObj(pi.Pod))
return
}
if updatedPod.Spec.NodeName != "" {
logger.V(4).Info("Pod is already scheduled to a node, aborted adding it to the nominator", "pod", klog.KObj(pi.Pod), "node", updatedPod.Spec.NodeName)
return
}
}
npm.nominatedPodToNode[pi.Pod.UID] = nodeName
for _, np := range npm.nominatedPods[nodeName] {
if np.UID == pi.Pod.UID {
logger.V(4).Info("Pod already exists in the nominator", "pod", np.UID)
return
}
}
npm.nominatedPods[nodeName] = append(npm.nominatedPods[nodeName], PodToRef(pi.Pod))
}
func (npm *nominator) delete(p *v1.Pod) {
nnn, ok := npm.nominatedPodToNode[p.UID]
if !ok {
return
}
for i, np := range npm.nominatedPods[nnn] {
if np.UID == p.UID {
npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn][:i], npm.nominatedPods[nnn][i+1:]...)
if len(npm.nominatedPods[nnn]) == 0 {
delete(npm.nominatedPods, nnn)
}
break
}
}
delete(npm.nominatedPodToNode, p.UID)
}
// UpdateNominatedPod updates the <oldPod> with <newPod>.
func (npm *nominator) UpdateNominatedPod(logger klog.Logger, oldPod *v1.Pod, newPodInfo *framework.PodInfo) {
npm.nLock.Lock()
defer npm.nLock.Unlock()
// In some cases, an Update event with no "NominatedNode" present is received right
// after a node("NominatedNode") is reserved for this pod in memory.
// In this case, we need to keep reserving the NominatedNode when updating the pod pointer.
var nominatingInfo *framework.NominatingInfo
// We won't fall into below `if` block if the Update event represents:
// (1) NominatedNode info is added
// (2) NominatedNode info is updated
// (3) NominatedNode info is removed
if NominatedNodeName(oldPod) == "" && NominatedNodeName(newPodInfo.Pod) == "" {
if nnn, ok := npm.nominatedPodToNode[oldPod.UID]; ok {
// This is the only case we should continue reserving the NominatedNode
nominatingInfo = &framework.NominatingInfo{
NominatingMode: framework.ModeOverride,
NominatedNodeName: nnn,
}
}
}
// We update irrespective of the nominatedNodeName changed or not, to ensure
// that pod pointer is updated.
npm.delete(oldPod)
npm.addNominatedPodUnlocked(logger, newPodInfo, nominatingInfo)
}
func newPodNominator(podLister listersv1.PodLister) *nominator {
return &nominator{
podLister: podLister,
nominatedPods: make(map[string][]PodRef),
nominatedPodToNode: make(map[types.UID]string),
}
}
func podInfoKeyFunc(pInfo *framework.QueuedPodInfo) string {
return cache.NewObjectName(pInfo.Pod.Namespace, pInfo.Pod.Name).String()
}

View File

@ -84,7 +84,7 @@ var (
nominatorCmpOpts = []cmp.Option{
cmp.AllowUnexported(nominator{}),
cmpopts.IgnoreFields(nominator{}, "podLister", "nLock", "nominatedPodsToInfo"),
cmpopts.IgnoreFields(nominator{}, "podLister", "nLock"),
}
queueHintReturnQueue = func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {