diff --git a/pkg/scheduler/core/extender_test.go b/pkg/scheduler/core/extender_test.go index f1f17beb437..fa3a95cc9f3 100644 --- a/pkg/scheduler/core/extender_test.go +++ b/pkg/scheduler/core/extender_test.go @@ -507,7 +507,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { for _, name := range test.nodes { cache.AddNode(createNode(name)) } - queue := internalqueue.NewSchedulingQueue() + queue := internalqueue.NewSchedulingQueue(nil) scheduler := NewGenericScheduler( cache, nil, diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index 2b668796923..4a35013aff7 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -472,7 +472,7 @@ func TestGenericScheduler(t *testing.T) { scheduler := NewGenericScheduler( cache, nil, - internalqueue.NewSchedulingQueue(), + internalqueue.NewSchedulingQueue(nil), test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, @@ -509,7 +509,7 @@ func makeScheduler(predicates map[string]algorithm.FitPredicate, nodes []*v1.Nod s := NewGenericScheduler( cache, nil, - internalqueue.NewSchedulingQueue(), + internalqueue.NewSchedulingQueue(nil), predicates, algorithm.EmptyPredicateMetadataProducer, prioritizers, @@ -1436,7 +1436,7 @@ func TestPreempt(t *testing.T) { scheduler := NewGenericScheduler( cache, nil, - internalqueue.NewSchedulingQueue(), + internalqueue.NewSchedulingQueue(nil), map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}}, @@ -1564,7 +1564,7 @@ func TestCacheInvalidationRace(t *testing.T) { scheduler := NewGenericScheduler( mockCache, eCache, - internalqueue.NewSchedulingQueue(), + internalqueue.NewSchedulingQueue(nil), ps, algorithm.EmptyPredicateMetadataProducer, prioritizers, @@ -1648,7 +1648,7 @@ func TestCacheInvalidationRace2(t *testing.T) { scheduler := NewGenericScheduler( cache, eCache, - internalqueue.NewSchedulingQueue(), + internalqueue.NewSchedulingQueue(nil), ps, algorithm.EmptyPredicateMetadataProducer, prioritizers, diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 120806aef78..b008286db5b 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -283,7 +283,7 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator { c := &configFactory{ client: args.Client, podLister: schedulerCache, - podQueue: internalqueue.NewSchedulingQueue(), + podQueue: internalqueue.NewSchedulingQueue(stopEverything), nodeLister: args.NodeInformer.Lister(), pVLister: args.PvInformer.Lister(), pVCLister: args.PvcInformer.Lister(), @@ -1493,8 +1493,7 @@ func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue // to run on a node, scheduler takes the pod into account when running // predicates for the node. if !util.PodPriorityEnabled() { - entry := backoff.GetEntry(podID) - if !entry.TryWait(backoff.MaxDuration()) { + if !backoff.TryBackoffAndWait(podID, c.StopEverything) { klog.Warningf("Request for pod %v already in flight, abandoning", podID) return } diff --git a/pkg/scheduler/internal/queue/BUILD b/pkg/scheduler/internal/queue/BUILD index 37a8d1f5250..ea7b66fe3f4 100644 --- a/pkg/scheduler/internal/queue/BUILD +++ b/pkg/scheduler/internal/queue/BUILD @@ -12,6 +12,8 @@ go_library( "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 9bf40361b4f..22242a9ce34 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -27,15 +27,17 @@ limitations under the License. package queue import ( - "container/heap" "fmt" "reflect" "sync" + "time" "k8s.io/klog" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ktypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" @@ -73,9 +75,9 @@ type SchedulingQueue interface { // NewSchedulingQueue initializes a new scheduling queue. If pod priority is // enabled a priority queue is returned. If it is disabled, a FIFO is returned. -func NewSchedulingQueue() SchedulingQueue { +func NewSchedulingQueue(stop <-chan struct{}) SchedulingQueue { if util.PodPriorityEnabled() { - return NewPriorityQueue() + return NewPriorityQueue(stop) } return NewFIFO() } @@ -179,12 +181,20 @@ func NominatedNodeName(pod *v1.Pod) string { // pods that are already tried and are determined to be unschedulable. The latter // is called unschedulableQ. type PriorityQueue struct { + stop <-chan struct{} + clock util.Clock + // podBackoff tracks backoff for pods attempting to be rescheduled + podBackoff *util.PodBackoff + lock sync.RWMutex cond sync.Cond // activeQ is heap structure that scheduler actively looks at to find pods to // schedule. Head of heap is the highest priority pod. - activeQ *Heap + activeQ *util.Heap + // podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff + // are popped from this heap before the scheduler looks at activeQ + podBackoffQ *util.Heap // unschedulableQ holds pods that have been tried and determined unschedulable. unschedulableQ *UnschedulablePodsMap // nominatedPods is a map keyed by a node name and the value is a list of @@ -228,16 +238,33 @@ func activeQComp(pod1, pod2 interface{}) bool { } // NewPriorityQueue creates a PriorityQueue object. -func NewPriorityQueue() *PriorityQueue { +func NewPriorityQueue(stop <-chan struct{}) *PriorityQueue { + return NewPriorityQueueWithClock(stop, util.RealClock{}) +} + +// NewPriorityQueueWithClock creates a PriorityQueue which uses the passed clock for time. +func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock) *PriorityQueue { pq := &PriorityQueue{ - activeQ: newHeap(cache.MetaNamespaceKeyFunc, activeQComp), + clock: clock, + stop: stop, + podBackoff: util.CreatePodBackoffWithClock(1*time.Second, 10*time.Second, clock), + activeQ: util.NewHeap(cache.MetaNamespaceKeyFunc, activeQComp), unschedulableQ: newUnschedulablePodsMap(), nominatedPods: map[string][]*v1.Pod{}, } pq.cond.L = &pq.lock + pq.podBackoffQ = util.NewHeap(cache.MetaNamespaceKeyFunc, pq.podsCompareBackoffCompleted) + + pq.run() + return pq } +// run starts the goroutine to pump from podBackoffQ to activeQ +func (p *PriorityQueue) run() { + go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop) +} + // addNominatedPodIfNeeded adds a pod to nominatedPods if it has a NominatedNodeName and it does not // already exist in the map. Adding an existing pod is not going to update the pod. func (p *PriorityQueue) addNominatedPodIfNeeded(pod *v1.Pod) { @@ -279,7 +306,7 @@ func (p *PriorityQueue) updateNominatedPod(oldPod, newPod *v1.Pod) { } // Add adds a pod to the active queue. It should be called only when a new pod -// is added so there is no chance the pod is already in either queue. +// is added so there is no chance the pod is already in active/unschedulable/backoff queues func (p *PriorityQueue) Add(pod *v1.Pod) error { p.lock.Lock() defer p.lock.Unlock() @@ -292,6 +319,10 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error { p.deleteNominatedPodIfExists(pod) p.unschedulableQ.delete(pod) } + // Delete pod from backoffQ if it is backing off + if err = p.podBackoffQ.Delete(pod); err == nil { + klog.Errorf("Error: pod %v/%v is already in the podBackoff queue.", pod.Namespace, pod.Name) + } p.addNominatedPodIfNeeded(pod) p.cond.Broadcast() } @@ -309,6 +340,9 @@ func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error { if _, exists, _ := p.activeQ.Get(pod); exists { return nil } + if _, exists, _ := p.podBackoffQ.Get(pod); exists { + return nil + } err := p.activeQ.Add(pod) if err != nil { klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err) @@ -324,6 +358,40 @@ func isPodUnschedulable(pod *v1.Pod) bool { return cond != nil && cond.Status == v1.ConditionFalse && cond.Reason == v1.PodReasonUnschedulable } +// nsNameForPod returns a namespacedname for a pod +func nsNameForPod(pod *v1.Pod) ktypes.NamespacedName { + return ktypes.NamespacedName{ + Namespace: pod.Namespace, + Name: pod.Name, + } +} + +// clearPodBackoff clears all backoff state for a pod (resets expiry) +func (p *PriorityQueue) clearPodBackoff(pod *v1.Pod) { + p.podBackoff.ClearPodBackoff(nsNameForPod(pod)) +} + +// isPodBackingOff returns whether a pod is currently undergoing backoff in the podBackoff structure +func (p *PriorityQueue) isPodBackingOff(pod *v1.Pod) bool { + boTime, exists := p.podBackoff.GetBackoffTime(nsNameForPod(pod)) + if !exists { + return false + } + return boTime.After(p.clock.Now()) +} + +// backoffPod checks if pod is currently undergoing backoff. If it is not it updates the backoff +// timeout otherwise it does nothing. +func (p *PriorityQueue) backoffPod(pod *v1.Pod) { + p.podBackoff.Gc() + + podID := nsNameForPod(pod) + boTime, found := p.podBackoff.GetBackoffTime(podID) + if !found || boTime.Before(p.clock.Now()) { + p.podBackoff.BackoffPod(podID) + } +} + // AddUnschedulableIfNotPresent does nothing if the pod is present in either // queue. Otherwise it adds the pod to the unschedulable queue if // p.receivedMoveRequest is false, and to the activeQ if p.receivedMoveRequest is true. @@ -336,11 +404,27 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error { if _, exists, _ := p.activeQ.Get(pod); exists { return fmt.Errorf("pod is already present in the activeQ") } + if _, exists, _ := p.podBackoffQ.Get(pod); exists { + return fmt.Errorf("pod is already present in the backoffQ") + } if !p.receivedMoveRequest && isPodUnschedulable(pod) { + p.backoffPod(pod) p.unschedulableQ.addOrUpdate(pod) p.addNominatedPodIfNeeded(pod) return nil } + + // If a move request has been received and the pod is subject to backoff, move it to the BackoffQ. + if p.isPodBackingOff(pod) && isPodUnschedulable(pod) { + err := p.podBackoffQ.Add(pod) + if err != nil { + klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err) + } else { + p.addNominatedPodIfNeeded(pod) + } + return err + } + err := p.activeQ.Add(pod) if err == nil { p.addNominatedPodIfNeeded(pod) @@ -349,13 +433,46 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error { return err } +// flushBackoffQCompleted Moves all pods from backoffQ which have completed backoff in to activeQ +func (p *PriorityQueue) flushBackoffQCompleted() { + p.lock.Lock() + defer p.lock.Unlock() + + for { + rawPod := p.podBackoffQ.Peek() + if rawPod == nil { + return + } + pod := rawPod.(*v1.Pod) + boTime, found := p.podBackoff.GetBackoffTime(nsNameForPod(pod)) + if !found { + klog.Errorf("Unable to find backoff value for pod %v in backoffQ", nsNameForPod(pod)) + p.podBackoffQ.Pop() + p.activeQ.Add(pod) + defer p.cond.Broadcast() + continue + } + + if boTime.After(p.clock.Now()) { + return + } + _, err := p.podBackoffQ.Pop() + if err != nil { + klog.Errorf("Unable to pop pod %v from backoffQ despite backoff completion.", nsNameForPod(pod)) + return + } + p.activeQ.Add(pod) + defer p.cond.Broadcast() + } +} + // Pop removes the head of the active queue and returns it. It blocks if the // activeQ is empty and waits until a new item is added to the queue. It also // clears receivedMoveRequest to mark the beginning of a new scheduling cycle. func (p *PriorityQueue) Pop() (*v1.Pod, error) { p.lock.Lock() defer p.lock.Unlock() - for len(p.activeQ.data.queue) == 0 { + for p.activeQ.Len() == 0 { // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the p.closed is set and the condition is broadcast, // which causes this loop to continue and return from the Pop(). @@ -392,16 +509,33 @@ func isPodUpdated(oldPod, newPod *v1.Pod) bool { func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { p.lock.Lock() defer p.lock.Unlock() - // If the pod is already in the active queue, just update it there. - if _, exists, _ := p.activeQ.Get(newPod); exists { - p.updateNominatedPod(oldPod, newPod) - err := p.activeQ.Update(newPod) - return err + + if oldPod != nil { + // If the pod is already in the active queue, just update it there. + if _, exists, _ := p.activeQ.Get(oldPod); exists { + p.updateNominatedPod(oldPod, newPod) + err := p.activeQ.Update(newPod) + return err + } + + // If the pod is in the backoff queue, update it there. + if _, exists, _ := p.podBackoffQ.Get(oldPod); exists { + p.updateNominatedPod(oldPod, newPod) + p.podBackoffQ.Delete(newPod) + err := p.activeQ.Add(newPod) + if err == nil { + p.cond.Broadcast() + } + return err + } } + // If the pod is in the unschedulable queue, updating it may make it schedulable. if usPod := p.unschedulableQ.get(newPod); usPod != nil { p.updateNominatedPod(oldPod, newPod) if isPodUpdated(oldPod, newPod) { + // If the pod is updated reset backoff + p.clearPodBackoff(newPod) p.unschedulableQ.delete(usPod) err := p.activeQ.Add(newPod) if err == nil { @@ -409,6 +543,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { } return err } + // Pod is already in unschedulable queue and hasnt updated, no need to backoff again p.unschedulableQ.addOrUpdate(newPod) return nil } @@ -429,6 +564,8 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error { p.deleteNominatedPodIfExists(pod) err := p.activeQ.Delete(pod) if err != nil { // The item was probably not found in the activeQ. + p.clearPodBackoff(pod) + p.podBackoffQ.Delete(pod) p.unschedulableQ.delete(pod) } return nil @@ -454,16 +591,18 @@ func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) { // function adds all pods and then signals the condition variable to ensure that // if Pop() is waiting for an item, it receives it after all the pods are in the // queue and the head is the highest priority pod. -// TODO(bsalamat): We should add a back-off mechanism here so that a high priority -// pod which is unschedulable does not go to the head of the queue frequently. For -// example in a cluster where a lot of pods being deleted, such a high priority -// pod can deprive other pods from getting scheduled. func (p *PriorityQueue) MoveAllToActiveQueue() { p.lock.Lock() defer p.lock.Unlock() for _, pod := range p.unschedulableQ.pods { - if err := p.activeQ.Add(pod); err != nil { - klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err) + if p.isPodBackingOff(pod) { + if err := p.podBackoffQ.Add(pod); err != nil { + klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err) + } + } else { + if err := p.activeQ.Add(pod); err != nil { + klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err) + } } } p.unschedulableQ.clear() @@ -474,11 +613,16 @@ func (p *PriorityQueue) MoveAllToActiveQueue() { // NOTE: this function assumes lock has been acquired in caller func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) { for _, pod := range pods { - if err := p.activeQ.Add(pod); err == nil { - p.unschedulableQ.delete(pod) + if p.isPodBackingOff(pod) { + if err := p.podBackoffQ.Add(pod); err != nil { + klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err) + } } else { - klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err) + if err := p.activeQ.Add(pod); err != nil { + klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err) + } } + p.unschedulableQ.delete(pod) } p.receivedMoveRequest = true p.cond.Broadcast() @@ -551,6 +695,12 @@ func (p *PriorityQueue) DeleteNominatedPodIfExists(pod *v1.Pod) { p.lock.Unlock() } +func (p *PriorityQueue) podsCompareBackoffCompleted(p1, p2 interface{}) bool { + bo1, _ := p.podBackoff.GetBackoffTime(nsNameForPod(p1.(*v1.Pod))) + bo2, _ := p.podBackoff.GetBackoffTime(nsNameForPod(p2.(*v1.Pod))) + return bo1.Before(bo2) +} + // UnschedulablePodsMap holds pods that cannot be scheduled. This data structure // is used to implement unschedulableQ. type UnschedulablePodsMap struct { @@ -591,200 +741,3 @@ func newUnschedulablePodsMap() *UnschedulablePodsMap { keyFunc: util.GetPodFullName, } } - -// Below is the implementation of the a heap. The logic is pretty much the same -// as cache.heap, however, this heap does not perform synchronization. It leaves -// synchronization to the SchedulingQueue. - -// LessFunc is a function type to compare two objects. -type LessFunc func(interface{}, interface{}) bool - -// KeyFunc is a function type to get the key from an object. -type KeyFunc func(obj interface{}) (string, error) - -type heapItem struct { - obj interface{} // The object which is stored in the heap. - index int // The index of the object's key in the Heap.queue. -} - -type itemKeyValue struct { - key string - obj interface{} -} - -// heapData is an internal struct that implements the standard heap interface -// and keeps the data stored in the heap. -type heapData struct { - // items is a map from key of the objects to the objects and their index. - // We depend on the property that items in the map are in the queue and vice versa. - items map[string]*heapItem - // queue implements a heap data structure and keeps the order of elements - // according to the heap invariant. The queue keeps the keys of objects stored - // in "items". - queue []string - - // keyFunc is used to make the key used for queued item insertion and retrieval, and - // should be deterministic. - keyFunc KeyFunc - // lessFunc is used to compare two objects in the heap. - lessFunc LessFunc -} - -var ( - _ = heap.Interface(&heapData{}) // heapData is a standard heap -) - -// Less compares two objects and returns true if the first one should go -// in front of the second one in the heap. -func (h *heapData) Less(i, j int) bool { - if i > len(h.queue) || j > len(h.queue) { - return false - } - itemi, ok := h.items[h.queue[i]] - if !ok { - return false - } - itemj, ok := h.items[h.queue[j]] - if !ok { - return false - } - return h.lessFunc(itemi.obj, itemj.obj) -} - -// Len returns the number of items in the Heap. -func (h *heapData) Len() int { return len(h.queue) } - -// Swap implements swapping of two elements in the heap. This is a part of standard -// heap interface and should never be called directly. -func (h *heapData) Swap(i, j int) { - h.queue[i], h.queue[j] = h.queue[j], h.queue[i] - item := h.items[h.queue[i]] - item.index = i - item = h.items[h.queue[j]] - item.index = j -} - -// Push is supposed to be called by heap.Push only. -func (h *heapData) Push(kv interface{}) { - keyValue := kv.(*itemKeyValue) - n := len(h.queue) - h.items[keyValue.key] = &heapItem{keyValue.obj, n} - h.queue = append(h.queue, keyValue.key) -} - -// Pop is supposed to be called by heap.Pop only. -func (h *heapData) Pop() interface{} { - key := h.queue[len(h.queue)-1] - h.queue = h.queue[0 : len(h.queue)-1] - item, ok := h.items[key] - if !ok { - // This is an error - return nil - } - delete(h.items, key) - return item.obj -} - -// Heap is a producer/consumer queue that implements a heap data structure. -// It can be used to implement priority queues and similar data structures. -type Heap struct { - // data stores objects and has a queue that keeps their ordering according - // to the heap invariant. - data *heapData -} - -// Add inserts an item, and puts it in the queue. The item is updated if it -// already exists. -func (h *Heap) Add(obj interface{}) error { - key, err := h.data.keyFunc(obj) - if err != nil { - return cache.KeyError{Obj: obj, Err: err} - } - if _, exists := h.data.items[key]; exists { - h.data.items[key].obj = obj - heap.Fix(h.data, h.data.items[key].index) - } else { - heap.Push(h.data, &itemKeyValue{key, obj}) - } - return nil -} - -// AddIfNotPresent inserts an item, and puts it in the queue. If an item with -// the key is present in the map, no changes is made to the item. -func (h *Heap) AddIfNotPresent(obj interface{}) error { - key, err := h.data.keyFunc(obj) - if err != nil { - return cache.KeyError{Obj: obj, Err: err} - } - if _, exists := h.data.items[key]; !exists { - heap.Push(h.data, &itemKeyValue{key, obj}) - } - return nil -} - -// Update is the same as Add in this implementation. When the item does not -// exist, it is added. -func (h *Heap) Update(obj interface{}) error { - return h.Add(obj) -} - -// Delete removes an item. -func (h *Heap) Delete(obj interface{}) error { - key, err := h.data.keyFunc(obj) - if err != nil { - return cache.KeyError{Obj: obj, Err: err} - } - if item, ok := h.data.items[key]; ok { - heap.Remove(h.data, item.index) - return nil - } - return fmt.Errorf("object not found") -} - -// Pop returns the head of the heap. -func (h *Heap) Pop() (interface{}, error) { - obj := heap.Pop(h.data) - if obj != nil { - return obj, nil - } - return nil, fmt.Errorf("object was removed from heap data") -} - -// Get returns the requested item, or sets exists=false. -func (h *Heap) Get(obj interface{}) (interface{}, bool, error) { - key, err := h.data.keyFunc(obj) - if err != nil { - return nil, false, cache.KeyError{Obj: obj, Err: err} - } - return h.GetByKey(key) -} - -// GetByKey returns the requested item, or sets exists=false. -func (h *Heap) GetByKey(key string) (interface{}, bool, error) { - item, exists := h.data.items[key] - if !exists { - return nil, false, nil - } - return item.obj, true, nil -} - -// List returns a list of all the items. -func (h *Heap) List() []interface{} { - list := make([]interface{}, 0, len(h.data.items)) - for _, item := range h.data.items { - list = append(list, item.obj) - } - return list -} - -// newHeap returns a Heap which can be used to queue up items to process. -func newHeap(keyFn KeyFunc, lessFn LessFunc) *Heap { - return &Heap{ - data: &heapData{ - items: map[string]*heapItem{}, - queue: []string{}, - keyFunc: keyFn, - lessFunc: lessFn, - }, - } -} diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 73ef4a0081c..cc0a057fdf9 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -95,7 +95,7 @@ var highPriorityPod, highPriNominatedPod, medPriorityPod, unschedulablePod = v1. } func TestPriorityQueue_Add(t *testing.T) { - q := NewPriorityQueue() + q := NewPriorityQueue(nil) q.Add(&medPriorityPod) q.Add(&unschedulablePod) q.Add(&highPriorityPod) @@ -120,7 +120,7 @@ func TestPriorityQueue_Add(t *testing.T) { } func TestPriorityQueue_AddIfNotPresent(t *testing.T) { - q := NewPriorityQueue() + q := NewPriorityQueue(nil) q.unschedulableQ.addOrUpdate(&highPriNominatedPod) q.AddIfNotPresent(&highPriNominatedPod) // Must not add anything. q.AddIfNotPresent(&medPriorityPod) @@ -146,7 +146,7 @@ func TestPriorityQueue_AddIfNotPresent(t *testing.T) { } func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { - q := NewPriorityQueue() + q := NewPriorityQueue(nil) q.Add(&highPriNominatedPod) q.AddUnschedulableIfNotPresent(&highPriNominatedPod) // Must not add anything. q.AddUnschedulableIfNotPresent(&medPriorityPod) // This should go to activeQ. @@ -172,7 +172,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { } func TestPriorityQueue_Pop(t *testing.T) { - q := NewPriorityQueue() + q := NewPriorityQueue(nil) wg := sync.WaitGroup{} wg.Add(1) go func() { @@ -189,7 +189,7 @@ func TestPriorityQueue_Pop(t *testing.T) { } func TestPriorityQueue_Update(t *testing.T) { - q := NewPriorityQueue() + q := NewPriorityQueue(nil) q.Update(nil, &highPriorityPod) if _, exists, _ := q.activeQ.Get(&highPriorityPod); !exists { t.Errorf("Expected %v to be added to activeQ.", highPriorityPod.Name) @@ -199,7 +199,7 @@ func TestPriorityQueue_Update(t *testing.T) { } // Update highPriorityPod and add a nominatedNodeName to it. q.Update(&highPriorityPod, &highPriNominatedPod) - if q.activeQ.data.Len() != 1 { + if q.activeQ.Len() != 1 { t.Error("Expected only one item in activeQ.") } if len(q.nominatedPods) != 1 { @@ -225,7 +225,7 @@ func TestPriorityQueue_Update(t *testing.T) { } func TestPriorityQueue_Delete(t *testing.T) { - q := NewPriorityQueue() + q := NewPriorityQueue(nil) q.Update(&highPriorityPod, &highPriNominatedPod) q.Add(&unschedulablePod) q.Delete(&highPriNominatedPod) @@ -245,12 +245,12 @@ func TestPriorityQueue_Delete(t *testing.T) { } func TestPriorityQueue_MoveAllToActiveQueue(t *testing.T) { - q := NewPriorityQueue() + q := NewPriorityQueue(nil) q.Add(&medPriorityPod) q.unschedulableQ.addOrUpdate(&unschedulablePod) q.unschedulableQ.addOrUpdate(&highPriorityPod) q.MoveAllToActiveQueue() - if q.activeQ.data.Len() != 3 { + if q.activeQ.Len() != 3 { t.Error("Expected all items to be in activeQ.") } } @@ -291,7 +291,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) { Spec: v1.PodSpec{NodeName: "machine1"}, } - q := NewPriorityQueue() + q := NewPriorityQueue(nil) q.Add(&medPriorityPod) // Add a couple of pods to the unschedulableQ. q.unschedulableQ.addOrUpdate(&unschedulablePod) @@ -312,7 +312,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) { } func TestPriorityQueue_WaitingPodsForNode(t *testing.T) { - q := NewPriorityQueue() + q := NewPriorityQueue(nil) q.Add(&medPriorityPod) q.Add(&unschedulablePod) q.Add(&highPriorityPod) @@ -491,7 +491,7 @@ func TestSchedulingQueue_Close(t *testing.T) { }, { name: "PriorityQueue close", - q: NewPriorityQueue(), + q: NewPriorityQueue(nil), expectedErr: fmt.Errorf(queueClosed), }, } @@ -520,7 +520,7 @@ func TestSchedulingQueue_Close(t *testing.T) { // ensures that an unschedulable pod does not block head of the queue when there // are frequent events that move pods to the active queue. func TestRecentlyTriedPodsGoBack(t *testing.T) { - q := NewPriorityQueue() + q := NewPriorityQueue(nil) // Add a few pods to priority queue. for i := 0; i < 5; i++ { p := v1.Pod{ @@ -567,3 +567,66 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) { } } } + +// TestHighPriorityBackoff tests that a high priority pod does not block +// other pods if it is unschedulable +func TestHighProirotyBackoff(t *testing.T) { + q := NewPriorityQueue(nil) + + midPod := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-midpod", + Namespace: "ns1", + UID: types.UID("tp-mid"), + }, + Spec: v1.PodSpec{ + Priority: &midPriority, + }, + Status: v1.PodStatus{ + NominatedNodeName: "node1", + }, + } + highPod := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-highpod", + Namespace: "ns1", + UID: types.UID("tp-high"), + }, + Spec: v1.PodSpec{ + Priority: &highPriority, + }, + Status: v1.PodStatus{ + NominatedNodeName: "node1", + }, + } + q.Add(&midPod) + q.Add(&highPod) + // Simulate a pod being popped by the scheduler, determined unschedulable, and + // then moved back to the active queue. + p, err := q.Pop() + if err != nil { + t.Errorf("Error while popping the head of the queue: %v", err) + } + if p != &highPod { + t.Errorf("Expected to get high prority pod, got: %v", p) + } + // Update pod condition to unschedulable. + podutil.UpdatePodCondition(&p.Status, &v1.PodCondition{ + Type: v1.PodScheduled, + Status: v1.ConditionFalse, + Reason: v1.PodReasonUnschedulable, + Message: "fake scheduling failure", + }) + // Put in the unschedulable queue. + q.AddUnschedulableIfNotPresent(p) + // Move all unschedulable pods to the active queue. + q.MoveAllToActiveQueue() + + p, err = q.Pop() + if err != nil { + t.Errorf("Error while popping the head of the queue: %v", err) + } + if p != &midPod { + t.Errorf("Expected to get mid prority pod, got: %v", p) + } +} diff --git a/pkg/scheduler/util/BUILD b/pkg/scheduler/util/BUILD index 810d2c5cb0a..53649692f2a 100644 --- a/pkg/scheduler/util/BUILD +++ b/pkg/scheduler/util/BUILD @@ -10,6 +10,7 @@ go_test( name = "go_default_test", srcs = [ "backoff_utils_test.go", + "heap_test.go", "utils_test.go", ], embed = [":go_default_library"], @@ -25,6 +26,8 @@ go_library( name = "go_default_library", srcs = [ "backoff_utils.go", + "clock.go", + "heap.go", "utils.go", ], importpath = "k8s.io/kubernetes/pkg/scheduler/util", @@ -34,6 +37,7 @@ go_library( "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], ) diff --git a/pkg/scheduler/util/backoff_utils.go b/pkg/scheduler/util/backoff_utils.go index 506cd1270ac..618f93772f9 100644 --- a/pkg/scheduler/util/backoff_utils.go +++ b/pkg/scheduler/util/backoff_utils.go @@ -37,10 +37,11 @@ func (realClock) Now() time.Time { return time.Now() } -// BackoffEntry is single threaded. in particular, it only allows a single action to be waiting on backoff at a time. -// It is expected that all users will only use the public TryWait(...) method +// backoffEntry is single threaded. in particular, it only allows a single action to be waiting on backoff at a time. // It is also not safe to copy this object. -type BackoffEntry struct { +type backoffEntry struct { + initialized bool + podName ktypes.NamespacedName backoff time.Duration lastUpdate time.Time reqInFlight int32 @@ -48,45 +49,41 @@ type BackoffEntry struct { // tryLock attempts to acquire a lock via atomic compare and swap. // returns true if the lock was acquired, false otherwise -func (b *BackoffEntry) tryLock() bool { +func (b *backoffEntry) tryLock() bool { return atomic.CompareAndSwapInt32(&b.reqInFlight, 0, 1) } // unlock returns the lock. panics if the lock isn't held -func (b *BackoffEntry) unlock() { +func (b *backoffEntry) unlock() { if !atomic.CompareAndSwapInt32(&b.reqInFlight, 1, 0) { panic(fmt.Sprintf("unexpected state on unlocking: %+v", b)) } } -// TryWait tries to acquire the backoff lock, maxDuration is the maximum allowed period to wait for. -func (b *BackoffEntry) TryWait(maxDuration time.Duration) bool { - if !b.tryLock() { - return false - } - defer b.unlock() - b.wait(maxDuration) - return true +// backoffTime returns the Time when a backoffEntry completes backoff +func (b *backoffEntry) backoffTime() time.Time { + return b.lastUpdate.Add(b.backoff) } -func (b *BackoffEntry) getBackoff(maxDuration time.Duration) time.Duration { - duration := b.backoff - newDuration := time.Duration(duration) * 2 +// getBackoff returns the duration until this entry completes backoff +func (b *backoffEntry) getBackoff(maxDuration time.Duration) time.Duration { + if !b.initialized { + b.initialized = true + return b.backoff + } + newDuration := b.backoff * 2 if newDuration > maxDuration { newDuration = maxDuration } b.backoff = newDuration - klog.V(4).Infof("Backing off %s", duration.String()) - return duration -} - -func (b *BackoffEntry) wait(maxDuration time.Duration) { - time.Sleep(b.getBackoff(maxDuration)) + klog.V(4).Infof("Backing off %s", newDuration.String()) + return newDuration } // PodBackoff is used to restart a pod with back-off delay. type PodBackoff struct { - perPodBackoff map[ktypes.NamespacedName]*BackoffEntry + // expiryQ stores backoffEntry orderedy by lastUpdate until they reach maxDuration and are GC'd + expiryQ *Heap lock sync.Mutex clock clock defaultDuration time.Duration @@ -111,24 +108,58 @@ func CreatePodBackoff(defaultDuration, maxDuration time.Duration) *PodBackoff { // CreatePodBackoffWithClock creates a pod back-off object by default duration, max duration and clock. func CreatePodBackoffWithClock(defaultDuration, maxDuration time.Duration, clock clock) *PodBackoff { return &PodBackoff{ - perPodBackoff: map[ktypes.NamespacedName]*BackoffEntry{}, + expiryQ: NewHeap(backoffEntryKeyFunc, backoffEntryCompareUpdate), clock: clock, defaultDuration: defaultDuration, maxDuration: maxDuration, } } -// GetEntry returns a back-off entry by Pod ID. -func (p *PodBackoff) GetEntry(podID ktypes.NamespacedName) *BackoffEntry { +// getEntry returns the backoffEntry for a given podID +func (p *PodBackoff) getEntry(podID ktypes.NamespacedName) *backoffEntry { + entry, exists, _ := p.expiryQ.GetByKey(podID.String()) + var be *backoffEntry + if !exists { + be = &backoffEntry{ + initialized: false, + podName: podID, + backoff: p.defaultDuration, + } + p.expiryQ.Update(be) + } else { + be = entry.(*backoffEntry) + } + return be +} + +// BackoffPod updates the backoff for a podId and returns the duration until backoff completion +func (p *PodBackoff) BackoffPod(podID ktypes.NamespacedName) time.Duration { p.lock.Lock() defer p.lock.Unlock() - entry, ok := p.perPodBackoff[podID] - if !ok { - entry = &BackoffEntry{backoff: p.defaultDuration} - p.perPodBackoff[podID] = entry - } + entry := p.getEntry(podID) entry.lastUpdate = p.clock.Now() - return entry + p.expiryQ.Update(entry) + return entry.getBackoff(p.maxDuration) +} + +// TryBackoffAndWait tries to acquire the backoff lock +func (p *PodBackoff) TryBackoffAndWait(podID ktypes.NamespacedName, stop <-chan struct{}) bool { + p.lock.Lock() + entry := p.getEntry(podID) + + if !entry.tryLock() { + p.lock.Unlock() + return false + } + defer entry.unlock() + duration := entry.getBackoff(p.maxDuration) + p.lock.Unlock() + select { + case <-time.After(duration): + return true + case <-stop: + return false + } } // Gc execute garbage collection on the pod back-off. @@ -136,9 +167,54 @@ func (p *PodBackoff) Gc() { p.lock.Lock() defer p.lock.Unlock() now := p.clock.Now() - for podID, entry := range p.perPodBackoff { - if now.Sub(entry.lastUpdate) > p.maxDuration { - delete(p.perPodBackoff, podID) + var be *backoffEntry + for { + entry := p.expiryQ.Peek() + if entry == nil { + break + } + be = entry.(*backoffEntry) + if now.Sub(be.lastUpdate) > p.maxDuration { + p.expiryQ.Pop() + } else { + break } } } + +// GetBackoffTime returns the time that podID completes backoff +func (p *PodBackoff) GetBackoffTime(podID ktypes.NamespacedName) (time.Time, bool) { + p.lock.Lock() + defer p.lock.Unlock() + rawBe, exists, _ := p.expiryQ.GetByKey(podID.String()) + if !exists { + return time.Time{}, false + } + be := rawBe.(*backoffEntry) + return be.lastUpdate.Add(be.backoff), true +} + +// ClearPodBackoff removes all tracking information for podID (clears expiry) +func (p *PodBackoff) ClearPodBackoff(podID ktypes.NamespacedName) bool { + p.lock.Lock() + defer p.lock.Unlock() + entry, exists, _ := p.expiryQ.GetByKey(podID.String()) + if exists { + err := p.expiryQ.Delete(entry) + return err == nil + } + return false +} + +// backoffEntryKeyFunc is the keying function used for mapping a backoffEntry to string for heap +func backoffEntryKeyFunc(b interface{}) (string, error) { + be := b.(*backoffEntry) + return be.podName.String(), nil +} + +// backoffEntryCompareUpdate returns true when b1's backoff time is before b2's +func backoffEntryCompareUpdate(b1, b2 interface{}) bool { + be1 := b1.(*backoffEntry) + be2 := b2.(*backoffEntry) + return be1.lastUpdate.Before(be2.lastUpdate) +} diff --git a/pkg/scheduler/util/backoff_utils_test.go b/pkg/scheduler/util/backoff_utils_test.go index 8f61b637e7d..b99c9498f58 100644 --- a/pkg/scheduler/util/backoff_utils_test.go +++ b/pkg/scheduler/util/backoff_utils_test.go @@ -31,7 +31,7 @@ func (f *fakeClock) Now() time.Time { return f.t } -func TestBackoff(t *testing.T) { +func TestBackoffPod(t *testing.T) { clock := fakeClock{} backoff := CreatePodBackoffWithClock(1*time.Second, 60*time.Second, &clock) tests := []struct { @@ -64,23 +64,75 @@ func TestBackoff(t *testing.T) { } for _, test := range tests { - duration := backoff.GetEntry(test.podID).getBackoff(backoff.maxDuration) + duration := backoff.BackoffPod(test.podID) if duration != test.expectedDuration { - t.Errorf("expected: %s, got %s for %s", test.expectedDuration.String(), duration.String(), test.podID) + t.Errorf("expected: %s, got %s for pod %s", test.expectedDuration.String(), duration.String(), test.podID) + } + if boTime, _ := backoff.GetBackoffTime(test.podID); boTime != clock.Now().Add(test.expectedDuration) { + t.Errorf("expected GetBackoffTime %s, got %s for pod %s", test.expectedDuration.String(), boTime.String(), test.podID) } clock.t = clock.t.Add(test.advanceClock) backoff.Gc() } fooID := ktypes.NamespacedName{Namespace: "default", Name: "foo"} - backoff.perPodBackoff[fooID].backoff = 60 * time.Second - duration := backoff.GetEntry(fooID).getBackoff(backoff.maxDuration) + be := backoff.getEntry(fooID) + be.backoff = 60 * time.Second + duration := backoff.BackoffPod(fooID) if duration != 60*time.Second { t.Errorf("expected: 60, got %s", duration.String()) } // Verify that we split on namespaces correctly, same name, different namespace fooID.Namespace = "other" - duration = backoff.GetEntry(fooID).getBackoff(backoff.maxDuration) + duration = backoff.BackoffPod(fooID) if duration != 1*time.Second { t.Errorf("expected: 1, got %s", duration.String()) } } + +func TestClearPodBackoff(t *testing.T) { + clock := fakeClock{} + backoff := CreatePodBackoffWithClock(1*time.Second, 60*time.Second, &clock) + + if backoff.ClearPodBackoff(ktypes.NamespacedName{Namespace: "ns", Name: "nonexist"}) { + t.Error("Expected ClearPodBackoff failure for unknown pod, got success.") + } + + podID := ktypes.NamespacedName{Namespace: "ns", Name: "foo"} + if dur := backoff.BackoffPod(podID); dur != 1*time.Second { + t.Errorf("Expected backoff of 1s for pod %s, got %s", podID, dur.String()) + } + + if !backoff.ClearPodBackoff(podID) { + t.Errorf("Failed to clear backoff for pod %v", podID) + } + + expectBoTime := clock.Now() + if boTime, _ := backoff.GetBackoffTime(podID); boTime != expectBoTime { + t.Errorf("Expected backoff time for pod %s of %s, got %s", podID, expectBoTime, boTime) + } +} + +func TestTryBackoffAndWait(t *testing.T) { + clock := fakeClock{} + backoff := CreatePodBackoffWithClock(1*time.Second, 60*time.Second, &clock) + + stopCh := make(chan struct{}) + podID := ktypes.NamespacedName{Namespace: "ns", Name: "pod"} + if !backoff.TryBackoffAndWait(podID, stopCh) { + t.Error("Expected TryBackoffAndWait success for new pod, got failure.") + } + + be := backoff.getEntry(podID) + if !be.tryLock() { + t.Error("Failed to acquire lock for backoffentry") + } + + if backoff.TryBackoffAndWait(podID, stopCh) { + t.Error("Expected TryBackoffAndWait failure with lock acquired, got success.") + } + + close(stopCh) + if backoff.TryBackoffAndWait(podID, stopCh) { + t.Error("Expected TryBackoffAndWait failure with closed stopCh, got success.") + } +} diff --git a/pkg/scheduler/util/clock.go b/pkg/scheduler/util/clock.go new file mode 100644 index 00000000000..e17c759dbac --- /dev/null +++ b/pkg/scheduler/util/clock.go @@ -0,0 +1,34 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "time" +) + +// Clock provides an interface for getting the current time +type Clock interface { + Now() time.Time +} + +// RealClock implements a clock using time +type RealClock struct{} + +// Now returns the current time with time.Now +func (RealClock) Now() time.Time { + return time.Now() +} diff --git a/pkg/scheduler/util/heap.go b/pkg/scheduler/util/heap.go new file mode 100644 index 00000000000..0f15652c654 --- /dev/null +++ b/pkg/scheduler/util/heap.go @@ -0,0 +1,236 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Below is the implementation of the a heap. The logic is pretty much the same +// as cache.heap, however, this heap does not perform synchronization. It leaves +// synchronization to the SchedulingQueue. + +package util + +import ( + "container/heap" + "fmt" + + "k8s.io/client-go/tools/cache" +) + +// KeyFunc is a function type to get the key from an object. +type KeyFunc func(obj interface{}) (string, error) + +type heapItem struct { + obj interface{} // The object which is stored in the heap. + index int // The index of the object's key in the Heap.queue. +} + +type itemKeyValue struct { + key string + obj interface{} +} + +// heapData is an internal struct that implements the standard heap interface +// and keeps the data stored in the heap. +type heapData struct { + // items is a map from key of the objects to the objects and their index. + // We depend on the property that items in the map are in the queue and vice versa. + items map[string]*heapItem + // queue implements a heap data structure and keeps the order of elements + // according to the heap invariant. The queue keeps the keys of objects stored + // in "items". + queue []string + + // keyFunc is used to make the key used for queued item insertion and retrieval, and + // should be deterministic. + keyFunc KeyFunc + // lessFunc is used to compare two objects in the heap. + lessFunc LessFunc +} + +var ( + _ = heap.Interface(&heapData{}) // heapData is a standard heap +) + +// Less compares two objects and returns true if the first one should go +// in front of the second one in the heap. +func (h *heapData) Less(i, j int) bool { + if i > len(h.queue) || j > len(h.queue) { + return false + } + itemi, ok := h.items[h.queue[i]] + if !ok { + return false + } + itemj, ok := h.items[h.queue[j]] + if !ok { + return false + } + return h.lessFunc(itemi.obj, itemj.obj) +} + +// Len returns the number of items in the Heap. +func (h *heapData) Len() int { return len(h.queue) } + +// Swap implements swapping of two elements in the heap. This is a part of standard +// heap interface and should never be called directly. +func (h *heapData) Swap(i, j int) { + h.queue[i], h.queue[j] = h.queue[j], h.queue[i] + item := h.items[h.queue[i]] + item.index = i + item = h.items[h.queue[j]] + item.index = j +} + +// Push is supposed to be called by heap.Push only. +func (h *heapData) Push(kv interface{}) { + keyValue := kv.(*itemKeyValue) + n := len(h.queue) + h.items[keyValue.key] = &heapItem{keyValue.obj, n} + h.queue = append(h.queue, keyValue.key) +} + +// Pop is supposed to be called by heap.Pop only. +func (h *heapData) Pop() interface{} { + key := h.queue[len(h.queue)-1] + h.queue = h.queue[0 : len(h.queue)-1] + item, ok := h.items[key] + if !ok { + // This is an error + return nil + } + delete(h.items, key) + return item.obj +} + +// Peek is supposed to be called by heap.Peek only. +func (h *heapData) Peek() interface{} { + if len(h.queue) > 0 { + return h.items[h.queue[0]].obj + } + return nil +} + +// Heap is a producer/consumer queue that implements a heap data structure. +// It can be used to implement priority queues and similar data structures. +type Heap struct { + // data stores objects and has a queue that keeps their ordering according + // to the heap invariant. + data *heapData +} + +// Add inserts an item, and puts it in the queue. The item is updated if it +// already exists. +func (h *Heap) Add(obj interface{}) error { + key, err := h.data.keyFunc(obj) + if err != nil { + return cache.KeyError{Obj: obj, Err: err} + } + if _, exists := h.data.items[key]; exists { + h.data.items[key].obj = obj + heap.Fix(h.data, h.data.items[key].index) + } else { + heap.Push(h.data, &itemKeyValue{key, obj}) + } + return nil +} + +// AddIfNotPresent inserts an item, and puts it in the queue. If an item with +// the key is present in the map, no changes is made to the item. +func (h *Heap) AddIfNotPresent(obj interface{}) error { + key, err := h.data.keyFunc(obj) + if err != nil { + return cache.KeyError{Obj: obj, Err: err} + } + if _, exists := h.data.items[key]; !exists { + heap.Push(h.data, &itemKeyValue{key, obj}) + } + return nil +} + +// Update is the same as Add in this implementation. When the item does not +// exist, it is added. +func (h *Heap) Update(obj interface{}) error { + return h.Add(obj) +} + +// Delete removes an item. +func (h *Heap) Delete(obj interface{}) error { + key, err := h.data.keyFunc(obj) + if err != nil { + return cache.KeyError{Obj: obj, Err: err} + } + if item, ok := h.data.items[key]; ok { + heap.Remove(h.data, item.index) + return nil + } + return fmt.Errorf("object not found") +} + +// Peek returns the head of the heap without removing it. +func (h *Heap) Peek() interface{} { + return h.data.Peek() +} + +// Pop returns the head of the heap and removes it. +func (h *Heap) Pop() (interface{}, error) { + obj := heap.Pop(h.data) + if obj != nil { + return obj, nil + } + return nil, fmt.Errorf("object was removed from heap data") +} + +// Get returns the requested item, or sets exists=false. +func (h *Heap) Get(obj interface{}) (interface{}, bool, error) { + key, err := h.data.keyFunc(obj) + if err != nil { + return nil, false, cache.KeyError{Obj: obj, Err: err} + } + return h.GetByKey(key) +} + +// GetByKey returns the requested item, or sets exists=false. +func (h *Heap) GetByKey(key string) (interface{}, bool, error) { + item, exists := h.data.items[key] + if !exists { + return nil, false, nil + } + return item.obj, true, nil +} + +// List returns a list of all the items. +func (h *Heap) List() []interface{} { + list := make([]interface{}, 0, len(h.data.items)) + for _, item := range h.data.items { + list = append(list, item.obj) + } + return list +} + +// Len returns the number of items in the heap. +func (h *Heap) Len() int { + return len(h.data.queue) +} + +// NewHeap returns a Heap which can be used to queue up items to process. +func NewHeap(keyFn KeyFunc, lessFn LessFunc) *Heap { + return &Heap{ + data: &heapData{ + items: map[string]*heapItem{}, + queue: []string{}, + keyFunc: keyFn, + lessFunc: lessFn, + }, + } +} diff --git a/pkg/scheduler/util/heap_test.go b/pkg/scheduler/util/heap_test.go new file mode 100644 index 00000000000..62812ec4c91 --- /dev/null +++ b/pkg/scheduler/util/heap_test.go @@ -0,0 +1,271 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This file was copied from client-go/tools/cache/heap.go and modified +// for our non thread-safe heap + +package util + +import ( + "testing" +) + +func testHeapObjectKeyFunc(obj interface{}) (string, error) { + return obj.(testHeapObject).name, nil +} + +type testHeapObject struct { + name string + val interface{} +} + +func mkHeapObj(name string, val interface{}) testHeapObject { + return testHeapObject{name: name, val: val} +} + +func compareInts(val1 interface{}, val2 interface{}) bool { + first := val1.(testHeapObject).val.(int) + second := val2.(testHeapObject).val.(int) + return first < second +} + +// TestHeapBasic tests Heap invariant +func TestHeapBasic(t *testing.T) { + h := NewHeap(testHeapObjectKeyFunc, compareInts) + const amount = 500 + var i int + + for i = amount; i > 0; i-- { + h.Add(mkHeapObj(string([]rune{'a', rune(i)}), i)) + } + + // Make sure that the numbers are popped in ascending order. + prevNum := 0 + for i := 0; i < amount; i++ { + obj, err := h.Pop() + num := obj.(testHeapObject).val.(int) + // All the items must be sorted. + if err != nil || prevNum > num { + t.Errorf("got %v out of order, last was %v", obj, prevNum) + } + prevNum = num + } +} + +// Tests Heap.Add and ensures that heap invariant is preserved after adding items. +func TestHeap_Add(t *testing.T) { + h := NewHeap(testHeapObjectKeyFunc, compareInts) + h.Add(mkHeapObj("foo", 10)) + h.Add(mkHeapObj("bar", 1)) + h.Add(mkHeapObj("baz", 11)) + h.Add(mkHeapObj("zab", 30)) + h.Add(mkHeapObj("foo", 13)) // This updates "foo". + + item, err := h.Pop() + if e, a := 1, item.(testHeapObject).val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } + item, err = h.Pop() + if e, a := 11, item.(testHeapObject).val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } + h.Delete(mkHeapObj("baz", 11)) // Nothing is deleted. + h.Add(mkHeapObj("foo", 14)) // foo is updated. + item, err = h.Pop() + if e, a := 14, item.(testHeapObject).val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } + item, err = h.Pop() + if e, a := 30, item.(testHeapObject).val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } +} + +// TestHeap_AddIfNotPresent tests Heap.AddIfNotPresent and ensures that heap +// invariant is preserved after adding items. +func TestHeap_AddIfNotPresent(t *testing.T) { + h := NewHeap(testHeapObjectKeyFunc, compareInts) + h.AddIfNotPresent(mkHeapObj("foo", 10)) + h.AddIfNotPresent(mkHeapObj("bar", 1)) + h.AddIfNotPresent(mkHeapObj("baz", 11)) + h.AddIfNotPresent(mkHeapObj("zab", 30)) + h.AddIfNotPresent(mkHeapObj("foo", 13)) // This is not added. + + if len := len(h.data.items); len != 4 { + t.Errorf("unexpected number of items: %d", len) + } + if val := h.data.items["foo"].obj.(testHeapObject).val; val != 10 { + t.Errorf("unexpected value: %d", val) + } + item, err := h.Pop() + if e, a := 1, item.(testHeapObject).val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } + item, err = h.Pop() + if e, a := 10, item.(testHeapObject).val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } + // bar is already popped. Let's add another one. + h.AddIfNotPresent(mkHeapObj("bar", 14)) + item, err = h.Pop() + if e, a := 11, item.(testHeapObject).val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } + item, err = h.Pop() + if e, a := 14, item.(testHeapObject).val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } +} + +// TestHeap_Delete tests Heap.Delete and ensures that heap invariant is +// preserved after deleting items. +func TestHeap_Delete(t *testing.T) { + h := NewHeap(testHeapObjectKeyFunc, compareInts) + h.Add(mkHeapObj("foo", 10)) + h.Add(mkHeapObj("bar", 1)) + h.Add(mkHeapObj("bal", 31)) + h.Add(mkHeapObj("baz", 11)) + + // Delete head. Delete should work with "key" and doesn't care about the value. + if err := h.Delete(mkHeapObj("bar", 200)); err != nil { + t.Fatalf("Failed to delete head.") + } + item, err := h.Pop() + if e, a := 10, item.(testHeapObject).val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } + h.Add(mkHeapObj("zab", 30)) + h.Add(mkHeapObj("faz", 30)) + len := h.data.Len() + // Delete non-existing item. + if err = h.Delete(mkHeapObj("non-existent", 10)); err == nil || len != h.data.Len() { + t.Fatalf("Didn't expect any item removal") + } + // Delete tail. + if err = h.Delete(mkHeapObj("bal", 31)); err != nil { + t.Fatalf("Failed to delete tail.") + } + // Delete one of the items with value 30. + if err = h.Delete(mkHeapObj("zab", 30)); err != nil { + t.Fatalf("Failed to delete item.") + } + item, err = h.Pop() + if e, a := 11, item.(testHeapObject).val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } + item, err = h.Pop() + if e, a := 30, item.(testHeapObject).val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } + if h.data.Len() != 0 { + t.Fatalf("expected an empty heap.") + } +} + +// TestHeap_Update tests Heap.Update and ensures that heap invariant is +// preserved after adding items. +func TestHeap_Update(t *testing.T) { + h := NewHeap(testHeapObjectKeyFunc, compareInts) + h.Add(mkHeapObj("foo", 10)) + h.Add(mkHeapObj("bar", 1)) + h.Add(mkHeapObj("bal", 31)) + h.Add(mkHeapObj("baz", 11)) + + // Update an item to a value that should push it to the head. + h.Update(mkHeapObj("baz", 0)) + if h.data.queue[0] != "baz" || h.data.items["baz"].index != 0 { + t.Fatalf("expected baz to be at the head") + } + item, err := h.Pop() + if e, a := 0, item.(testHeapObject).val; err != nil || a != e { + t.Fatalf("expected %d, got %d", e, a) + } + // Update bar to push it farther back in the queue. + h.Update(mkHeapObj("bar", 100)) + if h.data.queue[0] != "foo" || h.data.items["foo"].index != 0 { + t.Fatalf("expected foo to be at the head") + } +} + +// TestHeap_Get tests Heap.Get. +func TestHeap_Get(t *testing.T) { + h := NewHeap(testHeapObjectKeyFunc, compareInts) + h.Add(mkHeapObj("foo", 10)) + h.Add(mkHeapObj("bar", 1)) + h.Add(mkHeapObj("bal", 31)) + h.Add(mkHeapObj("baz", 11)) + + // Get works with the key. + obj, exists, err := h.Get(mkHeapObj("baz", 0)) + if err != nil || exists == false || obj.(testHeapObject).val != 11 { + t.Fatalf("unexpected error in getting element") + } + // Get non-existing object. + _, exists, err = h.Get(mkHeapObj("non-existing", 0)) + if err != nil || exists == true { + t.Fatalf("didn't expect to get any object") + } +} + +// TestHeap_GetByKey tests Heap.GetByKey and is very similar to TestHeap_Get. +func TestHeap_GetByKey(t *testing.T) { + h := NewHeap(testHeapObjectKeyFunc, compareInts) + h.Add(mkHeapObj("foo", 10)) + h.Add(mkHeapObj("bar", 1)) + h.Add(mkHeapObj("bal", 31)) + h.Add(mkHeapObj("baz", 11)) + + obj, exists, err := h.GetByKey("baz") + if err != nil || exists == false || obj.(testHeapObject).val != 11 { + t.Fatalf("unexpected error in getting element") + } + // Get non-existing object. + _, exists, err = h.GetByKey("non-existing") + if err != nil || exists == true { + t.Fatalf("didn't expect to get any object") + } +} + +// TestHeap_List tests Heap.List function. +func TestHeap_List(t *testing.T) { + h := NewHeap(testHeapObjectKeyFunc, compareInts) + list := h.List() + if len(list) != 0 { + t.Errorf("expected an empty list") + } + + items := map[string]int{ + "foo": 10, + "bar": 1, + "bal": 30, + "baz": 11, + "faz": 30, + } + for k, v := range items { + h.Add(mkHeapObj(k, v)) + } + list = h.List() + if len(list) != len(items) { + t.Errorf("expected %d items, got %d", len(items), len(list)) + } + for _, obj := range list { + heapObj := obj.(testHeapObject) + v, ok := items[heapObj.name] + if !ok || v != heapObj.val { + t.Errorf("unexpected item in the list: %v", heapObj) + } + } +}