From daf8492bc52cee4809a9a98064a03d9b73c7a636 Mon Sep 17 00:00:00 2001 From: "Bobby (Babak) Salamat" Date: Tue, 24 Oct 2017 11:14:29 -0700 Subject: [PATCH 1/2] Added a new scheduling queue based on priority queue. --- .../algorithm/predicates/predicates.go | 12 +- plugin/pkg/scheduler/core/scheduling_queue.go | 684 +++++++++++++++++- .../scheduler/core/scheduling_queue_test.go | 389 ++++++++++ plugin/pkg/scheduler/factory/factory.go | 26 +- plugin/pkg/scheduler/scheduler.go | 4 +- plugin/pkg/scheduler/util/utils.go | 7 + 6 files changed, 1090 insertions(+), 32 deletions(-) create mode 100644 plugin/pkg/scheduler/core/scheduling_queue_test.go diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index 2291ae44876..65d8f5c9ca5 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -1070,7 +1070,7 @@ func (c *PodAffinityChecker) anyPodMatchesPodAffinityTerm(pod *v1.Pod, allPods [ return false, matchingPodExists, nil } -func getPodAffinityTerms(podAffinity *v1.PodAffinity) (terms []v1.PodAffinityTerm) { +func GetPodAffinityTerms(podAffinity *v1.PodAffinity) (terms []v1.PodAffinityTerm) { if podAffinity != nil { if len(podAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 { terms = podAffinity.RequiredDuringSchedulingIgnoredDuringExecution @@ -1083,7 +1083,7 @@ func getPodAffinityTerms(podAffinity *v1.PodAffinity) (terms []v1.PodAffinityTer return terms } -func getPodAntiAffinityTerms(podAntiAffinity *v1.PodAntiAffinity) (terms []v1.PodAffinityTerm) { +func GetPodAntiAffinityTerms(podAntiAffinity *v1.PodAntiAffinity) (terms []v1.PodAffinityTerm) { if podAntiAffinity != nil { if len(podAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 { terms = podAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution @@ -1133,7 +1133,7 @@ func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*scheduler if affinity == nil { continue } - for _, term := range getPodAntiAffinityTerms(affinity.PodAntiAffinity) { + for _, term := range GetPodAntiAffinityTerms(affinity.PodAntiAffinity) { namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(existingPod, &term) selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) if err != nil { @@ -1160,7 +1160,7 @@ func getMatchingAntiAffinityTermsOfExistingPod(newPod *v1.Pod, existingPod *v1.P var result []matchingPodAntiAffinityTerm affinity := existingPod.Spec.Affinity if affinity != nil && affinity.PodAntiAffinity != nil { - for _, term := range getPodAntiAffinityTerms(affinity.PodAntiAffinity) { + for _, term := range GetPodAntiAffinityTerms(affinity.PodAntiAffinity) { namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(existingPod, &term) selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) if err != nil { @@ -1257,7 +1257,7 @@ func (c *PodAffinityChecker) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod, node } // Check all affinity terms. - for _, term := range getPodAffinityTerms(affinity.PodAffinity) { + for _, term := range GetPodAffinityTerms(affinity.PodAffinity) { termMatches, matchingPodExists, err := c.anyPodMatchesPodAffinityTerm(pod, filteredPods, node, &term) if err != nil { errMessage := fmt.Sprintf("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v, err: %v", podName(pod), node.Name, term, err) @@ -1290,7 +1290,7 @@ func (c *PodAffinityChecker) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod, node } // Check all anti-affinity terms. - for _, term := range getPodAntiAffinityTerms(affinity.PodAntiAffinity) { + for _, term := range GetPodAntiAffinityTerms(affinity.PodAntiAffinity) { termMatches, _, err := c.anyPodMatchesPodAffinityTerm(pod, filteredPods, node, &term) if err != nil || termMatches { glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAntiAffinityTerm %v, err: %v", diff --git a/plugin/pkg/scheduler/core/scheduling_queue.go b/plugin/pkg/scheduler/core/scheduling_queue.go index 969a4360071..da41941582b 100644 --- a/plugin/pkg/scheduler/core/scheduling_queue.go +++ b/plugin/pkg/scheduler/core/scheduling_queue.go @@ -15,47 +15,701 @@ limitations under the License. */ // This file contains structures that implement scheduling queue types. -// Scheduling queues hold pending pods waiting to be scheduled. +// Scheduling queues hold pods waiting to be scheduled. This file has two types +// of scheduling queue: 1) a FIFO, which is mostly the same as cache.FIFO, 2) a +// priority queue which has two sub queues. One sub-queue holds pods that are +// being considered for scheduling. This is called activeQ. Another queue holds +// pods that are already tried and are determined to be unschedulable. The latter +// is called unschedulableQ. +// FIFO is here for flag-gating purposes and allows us to use the traditional +// scheduling queue when Pod Priority flag is false. package core import ( + "container/heap" + "fmt" + "sync" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" + "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" + priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util" + "k8s.io/kubernetes/plugin/pkg/scheduler/util" + + "github.com/golang/glog" + "reflect" ) // SchedulingQueue is an interface for a queue to store pods waiting to be scheduled. // The interface follows a pattern similar to cache.FIFO and cache.Heap and // makes it easy to use those data structures as a SchedulingQueue. type SchedulingQueue interface { - Add(obj interface{}) error - AddIfNotPresent(obj interface{}) error - Pop() (interface{}, error) - Update(obj interface{}) error - Delete(obj interface{}) error - List() []interface{} - ListKeys() []string - Get(obj interface{}) (item interface{}, exists bool, err error) - GetByKey(key string) (item interface{}, exists bool, err error) + Add(pod *v1.Pod) error + AddIfNotPresent(pod *v1.Pod) error + AddUnschedulableIfNotPresent(pod *v1.Pod) error + Pop() (*v1.Pod, error) + Update(pod *v1.Pod) error + Delete(pod *v1.Pod) error + MoveAllToActiveQueue() + AssignedPodAdded(pod *v1.Pod) + AssignedPodUpdated(pod *v1.Pod) + WaitingPodsForNode(nodeName string) []*v1.Pod } -// FIFO is only used to add a Pop() method to cache.FIFO so that it can be -// used as a SchedulingQueue interface. +// NewSchedulingQueue initializes a new scheduling queue. If pod priority is +// enabled a priority queue is returned. If it is disabled, a FIFO is returned. +func NewSchedulingQueue() SchedulingQueue { + if util.PodPriorityEnabled() { + return NewPriorityQueue() + } + return NewFIFO() +} + +// FIFO is basically a simple wrapper around cache.FIFO to make it compatible +// with the SchedulingQueue interface. type FIFO struct { *cache.FIFO } +var _ = SchedulingQueue(&FIFO{}) // Making sure that FIFO implements SchedulingQueue. + +func (f *FIFO) Add(pod *v1.Pod) error { + return f.FIFO.Add(pod) +} + +func (f *FIFO) AddIfNotPresent(pod *v1.Pod) error { + return f.FIFO.AddIfNotPresent(pod) +} + +// AddUnschedulableIfNotPresent adds an unschedulable pod back to the queue. In +// FIFO it is added to the end of the queue. +func (f *FIFO) AddUnschedulableIfNotPresent(pod *v1.Pod) error { + return f.FIFO.AddIfNotPresent(pod) +} + +func (f *FIFO) Update(pod *v1.Pod) error { + return f.FIFO.Update(pod) +} + +func (f *FIFO) Delete(pod *v1.Pod) error { + return f.FIFO.Delete(pod) +} + // Pop removes the head of FIFO and returns it. // This is just a copy/paste of cache.Pop(queue Queue) from fifo.go that scheduler // has always been using. There is a comment in that file saying that this method // shouldn't be used in production code, but scheduler has always been using it. // This function does minimal error checking. -func (f *FIFO) Pop() (interface{}, error) { +func (f *FIFO) Pop() (*v1.Pod, error) { var result interface{} f.FIFO.Pop(func(obj interface{}) error { result = obj return nil }) - return result, nil + return result.(*v1.Pod), nil } -var _ = SchedulingQueue(&FIFO{}) // Making sure that FIFO implements SchedulingQueue. +// FIFO does not need to react to events, as all pods are always in the active +// scheduling queue anyway. +func (f *FIFO) AssignedPodAdded(pod *v1.Pod) {} +func (f *FIFO) AssignedPodUpdated(pod *v1.Pod) {} + +// MoveAllToActiveQueue does nothing in FIFO as all pods are always in the active queue. +func (f *FIFO) MoveAllToActiveQueue() {} + +// WaitingPodsForNode returns pods that are nominated to run on the given node, +// but FIFO does not support it. +func (f *FIFO) WaitingPodsForNode(nodeName string) []*v1.Pod { + return nil +} + +func NewFIFO() *FIFO { + return &FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)} +} + +// UnschedulablePods is an interface for a queue that is used to keep unschedulable +// pods. These pods are not actively reevaluated for scheduling. They are moved +// to the active scheduling queue on certain events, such as termination of a pod +// in the cluster, addition of nodes, etc. +type UnschedulablePods interface { + Add(pod *v1.Pod) + Delete(pod *v1.Pod) + Update(pod *v1.Pod) + GetPodsWaitingForNode(nodeName string) []*v1.Pod + Get(pod *v1.Pod) *v1.Pod + Clear() +} + +// PriorityQueue implements a scheduling queue. It is an alternative to FIFO. +// The head of PriorityQueue is the highest priority pending pod. This structure +// has two sub queues. One sub-queue holds pods that are being considered for +// scheduling. This is called activeQ and is a Heap. Another queue holds +// pods that are already tried and are determined to be unschedulable. The latter +// is called unschedulableQ. +// Heap is already thread safe, but we need to acquire another lock here to ensure +// atomicity of operations on the two data structures.. +type PriorityQueue struct { + lock sync.RWMutex + cond sync.Cond + + // activeQ is heap structure that scheduler actively looks at to find pods to + // schedule. Head of heap is the highest priority pod. + activeQ *Heap + // unschedulableQ holds pods that have been tried and determined unschedulable. + unschedulableQ *UnschedulablePodsMap + // receivedMoveRequest is set to true whenever we receive a request to move a + // pod from the unschedulableQ to the activeQ, and is set to false, when we pop + // a pod from the activeQ. It indicates if we received a move request when a + // pod was in flight (we were trying to schedule it). In such a case, we put + // the pod back into the activeQ if it is determined unschedulable. + receivedMoveRequest bool +} + +// Making sure that PriorityQueue implements SchedulingQueue. +var _ = SchedulingQueue(&PriorityQueue{}) + +func NewPriorityQueue() *PriorityQueue { + pq := &PriorityQueue{ + activeQ: newHeap(cache.MetaNamespaceKeyFunc, util.HigherPriorityPod), + unschedulableQ: newUnschedulablePodsMap(), + } + pq.cond.L = &pq.lock + return pq +} + +// Add adds a pod to the active queue. It should be called only when a new pod +// is added so there is no chance the pod is already in either queue. +func (p *PriorityQueue) Add(pod *v1.Pod) error { + p.lock.Lock() + defer p.lock.Unlock() + err := p.activeQ.Add(pod) + if err != nil { + glog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err) + } else { + if p.unschedulableQ.Get(pod) != nil { + glog.Errorf("Error: pod %v is already in the unschedulable queue.", pod.Name) + p.unschedulableQ.Delete(pod) + } + p.cond.Broadcast() + } + return err +} + +// AddIfNotPresent adds a pod to the active queue if it is not present in any of +// the two queues. If it is present in any, it doesn't do any thing. +func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error { + p.lock.Lock() + defer p.lock.Unlock() + if p.unschedulableQ.Get(pod) != nil { + return nil + } + if _, exists, _ := p.activeQ.Get(pod); exists { + return nil + } + err := p.activeQ.Add(pod) + if err != nil { + glog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err) + } else { + p.cond.Broadcast() + } + return err +} + +// AddUnschedulableIfNotPresent does nothing if the pod is present in either +// queue. Otherwise it adds the pod to the unschedulable queue if +// p.receivedMoveRequest is false, and to the activeQ if p.receivedMoveRequest is true. +func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error { + p.lock.Lock() + defer p.lock.Unlock() + if p.unschedulableQ.Get(pod) != nil { + return fmt.Errorf("pod is already present in unschedulableQ") + } + if _, exists, _ := p.activeQ.Get(pod); exists { + return fmt.Errorf("pod is already present in the activeQ") + } + if p.receivedMoveRequest { + return p.activeQ.Add(pod) + } + p.unschedulableQ.Add(pod) + return nil +} + +// Pop removes the head of the active queue and returns it. It blocks if the +// activeQ is empty and waits until a new item is added to the queue. It also +// clears receivedMoveRequest to mark the beginning of a new scheduling cycle. +func (p *PriorityQueue) Pop() (*v1.Pod, error) { + p.lock.Lock() + defer p.lock.Unlock() + for len(p.activeQ.data.queue) == 0 { + p.cond.Wait() + } + obj, err := p.activeQ.Pop() + if err != nil { + return nil, err + } + p.receivedMoveRequest = false + return obj.(*v1.Pod), err +} + +// isPodUpdated checks if the pod is updated in a way that it may have become +// schedulable. It drops status of the pod and compares it with old version. +func isPodUpdated(oldPod, newPod *v1.Pod) bool { + strip := func(pod *v1.Pod) *v1.Pod { + p := pod.DeepCopy() + p.ResourceVersion = "" + p.Status = v1.PodStatus{} + return p + } + return !reflect.DeepEqual(strip(oldPod), strip(newPod)) +} + +// Update updates a pod in the active queue if present. Otherwise, it removes +// the item from the unschedulable queue and adds the updated one to the active +// queue. +func (p *PriorityQueue) Update(pod *v1.Pod) error { + p.lock.Lock() + defer p.lock.Unlock() + // If the pod is already in the active queue, just update it there. + if _, exists, _ := p.activeQ.Get(pod); exists { + err := p.activeQ.Update(pod) + if err == nil { + p.cond.Broadcast() + } + return err + } + // If the pod is in the unschedulable queue, updating it may make it schedulable. + if oldPod := p.unschedulableQ.Get(pod); oldPod != nil { + if isPodUpdated(oldPod, pod) { + p.unschedulableQ.Delete(pod) + err := p.activeQ.Add(pod) + if err == nil { + p.cond.Broadcast() + } + return err + } else { + p.unschedulableQ.Update(pod) + return nil + } + } + // If pod is not in any of the two queue, we put it in the active queue. + err := p.activeQ.Add(pod) + if err == nil { + p.cond.Broadcast() + } + return err +} + +// Delete deletes the item from either of the two queues. It assumes the pod is +// only in one queue. +func (p *PriorityQueue) Delete(pod *v1.Pod) error { + p.lock.Lock() + defer p.lock.Unlock() + if _, exists, _ := p.activeQ.Get(pod); exists { + return p.activeQ.Delete(pod) + } + p.unschedulableQ.Delete(pod) + return nil +} + +// AssignedPodAdded is called when a bound pod is added. Creation of this pod +// may make pending pods with matching affinity terms schedulable. +func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod) { + p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod)) +} + +// AssignedPodUpdated is called when a bound pod is updated. Change of labels +// may make pending pods with matching affinity terms schedulable. +func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) { + p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod)) +} + +// MoveAllToActiveQueue moves all pods from unschedulableQ to activeQ. This +// function adds all pods and then signals the condition variable to ensure that +// if Pop() is waiting for an item, it receives it after all the pods are in the +// queue and the head is the highest priority pod. +// TODO(bsalamat): We should add a back-off mechanism here so that a high priority +// pod which is unschedulable does not go to the head of the queue frequently. For +// example in a cluster where a lot of pods being deleted, such a high priority +// pod can deprive other pods from getting scheduled. +func (p *PriorityQueue) MoveAllToActiveQueue() { + p.lock.Lock() + defer p.lock.Unlock() + var unschedulablePods []interface{} + for _, pod := range p.unschedulableQ.pods { + unschedulablePods = append(unschedulablePods, pod) + } + p.activeQ.BulkAdd(unschedulablePods) + p.unschedulableQ.Clear() + p.receivedMoveRequest = true + p.cond.Broadcast() +} + +func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) { + p.lock.Lock() + defer p.lock.Unlock() + for _, pod := range pods { + p.activeQ.Add(pod) + p.unschedulableQ.Delete(pod) + } + p.receivedMoveRequest = true + p.cond.Broadcast() +} + +// getUnschedulablePodsWithMatchingAffinityTerm returns unschedulable pods which have +// any affinity term that matches "pod". +func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*v1.Pod { + p.lock.RLock() + defer p.lock.RUnlock() + podsToMove := []*v1.Pod{} + for _, up := range p.unschedulableQ.pods { + affinity := up.Spec.Affinity + if affinity != nil && affinity.PodAffinity != nil { + terms := predicates.GetPodAffinityTerms(affinity.PodAffinity) + for _, term := range terms { + namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(up, &term) + selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) + if err != nil { + glog.Errorf("Error getting label selectors for pod: %v.", up.Name) + } + if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) { + podsToMove = append(podsToMove, up) + } + } + } + } + return podsToMove +} + +// WaitingPodsForNode returns pods that are nominated to run on the given node, +// but they are waiting for other pods to be removed from the node before they +// can be actually scheduled. +func (p *PriorityQueue) WaitingPodsForNode(nodeName string) []*v1.Pod { + return p.unschedulableQ.GetPodsWaitingForNode(nodeName) +} + +// UnschedulablePodsMap holds pods that cannot be scheduled. This data structure +// is used to implement unschedulableQ. +type UnschedulablePodsMap struct { + // pods is a map key by a pod's full-name and the value is a pointer to the pod. + pods map[string]*v1.Pod + // nominatedPods is a map keyed by a node name and the value is a list of + // pods' full-names which are nominated to run on the node. + nominatedPods map[string][]string + keyFunc func(*v1.Pod) string +} + +var _ = UnschedulablePods(&UnschedulablePodsMap{}) + +func NominatedNodeName(pod *v1.Pod) string { + nominatedNodeName, ok := pod.Annotations[NominatedNodeAnnotationKey] + if !ok { + return "" + } + return nominatedNodeName +} + +// Add adds a pod to the unschedulable pods. +func (u *UnschedulablePodsMap) Add(pod *v1.Pod) { + podKey := u.keyFunc(pod) + if _, exists := u.pods[podKey]; !exists { + u.pods[podKey] = pod + nominatedNodeName := NominatedNodeName(pod) + if len(nominatedNodeName) > 0 { + u.nominatedPods[nominatedNodeName] = append(u.nominatedPods[nominatedNodeName], podKey) + } + } +} + +func (u *UnschedulablePodsMap) deleteFromNominated(pod *v1.Pod) { + nominatedNodeName := NominatedNodeName(pod) + if len(nominatedNodeName) > 0 { + podKey := u.keyFunc(pod) + nps := u.nominatedPods[nominatedNodeName] + for i, np := range nps { + if np == podKey { + u.nominatedPods[nominatedNodeName] = append(nps[:i], nps[i+1:]...) + if len(u.nominatedPods[nominatedNodeName]) == 0 { + delete(u.nominatedPods, nominatedNodeName) + } + break + } + } + } +} + +// Delete deletes a pod from the unschedulable pods. +func (u *UnschedulablePodsMap) Delete(pod *v1.Pod) { + podKey := u.keyFunc(pod) + if p, exists := u.pods[podKey]; exists { + u.deleteFromNominated(p) + delete(u.pods, podKey) + } +} + +// Update updates a pod in the unschedulable pods. +func (u *UnschedulablePodsMap) Update(pod *v1.Pod) { + podKey := u.keyFunc(pod) + oldPod, exists := u.pods[podKey] + if !exists { + u.Add(pod) + return + } + u.pods[podKey] = pod + oldNominateNodeName := NominatedNodeName(oldPod) + nominatedNodeName := NominatedNodeName(pod) + if oldNominateNodeName != nominatedNodeName { + u.deleteFromNominated(oldPod) + if len(nominatedNodeName) > 0 { + u.nominatedPods[nominatedNodeName] = append(u.nominatedPods[nominatedNodeName], podKey) + } + } +} + +// Get returns the pod if a pod with the same key as the key of the given "pod" +// is found in the map. It returns nil otherwise. +func (u *UnschedulablePodsMap) Get(pod *v1.Pod) *v1.Pod { + podKey := u.keyFunc(pod) + if p, exists := u.pods[podKey]; exists { + return p + } + return nil +} + +// GetPodsWaitingForNode returns a list of unschedulable pods whose NominatedNodeNames +// are equal to the given nodeName. +func (u *UnschedulablePodsMap) GetPodsWaitingForNode(nodeName string) []*v1.Pod { + var pods []*v1.Pod + for _, key := range u.nominatedPods[nodeName] { + pods = append(pods, u.pods[key]) + } + return pods +} + +// Clear removes all the entries from the unschedulable maps. +func (u *UnschedulablePodsMap) Clear() { + u.pods = make(map[string]*v1.Pod) + u.nominatedPods = make(map[string][]string) +} + +// newUnschedulablePodsMap initializes a new object of UnschedulablePodsMap. +func newUnschedulablePodsMap() *UnschedulablePodsMap { + return &UnschedulablePodsMap{ + pods: make(map[string]*v1.Pod), + nominatedPods: make(map[string][]string), + keyFunc: util.GetPodFullName, + } +} + +// Below is the implementation of the a heap. The logic is pretty much the same +// as cache.heap, however, this heap does not perform synchronization. It leaves +// synchronization to the SchedulingQueue. + +type LessFunc func(interface{}, interface{}) bool +type KeyFunc func(obj interface{}) (string, error) + +type heapItem struct { + obj interface{} // The object which is stored in the heap. + index int // The index of the object's key in the Heap.queue. +} + +type itemKeyValue struct { + key string + obj interface{} +} + +// heapData is an internal struct that implements the standard heap interface +// and keeps the data stored in the heap. +type heapData struct { + // items is a map from key of the objects to the objects and their index. + // We depend on the property that items in the map are in the queue and vice versa. + items map[string]*heapItem + // queue implements a heap data structure and keeps the order of elements + // according to the heap invariant. The queue keeps the keys of objects stored + // in "items". + queue []string + + // keyFunc is used to make the key used for queued item insertion and retrieval, and + // should be deterministic. + keyFunc KeyFunc + // lessFunc is used to compare two objects in the heap. + lessFunc LessFunc +} + +var ( + _ = heap.Interface(&heapData{}) // heapData is a standard heap +) + +// Less compares two objects and returns true if the first one should go +// in front of the second one in the heap. +func (h *heapData) Less(i, j int) bool { + if i > len(h.queue) || j > len(h.queue) { + return false + } + itemi, ok := h.items[h.queue[i]] + if !ok { + return false + } + itemj, ok := h.items[h.queue[j]] + if !ok { + return false + } + return h.lessFunc(itemi.obj, itemj.obj) +} + +// Len returns the number of items in the Heap. +func (h *heapData) Len() int { return len(h.queue) } + +// Swap implements swapping of two elements in the heap. This is a part of standard +// heap interface and should never be called directly. +func (h *heapData) Swap(i, j int) { + h.queue[i], h.queue[j] = h.queue[j], h.queue[i] + item := h.items[h.queue[i]] + item.index = i + item = h.items[h.queue[j]] + item.index = j +} + +// Push is supposed to be called by heap.Push only. +func (h *heapData) Push(kv interface{}) { + keyValue := kv.(*itemKeyValue) + n := len(h.queue) + h.items[keyValue.key] = &heapItem{keyValue.obj, n} + h.queue = append(h.queue, keyValue.key) +} + +// Pop is supposed to be called by heap.Pop only. +func (h *heapData) Pop() interface{} { + key := h.queue[len(h.queue)-1] + h.queue = h.queue[0 : len(h.queue)-1] + item, ok := h.items[key] + if !ok { + // This is an error + return nil + } + delete(h.items, key) + return item.obj +} + +// Heap is a thread-safe producer/consumer queue that implements a heap data structure. +// It can be used to implement priority queues and similar data structures. +type Heap struct { + // data stores objects and has a queue that keeps their ordering according + // to the heap invariant. + data *heapData +} + +// Add inserts an item, and puts it in the queue. The item is updated if it +// already exists. +func (h *Heap) Add(obj interface{}) error { + key, err := h.data.keyFunc(obj) + if err != nil { + return cache.KeyError{Obj: obj, Err: err} + } + if _, exists := h.data.items[key]; exists { + h.data.items[key].obj = obj + heap.Fix(h.data, h.data.items[key].index) + } else { + heap.Push(h.data, &itemKeyValue{key, obj}) + } + return nil +} + +// BulkAdd adds all the items in the list to the queue. +func (h *Heap) BulkAdd(list []interface{}) error { + for _, obj := range list { + key, err := h.data.keyFunc(obj) + if err != nil { + return cache.KeyError{Obj: obj, Err: err} + } + if _, exists := h.data.items[key]; exists { + h.data.items[key].obj = obj + heap.Fix(h.data, h.data.items[key].index) + } else { + heap.Push(h.data, &itemKeyValue{key, obj}) + } + } + return nil +} + +// AddIfNotPresent inserts an item, and puts it in the queue. If an item with +// the key is present in the map, no changes is made to the item. +func (h *Heap) AddIfNotPresent(obj interface{}) error { + key, err := h.data.keyFunc(obj) + if err != nil { + return cache.KeyError{Obj: obj, Err: err} + } + if _, exists := h.data.items[key]; !exists { + heap.Push(h.data, &itemKeyValue{key, obj}) + } + return nil +} + +// Update is the same as Add in this implementation. When the item does not +// exist, it is added. +func (h *Heap) Update(obj interface{}) error { + return h.Add(obj) +} + +// Delete removes an item. +func (h *Heap) Delete(obj interface{}) error { + key, err := h.data.keyFunc(obj) + if err != nil { + return cache.KeyError{Obj: obj, Err: err} + } + if item, ok := h.data.items[key]; ok { + heap.Remove(h.data, item.index) + return nil + } + return fmt.Errorf("object not found") +} + +// Pop returns the head of the heap. +func (h *Heap) Pop() (interface{}, error) { + obj := heap.Pop(h.data) + if obj != nil { + return obj, nil + } else { + return nil, fmt.Errorf("object was removed from heap data") + } +} + +// Get returns the requested item, or sets exists=false. +func (h *Heap) Get(obj interface{}) (interface{}, bool, error) { + key, err := h.data.keyFunc(obj) + if err != nil { + return nil, false, cache.KeyError{Obj: obj, Err: err} + } + return h.GetByKey(key) +} + +// GetByKey returns the requested item, or sets exists=false. +func (h *Heap) GetByKey(key string) (interface{}, bool, error) { + item, exists := h.data.items[key] + if !exists { + return nil, false, nil + } + return item.obj, true, nil +} + +// List returns a list of all the items. +func (h *Heap) List() []interface{} { + list := make([]interface{}, 0, len(h.data.items)) + for _, item := range h.data.items { + list = append(list, item.obj) + } + return list +} + +// newHeap returns a Heap which can be used to queue up items to process. +func newHeap(keyFn KeyFunc, lessFn LessFunc) *Heap { + return &Heap{ + data: &heapData{ + items: map[string]*heapItem{}, + queue: []string{}, + keyFunc: keyFn, + lessFunc: lessFn, + }, + } +} diff --git a/plugin/pkg/scheduler/core/scheduling_queue_test.go b/plugin/pkg/scheduler/core/scheduling_queue_test.go new file mode 100644 index 00000000000..343b7593bf5 --- /dev/null +++ b/plugin/pkg/scheduler/core/scheduling_queue_test.go @@ -0,0 +1,389 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package core + +import ( + "reflect" + "testing" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/plugin/pkg/scheduler/util" +) + +var mediumPriority = (lowPriority + highPriority) / 2 +var highPriorityPod, medPriorityPod, unschedulablePod = v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hpp", + Namespace: "ns1", + }, + Spec: v1.PodSpec{ + Priority: &highPriority, + }, +}, + v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "mpp", + Namespace: "ns2", + Annotations: map[string]string{ + NominatedNodeAnnotationKey: "node1", "annot2": "val2", + }, + }, + Spec: v1.PodSpec{ + Priority: &mediumPriority, + }, + }, + v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "up", + Namespace: "ns1", + Annotations: map[string]string{ + NominatedNodeAnnotationKey: "node1", "annot2": "val2", + }, + }, + Spec: v1.PodSpec{ + Priority: &lowPriority, + }, + Status: v1.PodStatus{ + Conditions: []v1.PodCondition{ + { + Type: v1.PodScheduled, + Status: v1.ConditionFalse, + Reason: v1.PodReasonUnschedulable, + }, + }, + }, + } + +func TestPriorityQueue_Add(t *testing.T) { + q := NewPriorityQueue() + q.Add(&medPriorityPod) + q.Add(&unschedulablePod) + q.Add(&highPriorityPod) + if p, err := q.Pop(); err != nil || p != &highPriorityPod { + t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name) + } + if p, err := q.Pop(); err != nil || p != &medPriorityPod { + t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name) + } + if p, err := q.Pop(); err != nil || p != &unschedulablePod { + t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Name) + } +} + +func TestPriorityQueue_Pop(t *testing.T) { + q := NewPriorityQueue() + go func() { + if p, err := q.Pop(); err != nil || p != &highPriorityPod { + t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name) + } + }() + q.Add(&highPriorityPod) +} + +func TestPriorityQueue_Update(t *testing.T) { + q := NewPriorityQueue() + q.Update(&highPriorityPod) + if _, exists, _ := q.activeQ.Get(&highPriorityPod); !exists { + t.Errorf("Expected %v to be added to activeQ.", highPriorityPod.Name) + } + q.Update(&highPriorityPod) + if q.activeQ.data.Len() != 1 { + t.Error("Expected only one item in activeQ.") + } + // Updating an unschedulable pod which is not in any of the two queues, should + // add the pod to activeQ. + q.Update(&unschedulablePod) + if _, exists, _ := q.activeQ.Get(&unschedulablePod); !exists { + t.Errorf("Expected %v to be added to activeQ.", unschedulablePod.Name) + } + // Updating a pod that is already in unschedulableQ, should move the pod to + // activeQ. + q.Update(&unschedulablePod) + if len(q.unschedulableQ.pods) != 0 { + t.Error("Expected unschedulableQ to be empty.") + } + if _, exists, _ := q.activeQ.Get(&unschedulablePod); !exists { + t.Errorf("Expected: %v to be added to activeQ.", unschedulablePod.Name) + } + if p, err := q.Pop(); err != nil || p != &highPriorityPod { + t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name) + } +} + +func TestPriorityQueue_Delete(t *testing.T) { + q := NewPriorityQueue() + q.Update(&highPriorityPod) + q.Add(&unschedulablePod) + q.Delete(&highPriorityPod) + if _, exists, _ := q.activeQ.Get(&unschedulablePod); !exists { + t.Errorf("Expected %v to be in activeQ.", unschedulablePod.Name) + } + if _, exists, _ := q.activeQ.Get(&highPriorityPod); exists { + t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPod.Name) + } +} + +func TestPriorityQueue_MoveAllToActiveQueue(t *testing.T) { + q := NewPriorityQueue() + q.Add(&medPriorityPod) + q.unschedulableQ.Add(&unschedulablePod) + q.unschedulableQ.Add(&highPriorityPod) + q.MoveAllToActiveQueue() + if q.activeQ.data.Len() != 3 { + t.Error("Expected all items to be in activeQ.") + } +} + +// TestPriorityQueue_AssignedPodAdded tests AssignedPodAdded. It checks that +// when a pod with pod affinity is in unschedulableQ and another pod with a +// matching label is added, the unschedulable pod is moved to activeQ. +func TestPriorityQueue_AssignedPodAdded(t *testing.T) { + affinityPod := unschedulablePod.DeepCopy() + affinityPod.Name = "afp" + affinityPod.Spec = v1.PodSpec{ + Affinity: &v1.Affinity{ + PodAffinity: &v1.PodAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "service", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"securityscan", "value2"}, + }, + }, + }, + TopologyKey: "region", + }, + }, + }, + }, + Priority: &mediumPriority, + } + labelPod := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "lbp", + Namespace: affinityPod.Namespace, + Labels: map[string]string{"service": "securityscan"}, + }, + Spec: v1.PodSpec{NodeName: "machine1"}, + } + + q := NewPriorityQueue() + q.Add(&medPriorityPod) + // Add a couple of pods to the unschedulableQ. + q.unschedulableQ.Add(&unschedulablePod) + q.unschedulableQ.Add(affinityPod) + // Simulate addition of an assigned pod. The pod has matching labels for + // affinityPod. So, affinityPod should go to activeQ. + q.AssignedPodAdded(&labelPod) + if q.unschedulableQ.Get(affinityPod) != nil { + t.Error("affinityPod is still in the unschedulableQ.") + } + if _, exists, _ := q.activeQ.Get(affinityPod); !exists { + t.Error("affinityPod is not moved to activeQ.") + } + // Check that the other pod is still in the unschedulableQ. + if q.unschedulableQ.Get(&unschedulablePod) == nil { + t.Error("unschedulablePod is not in the unschedulableQ.") + } +} + +func TestUnschedulablePodsMap(t *testing.T) { + var pods = []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "p0", + Namespace: "ns1", + Annotations: map[string]string{ + NominatedNodeAnnotationKey: "node1", "annot2": "val2", + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "p1", + Namespace: "ns1", + Annotations: map[string]string{ + "annot": "val", + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "p2", + Namespace: "ns2", + Annotations: map[string]string{ + NominatedNodeAnnotationKey: "node3", "annot2": "val2", "annot3": "val3", + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "p3", + Namespace: "ns4", + Annotations: map[string]string{ + NominatedNodeAnnotationKey: "node1", + }, + }, + }, + } + var updatedPods = make([]*v1.Pod, len(pods)) + updatedPods[0] = pods[0].DeepCopy() + updatedPods[0].Annotations[NominatedNodeAnnotationKey] = "node3" + updatedPods[1] = pods[1].DeepCopy() + updatedPods[1].Annotations[NominatedNodeAnnotationKey] = "node3" + updatedPods[3] = pods[3].DeepCopy() + delete(updatedPods[3].Annotations, NominatedNodeAnnotationKey) + + tests := []struct { + podsToAdd []*v1.Pod + expectedMapAfterAdd map[string]*v1.Pod + expectedNominatedAfterAdd map[string][]string + podsToUpdate []*v1.Pod + expectedMapAfterUpdate map[string]*v1.Pod + expectedNominatedAfterUpdate map[string][]string + podsToDelete []*v1.Pod + expectedMapAfterDelete map[string]*v1.Pod + expectedNominatedAfterDelete map[string][]string + }{ + { + podsToAdd: []*v1.Pod{pods[0], pods[1], pods[2], pods[3]}, + expectedMapAfterAdd: map[string]*v1.Pod{ + util.GetPodFullName(pods[0]): pods[0], + util.GetPodFullName(pods[1]): pods[1], + util.GetPodFullName(pods[2]): pods[2], + util.GetPodFullName(pods[3]): pods[3], + }, + expectedNominatedAfterAdd: map[string][]string{ + "node1": {util.GetPodFullName(pods[0]), util.GetPodFullName(pods[3])}, + "node3": {util.GetPodFullName(pods[2])}, + }, + podsToUpdate: []*v1.Pod{updatedPods[0]}, + expectedMapAfterUpdate: map[string]*v1.Pod{ + util.GetPodFullName(pods[0]): updatedPods[0], + util.GetPodFullName(pods[1]): pods[1], + util.GetPodFullName(pods[2]): pods[2], + util.GetPodFullName(pods[3]): pods[3], + }, + expectedNominatedAfterUpdate: map[string][]string{ + "node1": {util.GetPodFullName(pods[3])}, + "node3": {util.GetPodFullName(pods[2]), util.GetPodFullName(pods[0])}, + }, + podsToDelete: []*v1.Pod{pods[0], pods[1]}, + expectedMapAfterDelete: map[string]*v1.Pod{ + util.GetPodFullName(pods[2]): pods[2], + util.GetPodFullName(pods[3]): pods[3], + }, + expectedNominatedAfterDelete: map[string][]string{ + "node1": {util.GetPodFullName(pods[3])}, + "node3": {util.GetPodFullName(pods[2])}, + }, + }, + { + podsToAdd: []*v1.Pod{pods[0], pods[3]}, + expectedMapAfterAdd: map[string]*v1.Pod{ + util.GetPodFullName(pods[0]): pods[0], + util.GetPodFullName(pods[3]): pods[3], + }, + expectedNominatedAfterAdd: map[string][]string{ + "node1": {util.GetPodFullName(pods[0]), util.GetPodFullName(pods[3])}, + }, + podsToUpdate: []*v1.Pod{updatedPods[3]}, + expectedMapAfterUpdate: map[string]*v1.Pod{ + util.GetPodFullName(pods[0]): pods[0], + util.GetPodFullName(pods[3]): updatedPods[3], + }, + expectedNominatedAfterUpdate: map[string][]string{ + "node1": {util.GetPodFullName(pods[0])}, + }, + podsToDelete: []*v1.Pod{pods[0], pods[3]}, + expectedMapAfterDelete: map[string]*v1.Pod{}, + expectedNominatedAfterDelete: map[string][]string{}, + }, + { + podsToAdd: []*v1.Pod{pods[1], pods[2]}, + expectedMapAfterAdd: map[string]*v1.Pod{ + util.GetPodFullName(pods[1]): pods[1], + util.GetPodFullName(pods[2]): pods[2], + }, + expectedNominatedAfterAdd: map[string][]string{ + "node3": {util.GetPodFullName(pods[2])}, + }, + podsToUpdate: []*v1.Pod{updatedPods[1]}, + expectedMapAfterUpdate: map[string]*v1.Pod{ + util.GetPodFullName(pods[1]): updatedPods[1], + util.GetPodFullName(pods[2]): pods[2], + }, + expectedNominatedAfterUpdate: map[string][]string{ + "node3": {util.GetPodFullName(pods[2]), util.GetPodFullName(updatedPods[1])}, + }, + podsToDelete: []*v1.Pod{pods[2], pods[3]}, + expectedMapAfterDelete: map[string]*v1.Pod{ + util.GetPodFullName(pods[1]): updatedPods[1], + }, + expectedNominatedAfterDelete: map[string][]string{ + "node3": {util.GetPodFullName(updatedPods[1])}, + }, + }, + } + + for i, test := range tests { + upm := newUnschedulablePodsMap() + for _, p := range test.podsToAdd { + upm.Add(p) + } + if !reflect.DeepEqual(upm.pods, test.expectedMapAfterAdd) { + t.Errorf("#%d: Unexpected map after adding pods. Expected: %v, got: %v", + i, test.expectedMapAfterAdd, upm.pods) + } + if !reflect.DeepEqual(upm.nominatedPods, test.expectedNominatedAfterAdd) { + t.Errorf("#%d: Unexpected nominated map after adding pods. Expected: %v, got: %v", + i, test.expectedNominatedAfterAdd, upm.nominatedPods) + } + if len(test.podsToUpdate) > 0 { + for _, p := range test.podsToUpdate { + upm.Update(p) + } + if !reflect.DeepEqual(upm.pods, test.expectedMapAfterUpdate) { + t.Errorf("#%d: Unexpected map after updating pods. Expected: %v, got: %v", + i, test.expectedMapAfterUpdate, upm.pods) + } + if !reflect.DeepEqual(upm.nominatedPods, test.expectedNominatedAfterUpdate) { + t.Errorf("#%d: Unexpected nominated map after updating pods. Expected: %v, got: %v", + i, test.expectedNominatedAfterUpdate, upm.nominatedPods) + } + } + for _, p := range test.podsToDelete { + upm.Delete(p) + } + if !reflect.DeepEqual(upm.pods, test.expectedMapAfterDelete) { + t.Errorf("#%d: Unexpected map after deleting pods. Expected: %v, got: %v", + i, test.expectedMapAfterDelete, upm.pods) + } + if !reflect.DeepEqual(upm.nominatedPods, test.expectedNominatedAfterDelete) { + t.Errorf("#%d: Unexpected nominated map after deleting pods. Expected: %v, got: %v", + i, test.expectedNominatedAfterDelete, upm.nominatedPods) + } + upm.Clear() + if len(upm.pods) != 0 { + t.Errorf("Expected the map to be empty, but has %v elements.", len(upm.pods)) + } + } +} diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 3bd07c6480d..a04afe19cb6 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -142,11 +142,10 @@ func NewConfigFactory( stopEverything := make(chan struct{}) schedulerCache := schedulercache.New(30*time.Second, stopEverything) - schedulingQueue := &core.FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)} c := &configFactory{ client: client, podLister: schedulerCache, - podQueue: schedulingQueue, + podQueue: core.NewSchedulingQueue(), pVLister: pvInformer.Lister(), pVCLister: pvcInformer.Lister(), serviceLister: serviceInformer.Lister(), @@ -195,20 +194,21 @@ func NewConfigFactory( }, Handler: cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - if err := c.podQueue.Add(obj); err != nil { + if err := c.podQueue.Add(obj.(*v1.Pod)); err != nil { runtime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err)) } }, UpdateFunc: func(oldObj, newObj interface{}) { - if c.skipPodUpdate(newObj.(*v1.Pod)) { + pod := newObj.(*v1.Pod) + if c.skipPodUpdate(pod) { return } - if err := c.podQueue.Update(newObj); err != nil { + if err := c.podQueue.Update(pod); err != nil { runtime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err)) } }, DeleteFunc: func(obj interface{}) { - if err := c.podQueue.Delete(obj); err != nil { + if err := c.podQueue.Delete(obj.(*v1.Pod)); err != nil { runtime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err)) } }, @@ -377,6 +377,7 @@ func (c *configFactory) onPvcAdd(obj interface{}) { } c.invalidatePredicatesForPvc(pvc) } + c.podQueue.MoveAllToActiveQueue() } func (c *configFactory) onPvcDelete(obj interface{}) { @@ -398,6 +399,7 @@ func (c *configFactory) onPvcDelete(obj interface{}) { } c.invalidatePredicatesForPvc(pvc) } + c.podQueue.MoveAllToActiveQueue() } func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim) { @@ -410,6 +412,7 @@ func (c *configFactory) onServiceAdd(obj interface{}) { if c.enableEquivalenceClassCache { c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet) } + c.podQueue.MoveAllToActiveQueue() } func (c *configFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) { @@ -421,12 +424,14 @@ func (c *configFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet) } } + c.podQueue.MoveAllToActiveQueue() } func (c *configFactory) onServiceDelete(obj interface{}) { if c.enableEquivalenceClassCache { c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet) } + c.podQueue.MoveAllToActiveQueue() } // GetNodeStore provides the cache to the nodes, mostly internal use, but may also be called by mock-tests. @@ -462,6 +467,8 @@ func (c *configFactory) addPodToCache(obj interface{}) { if err := c.schedulerCache.AddPod(pod); err != nil { glog.Errorf("scheduler cache AddPod failed: %v", err) } + + c.podQueue.AssignedPodAdded(pod) // NOTE: Updating equivalence cache of addPodToCache has been // handled optimistically in InvalidateCachedPredicateItemForPodAdd. } @@ -483,6 +490,7 @@ func (c *configFactory) updatePodInCache(oldObj, newObj interface{}) { } c.invalidateCachedPredicatesOnUpdatePod(newPod, oldPod) + c.podQueue.AssignedPodUpdated(newPod) } func (c *configFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, oldPod *v1.Pod) { @@ -527,6 +535,7 @@ func (c *configFactory) deletePodFromCache(obj interface{}) { } c.invalidateCachedPredicatesOnDeletePod(pod) + c.podQueue.MoveAllToActiveQueue() } func (c *configFactory) invalidateCachedPredicatesOnDeletePod(pod *v1.Pod) { @@ -561,6 +570,7 @@ func (c *configFactory) addNodeToCache(obj interface{}) { glog.Errorf("scheduler cache AddNode failed: %v", err) } + c.podQueue.MoveAllToActiveQueue() // NOTE: add a new node does not affect existing predicates in equivalence cache } @@ -581,6 +591,7 @@ func (c *configFactory) updateNodeInCache(oldObj, newObj interface{}) { } c.invalidateCachedPredicatesOnNodeUpdate(newNode, oldNode) + c.podQueue.MoveAllToActiveQueue() } func (c *configFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node, oldNode *v1.Node) { @@ -902,8 +913,7 @@ func (f *configFactory) getPluginArgs() (*PluginFactoryArgs, error) { } func (f *configFactory) getNextPod() *v1.Pod { - if obj, err := f.podQueue.Pop(); err == nil { - pod := obj.(*v1.Pod) + if pod, err := f.podQueue.Pop(); err == nil { glog.V(4).Infof("About to try and schedule pod %v", pod.Name) return pod } else { diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index 73b8bb120e1..66faabfaa53 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -23,11 +23,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" - utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/record" - "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" "k8s.io/kubernetes/plugin/pkg/scheduler/core" @@ -196,7 +194,7 @@ func (sched *Scheduler) schedule(pod *v1.Pod) (string, error) { // If it succeeds, it adds the name of the node where preemption has happened to the pod annotations. // It returns the node name and an error if any. func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) { - if !utilfeature.DefaultFeatureGate.Enabled(features.PodPriority) { + if !util.PodPriorityEnabled() { glog.V(3).Infof("Pod priority feature is not enabled. No preemption is performed.") return "", nil } diff --git a/plugin/pkg/scheduler/util/utils.go b/plugin/pkg/scheduler/util/utils.go index ce1d1ad992a..66a004e0013 100644 --- a/plugin/pkg/scheduler/util/utils.go +++ b/plugin/pkg/scheduler/util/utils.go @@ -21,7 +21,9 @@ import ( "sort" "k8s.io/api/core/v1" + "k8s.io/apiserver/pkg/util/feature" "k8s.io/kubernetes/pkg/apis/scheduling" + "k8s.io/kubernetes/pkg/features" ) const DefaultBindAllHostIP = "0.0.0.0" @@ -59,6 +61,11 @@ func GetUsedPorts(pods ...*v1.Pod) map[string]bool { return ports } +// PodPriorityEnabled indicates whether pod priority feature is enabled. +func PodPriorityEnabled() bool { + return feature.DefaultFeatureGate.Enabled(features.PodPriority) +} + // GetPodFullName returns a name that uniquely identifies a pod. func GetPodFullName(pod *v1.Pod) string { // Use underscore as the delimiter because it is not allowed in pod name From 79ac37d8b30a77929f9cc8b5aa346bbeb1ca7ca7 Mon Sep 17 00:00:00 2001 From: "Bobby (Babak) Salamat" Date: Sun, 19 Nov 2017 11:32:14 -0800 Subject: [PATCH 2/2] Autogenerated files --- plugin/pkg/scheduler/BUILD | 2 -- plugin/pkg/scheduler/core/BUILD | 4 ++++ plugin/pkg/scheduler/util/BUILD | 2 ++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/plugin/pkg/scheduler/BUILD b/plugin/pkg/scheduler/BUILD index 0166a4c6f20..41136590537 100644 --- a/plugin/pkg/scheduler/BUILD +++ b/plugin/pkg/scheduler/BUILD @@ -38,7 +38,6 @@ go_library( ], importpath = "k8s.io/kubernetes/plugin/pkg/scheduler", deps = [ - "//pkg/features:go_default_library", "//plugin/pkg/scheduler/algorithm:go_default_library", "//plugin/pkg/scheduler/api:go_default_library", "//plugin/pkg/scheduler/core:go_default_library", @@ -50,7 +49,6 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", - "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/listers/core/v1:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", diff --git a/plugin/pkg/scheduler/core/BUILD b/plugin/pkg/scheduler/core/BUILD index 305ce865879..39268b96a5a 100644 --- a/plugin/pkg/scheduler/core/BUILD +++ b/plugin/pkg/scheduler/core/BUILD @@ -12,6 +12,7 @@ go_test( "equivalence_cache_test.go", "extender_test.go", "generic_scheduler_test.go", + "scheduling_queue_test.go", ], importpath = "k8s.io/kubernetes/plugin/pkg/scheduler/core", library = ":go_default_library", @@ -23,6 +24,7 @@ go_test( "//plugin/pkg/scheduler/api:go_default_library", "//plugin/pkg/scheduler/schedulercache:go_default_library", "//plugin/pkg/scheduler/testing:go_default_library", + "//plugin/pkg/scheduler/util:go_default_library", "//vendor/k8s.io/api/apps/v1beta1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/extensions/v1beta1:go_default_library", @@ -46,12 +48,14 @@ go_library( "//pkg/util/hash:go_default_library", "//plugin/pkg/scheduler/algorithm:go_default_library", "//plugin/pkg/scheduler/algorithm/predicates:go_default_library", + "//plugin/pkg/scheduler/algorithm/priorities/util:go_default_library", "//plugin/pkg/scheduler/api:go_default_library", "//plugin/pkg/scheduler/schedulercache:go_default_library", "//plugin/pkg/scheduler/util:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/groupcache/lru:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", diff --git a/plugin/pkg/scheduler/util/BUILD b/plugin/pkg/scheduler/util/BUILD index 9b6603cab19..1eed06de183 100644 --- a/plugin/pkg/scheduler/util/BUILD +++ b/plugin/pkg/scheduler/util/BUILD @@ -37,11 +37,13 @@ go_library( "//pkg/apis/core:go_default_library", "//pkg/apis/core/install:go_default_library", "//pkg/apis/scheduling:go_default_library", + "//pkg/features:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", ], )