One lock among PodNominator and SchedulingQueue

Change-Id: I17fe5da40250e42c04124c25b530ce6c8dea4154
This commit is contained in:
Aldo Culquicondor
2023-03-08 16:18:36 -05:00
parent 713ded7368
commit 07a73bb2e1
5 changed files with 83 additions and 63 deletions

View File

@@ -34,7 +34,6 @@ import (
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
@@ -143,8 +142,7 @@ func NominatedNodeName(pod *v1.Pod) string {
// - unschedulablePods holds pods that were already attempted for scheduling and
// are currently determined to be unschedulable.
type PriorityQueue struct {
// PodNominator abstracts the operations to maintain nominated Pods.
framework.PodNominator
*nominator
stop chan struct{}
clock clock.Clock
@@ -156,7 +154,6 @@ type PriorityQueue struct {
// the maximum time a pod can stay in the unschedulablePods.
podMaxInUnschedulablePodsDuration time.Duration
lock sync.RWMutex
cond sync.Cond
// activeQ is heap structure that scheduler actively looks at to find pods to
@@ -192,7 +189,7 @@ type priorityQueueOptions struct {
podInitialBackoffDuration time.Duration
podMaxBackoffDuration time.Duration
podMaxInUnschedulablePodsDuration time.Duration
podNominator framework.PodNominator
podLister listersv1.PodLister
clusterEventMap map[framework.ClusterEvent]sets.String
preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin
}
@@ -221,10 +218,10 @@ func WithPodMaxBackoffDuration(duration time.Duration) Option {
}
}
// WithPodNominator sets pod nominator for PriorityQueue.
func WithPodNominator(pn framework.PodNominator) Option {
// WithPodLister sets pod lister for PriorityQueue.
func WithPodLister(pl listersv1.PodLister) Option {
return func(o *priorityQueueOptions) {
o.podNominator = pn
o.podLister = pl
}
}
@@ -276,6 +273,9 @@ func NewPriorityQueue(
opts ...Option,
) *PriorityQueue {
options := defaultPriorityQueueOptions
if options.podLister == nil {
options.podLister = informerFactory.Core().V1().Pods().Lister()
}
for _, opt := range opts {
opt(&options)
}
@@ -286,12 +286,8 @@ func NewPriorityQueue(
return lessFn(pInfo1, pInfo2)
}
if options.podNominator == nil {
options.podNominator = NewPodNominator(informerFactory.Core().V1().Pods().Lister())
}
pq := &PriorityQueue{
PodNominator: options.podNominator,
nominator: newPodNominator(options.podLister),
clock: options.clock,
stop: make(chan struct{}),
podInitialBackoffDuration: options.podInitialBackoffDuration,
@@ -384,7 +380,7 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error {
}
klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", PodAdd, "queue", activeQName)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc()
p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil)
p.addNominatedPodUnlocked(pInfo.PodInfo, nil)
p.cond.Broadcast()
return nil
@@ -438,7 +434,7 @@ func (p *PriorityQueue) activate(pod *v1.Pod) bool {
p.unschedulablePods.delete(pInfo.Pod, gated)
p.podBackoffQ.Delete(pInfo)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", ForceActivate).Inc()
p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil)
p.addNominatedPodUnlocked(pInfo.PodInfo, nil)
return true
}
@@ -499,7 +495,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodI
}
p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil)
p.addNominatedPodUnlocked(pInfo.PodInfo, nil)
return nil
}
@@ -610,14 +606,14 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
// If the pod is already in the active queue, just update it there.
if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists {
pInfo := updatePod(oldPodInfo, newPod)
p.PodNominator.UpdateNominatedPod(oldPod, pInfo.PodInfo)
p.updateNominatedPodUnlocked(oldPod, pInfo.PodInfo)
return p.activeQ.Update(pInfo)
}
// If the pod is in the backoff queue, update it there.
if oldPodInfo, exists, _ := p.podBackoffQ.Get(oldPodInfo); exists {
pInfo := updatePod(oldPodInfo, newPod)
p.PodNominator.UpdateNominatedPod(oldPod, pInfo.PodInfo)
p.updateNominatedPodUnlocked(oldPod, pInfo.PodInfo)
return p.podBackoffQ.Update(pInfo)
}
}
@@ -625,7 +621,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
// If the pod is in the unschedulable queue, updating it may make it schedulable.
if usPodInfo := p.unschedulablePods.get(newPod); usPodInfo != nil {
pInfo := updatePod(usPodInfo, newPod)
p.PodNominator.UpdateNominatedPod(oldPod, pInfo.PodInfo)
p.updateNominatedPodUnlocked(oldPod, pInfo.PodInfo)
if isPodUpdated(oldPod, newPod) {
gated := usPodInfo.Gated
if p.isPodBackingoff(usPodInfo) {
@@ -654,7 +650,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
if added, err := p.addToActiveQ(pInfo); !added {
return err
}
p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil)
p.addNominatedPodUnlocked(pInfo.PodInfo, nil)
klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", PodUpdate, "queue", activeQName)
p.cond.Broadcast()
return nil
@@ -665,7 +661,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
func (p *PriorityQueue) Delete(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
p.PodNominator.DeleteNominatedPodIfExists(pod)
p.deleteNominatedPodIfExistsUnlocked(pod)
pInfo := newQueuedPodInfoForLookup(pod)
if err := p.activeQ.Delete(pInfo); err != nil {
// The item was probably not found in the activeQ.
@@ -776,8 +772,7 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.
// any affinity term that matches "pod".
// NOTE: this function assumes lock has been acquired in caller.
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*framework.QueuedPodInfo {
var nsLabels labels.Set
nsLabels = interpodaffinity.GetNamespaceLabelsSnapshot(pod.Namespace, p.nsLister)
nsLabels := interpodaffinity.GetNamespaceLabelsSnapshot(pod.Namespace, p.nsLister)
var podsToMove []*framework.QueuedPodInfo
for _, pInfo := range p.unschedulablePods.podInfoMap {
@@ -824,9 +819,13 @@ func (p *PriorityQueue) Close() {
// DeleteNominatedPodIfExists deletes <pod> from nominatedPods.
func (npm *nominator) DeleteNominatedPodIfExists(pod *v1.Pod) {
npm.Lock()
npm.lock.Lock()
npm.deleteNominatedPodIfExistsUnlocked(pod)
npm.lock.Unlock()
}
func (npm *nominator) deleteNominatedPodIfExistsUnlocked(pod *v1.Pod) {
npm.delete(pod)
npm.Unlock()
}
// AddNominatedPod adds a pod to the nominated pods of the given node.
@@ -834,16 +833,16 @@ func (npm *nominator) DeleteNominatedPodIfExists(pod *v1.Pod) {
// the pod. We update the structure before sending a request to update the pod
// object to avoid races with the following scheduling cycles.
func (npm *nominator) AddNominatedPod(pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) {
npm.Lock()
npm.add(pi, nominatingInfo)
npm.Unlock()
npm.lock.Lock()
npm.addNominatedPodUnlocked(pi, nominatingInfo)
npm.lock.Unlock()
}
// NominatedPodsForNode returns a copy of pods that are nominated to run on the given node,
// but they are waiting for other pods to be removed from the node.
func (npm *nominator) NominatedPodsForNode(nodeName string) []*framework.PodInfo {
npm.RLock()
defer npm.RUnlock()
npm.lock.RLock()
defer npm.lock.RUnlock()
// Make a copy of the nominated Pods so the caller can mutate safely.
pods := make([]*framework.PodInfo, len(npm.nominatedPods[nodeName]))
for i := 0; i < len(pods); i++ {
@@ -985,10 +984,10 @@ type nominator struct {
// nominated.
nominatedPodToNode map[types.UID]string
sync.RWMutex
lock sync.RWMutex
}
func (npm *nominator) add(pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) {
func (npm *nominator) addNominatedPodUnlocked(pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) {
// Always delete the pod if it already exists, to ensure we never store more than
// one instance of the pod.
npm.delete(pi.Pod)
@@ -1045,8 +1044,12 @@ func (npm *nominator) delete(p *v1.Pod) {
// UpdateNominatedPod updates the <oldPod> with <newPod>.
func (npm *nominator) UpdateNominatedPod(oldPod *v1.Pod, newPodInfo *framework.PodInfo) {
npm.Lock()
defer npm.Unlock()
npm.lock.Lock()
defer npm.lock.Unlock()
npm.updateNominatedPodUnlocked(oldPod, newPodInfo)
}
func (npm *nominator) updateNominatedPodUnlocked(oldPod *v1.Pod, newPodInfo *framework.PodInfo) {
// In some cases, an Update event with no "NominatedNode" present is received right
// after a node("NominatedNode") is reserved for this pod in memory.
// In this case, we need to keep reserving the NominatedNode when updating the pod pointer.
@@ -1067,13 +1070,17 @@ func (npm *nominator) UpdateNominatedPod(oldPod *v1.Pod, newPodInfo *framework.P
// We update irrespective of the nominatedNodeName changed or not, to ensure
// that pod pointer is updated.
npm.delete(oldPod)
npm.add(newPodInfo, nominatingInfo)
npm.addNominatedPodUnlocked(newPodInfo, nominatingInfo)
}
// NewPodNominator creates a nominator as a backing of framework.PodNominator.
// A podLister is passed in so as to check if the pod exists
// before adding its nominatedNode info.
func NewPodNominator(podLister listersv1.PodLister) framework.PodNominator {
return newPodNominator(podLister)
}
func newPodNominator(podLister listersv1.PodLister) *nominator {
return &nominator{
podLister: podLister,
nominatedPods: make(map[string][]*framework.PodInfo),