From f462a31e9b3c85ea7ba287e04357fa63b9fbd284 Mon Sep 17 00:00:00 2001 From: Harsh Singh Date: Fri, 27 Sep 2019 14:27:27 +0530 Subject: [PATCH] Move heap into its own internal package --- pkg/scheduler/BUILD | 1 + pkg/scheduler/internal/heap/BUILD | 36 +++++++++++++++++ pkg/scheduler/{util => internal/heap}/heap.go | 40 ++++++++++--------- .../{util => internal/heap}/heap_test.go | 18 ++++----- pkg/scheduler/internal/queue/BUILD | 1 + .../internal/queue/scheduling_queue.go | 9 +++-- pkg/scheduler/util/BUILD | 4 -- pkg/scheduler/util/utils.go | 10 ++--- 8 files changed, 79 insertions(+), 40 deletions(-) create mode 100644 pkg/scheduler/internal/heap/BUILD rename pkg/scheduler/{util => internal/heap}/heap.go (86%) rename pkg/scheduler/{util => internal/heap}/heap_test.go (94%) diff --git a/pkg/scheduler/BUILD b/pkg/scheduler/BUILD index bce3f6da6b2..5cca71bd3be 100644 --- a/pkg/scheduler/BUILD +++ b/pkg/scheduler/BUILD @@ -102,6 +102,7 @@ filegroup( "//pkg/scheduler/factory:all-srcs", "//pkg/scheduler/framework:all-srcs", "//pkg/scheduler/internal/cache:all-srcs", + "//pkg/scheduler/internal/heap:all-srcs", "//pkg/scheduler/internal/queue:all-srcs", "//pkg/scheduler/metrics:all-srcs", "//pkg/scheduler/nodeinfo:all-srcs", diff --git a/pkg/scheduler/internal/heap/BUILD b/pkg/scheduler/internal/heap/BUILD new file mode 100644 index 00000000000..ed96d8444d7 --- /dev/null +++ b/pkg/scheduler/internal/heap/BUILD @@ -0,0 +1,36 @@ +package(default_visibility = ["//visibility:public"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_test( + name = "go_default_test", + srcs = ["heap_test.go"], + embed = [":go_default_library"], +) + +go_library( + name = "go_default_library", + srcs = ["heap.go"], + importpath = "k8s.io/kubernetes/pkg/scheduler/internal/heap", + deps = [ + "//pkg/scheduler/metrics:go_default_library", + "//staging/src/k8s.io/client-go/tools/cache:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/pkg/scheduler/util/heap.go b/pkg/scheduler/internal/heap/heap.go similarity index 86% rename from pkg/scheduler/util/heap.go rename to pkg/scheduler/internal/heap/heap.go index 13d6b2ffd13..49bf9d12388 100644 --- a/pkg/scheduler/util/heap.go +++ b/pkg/scheduler/internal/heap/heap.go @@ -18,7 +18,7 @@ limitations under the License. // as cache.heap, however, this heap does not perform synchronization. It leaves // synchronization to the SchedulingQueue. -package util +package heap import ( "container/heap" @@ -41,9 +41,9 @@ type itemKeyValue struct { obj interface{} } -// heapData is an internal struct that implements the standard heap interface +// data is an internal struct that implements the standard heap interface // and keeps the data stored in the heap. -type heapData struct { +type data struct { // items is a map from key of the objects to the objects and their index. // We depend on the property that items in the map are in the queue and vice versa. items map[string]*heapItem @@ -56,16 +56,16 @@ type heapData struct { // should be deterministic. keyFunc KeyFunc // lessFunc is used to compare two objects in the heap. - lessFunc LessFunc + lessFunc lessFunc } var ( - _ = heap.Interface(&heapData{}) // heapData is a standard heap + _ = heap.Interface(&data{}) // heapData is a standard heap ) // Less compares two objects and returns true if the first one should go // in front of the second one in the heap. -func (h *heapData) Less(i, j int) bool { +func (h *data) Less(i, j int) bool { if i > len(h.queue) || j > len(h.queue) { return false } @@ -81,11 +81,11 @@ func (h *heapData) Less(i, j int) bool { } // Len returns the number of items in the Heap. -func (h *heapData) Len() int { return len(h.queue) } +func (h *data) Len() int { return len(h.queue) } // Swap implements swapping of two elements in the heap. This is a part of standard // heap interface and should never be called directly. -func (h *heapData) Swap(i, j int) { +func (h *data) Swap(i, j int) { h.queue[i], h.queue[j] = h.queue[j], h.queue[i] item := h.items[h.queue[i]] item.index = i @@ -94,7 +94,7 @@ func (h *heapData) Swap(i, j int) { } // Push is supposed to be called by heap.Push only. -func (h *heapData) Push(kv interface{}) { +func (h *data) Push(kv interface{}) { keyValue := kv.(*itemKeyValue) n := len(h.queue) h.items[keyValue.key] = &heapItem{keyValue.obj, n} @@ -102,7 +102,7 @@ func (h *heapData) Push(kv interface{}) { } // Pop is supposed to be called by heap.Pop only. -func (h *heapData) Pop() interface{} { +func (h *data) Pop() interface{} { key := h.queue[len(h.queue)-1] h.queue = h.queue[0 : len(h.queue)-1] item, ok := h.items[key] @@ -115,7 +115,7 @@ func (h *heapData) Pop() interface{} { } // Peek is supposed to be called by heap.Peek only. -func (h *heapData) Peek() interface{} { +func (h *data) Peek() interface{} { if len(h.queue) > 0 { return h.items[h.queue[0]].obj } @@ -127,7 +127,7 @@ func (h *heapData) Peek() interface{} { type Heap struct { // data stores objects and has a queue that keeps their ordering according // to the heap invariant. - data *heapData + data *data // metricRecorder updates the counter when elements of a heap get added or // removed, and it does nothing if it's nil metricRecorder metrics.MetricRecorder @@ -239,15 +239,15 @@ func (h *Heap) Len() int { return len(h.data.queue) } -// NewHeap returns a Heap which can be used to queue up items to process. -func NewHeap(keyFn KeyFunc, lessFn LessFunc) *Heap { - return NewHeapWithRecorder(keyFn, lessFn, nil) +// New returns a Heap which can be used to queue up items to process. +func New(keyFn KeyFunc, lessFn lessFunc) *Heap { + return NewWithRecorder(keyFn, lessFn, nil) } -// NewHeapWithRecorder wraps an optional metricRecorder to compose a Heap object. -func NewHeapWithRecorder(keyFn KeyFunc, lessFn LessFunc, metricRecorder metrics.MetricRecorder) *Heap { +// NewWithRecorder wraps an optional metricRecorder to compose a Heap object. +func NewWithRecorder(keyFn KeyFunc, lessFn lessFunc, metricRecorder metrics.MetricRecorder) *Heap { return &Heap{ - data: &heapData{ + data: &data{ items: map[string]*heapItem{}, queue: []string{}, keyFunc: keyFn, @@ -256,3 +256,7 @@ func NewHeapWithRecorder(keyFn KeyFunc, lessFn LessFunc, metricRecorder metrics. metricRecorder: metricRecorder, } } + +// lessFunc is a function that receives two items and returns true if the first +// item should be placed before the second one when the list is sorted. +type lessFunc = func(item1, item2 interface{}) bool diff --git a/pkg/scheduler/util/heap_test.go b/pkg/scheduler/internal/heap/heap_test.go similarity index 94% rename from pkg/scheduler/util/heap_test.go rename to pkg/scheduler/internal/heap/heap_test.go index 62812ec4c91..40a8f41da88 100644 --- a/pkg/scheduler/util/heap_test.go +++ b/pkg/scheduler/internal/heap/heap_test.go @@ -17,7 +17,7 @@ limitations under the License. // This file was copied from client-go/tools/cache/heap.go and modified // for our non thread-safe heap -package util +package heap import ( "testing" @@ -44,7 +44,7 @@ func compareInts(val1 interface{}, val2 interface{}) bool { // TestHeapBasic tests Heap invariant func TestHeapBasic(t *testing.T) { - h := NewHeap(testHeapObjectKeyFunc, compareInts) + h := New(testHeapObjectKeyFunc, compareInts) const amount = 500 var i int @@ -67,7 +67,7 @@ func TestHeapBasic(t *testing.T) { // Tests Heap.Add and ensures that heap invariant is preserved after adding items. func TestHeap_Add(t *testing.T) { - h := NewHeap(testHeapObjectKeyFunc, compareInts) + h := New(testHeapObjectKeyFunc, compareInts) h.Add(mkHeapObj("foo", 10)) h.Add(mkHeapObj("bar", 1)) h.Add(mkHeapObj("baz", 11)) @@ -97,7 +97,7 @@ func TestHeap_Add(t *testing.T) { // TestHeap_AddIfNotPresent tests Heap.AddIfNotPresent and ensures that heap // invariant is preserved after adding items. func TestHeap_AddIfNotPresent(t *testing.T) { - h := NewHeap(testHeapObjectKeyFunc, compareInts) + h := New(testHeapObjectKeyFunc, compareInts) h.AddIfNotPresent(mkHeapObj("foo", 10)) h.AddIfNotPresent(mkHeapObj("bar", 1)) h.AddIfNotPresent(mkHeapObj("baz", 11)) @@ -133,7 +133,7 @@ func TestHeap_AddIfNotPresent(t *testing.T) { // TestHeap_Delete tests Heap.Delete and ensures that heap invariant is // preserved after deleting items. func TestHeap_Delete(t *testing.T) { - h := NewHeap(testHeapObjectKeyFunc, compareInts) + h := New(testHeapObjectKeyFunc, compareInts) h.Add(mkHeapObj("foo", 10)) h.Add(mkHeapObj("bar", 1)) h.Add(mkHeapObj("bal", 31)) @@ -178,7 +178,7 @@ func TestHeap_Delete(t *testing.T) { // TestHeap_Update tests Heap.Update and ensures that heap invariant is // preserved after adding items. func TestHeap_Update(t *testing.T) { - h := NewHeap(testHeapObjectKeyFunc, compareInts) + h := New(testHeapObjectKeyFunc, compareInts) h.Add(mkHeapObj("foo", 10)) h.Add(mkHeapObj("bar", 1)) h.Add(mkHeapObj("bal", 31)) @@ -202,7 +202,7 @@ func TestHeap_Update(t *testing.T) { // TestHeap_Get tests Heap.Get. func TestHeap_Get(t *testing.T) { - h := NewHeap(testHeapObjectKeyFunc, compareInts) + h := New(testHeapObjectKeyFunc, compareInts) h.Add(mkHeapObj("foo", 10)) h.Add(mkHeapObj("bar", 1)) h.Add(mkHeapObj("bal", 31)) @@ -222,7 +222,7 @@ func TestHeap_Get(t *testing.T) { // TestHeap_GetByKey tests Heap.GetByKey and is very similar to TestHeap_Get. func TestHeap_GetByKey(t *testing.T) { - h := NewHeap(testHeapObjectKeyFunc, compareInts) + h := New(testHeapObjectKeyFunc, compareInts) h.Add(mkHeapObj("foo", 10)) h.Add(mkHeapObj("bar", 1)) h.Add(mkHeapObj("bal", 31)) @@ -241,7 +241,7 @@ func TestHeap_GetByKey(t *testing.T) { // TestHeap_List tests Heap.List function. func TestHeap_List(t *testing.T) { - h := NewHeap(testHeapObjectKeyFunc, compareInts) + h := New(testHeapObjectKeyFunc, compareInts) list := h.List() if len(list) != 0 { t.Errorf("expected an empty list") diff --git a/pkg/scheduler/internal/queue/BUILD b/pkg/scheduler/internal/queue/BUILD index 19d02afa1e0..7e0d1333b50 100644 --- a/pkg/scheduler/internal/queue/BUILD +++ b/pkg/scheduler/internal/queue/BUILD @@ -13,6 +13,7 @@ go_library( "//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/algorithm/priorities/util:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", + "//pkg/scheduler/internal/heap:go_default_library", "//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 64debc829f7..1aef9381ffa 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -40,6 +40,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" + "k8s.io/kubernetes/pkg/scheduler/internal/heap" "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/util" ) @@ -116,10 +117,10 @@ type PriorityQueue struct { // activeQ is heap structure that scheduler actively looks at to find pods to // schedule. Head of heap is the highest priority pod. - activeQ *util.Heap + activeQ *heap.Heap // podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff // are popped from this heap before the scheduler looks at activeQ - podBackoffQ *util.Heap + podBackoffQ *heap.Heap // unschedulableQ holds pods that have been tried and determined unschedulable. unschedulableQ *UnschedulablePodsMap // nominatedPods is a structures that stores pods which are nominated to run @@ -183,13 +184,13 @@ func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock, fwk frame clock: clock, stop: stop, podBackoff: NewPodBackoffMap(1*time.Second, 10*time.Second), - activeQ: util.NewHeapWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()), + activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()), unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()), nominatedPods: newNominatedPodMap(), moveRequestCycle: -1, } pq.cond.L = &pq.lock - pq.podBackoffQ = util.NewHeapWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder()) + pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder()) pq.run() diff --git a/pkg/scheduler/util/BUILD b/pkg/scheduler/util/BUILD index c187e5bc982..b885c176bb7 100644 --- a/pkg/scheduler/util/BUILD +++ b/pkg/scheduler/util/BUILD @@ -10,7 +10,6 @@ go_test( name = "go_default_test", srcs = [ "error_channel_test.go", - "heap_test.go", "utils_test.go", ], embed = [":go_default_library"], @@ -28,17 +27,14 @@ go_library( srcs = [ "clock.go", "error_channel.go", - "heap.go", "utils.go", ], importpath = "k8s.io/kubernetes/pkg/scheduler/util", deps = [ "//pkg/api/v1/pod:go_default_library", "//pkg/scheduler/apis/extender/v1:go_default_library", - "//pkg/scheduler/metrics:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], ) diff --git a/pkg/scheduler/util/utils.go b/pkg/scheduler/util/utils.go index f4c0d7df237..7dc2a7fa161 100644 --- a/pkg/scheduler/util/utils.go +++ b/pkg/scheduler/util/utils.go @@ -62,6 +62,10 @@ func GetPodStartTime(pod *v1.Pod) *metav1.Time { return &metav1.Time{Time: time.Now()} } +// lessFunc is a function that receives two items and returns true if the first +// item should be placed before the second one when the list is sorted. +type lessFunc = func(item1, item2 interface{}) bool + // GetEarliestPodStartTime returns the earliest start time of all pods that // have the highest priority among all victims. func GetEarliestPodStartTime(victims *extenderv1.Victims) *metav1.Time { @@ -91,13 +95,9 @@ func GetEarliestPodStartTime(victims *extenderv1.Victims) *metav1.Time { // SortableList is a list that implements sort.Interface. type SortableList struct { Items []interface{} - CompFunc LessFunc + CompFunc lessFunc } -// LessFunc is a function that receives two items and returns true if the first -// item should be placed before the second one when the list is sorted. -type LessFunc func(item1, item2 interface{}) bool - var _ = sort.Interface(&SortableList{}) func (l *SortableList) Len() int { return len(l.Items) }