mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-04 23:17:50 +00:00
Extract logic related with scheduler nominatedPods to an interface
- rename `UpdateNominatedPodForNode` to `AddNominatedPod` - promote `update` to `UpdateNominatedPod` - anonymous lock in nominatedMap - pass PodNominator as an option to NewFramework
This commit is contained in:
@@ -65,6 +65,7 @@ const (
|
||||
// The interface follows a pattern similar to cache.FIFO and cache.Heap and
|
||||
// makes it easy to use those data structures as a SchedulingQueue.
|
||||
type SchedulingQueue interface {
|
||||
framework.PodNominator
|
||||
Add(pod *v1.Pod) error
|
||||
// AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue.
|
||||
// The podSchedulingCycle represents the current scheduling cycle number which can be
|
||||
@@ -87,11 +88,6 @@ type SchedulingQueue interface {
|
||||
// Close closes the SchedulingQueue so that the goroutine which is
|
||||
// waiting to pop items can exit gracefully.
|
||||
Close()
|
||||
// UpdateNominatedPodForNode adds the given pod to the nominated pod map or
|
||||
// updates it if it already exists.
|
||||
UpdateNominatedPodForNode(pod *v1.Pod, nodeName string)
|
||||
// DeleteNominatedPodIfExists deletes nominatedPod from internal cache
|
||||
DeleteNominatedPodIfExists(pod *v1.Pod)
|
||||
// NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.
|
||||
NumUnschedulablePods() int
|
||||
// Run starts the goroutines managing the queue.
|
||||
@@ -116,6 +112,9 @@ func NominatedNodeName(pod *v1.Pod) string {
|
||||
// is called unschedulableQ. The third queue holds pods that are moved from
|
||||
// unschedulable queues and will be moved to active queue when backoff are completed.
|
||||
type PriorityQueue struct {
|
||||
// PodNominator abstracts the operations to maintain nominated Pods.
|
||||
framework.PodNominator
|
||||
|
||||
stop chan struct{}
|
||||
clock util.Clock
|
||||
|
||||
@@ -135,9 +134,6 @@ type PriorityQueue struct {
|
||||
podBackoffQ *heap.Heap
|
||||
// unschedulableQ holds pods that have been tried and determined unschedulable.
|
||||
unschedulableQ *UnschedulablePodsMap
|
||||
// nominatedPods is a structures that stores pods which are nominated to run
|
||||
// on nodes.
|
||||
nominatedPods *nominatedPodMap
|
||||
// schedulingCycle represents sequence number of scheduling cycle and is incremented
|
||||
// when a pod is popped.
|
||||
schedulingCycle int64
|
||||
@@ -156,6 +152,7 @@ type priorityQueueOptions struct {
|
||||
clock util.Clock
|
||||
podInitialBackoffDuration time.Duration
|
||||
podMaxBackoffDuration time.Duration
|
||||
podNominator framework.PodNominator
|
||||
}
|
||||
|
||||
// Option configures a PriorityQueue
|
||||
@@ -168,20 +165,27 @@ func WithClock(clock util.Clock) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// WithPodInitialBackoffDuration sets pod initial backoff duration for PriorityQueue,
|
||||
// WithPodInitialBackoffDuration sets pod initial backoff duration for PriorityQueue.
|
||||
func WithPodInitialBackoffDuration(duration time.Duration) Option {
|
||||
return func(o *priorityQueueOptions) {
|
||||
o.podInitialBackoffDuration = duration
|
||||
}
|
||||
}
|
||||
|
||||
// WithPodMaxBackoffDuration sets pod max backoff duration for PriorityQueue,
|
||||
// WithPodMaxBackoffDuration sets pod max backoff duration for PriorityQueue.
|
||||
func WithPodMaxBackoffDuration(duration time.Duration) Option {
|
||||
return func(o *priorityQueueOptions) {
|
||||
o.podMaxBackoffDuration = duration
|
||||
}
|
||||
}
|
||||
|
||||
// WithPodNominator sets pod nominator for PriorityQueue.
|
||||
func WithPodNominator(pn framework.PodNominator) Option {
|
||||
return func(o *priorityQueueOptions) {
|
||||
o.podNominator = pn
|
||||
}
|
||||
}
|
||||
|
||||
var defaultPriorityQueueOptions = priorityQueueOptions{
|
||||
clock: util.RealClock{},
|
||||
podInitialBackoffDuration: DefaultPodInitialBackoffDuration,
|
||||
@@ -214,14 +218,18 @@ func NewPriorityQueue(
|
||||
return lessFn(pInfo1, pInfo2)
|
||||
}
|
||||
|
||||
if options.podNominator == nil {
|
||||
options.podNominator = NewPodNominator()
|
||||
}
|
||||
|
||||
pq := &PriorityQueue{
|
||||
PodNominator: options.podNominator,
|
||||
clock: options.clock,
|
||||
stop: make(chan struct{}),
|
||||
podInitialBackoffDuration: options.podInitialBackoffDuration,
|
||||
podMaxBackoffDuration: options.podMaxBackoffDuration,
|
||||
activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
|
||||
unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()),
|
||||
nominatedPods: newNominatedPodMap(),
|
||||
moveRequestCycle: -1,
|
||||
}
|
||||
pq.cond.L = &pq.lock
|
||||
@@ -255,7 +263,7 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error {
|
||||
klog.Errorf("Error: pod %v is already in the podBackoff queue.", nsNameForPod(pod))
|
||||
}
|
||||
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc()
|
||||
p.nominatedPods.add(pod, "")
|
||||
p.PodNominator.AddNominatedPod(pod, "")
|
||||
p.cond.Broadcast()
|
||||
|
||||
return nil
|
||||
@@ -316,9 +324,8 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodI
|
||||
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()
|
||||
}
|
||||
|
||||
p.nominatedPods.add(pod, "")
|
||||
p.PodNominator.AddNominatedPod(pod, "")
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
// flushBackoffQCompleted Moves all pods from backoffQ which have completed backoff in to activeQ
|
||||
@@ -416,14 +423,14 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
|
||||
oldPodInfo := newQueuedPodInfoNoTimestamp(oldPod)
|
||||
// If the pod is already in the active queue, just update it there.
|
||||
if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists {
|
||||
p.nominatedPods.update(oldPod, newPod)
|
||||
p.PodNominator.UpdateNominatedPod(oldPod, newPod)
|
||||
err := p.activeQ.Update(updatePod(oldPodInfo, newPod))
|
||||
return err
|
||||
}
|
||||
|
||||
// If the pod is in the backoff queue, update it there.
|
||||
if oldPodInfo, exists, _ := p.podBackoffQ.Get(oldPodInfo); exists {
|
||||
p.nominatedPods.update(oldPod, newPod)
|
||||
p.PodNominator.UpdateNominatedPod(oldPod, newPod)
|
||||
p.podBackoffQ.Delete(oldPodInfo)
|
||||
err := p.activeQ.Add(updatePod(oldPodInfo, newPod))
|
||||
if err == nil {
|
||||
@@ -435,7 +442,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
|
||||
|
||||
// If the pod is in the unschedulable queue, updating it may make it schedulable.
|
||||
if usPodInfo := p.unschedulableQ.get(newPod); usPodInfo != nil {
|
||||
p.nominatedPods.update(oldPod, newPod)
|
||||
p.PodNominator.UpdateNominatedPod(oldPod, newPod)
|
||||
if isPodUpdated(oldPod, newPod) {
|
||||
p.unschedulableQ.delete(usPodInfo.Pod)
|
||||
err := p.activeQ.Add(updatePod(usPodInfo, newPod))
|
||||
@@ -451,7 +458,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
|
||||
// If pod is not in any of the queues, we put it in the active queue.
|
||||
err := p.activeQ.Add(p.newQueuedPodInfo(newPod))
|
||||
if err == nil {
|
||||
p.nominatedPods.add(newPod, "")
|
||||
p.PodNominator.AddNominatedPod(newPod, "")
|
||||
p.cond.Broadcast()
|
||||
}
|
||||
return err
|
||||
@@ -462,7 +469,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
|
||||
func (p *PriorityQueue) Delete(pod *v1.Pod) error {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
p.nominatedPods.delete(pod)
|
||||
p.PodNominator.DeleteNominatedPodIfExists(pod)
|
||||
err := p.activeQ.Delete(newQueuedPodInfoNoTimestamp(pod))
|
||||
if err != nil { // The item was probably not found in the activeQ.
|
||||
p.podBackoffQ.Delete(newQueuedPodInfoNoTimestamp(pod))
|
||||
@@ -553,9 +560,8 @@ func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod
|
||||
// but they are waiting for other pods to be removed from the node before they
|
||||
// can be actually scheduled.
|
||||
func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*v1.Pod {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
return p.nominatedPods.podsForNode(nodeName)
|
||||
// TODO: make podsForNode() public?
|
||||
return p.PodNominator.(*nominatedPodMap).podsForNode(nodeName)
|
||||
}
|
||||
|
||||
// PendingPods returns all the pending pods in the queue. This function is
|
||||
@@ -585,21 +591,21 @@ func (p *PriorityQueue) Close() {
|
||||
p.cond.Broadcast()
|
||||
}
|
||||
|
||||
// DeleteNominatedPodIfExists deletes pod nominatedPods.
|
||||
func (p *PriorityQueue) DeleteNominatedPodIfExists(pod *v1.Pod) {
|
||||
p.lock.Lock()
|
||||
p.nominatedPods.delete(pod)
|
||||
p.lock.Unlock()
|
||||
// DeleteNominatedPodIfExists deletes <pod> from nominatedPods.
|
||||
func (npm *nominatedPodMap) DeleteNominatedPodIfExists(pod *v1.Pod) {
|
||||
npm.Lock()
|
||||
npm.delete(pod)
|
||||
npm.Unlock()
|
||||
}
|
||||
|
||||
// UpdateNominatedPodForNode adds a pod to the nominated pods of the given node.
|
||||
// AddNominatedPod adds a pod to the nominated pods of the given node.
|
||||
// This is called during the preemption process after a node is nominated to run
|
||||
// the pod. We update the structure before sending a request to update the pod
|
||||
// object to avoid races with the following scheduling cycles.
|
||||
func (p *PriorityQueue) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string) {
|
||||
p.lock.Lock()
|
||||
p.nominatedPods.add(pod, nodeName)
|
||||
p.lock.Unlock()
|
||||
func (npm *nominatedPodMap) AddNominatedPod(pod *v1.Pod, nodeName string) {
|
||||
npm.Lock()
|
||||
npm.add(pod, nodeName)
|
||||
npm.Unlock()
|
||||
}
|
||||
|
||||
func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool {
|
||||
@@ -721,6 +727,8 @@ type nominatedPodMap struct {
|
||||
// nominatedPodToNode is map keyed by a Pod UID to the node name where it is
|
||||
// nominated.
|
||||
nominatedPodToNode map[ktypes.UID]string
|
||||
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
func (npm *nominatedPodMap) add(p *v1.Pod, nodeName string) {
|
||||
@@ -762,7 +770,10 @@ func (npm *nominatedPodMap) delete(p *v1.Pod) {
|
||||
delete(npm.nominatedPodToNode, p.UID)
|
||||
}
|
||||
|
||||
func (npm *nominatedPodMap) update(oldPod, newPod *v1.Pod) {
|
||||
// UpdateNominatedPod updates the <oldPod> with <newPod>.
|
||||
func (npm *nominatedPodMap) UpdateNominatedPod(oldPod, newPod *v1.Pod) {
|
||||
npm.Lock()
|
||||
defer npm.Unlock()
|
||||
// In some cases, an Update event with no "NominatedNode" present is received right
|
||||
// after a node("NominatedNode") is reserved for this pod in memory.
|
||||
// In this case, we need to keep reserving the NominatedNode when updating the pod pointer.
|
||||
@@ -784,13 +795,16 @@ func (npm *nominatedPodMap) update(oldPod, newPod *v1.Pod) {
|
||||
}
|
||||
|
||||
func (npm *nominatedPodMap) podsForNode(nodeName string) []*v1.Pod {
|
||||
npm.RLock()
|
||||
defer npm.RUnlock()
|
||||
if list, ok := npm.nominatedPods[nodeName]; ok {
|
||||
return list
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func newNominatedPodMap() *nominatedPodMap {
|
||||
// NewPodNominator creates a nominatedPodMap as a backing of framework.PodNominator.
|
||||
func NewPodNominator() framework.PodNominator {
|
||||
return &nominatedPodMap{
|
||||
nominatedPods: make(map[string][]*v1.Pod),
|
||||
nominatedPodToNode: make(map[ktypes.UID]string),
|
||||
|
||||
Reference in New Issue
Block a user