Merge pull request #83233 from hprateek43/heapPackageMovement

Move heap into its own internal package
This commit is contained in:
Kubernetes Prow Robot 2019-10-05 03:15:23 -07:00 committed by GitHub
commit 9522fb09f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 79 additions and 40 deletions

View File

@ -102,6 +102,7 @@ filegroup(
"//pkg/scheduler/factory:all-srcs", "//pkg/scheduler/factory:all-srcs",
"//pkg/scheduler/framework:all-srcs", "//pkg/scheduler/framework:all-srcs",
"//pkg/scheduler/internal/cache:all-srcs", "//pkg/scheduler/internal/cache:all-srcs",
"//pkg/scheduler/internal/heap:all-srcs",
"//pkg/scheduler/internal/queue:all-srcs", "//pkg/scheduler/internal/queue:all-srcs",
"//pkg/scheduler/metrics:all-srcs", "//pkg/scheduler/metrics:all-srcs",
"//pkg/scheduler/nodeinfo:all-srcs", "//pkg/scheduler/nodeinfo:all-srcs",

View File

@ -0,0 +1,36 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_test(
name = "go_default_test",
srcs = ["heap_test.go"],
embed = [":go_default_library"],
)
go_library(
name = "go_default_library",
srcs = ["heap.go"],
importpath = "k8s.io/kubernetes/pkg/scheduler/internal/heap",
deps = [
"//pkg/scheduler/metrics:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -18,7 +18,7 @@ limitations under the License.
// as cache.heap, however, this heap does not perform synchronization. It leaves // as cache.heap, however, this heap does not perform synchronization. It leaves
// synchronization to the SchedulingQueue. // synchronization to the SchedulingQueue.
package util package heap
import ( import (
"container/heap" "container/heap"
@ -41,9 +41,9 @@ type itemKeyValue struct {
obj interface{} obj interface{}
} }
// heapData is an internal struct that implements the standard heap interface // data is an internal struct that implements the standard heap interface
// and keeps the data stored in the heap. // and keeps the data stored in the heap.
type heapData struct { type data struct {
// items is a map from key of the objects to the objects and their index. // items is a map from key of the objects to the objects and their index.
// We depend on the property that items in the map are in the queue and vice versa. // We depend on the property that items in the map are in the queue and vice versa.
items map[string]*heapItem items map[string]*heapItem
@ -56,16 +56,16 @@ type heapData struct {
// should be deterministic. // should be deterministic.
keyFunc KeyFunc keyFunc KeyFunc
// lessFunc is used to compare two objects in the heap. // lessFunc is used to compare two objects in the heap.
lessFunc LessFunc lessFunc lessFunc
} }
var ( var (
_ = heap.Interface(&heapData{}) // heapData is a standard heap _ = heap.Interface(&data{}) // heapData is a standard heap
) )
// Less compares two objects and returns true if the first one should go // Less compares two objects and returns true if the first one should go
// in front of the second one in the heap. // in front of the second one in the heap.
func (h *heapData) Less(i, j int) bool { func (h *data) Less(i, j int) bool {
if i > len(h.queue) || j > len(h.queue) { if i > len(h.queue) || j > len(h.queue) {
return false return false
} }
@ -81,11 +81,11 @@ func (h *heapData) Less(i, j int) bool {
} }
// Len returns the number of items in the Heap. // Len returns the number of items in the Heap.
func (h *heapData) Len() int { return len(h.queue) } func (h *data) Len() int { return len(h.queue) }
// Swap implements swapping of two elements in the heap. This is a part of standard // Swap implements swapping of two elements in the heap. This is a part of standard
// heap interface and should never be called directly. // heap interface and should never be called directly.
func (h *heapData) Swap(i, j int) { func (h *data) Swap(i, j int) {
h.queue[i], h.queue[j] = h.queue[j], h.queue[i] h.queue[i], h.queue[j] = h.queue[j], h.queue[i]
item := h.items[h.queue[i]] item := h.items[h.queue[i]]
item.index = i item.index = i
@ -94,7 +94,7 @@ func (h *heapData) Swap(i, j int) {
} }
// Push is supposed to be called by heap.Push only. // Push is supposed to be called by heap.Push only.
func (h *heapData) Push(kv interface{}) { func (h *data) Push(kv interface{}) {
keyValue := kv.(*itemKeyValue) keyValue := kv.(*itemKeyValue)
n := len(h.queue) n := len(h.queue)
h.items[keyValue.key] = &heapItem{keyValue.obj, n} h.items[keyValue.key] = &heapItem{keyValue.obj, n}
@ -102,7 +102,7 @@ func (h *heapData) Push(kv interface{}) {
} }
// Pop is supposed to be called by heap.Pop only. // Pop is supposed to be called by heap.Pop only.
func (h *heapData) Pop() interface{} { func (h *data) Pop() interface{} {
key := h.queue[len(h.queue)-1] key := h.queue[len(h.queue)-1]
h.queue = h.queue[0 : len(h.queue)-1] h.queue = h.queue[0 : len(h.queue)-1]
item, ok := h.items[key] item, ok := h.items[key]
@ -115,7 +115,7 @@ func (h *heapData) Pop() interface{} {
} }
// Peek is supposed to be called by heap.Peek only. // Peek is supposed to be called by heap.Peek only.
func (h *heapData) Peek() interface{} { func (h *data) Peek() interface{} {
if len(h.queue) > 0 { if len(h.queue) > 0 {
return h.items[h.queue[0]].obj return h.items[h.queue[0]].obj
} }
@ -127,7 +127,7 @@ func (h *heapData) Peek() interface{} {
type Heap struct { type Heap struct {
// data stores objects and has a queue that keeps their ordering according // data stores objects and has a queue that keeps their ordering according
// to the heap invariant. // to the heap invariant.
data *heapData data *data
// metricRecorder updates the counter when elements of a heap get added or // metricRecorder updates the counter when elements of a heap get added or
// removed, and it does nothing if it's nil // removed, and it does nothing if it's nil
metricRecorder metrics.MetricRecorder metricRecorder metrics.MetricRecorder
@ -239,15 +239,15 @@ func (h *Heap) Len() int {
return len(h.data.queue) return len(h.data.queue)
} }
// NewHeap returns a Heap which can be used to queue up items to process. // New returns a Heap which can be used to queue up items to process.
func NewHeap(keyFn KeyFunc, lessFn LessFunc) *Heap { func New(keyFn KeyFunc, lessFn lessFunc) *Heap {
return NewHeapWithRecorder(keyFn, lessFn, nil) return NewWithRecorder(keyFn, lessFn, nil)
} }
// NewHeapWithRecorder wraps an optional metricRecorder to compose a Heap object. // NewWithRecorder wraps an optional metricRecorder to compose a Heap object.
func NewHeapWithRecorder(keyFn KeyFunc, lessFn LessFunc, metricRecorder metrics.MetricRecorder) *Heap { func NewWithRecorder(keyFn KeyFunc, lessFn lessFunc, metricRecorder metrics.MetricRecorder) *Heap {
return &Heap{ return &Heap{
data: &heapData{ data: &data{
items: map[string]*heapItem{}, items: map[string]*heapItem{},
queue: []string{}, queue: []string{},
keyFunc: keyFn, keyFunc: keyFn,
@ -256,3 +256,7 @@ func NewHeapWithRecorder(keyFn KeyFunc, lessFn LessFunc, metricRecorder metrics.
metricRecorder: metricRecorder, metricRecorder: metricRecorder,
} }
} }
// lessFunc is a function that receives two items and returns true if the first
// item should be placed before the second one when the list is sorted.
type lessFunc = func(item1, item2 interface{}) bool

View File

@ -17,7 +17,7 @@ limitations under the License.
// This file was copied from client-go/tools/cache/heap.go and modified // This file was copied from client-go/tools/cache/heap.go and modified
// for our non thread-safe heap // for our non thread-safe heap
package util package heap
import ( import (
"testing" "testing"
@ -44,7 +44,7 @@ func compareInts(val1 interface{}, val2 interface{}) bool {
// TestHeapBasic tests Heap invariant // TestHeapBasic tests Heap invariant
func TestHeapBasic(t *testing.T) { func TestHeapBasic(t *testing.T) {
h := NewHeap(testHeapObjectKeyFunc, compareInts) h := New(testHeapObjectKeyFunc, compareInts)
const amount = 500 const amount = 500
var i int var i int
@ -67,7 +67,7 @@ func TestHeapBasic(t *testing.T) {
// Tests Heap.Add and ensures that heap invariant is preserved after adding items. // Tests Heap.Add and ensures that heap invariant is preserved after adding items.
func TestHeap_Add(t *testing.T) { func TestHeap_Add(t *testing.T) {
h := NewHeap(testHeapObjectKeyFunc, compareInts) h := New(testHeapObjectKeyFunc, compareInts)
h.Add(mkHeapObj("foo", 10)) h.Add(mkHeapObj("foo", 10))
h.Add(mkHeapObj("bar", 1)) h.Add(mkHeapObj("bar", 1))
h.Add(mkHeapObj("baz", 11)) h.Add(mkHeapObj("baz", 11))
@ -97,7 +97,7 @@ func TestHeap_Add(t *testing.T) {
// TestHeap_AddIfNotPresent tests Heap.AddIfNotPresent and ensures that heap // TestHeap_AddIfNotPresent tests Heap.AddIfNotPresent and ensures that heap
// invariant is preserved after adding items. // invariant is preserved after adding items.
func TestHeap_AddIfNotPresent(t *testing.T) { func TestHeap_AddIfNotPresent(t *testing.T) {
h := NewHeap(testHeapObjectKeyFunc, compareInts) h := New(testHeapObjectKeyFunc, compareInts)
h.AddIfNotPresent(mkHeapObj("foo", 10)) h.AddIfNotPresent(mkHeapObj("foo", 10))
h.AddIfNotPresent(mkHeapObj("bar", 1)) h.AddIfNotPresent(mkHeapObj("bar", 1))
h.AddIfNotPresent(mkHeapObj("baz", 11)) h.AddIfNotPresent(mkHeapObj("baz", 11))
@ -133,7 +133,7 @@ func TestHeap_AddIfNotPresent(t *testing.T) {
// TestHeap_Delete tests Heap.Delete and ensures that heap invariant is // TestHeap_Delete tests Heap.Delete and ensures that heap invariant is
// preserved after deleting items. // preserved after deleting items.
func TestHeap_Delete(t *testing.T) { func TestHeap_Delete(t *testing.T) {
h := NewHeap(testHeapObjectKeyFunc, compareInts) h := New(testHeapObjectKeyFunc, compareInts)
h.Add(mkHeapObj("foo", 10)) h.Add(mkHeapObj("foo", 10))
h.Add(mkHeapObj("bar", 1)) h.Add(mkHeapObj("bar", 1))
h.Add(mkHeapObj("bal", 31)) h.Add(mkHeapObj("bal", 31))
@ -178,7 +178,7 @@ func TestHeap_Delete(t *testing.T) {
// TestHeap_Update tests Heap.Update and ensures that heap invariant is // TestHeap_Update tests Heap.Update and ensures that heap invariant is
// preserved after adding items. // preserved after adding items.
func TestHeap_Update(t *testing.T) { func TestHeap_Update(t *testing.T) {
h := NewHeap(testHeapObjectKeyFunc, compareInts) h := New(testHeapObjectKeyFunc, compareInts)
h.Add(mkHeapObj("foo", 10)) h.Add(mkHeapObj("foo", 10))
h.Add(mkHeapObj("bar", 1)) h.Add(mkHeapObj("bar", 1))
h.Add(mkHeapObj("bal", 31)) h.Add(mkHeapObj("bal", 31))
@ -202,7 +202,7 @@ func TestHeap_Update(t *testing.T) {
// TestHeap_Get tests Heap.Get. // TestHeap_Get tests Heap.Get.
func TestHeap_Get(t *testing.T) { func TestHeap_Get(t *testing.T) {
h := NewHeap(testHeapObjectKeyFunc, compareInts) h := New(testHeapObjectKeyFunc, compareInts)
h.Add(mkHeapObj("foo", 10)) h.Add(mkHeapObj("foo", 10))
h.Add(mkHeapObj("bar", 1)) h.Add(mkHeapObj("bar", 1))
h.Add(mkHeapObj("bal", 31)) h.Add(mkHeapObj("bal", 31))
@ -222,7 +222,7 @@ func TestHeap_Get(t *testing.T) {
// TestHeap_GetByKey tests Heap.GetByKey and is very similar to TestHeap_Get. // TestHeap_GetByKey tests Heap.GetByKey and is very similar to TestHeap_Get.
func TestHeap_GetByKey(t *testing.T) { func TestHeap_GetByKey(t *testing.T) {
h := NewHeap(testHeapObjectKeyFunc, compareInts) h := New(testHeapObjectKeyFunc, compareInts)
h.Add(mkHeapObj("foo", 10)) h.Add(mkHeapObj("foo", 10))
h.Add(mkHeapObj("bar", 1)) h.Add(mkHeapObj("bar", 1))
h.Add(mkHeapObj("bal", 31)) h.Add(mkHeapObj("bal", 31))
@ -241,7 +241,7 @@ func TestHeap_GetByKey(t *testing.T) {
// TestHeap_List tests Heap.List function. // TestHeap_List tests Heap.List function.
func TestHeap_List(t *testing.T) { func TestHeap_List(t *testing.T) {
h := NewHeap(testHeapObjectKeyFunc, compareInts) h := New(testHeapObjectKeyFunc, compareInts)
list := h.List() list := h.List()
if len(list) != 0 { if len(list) != 0 {
t.Errorf("expected an empty list") t.Errorf("expected an empty list")

View File

@ -13,6 +13,7 @@ go_library(
"//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithm/priorities/util:go_default_library", "//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/heap:go_default_library",
"//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/util:go_default_library", "//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",

View File

@ -40,6 +40,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/internal/heap"
"k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/pkg/scheduler/util"
) )
@ -116,10 +117,10 @@ type PriorityQueue struct {
// activeQ is heap structure that scheduler actively looks at to find pods to // activeQ is heap structure that scheduler actively looks at to find pods to
// schedule. Head of heap is the highest priority pod. // schedule. Head of heap is the highest priority pod.
activeQ *util.Heap activeQ *heap.Heap
// podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff // podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff
// are popped from this heap before the scheduler looks at activeQ // are popped from this heap before the scheduler looks at activeQ
podBackoffQ *util.Heap podBackoffQ *heap.Heap
// unschedulableQ holds pods that have been tried and determined unschedulable. // unschedulableQ holds pods that have been tried and determined unschedulable.
unschedulableQ *UnschedulablePodsMap unschedulableQ *UnschedulablePodsMap
// nominatedPods is a structures that stores pods which are nominated to run // nominatedPods is a structures that stores pods which are nominated to run
@ -183,13 +184,13 @@ func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock, fwk frame
clock: clock, clock: clock,
stop: stop, stop: stop,
podBackoff: NewPodBackoffMap(1*time.Second, 10*time.Second), podBackoff: NewPodBackoffMap(1*time.Second, 10*time.Second),
activeQ: util.NewHeapWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()), activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()), unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()),
nominatedPods: newNominatedPodMap(), nominatedPods: newNominatedPodMap(),
moveRequestCycle: -1, moveRequestCycle: -1,
} }
pq.cond.L = &pq.lock pq.cond.L = &pq.lock
pq.podBackoffQ = util.NewHeapWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder()) pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
pq.run() pq.run()

View File

@ -10,7 +10,6 @@ go_test(
name = "go_default_test", name = "go_default_test",
srcs = [ srcs = [
"error_channel_test.go", "error_channel_test.go",
"heap_test.go",
"utils_test.go", "utils_test.go",
], ],
embed = [":go_default_library"], embed = [":go_default_library"],
@ -28,17 +27,14 @@ go_library(
srcs = [ srcs = [
"clock.go", "clock.go",
"error_channel.go", "error_channel.go",
"heap.go",
"utils.go", "utils.go",
], ],
importpath = "k8s.io/kubernetes/pkg/scheduler/util", importpath = "k8s.io/kubernetes/pkg/scheduler/util",
deps = [ deps = [
"//pkg/api/v1/pod:go_default_library", "//pkg/api/v1/pod:go_default_library",
"//pkg/scheduler/apis/extender/v1:go_default_library", "//pkg/scheduler/apis/extender/v1:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/klog:go_default_library",
], ],
) )

View File

@ -62,6 +62,10 @@ func GetPodStartTime(pod *v1.Pod) *metav1.Time {
return &metav1.Time{Time: time.Now()} return &metav1.Time{Time: time.Now()}
} }
// lessFunc is a function that receives two items and returns true if the first
// item should be placed before the second one when the list is sorted.
type lessFunc = func(item1, item2 interface{}) bool
// GetEarliestPodStartTime returns the earliest start time of all pods that // GetEarliestPodStartTime returns the earliest start time of all pods that
// have the highest priority among all victims. // have the highest priority among all victims.
func GetEarliestPodStartTime(victims *extenderv1.Victims) *metav1.Time { func GetEarliestPodStartTime(victims *extenderv1.Victims) *metav1.Time {
@ -91,13 +95,9 @@ func GetEarliestPodStartTime(victims *extenderv1.Victims) *metav1.Time {
// SortableList is a list that implements sort.Interface. // SortableList is a list that implements sort.Interface.
type SortableList struct { type SortableList struct {
Items []interface{} Items []interface{}
CompFunc LessFunc CompFunc lessFunc
} }
// LessFunc is a function that receives two items and returns true if the first
// item should be placed before the second one when the list is sorted.
type LessFunc func(item1, item2 interface{}) bool
var _ = sort.Interface(&SortableList{}) var _ = sort.Interface(&SortableList{})
func (l *SortableList) Len() int { return len(l.Items) } func (l *SortableList) Len() int { return len(l.Items) }