mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-19 00:31:00 +00:00
Improve performance of scheduling queue by adding a hash map to track all pods in with a nominatedNodeName.
This commit is contained in:
parent
76e6da25fa
commit
69d62a9288
@ -51,7 +51,7 @@ type SchedulingQueue interface {
|
||||
AddIfNotPresent(pod *v1.Pod) error
|
||||
AddUnschedulableIfNotPresent(pod *v1.Pod) error
|
||||
Pop() (*v1.Pod, error)
|
||||
Update(pod *v1.Pod) error
|
||||
Update(oldPod, newPod *v1.Pod) error
|
||||
Delete(pod *v1.Pod) error
|
||||
MoveAllToActiveQueue()
|
||||
AssignedPodAdded(pod *v1.Pod)
|
||||
@ -93,8 +93,8 @@ func (f *FIFO) AddUnschedulableIfNotPresent(pod *v1.Pod) error {
|
||||
}
|
||||
|
||||
// Update updates a pod in the FIFO.
|
||||
func (f *FIFO) Update(pod *v1.Pod) error {
|
||||
return f.FIFO.Update(pod)
|
||||
func (f *FIFO) Update(oldPod, newPod *v1.Pod) error {
|
||||
return f.FIFO.Update(newPod)
|
||||
}
|
||||
|
||||
// Delete deletes a pod in the FIFO.
|
||||
@ -139,6 +139,11 @@ func NewFIFO() *FIFO {
|
||||
return &FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)}
|
||||
}
|
||||
|
||||
// NominatedNodeName returns nominated node name of a Pod.
|
||||
func NominatedNodeName(pod *v1.Pod) string {
|
||||
return pod.Status.NominatedNodeName
|
||||
}
|
||||
|
||||
// UnschedulablePods is an interface for a queue that is used to keep unschedulable
|
||||
// pods. These pods are not actively reevaluated for scheduling. They are moved
|
||||
// to the active scheduling queue on certain events, such as termination of a pod
|
||||
@ -147,7 +152,6 @@ type UnschedulablePods interface {
|
||||
Add(pod *v1.Pod)
|
||||
Delete(pod *v1.Pod)
|
||||
Update(pod *v1.Pod)
|
||||
GetPodsWaitingForNode(nodeName string) []*v1.Pod
|
||||
Get(pod *v1.Pod) *v1.Pod
|
||||
Clear()
|
||||
}
|
||||
@ -167,6 +171,10 @@ type PriorityQueue struct {
|
||||
activeQ *Heap
|
||||
// unschedulableQ holds pods that have been tried and determined unschedulable.
|
||||
unschedulableQ *UnschedulablePodsMap
|
||||
// nominatedPods is a map keyed by a node name and the value is a list of
|
||||
// pods which are nominated to run on the node. These are pods which can be in
|
||||
// the activeQ or unschedulableQ.
|
||||
nominatedPods map[string][]*v1.Pod
|
||||
// receivedMoveRequest is set to true whenever we receive a request to move a
|
||||
// pod from the unschedulableQ to the activeQ, and is set to false, when we pop
|
||||
// a pod from the activeQ. It indicates if we received a move request when a
|
||||
@ -183,11 +191,51 @@ func NewPriorityQueue() *PriorityQueue {
|
||||
pq := &PriorityQueue{
|
||||
activeQ: newHeap(cache.MetaNamespaceKeyFunc, util.HigherPriorityPod),
|
||||
unschedulableQ: newUnschedulablePodsMap(),
|
||||
nominatedPods: map[string][]*v1.Pod{},
|
||||
}
|
||||
pq.cond.L = &pq.lock
|
||||
return pq
|
||||
}
|
||||
|
||||
// addNominatedPodIfNeeded adds a pod to nominatedPods if it has a NominatedNodeName and it does not
|
||||
// already exist in the map. Adding an existing pod is not going to update the pod.
|
||||
func (p *PriorityQueue) addNominatedPodIfNeeded(pod *v1.Pod) {
|
||||
nnn := NominatedNodeName(pod)
|
||||
if len(nnn) > 0 {
|
||||
for _, p := range p.nominatedPods[nnn] {
|
||||
if p.Name == pod.Name && p.Namespace == pod.Namespace {
|
||||
glog.Errorf("Pod %v/%v already exists in the nominated map!", pod.Namespace, pod.Name)
|
||||
return
|
||||
}
|
||||
}
|
||||
p.nominatedPods[nnn] = append(p.nominatedPods[nnn], pod)
|
||||
}
|
||||
}
|
||||
|
||||
// deleteNominatedPodIfExists deletes a pod from the nominatedPods.
|
||||
func (p *PriorityQueue) deleteNominatedPodIfExists(pod *v1.Pod) {
|
||||
nnn := NominatedNodeName(pod)
|
||||
if len(nnn) > 0 {
|
||||
for i, np := range p.nominatedPods[nnn] {
|
||||
if np.Name == pod.Name && np.Namespace == pod.Namespace {
|
||||
p.nominatedPods[nnn] = append(p.nominatedPods[nnn][:i], p.nominatedPods[nnn][i+1:]...)
|
||||
if len(p.nominatedPods[nnn]) == 0 {
|
||||
delete(p.nominatedPods, nnn)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// updateNominatedPod updates a pod in the nominatedPods.
|
||||
func (p *PriorityQueue) updateNominatedPod(oldPod, newPod *v1.Pod) {
|
||||
// Even if the nominated node name of the Pod is not changed, we must delete and add it again
|
||||
// to ensure that its pointer is updated.
|
||||
p.deleteNominatedPodIfExists(oldPod)
|
||||
p.addNominatedPodIfNeeded(newPod)
|
||||
}
|
||||
|
||||
// Add adds a pod to the active queue. It should be called only when a new pod
|
||||
// is added so there is no chance the pod is already in either queue.
|
||||
func (p *PriorityQueue) Add(pod *v1.Pod) error {
|
||||
@ -199,8 +247,10 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error {
|
||||
} else {
|
||||
if p.unschedulableQ.Get(pod) != nil {
|
||||
glog.Errorf("Error: pod %v is already in the unschedulable queue.", pod.Name)
|
||||
p.deleteNominatedPodIfExists(pod)
|
||||
p.unschedulableQ.Delete(pod)
|
||||
}
|
||||
p.addNominatedPodIfNeeded(pod)
|
||||
p.cond.Broadcast()
|
||||
}
|
||||
return err
|
||||
@ -221,6 +271,7 @@ func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error {
|
||||
if err != nil {
|
||||
glog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
|
||||
} else {
|
||||
p.addNominatedPodIfNeeded(pod)
|
||||
p.cond.Broadcast()
|
||||
}
|
||||
return err
|
||||
@ -245,10 +296,12 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error {
|
||||
}
|
||||
if !p.receivedMoveRequest && isPodUnschedulable(pod) {
|
||||
p.unschedulableQ.Add(pod)
|
||||
p.addNominatedPodIfNeeded(pod)
|
||||
return nil
|
||||
}
|
||||
err := p.activeQ.Add(pod)
|
||||
if err == nil {
|
||||
p.addNominatedPodIfNeeded(pod)
|
||||
p.cond.Broadcast()
|
||||
}
|
||||
return err
|
||||
@ -267,8 +320,10 @@ func (p *PriorityQueue) Pop() (*v1.Pod, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pod := obj.(*v1.Pod)
|
||||
p.deleteNominatedPodIfExists(pod)
|
||||
p.receivedMoveRequest = false
|
||||
return obj.(*v1.Pod), err
|
||||
return pod, err
|
||||
}
|
||||
|
||||
// isPodUpdated checks if the pod is updated in a way that it may have become
|
||||
@ -287,30 +342,33 @@ func isPodUpdated(oldPod, newPod *v1.Pod) bool {
|
||||
// Update updates a pod in the active queue if present. Otherwise, it removes
|
||||
// the item from the unschedulable queue and adds the updated one to the active
|
||||
// queue.
|
||||
func (p *PriorityQueue) Update(pod *v1.Pod) error {
|
||||
func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
// If the pod is already in the active queue, just update it there.
|
||||
if _, exists, _ := p.activeQ.Get(pod); exists {
|
||||
err := p.activeQ.Update(pod)
|
||||
if _, exists, _ := p.activeQ.Get(newPod); exists {
|
||||
p.updateNominatedPod(oldPod, newPod)
|
||||
err := p.activeQ.Update(newPod)
|
||||
return err
|
||||
}
|
||||
// If the pod is in the unschedulable queue, updating it may make it schedulable.
|
||||
if oldPod := p.unschedulableQ.Get(pod); oldPod != nil {
|
||||
if isPodUpdated(oldPod, pod) {
|
||||
p.unschedulableQ.Delete(oldPod)
|
||||
err := p.activeQ.Add(pod)
|
||||
if usPod := p.unschedulableQ.Get(newPod); usPod != nil {
|
||||
p.updateNominatedPod(oldPod, newPod)
|
||||
if isPodUpdated(oldPod, newPod) {
|
||||
p.unschedulableQ.Delete(usPod)
|
||||
err := p.activeQ.Add(newPod)
|
||||
if err == nil {
|
||||
p.cond.Broadcast()
|
||||
}
|
||||
return err
|
||||
}
|
||||
p.unschedulableQ.Update(pod)
|
||||
p.unschedulableQ.Update(newPod)
|
||||
return nil
|
||||
}
|
||||
// If pod is not in any of the two queue, we put it in the active queue.
|
||||
err := p.activeQ.Add(pod)
|
||||
err := p.activeQ.Add(newPod)
|
||||
if err == nil {
|
||||
p.addNominatedPodIfNeeded(newPod)
|
||||
p.cond.Broadcast()
|
||||
}
|
||||
return err
|
||||
@ -321,6 +379,7 @@ func (p *PriorityQueue) Update(pod *v1.Pod) error {
|
||||
func (p *PriorityQueue) Delete(pod *v1.Pod) error {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
p.deleteNominatedPodIfExists(pod)
|
||||
if _, exists, _ := p.activeQ.Get(pod); exists {
|
||||
return p.activeQ.Delete(pod)
|
||||
}
|
||||
@ -403,68 +462,34 @@ func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod
|
||||
func (p *PriorityQueue) WaitingPodsForNode(nodeName string) []*v1.Pod {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
pods := p.unschedulableQ.GetPodsWaitingForNode(nodeName)
|
||||
for _, obj := range p.activeQ.List() {
|
||||
pod := obj.(*v1.Pod)
|
||||
if pod.Status.NominatedNodeName == nodeName {
|
||||
pods = append(pods, pod)
|
||||
}
|
||||
if list, ok := p.nominatedPods[nodeName]; ok {
|
||||
return list
|
||||
}
|
||||
return pods
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnschedulablePodsMap holds pods that cannot be scheduled. This data structure
|
||||
// is used to implement unschedulableQ.
|
||||
type UnschedulablePodsMap struct {
|
||||
// pods is a map key by a pod's full-name and the value is a pointer to the pod.
|
||||
pods map[string]*v1.Pod
|
||||
// nominatedPods is a map keyed by a node name and the value is a list of
|
||||
// pods' full-names which are nominated to run on the node.
|
||||
nominatedPods map[string][]string
|
||||
keyFunc func(*v1.Pod) string
|
||||
pods map[string]*v1.Pod
|
||||
keyFunc func(*v1.Pod) string
|
||||
}
|
||||
|
||||
var _ = UnschedulablePods(&UnschedulablePodsMap{})
|
||||
|
||||
// NominatedNodeName returns the nominated node name of a pod.
|
||||
func NominatedNodeName(pod *v1.Pod) string {
|
||||
return pod.Status.NominatedNodeName
|
||||
}
|
||||
|
||||
// Add adds a pod to the unschedulable pods.
|
||||
func (u *UnschedulablePodsMap) Add(pod *v1.Pod) {
|
||||
podKey := u.keyFunc(pod)
|
||||
if _, exists := u.pods[podKey]; !exists {
|
||||
u.pods[podKey] = pod
|
||||
nominatedNodeName := NominatedNodeName(pod)
|
||||
if len(nominatedNodeName) > 0 {
|
||||
u.nominatedPods[nominatedNodeName] = append(u.nominatedPods[nominatedNodeName], podKey)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (u *UnschedulablePodsMap) deleteFromNominated(pod *v1.Pod) {
|
||||
nominatedNodeName := NominatedNodeName(pod)
|
||||
if len(nominatedNodeName) > 0 {
|
||||
podKey := u.keyFunc(pod)
|
||||
nps := u.nominatedPods[nominatedNodeName]
|
||||
for i, np := range nps {
|
||||
if np == podKey {
|
||||
u.nominatedPods[nominatedNodeName] = append(nps[:i], nps[i+1:]...)
|
||||
if len(u.nominatedPods[nominatedNodeName]) == 0 {
|
||||
delete(u.nominatedPods, nominatedNodeName)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Delete deletes a pod from the unschedulable pods.
|
||||
func (u *UnschedulablePodsMap) Delete(pod *v1.Pod) {
|
||||
podKey := u.keyFunc(pod)
|
||||
if p, exists := u.pods[podKey]; exists {
|
||||
u.deleteFromNominated(p)
|
||||
if _, exists := u.pods[podKey]; exists {
|
||||
delete(u.pods, podKey)
|
||||
}
|
||||
}
|
||||
@ -472,20 +497,12 @@ func (u *UnschedulablePodsMap) Delete(pod *v1.Pod) {
|
||||
// Update updates a pod in the unschedulable pods.
|
||||
func (u *UnschedulablePodsMap) Update(pod *v1.Pod) {
|
||||
podKey := u.keyFunc(pod)
|
||||
oldPod, exists := u.pods[podKey]
|
||||
_, exists := u.pods[podKey]
|
||||
if !exists {
|
||||
u.Add(pod)
|
||||
return
|
||||
}
|
||||
u.pods[podKey] = pod
|
||||
oldNominateNodeName := NominatedNodeName(oldPod)
|
||||
nominatedNodeName := NominatedNodeName(pod)
|
||||
if oldNominateNodeName != nominatedNodeName {
|
||||
u.deleteFromNominated(oldPod)
|
||||
if len(nominatedNodeName) > 0 {
|
||||
u.nominatedPods[nominatedNodeName] = append(u.nominatedPods[nominatedNodeName], podKey)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get returns the pod if a pod with the same key as the key of the given "pod"
|
||||
@ -498,28 +515,16 @@ func (u *UnschedulablePodsMap) Get(pod *v1.Pod) *v1.Pod {
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetPodsWaitingForNode returns a list of unschedulable pods whose NominatedNodeNames
|
||||
// are equal to the given nodeName.
|
||||
func (u *UnschedulablePodsMap) GetPodsWaitingForNode(nodeName string) []*v1.Pod {
|
||||
var pods []*v1.Pod
|
||||
for _, key := range u.nominatedPods[nodeName] {
|
||||
pods = append(pods, u.pods[key])
|
||||
}
|
||||
return pods
|
||||
}
|
||||
|
||||
// Clear removes all the entries from the unschedulable maps.
|
||||
func (u *UnschedulablePodsMap) Clear() {
|
||||
u.pods = make(map[string]*v1.Pod)
|
||||
u.nominatedPods = make(map[string][]string)
|
||||
}
|
||||
|
||||
// newUnschedulablePodsMap initializes a new object of UnschedulablePodsMap.
|
||||
func newUnschedulablePodsMap() *UnschedulablePodsMap {
|
||||
return &UnschedulablePodsMap{
|
||||
pods: make(map[string]*v1.Pod),
|
||||
nominatedPods: make(map[string][]string),
|
||||
keyFunc: util.GetPodFullName,
|
||||
pods: make(map[string]*v1.Pod),
|
||||
keyFunc: util.GetPodFullName,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -27,7 +27,7 @@ import (
|
||||
)
|
||||
|
||||
var mediumPriority = (lowPriority + highPriority) / 2
|
||||
var highPriorityPod, medPriorityPod, unschedulablePod = v1.Pod{
|
||||
var highPriorityPod, highPriNominatedPod, medPriorityPod, unschedulablePod = v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "hpp",
|
||||
Namespace: "ns1",
|
||||
@ -36,6 +36,18 @@ var highPriorityPod, medPriorityPod, unschedulablePod = v1.Pod{
|
||||
Priority: &highPriority,
|
||||
},
|
||||
},
|
||||
v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "hpp",
|
||||
Namespace: "ns1",
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Priority: &highPriority,
|
||||
},
|
||||
Status: v1.PodStatus{
|
||||
NominatedNodeName: "node1",
|
||||
},
|
||||
},
|
||||
v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "mpp",
|
||||
@ -79,6 +91,12 @@ func TestPriorityQueue_Add(t *testing.T) {
|
||||
q.Add(&medPriorityPod)
|
||||
q.Add(&unschedulablePod)
|
||||
q.Add(&highPriorityPod)
|
||||
expectedNominatedPods := map[string][]*v1.Pod{
|
||||
"node1": {&medPriorityPod, &unschedulablePod},
|
||||
}
|
||||
if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) {
|
||||
t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods)
|
||||
}
|
||||
if p, err := q.Pop(); err != nil || p != &highPriorityPod {
|
||||
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name)
|
||||
}
|
||||
@ -88,6 +106,61 @@ func TestPriorityQueue_Add(t *testing.T) {
|
||||
if p, err := q.Pop(); err != nil || p != &unschedulablePod {
|
||||
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Name)
|
||||
}
|
||||
if len(q.nominatedPods) != 0 {
|
||||
t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPriorityQueue_AddIfNotPresent(t *testing.T) {
|
||||
q := NewPriorityQueue()
|
||||
q.unschedulableQ.Add(&highPriNominatedPod)
|
||||
q.AddIfNotPresent(&highPriNominatedPod) // Must not add anything.
|
||||
q.AddIfNotPresent(&medPriorityPod)
|
||||
q.AddIfNotPresent(&unschedulablePod)
|
||||
expectedNominatedPods := map[string][]*v1.Pod{
|
||||
"node1": {&medPriorityPod, &unschedulablePod},
|
||||
}
|
||||
if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) {
|
||||
t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods)
|
||||
}
|
||||
if p, err := q.Pop(); err != nil || p != &medPriorityPod {
|
||||
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name)
|
||||
}
|
||||
if p, err := q.Pop(); err != nil || p != &unschedulablePod {
|
||||
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Name)
|
||||
}
|
||||
if len(q.nominatedPods) != 0 {
|
||||
t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods)
|
||||
}
|
||||
if q.unschedulableQ.Get(&highPriNominatedPod) != &highPriNominatedPod {
|
||||
t.Errorf("Pod %v was not found in the unschedulableQ.", highPriNominatedPod.Name)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
|
||||
q := NewPriorityQueue()
|
||||
q.Add(&highPriNominatedPod)
|
||||
q.AddUnschedulableIfNotPresent(&highPriNominatedPod) // Must not add anything.
|
||||
q.AddUnschedulableIfNotPresent(&medPriorityPod) // This should go to activeQ.
|
||||
q.AddUnschedulableIfNotPresent(&unschedulablePod)
|
||||
expectedNominatedPods := map[string][]*v1.Pod{
|
||||
"node1": {&highPriNominatedPod, &medPriorityPod, &unschedulablePod},
|
||||
}
|
||||
if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) {
|
||||
t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods)
|
||||
}
|
||||
if p, err := q.Pop(); err != nil || p != &highPriNominatedPod {
|
||||
t.Errorf("Expected: %v after Pop, but got: %v", highPriNominatedPod.Name, p.Name)
|
||||
}
|
||||
if p, err := q.Pop(); err != nil || p != &medPriorityPod {
|
||||
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name)
|
||||
}
|
||||
if len(q.nominatedPods) != 1 {
|
||||
t.Errorf("Expected nomindatePods to have one element: %v", q.nominatedPods)
|
||||
}
|
||||
if q.unschedulableQ.Get(&unschedulablePod) != &unschedulablePod {
|
||||
t.Errorf("Pod %v was not found in the unschedulableQ.", unschedulablePod.Name)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPriorityQueue_Pop(t *testing.T) {
|
||||
@ -96,55 +169,71 @@ func TestPriorityQueue_Pop(t *testing.T) {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if p, err := q.Pop(); err != nil || p != &highPriorityPod {
|
||||
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name)
|
||||
if p, err := q.Pop(); err != nil || p != &medPriorityPod {
|
||||
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name)
|
||||
}
|
||||
if len(q.nominatedPods) != 0 {
|
||||
t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods)
|
||||
}
|
||||
}()
|
||||
q.Add(&highPriorityPod)
|
||||
q.Add(&medPriorityPod)
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestPriorityQueue_Update(t *testing.T) {
|
||||
q := NewPriorityQueue()
|
||||
q.Update(&highPriorityPod)
|
||||
q.Update(nil, &highPriorityPod)
|
||||
if _, exists, _ := q.activeQ.Get(&highPriorityPod); !exists {
|
||||
t.Errorf("Expected %v to be added to activeQ.", highPriorityPod.Name)
|
||||
}
|
||||
q.Update(&highPriorityPod)
|
||||
if len(q.nominatedPods) != 0 {
|
||||
t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods)
|
||||
}
|
||||
// Update highPriorityPod and add a nominatedNodeName to it.
|
||||
q.Update(&highPriorityPod, &highPriNominatedPod)
|
||||
if q.activeQ.data.Len() != 1 {
|
||||
t.Error("Expected only one item in activeQ.")
|
||||
}
|
||||
if len(q.nominatedPods) != 1 {
|
||||
t.Errorf("Expected one item in nomindatePods map: %v", q.nominatedPods)
|
||||
}
|
||||
// Updating an unschedulable pod which is not in any of the two queues, should
|
||||
// add the pod to activeQ.
|
||||
q.Update(&unschedulablePod)
|
||||
q.Update(&unschedulablePod, &unschedulablePod)
|
||||
if _, exists, _ := q.activeQ.Get(&unschedulablePod); !exists {
|
||||
t.Errorf("Expected %v to be added to activeQ.", unschedulablePod.Name)
|
||||
}
|
||||
// Updating a pod that is already in unschedulableQ, should move the pod to
|
||||
// activeQ.
|
||||
q.Update(&unschedulablePod)
|
||||
// Updating a pod that is already in activeQ, should not change it.
|
||||
q.Update(&unschedulablePod, &unschedulablePod)
|
||||
if len(q.unschedulableQ.pods) != 0 {
|
||||
t.Error("Expected unschedulableQ to be empty.")
|
||||
}
|
||||
if _, exists, _ := q.activeQ.Get(&unschedulablePod); !exists {
|
||||
t.Errorf("Expected: %v to be added to activeQ.", unschedulablePod.Name)
|
||||
}
|
||||
if p, err := q.Pop(); err != nil || p != &highPriorityPod {
|
||||
if p, err := q.Pop(); err != nil || p != &highPriNominatedPod {
|
||||
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPriorityQueue_Delete(t *testing.T) {
|
||||
q := NewPriorityQueue()
|
||||
q.Update(&highPriorityPod)
|
||||
q.Update(&highPriorityPod, &highPriNominatedPod)
|
||||
q.Add(&unschedulablePod)
|
||||
q.Delete(&highPriorityPod)
|
||||
q.Delete(&highPriNominatedPod)
|
||||
if _, exists, _ := q.activeQ.Get(&unschedulablePod); !exists {
|
||||
t.Errorf("Expected %v to be in activeQ.", unschedulablePod.Name)
|
||||
}
|
||||
if _, exists, _ := q.activeQ.Get(&highPriorityPod); exists {
|
||||
if _, exists, _ := q.activeQ.Get(&highPriNominatedPod); exists {
|
||||
t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPod.Name)
|
||||
}
|
||||
if len(q.nominatedPods) != 1 {
|
||||
t.Errorf("Expected nomindatePods to have only 'unschedulablePod': %v", q.nominatedPods)
|
||||
}
|
||||
q.Delete(&unschedulablePod)
|
||||
if len(q.nominatedPods) != 0 {
|
||||
t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPriorityQueue_MoveAllToActiveQueue(t *testing.T) {
|
||||
@ -214,6 +303,23 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestPriorityQueue_WaitingPodsForNode(t *testing.T) {
|
||||
q := NewPriorityQueue()
|
||||
q.Add(&medPriorityPod)
|
||||
q.Add(&unschedulablePod)
|
||||
q.Add(&highPriorityPod)
|
||||
if p, err := q.Pop(); err != nil || p != &highPriorityPod {
|
||||
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name)
|
||||
}
|
||||
expectedList := []*v1.Pod{&medPriorityPod, &unschedulablePod}
|
||||
if !reflect.DeepEqual(expectedList, q.WaitingPodsForNode("node1")) {
|
||||
t.Error("Unexpected list of nominated Pods for node.")
|
||||
}
|
||||
if q.WaitingPodsForNode("node2") != nil {
|
||||
t.Error("Expected list of nominated Pods for node2 to be empty.")
|
||||
}
|
||||
}
|
||||
|
||||
func TestUnschedulablePodsMap(t *testing.T) {
|
||||
var pods = []*v1.Pod{
|
||||
{
|
||||
@ -261,22 +367,16 @@ func TestUnschedulablePodsMap(t *testing.T) {
|
||||
}
|
||||
var updatedPods = make([]*v1.Pod, len(pods))
|
||||
updatedPods[0] = pods[0].DeepCopy()
|
||||
updatedPods[0].Status.NominatedNodeName = "node3"
|
||||
updatedPods[1] = pods[1].DeepCopy()
|
||||
updatedPods[1].Status.NominatedNodeName = "node3"
|
||||
updatedPods[3] = pods[3].DeepCopy()
|
||||
updatedPods[3].Status.NominatedNodeName = ""
|
||||
|
||||
tests := []struct {
|
||||
podsToAdd []*v1.Pod
|
||||
expectedMapAfterAdd map[string]*v1.Pod
|
||||
expectedNominatedAfterAdd map[string][]string
|
||||
podsToUpdate []*v1.Pod
|
||||
expectedMapAfterUpdate map[string]*v1.Pod
|
||||
expectedNominatedAfterUpdate map[string][]string
|
||||
podsToDelete []*v1.Pod
|
||||
expectedMapAfterDelete map[string]*v1.Pod
|
||||
expectedNominatedAfterDelete map[string][]string
|
||||
podsToAdd []*v1.Pod
|
||||
expectedMapAfterAdd map[string]*v1.Pod
|
||||
podsToUpdate []*v1.Pod
|
||||
expectedMapAfterUpdate map[string]*v1.Pod
|
||||
podsToDelete []*v1.Pod
|
||||
expectedMapAfterDelete map[string]*v1.Pod
|
||||
}{
|
||||
{
|
||||
podsToAdd: []*v1.Pod{pods[0], pods[1], pods[2], pods[3]},
|
||||
@ -286,10 +386,6 @@ func TestUnschedulablePodsMap(t *testing.T) {
|
||||
util.GetPodFullName(pods[2]): pods[2],
|
||||
util.GetPodFullName(pods[3]): pods[3],
|
||||
},
|
||||
expectedNominatedAfterAdd: map[string][]string{
|
||||
"node1": {util.GetPodFullName(pods[0]), util.GetPodFullName(pods[3])},
|
||||
"node3": {util.GetPodFullName(pods[2])},
|
||||
},
|
||||
podsToUpdate: []*v1.Pod{updatedPods[0]},
|
||||
expectedMapAfterUpdate: map[string]*v1.Pod{
|
||||
util.GetPodFullName(pods[0]): updatedPods[0],
|
||||
@ -297,19 +393,11 @@ func TestUnschedulablePodsMap(t *testing.T) {
|
||||
util.GetPodFullName(pods[2]): pods[2],
|
||||
util.GetPodFullName(pods[3]): pods[3],
|
||||
},
|
||||
expectedNominatedAfterUpdate: map[string][]string{
|
||||
"node1": {util.GetPodFullName(pods[3])},
|
||||
"node3": {util.GetPodFullName(pods[2]), util.GetPodFullName(pods[0])},
|
||||
},
|
||||
podsToDelete: []*v1.Pod{pods[0], pods[1]},
|
||||
expectedMapAfterDelete: map[string]*v1.Pod{
|
||||
util.GetPodFullName(pods[2]): pods[2],
|
||||
util.GetPodFullName(pods[3]): pods[3],
|
||||
},
|
||||
expectedNominatedAfterDelete: map[string][]string{
|
||||
"node1": {util.GetPodFullName(pods[3])},
|
||||
"node3": {util.GetPodFullName(pods[2])},
|
||||
},
|
||||
},
|
||||
{
|
||||
podsToAdd: []*v1.Pod{pods[0], pods[3]},
|
||||
@ -317,20 +405,13 @@ func TestUnschedulablePodsMap(t *testing.T) {
|
||||
util.GetPodFullName(pods[0]): pods[0],
|
||||
util.GetPodFullName(pods[3]): pods[3],
|
||||
},
|
||||
expectedNominatedAfterAdd: map[string][]string{
|
||||
"node1": {util.GetPodFullName(pods[0]), util.GetPodFullName(pods[3])},
|
||||
},
|
||||
podsToUpdate: []*v1.Pod{updatedPods[3]},
|
||||
expectedMapAfterUpdate: map[string]*v1.Pod{
|
||||
util.GetPodFullName(pods[0]): pods[0],
|
||||
util.GetPodFullName(pods[3]): updatedPods[3],
|
||||
},
|
||||
expectedNominatedAfterUpdate: map[string][]string{
|
||||
"node1": {util.GetPodFullName(pods[0])},
|
||||
},
|
||||
podsToDelete: []*v1.Pod{pods[0], pods[3]},
|
||||
expectedMapAfterDelete: map[string]*v1.Pod{},
|
||||
expectedNominatedAfterDelete: map[string][]string{},
|
||||
podsToDelete: []*v1.Pod{pods[0], pods[3]},
|
||||
expectedMapAfterDelete: map[string]*v1.Pod{},
|
||||
},
|
||||
{
|
||||
podsToAdd: []*v1.Pod{pods[1], pods[2]},
|
||||
@ -338,24 +419,15 @@ func TestUnschedulablePodsMap(t *testing.T) {
|
||||
util.GetPodFullName(pods[1]): pods[1],
|
||||
util.GetPodFullName(pods[2]): pods[2],
|
||||
},
|
||||
expectedNominatedAfterAdd: map[string][]string{
|
||||
"node3": {util.GetPodFullName(pods[2])},
|
||||
},
|
||||
podsToUpdate: []*v1.Pod{updatedPods[1]},
|
||||
expectedMapAfterUpdate: map[string]*v1.Pod{
|
||||
util.GetPodFullName(pods[1]): updatedPods[1],
|
||||
util.GetPodFullName(pods[2]): pods[2],
|
||||
},
|
||||
expectedNominatedAfterUpdate: map[string][]string{
|
||||
"node3": {util.GetPodFullName(pods[2]), util.GetPodFullName(updatedPods[1])},
|
||||
},
|
||||
podsToDelete: []*v1.Pod{pods[2], pods[3]},
|
||||
expectedMapAfterDelete: map[string]*v1.Pod{
|
||||
util.GetPodFullName(pods[1]): updatedPods[1],
|
||||
},
|
||||
expectedNominatedAfterDelete: map[string][]string{
|
||||
"node3": {util.GetPodFullName(updatedPods[1])},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
@ -368,10 +440,7 @@ func TestUnschedulablePodsMap(t *testing.T) {
|
||||
t.Errorf("#%d: Unexpected map after adding pods. Expected: %v, got: %v",
|
||||
i, test.expectedMapAfterAdd, upm.pods)
|
||||
}
|
||||
if !reflect.DeepEqual(upm.nominatedPods, test.expectedNominatedAfterAdd) {
|
||||
t.Errorf("#%d: Unexpected nominated map after adding pods. Expected: %v, got: %v",
|
||||
i, test.expectedNominatedAfterAdd, upm.nominatedPods)
|
||||
}
|
||||
|
||||
if len(test.podsToUpdate) > 0 {
|
||||
for _, p := range test.podsToUpdate {
|
||||
upm.Update(p)
|
||||
@ -380,10 +449,6 @@ func TestUnschedulablePodsMap(t *testing.T) {
|
||||
t.Errorf("#%d: Unexpected map after updating pods. Expected: %v, got: %v",
|
||||
i, test.expectedMapAfterUpdate, upm.pods)
|
||||
}
|
||||
if !reflect.DeepEqual(upm.nominatedPods, test.expectedNominatedAfterUpdate) {
|
||||
t.Errorf("#%d: Unexpected nominated map after updating pods. Expected: %v, got: %v",
|
||||
i, test.expectedNominatedAfterUpdate, upm.nominatedPods)
|
||||
}
|
||||
}
|
||||
for _, p := range test.podsToDelete {
|
||||
upm.Delete(p)
|
||||
@ -392,10 +457,6 @@ func TestUnschedulablePodsMap(t *testing.T) {
|
||||
t.Errorf("#%d: Unexpected map after deleting pods. Expected: %v, got: %v",
|
||||
i, test.expectedMapAfterDelete, upm.pods)
|
||||
}
|
||||
if !reflect.DeepEqual(upm.nominatedPods, test.expectedNominatedAfterDelete) {
|
||||
t.Errorf("#%d: Unexpected nominated map after deleting pods. Expected: %v, got: %v",
|
||||
i, test.expectedNominatedAfterDelete, upm.nominatedPods)
|
||||
}
|
||||
upm.Clear()
|
||||
if len(upm.pods) != 0 {
|
||||
t.Errorf("Expected the map to be empty, but has %v elements.", len(upm.pods))
|
||||
|
@ -591,7 +591,7 @@ func (c *configFactory) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
|
||||
if c.skipPodUpdate(pod) {
|
||||
return
|
||||
}
|
||||
if err := c.podQueue.Update(pod); err != nil {
|
||||
if err := c.podQueue.Update(oldObj.(*v1.Pod), pod); err != nil {
|
||||
runtime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err))
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user