Use PodInfo instead of Pod for nominatedPods and QueuedPodInfo

This commit is contained in:
drfish
2021-02-22 22:00:23 +08:00
parent 84483a5aac
commit bc2df9de72
14 changed files with 319 additions and 288 deletions

View File

@@ -194,10 +194,12 @@ var defaultPriorityQueueOptions = priorityQueueOptions{
// Making sure that PriorityQueue implements SchedulingQueue.
var _ SchedulingQueue = &PriorityQueue{}
// newQueuedPodInfoNoTimestamp builds a QueuedPodInfo object without timestamp.
func newQueuedPodInfoNoTimestamp(pod *v1.Pod) *framework.QueuedPodInfo {
// newQueuedPodInfoForLookup builds a QueuedPodInfo object for a lookup in the queue.
func newQueuedPodInfoForLookup(pod *v1.Pod) *framework.QueuedPodInfo {
// Since this is only used for a lookup in the queue, we only need to set the Pod,
// and so we avoid creating a full PodInfo, which is expensive to instantiate frequently.
return &framework.QueuedPodInfo{
Pod: pod,
PodInfo: &framework.PodInfo{Pod: pod},
}
}
@@ -262,7 +264,7 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error {
klog.ErrorS(nil, "Error: pod is already in the podBackoff queue", "pod", klog.KObj(pod))
}
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc()
p.PodNominator.AddNominatedPod(pod, "")
p.PodNominator.AddNominatedPod(pInfo.PodInfo, "")
p.cond.Broadcast()
return nil
@@ -316,7 +318,7 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodI
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()
}
p.PodNominator.AddNominatedPod(pod, "")
p.PodNominator.AddNominatedPod(pInfo.PodInfo, "")
return nil
}
@@ -412,48 +414,53 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
defer p.lock.Unlock()
if oldPod != nil {
oldPodInfo := newQueuedPodInfoNoTimestamp(oldPod)
oldPodInfo := newQueuedPodInfoForLookup(oldPod)
// If the pod is already in the active queue, just update it there.
if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists {
p.PodNominator.UpdateNominatedPod(oldPod, newPod)
err := p.activeQ.Update(updatePod(oldPodInfo, newPod))
return err
pInfo := updatePod(oldPodInfo, newPod)
p.PodNominator.UpdateNominatedPod(oldPod, pInfo.PodInfo)
return p.activeQ.Update(pInfo)
}
// If the pod is in the backoff queue, update it there.
if oldPodInfo, exists, _ := p.podBackoffQ.Get(oldPodInfo); exists {
p.PodNominator.UpdateNominatedPod(oldPod, newPod)
pInfo := updatePod(oldPodInfo, newPod)
p.PodNominator.UpdateNominatedPod(oldPod, pInfo.PodInfo)
p.podBackoffQ.Delete(oldPodInfo)
err := p.activeQ.Add(updatePod(oldPodInfo, newPod))
if err == nil {
p.cond.Broadcast()
if err := p.activeQ.Add(pInfo); err != nil {
return err
}
return err
p.cond.Broadcast()
return nil
}
}
// If the pod is in the unschedulable queue, updating it may make it schedulable.
if usPodInfo := p.unschedulableQ.get(newPod); usPodInfo != nil {
p.PodNominator.UpdateNominatedPod(oldPod, newPod)
if isPodUpdated(oldPod, newPod) {
p.unschedulableQ.delete(usPodInfo.Pod)
err := p.activeQ.Add(updatePod(usPodInfo, newPod))
if err == nil {
p.cond.Broadcast()
pInfo := updatePod(usPodInfo, newPod)
p.PodNominator.UpdateNominatedPod(oldPod, pInfo.PodInfo)
if err := p.activeQ.Add(pInfo); err != nil {
return err
}
return err
p.cond.Broadcast()
return nil
}
// Pod is already in unschedulable queue and hasnt updated, no need to backoff again
p.unschedulableQ.addOrUpdate(updatePod(usPodInfo, newPod))
pInfo := updatePod(usPodInfo, newPod)
p.PodNominator.UpdateNominatedPod(oldPod, pInfo.PodInfo)
// Pod is already in unschedulable queue and hasn't updated, no need to backoff again
p.unschedulableQ.addOrUpdate(pInfo)
return nil
}
// If pod is not in any of the queues, we put it in the active queue.
err := p.activeQ.Add(p.newQueuedPodInfo(newPod))
if err == nil {
p.PodNominator.AddNominatedPod(newPod, "")
p.cond.Broadcast()
pInfo := p.newQueuedPodInfo(newPod)
if err := p.activeQ.Add(pInfo); err != nil {
return err
}
return err
p.PodNominator.AddNominatedPod(pInfo.PodInfo, "")
p.cond.Broadcast()
return nil
}
// Delete deletes the item from either of the two queues. It assumes the pod is
@@ -462,9 +469,9 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
p.PodNominator.DeleteNominatedPodIfExists(pod)
err := p.activeQ.Delete(newQueuedPodInfoNoTimestamp(pod))
if err != nil { // The item was probably not found in the activeQ.
p.podBackoffQ.Delete(newQueuedPodInfoNoTimestamp(pod))
if err := p.activeQ.Delete(newQueuedPodInfoForLookup(pod)); err != nil {
// The item was probably not found in the activeQ.
p.podBackoffQ.Delete(newQueuedPodInfoForLookup(pod))
p.unschedulableQ.delete(pod)
}
return nil
@@ -586,15 +593,15 @@ func (npm *nominatedPodMap) DeleteNominatedPodIfExists(pod *v1.Pod) {
// This is called during the preemption process after a node is nominated to run
// the pod. We update the structure before sending a request to update the pod
// object to avoid races with the following scheduling cycles.
func (npm *nominatedPodMap) AddNominatedPod(pod *v1.Pod, nodeName string) {
func (npm *nominatedPodMap) AddNominatedPod(pi *framework.PodInfo, nodeName string) {
npm.Lock()
npm.add(pod, nodeName)
npm.add(pi, nodeName)
npm.Unlock()
}
// NominatedPodsForNode returns pods that are nominated to run on the given node,
// but they are waiting for other pods to be removed from the node.
func (npm *nominatedPodMap) NominatedPodsForNode(nodeName string) []*v1.Pod {
func (npm *nominatedPodMap) NominatedPodsForNode(nodeName string) []*framework.PodInfo {
npm.RLock()
defer npm.RUnlock()
// TODO: we may need to return a copy of []*Pods to avoid modification
@@ -621,7 +628,7 @@ func (p *PriorityQueue) NumUnschedulablePods() int {
func (p *PriorityQueue) newQueuedPodInfo(pod *v1.Pod) *framework.QueuedPodInfo {
now := p.clock.Now()
return &framework.QueuedPodInfo{
Pod: pod,
PodInfo: framework.NewPodInfo(pod),
Timestamp: now,
InitialAttemptTimestamp: now,
}
@@ -649,7 +656,7 @@ func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInf
func updatePod(oldPodInfo interface{}, newPod *v1.Pod) *framework.QueuedPodInfo {
pInfo := oldPodInfo.(*framework.QueuedPodInfo)
pInfo.Pod = newPod
pInfo.Update(newPod)
return pInfo
}
@@ -717,7 +724,7 @@ type nominatedPodMap struct {
// nominatedPods is a map keyed by a node name and the value is a list of
// pods which are nominated to run on the node. These are pods which can be in
// the activeQ or unschedulableQ.
nominatedPods map[string][]*v1.Pod
nominatedPods map[string][]*framework.PodInfo
// nominatedPodToNode is map keyed by a Pod UID to the node name where it is
// nominated.
nominatedPodToNode map[ktypes.UID]string
@@ -725,26 +732,26 @@ type nominatedPodMap struct {
sync.RWMutex
}
func (npm *nominatedPodMap) add(p *v1.Pod, nodeName string) {
func (npm *nominatedPodMap) add(pi *framework.PodInfo, nodeName string) {
// always delete the pod if it already exist, to ensure we never store more than
// one instance of the pod.
npm.delete(p)
npm.delete(pi.Pod)
nnn := nodeName
if len(nnn) == 0 {
nnn = NominatedNodeName(p)
nnn = NominatedNodeName(pi.Pod)
if len(nnn) == 0 {
return
}
}
npm.nominatedPodToNode[p.UID] = nnn
for _, np := range npm.nominatedPods[nnn] {
if np.UID == p.UID {
klog.V(4).InfoS("Pod already exists in the nominated map", "pod", klog.KObj(p))
npm.nominatedPodToNode[pi.Pod.UID] = nnn
for _, npi := range npm.nominatedPods[nnn] {
if npi.Pod.UID == pi.Pod.UID {
klog.V(4).InfoS("Pod already exists in the nominated map", "pod", klog.KObj(npi.Pod))
return
}
}
npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn], p)
npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn], pi)
}
func (npm *nominatedPodMap) delete(p *v1.Pod) {
@@ -753,7 +760,7 @@ func (npm *nominatedPodMap) delete(p *v1.Pod) {
return
}
for i, np := range npm.nominatedPods[nnn] {
if np.UID == p.UID {
if np.Pod.UID == p.UID {
npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn][:i], npm.nominatedPods[nnn][i+1:]...)
if len(npm.nominatedPods[nnn]) == 0 {
delete(npm.nominatedPods, nnn)
@@ -765,7 +772,7 @@ func (npm *nominatedPodMap) delete(p *v1.Pod) {
}
// UpdateNominatedPod updates the <oldPod> with <newPod>.
func (npm *nominatedPodMap) UpdateNominatedPod(oldPod, newPod *v1.Pod) {
func (npm *nominatedPodMap) UpdateNominatedPod(oldPod *v1.Pod, newPodInfo *framework.PodInfo) {
npm.Lock()
defer npm.Unlock()
// In some cases, an Update event with no "NominatedNode" present is received right
@@ -776,7 +783,7 @@ func (npm *nominatedPodMap) UpdateNominatedPod(oldPod, newPod *v1.Pod) {
// (1) NominatedNode info is added
// (2) NominatedNode info is updated
// (3) NominatedNode info is removed
if NominatedNodeName(oldPod) == "" && NominatedNodeName(newPod) == "" {
if NominatedNodeName(oldPod) == "" && NominatedNodeName(newPodInfo.Pod) == "" {
if nnn, ok := npm.nominatedPodToNode[oldPod.UID]; ok {
// This is the only case we should continue reserving the NominatedNode
nodeName = nnn
@@ -785,13 +792,13 @@ func (npm *nominatedPodMap) UpdateNominatedPod(oldPod, newPod *v1.Pod) {
// We update irrespective of the nominatedNodeName changed or not, to ensure
// that pod pointer is updated.
npm.delete(oldPod)
npm.add(newPod, nodeName)
npm.add(newPodInfo, nodeName)
}
// NewPodNominator creates a nominatedPodMap as a backing of framework.PodNominator.
func NewPodNominator() framework.PodNominator {
return &nominatedPodMap{
nominatedPods: make(map[string][]*v1.Pod),
nominatedPods: make(map[string][]*framework.PodInfo),
nominatedPodToNode: make(map[ktypes.UID]string),
}
}