diff --git a/pkg/controller/.import-restrictions b/pkg/controller/.import-restrictions index 8aab20499e4..ad130f6dad2 100644 --- a/pkg/controller/.import-restrictions +++ b/pkg/controller/.import-restrictions @@ -258,7 +258,8 @@ "k8s.io/kubernetes/pkg/fieldpath", "k8s.io/kubernetes/pkg/scheduler/volumebinder", "k8s.io/kubernetes/pkg/util/resizefs", - "k8s.io/kubernetes/pkg/apis/apps" + "k8s.io/kubernetes/pkg/apis/apps", + "k8s.io/kubernetes/pkg/scheduler/metrics" ] }, { diff --git a/pkg/scheduler/internal/queue/BUILD b/pkg/scheduler/internal/queue/BUILD index 41a6da838f2..23ec5520400 100644 --- a/pkg/scheduler/internal/queue/BUILD +++ b/pkg/scheduler/internal/queue/BUILD @@ -12,6 +12,7 @@ go_library( "//pkg/api/v1/pod:go_default_library", "//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/algorithm/priorities/util:go_default_library", + "//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", @@ -31,11 +32,13 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/api/v1/pod:go_default_library", + "//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", + "//vendor/github.com/prometheus/client_model/go:go_default_library", ], ) diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 4b442808a06..43b06c8257f 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -42,6 +42,7 @@ import ( podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" + "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/util" ) @@ -283,13 +284,13 @@ func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock) *Priority clock: clock, stop: stop, podBackoff: NewPodBackoffMap(1*time.Second, 10*time.Second), - activeQ: util.NewHeap(podInfoKeyFunc, activeQComp), - unschedulableQ: newUnschedulablePodsMap(), + activeQ: util.NewHeapWithRecorder(podInfoKeyFunc, activeQComp, metrics.NewActivePodsRecorder()), + unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()), nominatedPods: newNominatedPodMap(), moveRequestCycle: -1, } pq.cond.L = &pq.lock - pq.podBackoffQ = util.NewHeap(podInfoKeyFunc, pq.podsCompareBackoffCompleted) + pq.podBackoffQ = util.NewHeapWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder()) pq.run() @@ -777,16 +778,27 @@ type UnschedulablePodsMap struct { // podInfoMap is a map key by a pod's full-name and the value is a pointer to the podInfo. podInfoMap map[string]*podInfo keyFunc func(*v1.Pod) string + // metricRecorder updates the counter when elements of an unschedulablePodsMap + // get added or removed, and it does nothing if it's nil + metricRecorder metrics.MetricRecorder } // Add adds a pod to the unschedulable podInfoMap. func (u *UnschedulablePodsMap) addOrUpdate(pInfo *podInfo) { - u.podInfoMap[u.keyFunc(pInfo.pod)] = pInfo + podID := u.keyFunc(pInfo.pod) + if _, exists := u.podInfoMap[podID]; !exists && u.metricRecorder != nil { + u.metricRecorder.Inc() + } + u.podInfoMap[podID] = pInfo } // Delete deletes a pod from the unschedulable podInfoMap. func (u *UnschedulablePodsMap) delete(pod *v1.Pod) { - delete(u.podInfoMap, u.keyFunc(pod)) + podID := u.keyFunc(pod) + if _, exists := u.podInfoMap[podID]; exists && u.metricRecorder != nil { + u.metricRecorder.Dec() + } + delete(u.podInfoMap, podID) } // Get returns the podInfo if a pod with the same key as the key of the given "pod" @@ -802,13 +814,17 @@ func (u *UnschedulablePodsMap) get(pod *v1.Pod) *podInfo { // Clear removes all the entries from the unschedulable podInfoMap. func (u *UnschedulablePodsMap) clear() { u.podInfoMap = make(map[string]*podInfo) + if u.metricRecorder != nil { + u.metricRecorder.Clear() + } } // newUnschedulablePodsMap initializes a new object of UnschedulablePodsMap. -func newUnschedulablePodsMap() *UnschedulablePodsMap { +func newUnschedulablePodsMap(metricRecorder metrics.MetricRecorder) *UnschedulablePodsMap { return &UnschedulablePodsMap{ - podInfoMap: make(map[string]*podInfo), - keyFunc: util.GetPodFullName, + podInfoMap: make(map[string]*podInfo), + keyFunc: util.GetPodFullName, + metricRecorder: metricRecorder, } } diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 91f4dc7abb4..d9ba6790eaf 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -23,11 +23,13 @@ import ( "testing" "time" + dto "github.com/prometheus/client_model/go" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/clock" podutil "k8s.io/kubernetes/pkg/api/v1/pod" + "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/util" ) @@ -647,7 +649,7 @@ func TestUnschedulablePodsMap(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - upm := newUnschedulablePodsMap() + upm := newUnschedulablePodsMap(nil) for _, p := range test.podsToAdd { upm.addOrUpdate(newPodInfoNoTimestamp(p)) } @@ -987,6 +989,48 @@ func TestHighProirotyFlushUnschedulableQLeftover(t *testing.T) { } } +type operation func(queue *PriorityQueue, pInfo *podInfo) + +var ( + addPodActiveQ = func(queue *PriorityQueue, pInfo *podInfo) { + queue.lock.Lock() + queue.activeQ.Add(pInfo) + queue.lock.Unlock() + } + updatePodActiveQ = func(queue *PriorityQueue, pInfo *podInfo) { + queue.lock.Lock() + queue.activeQ.Update(pInfo) + queue.lock.Unlock() + } + addPodUnschedulableQ = func(queue *PriorityQueue, pInfo *podInfo) { + queue.lock.Lock() + // Update pod condition to unschedulable. + podutil.UpdatePodCondition(&pInfo.pod.Status, &v1.PodCondition{ + Type: v1.PodScheduled, + Status: v1.ConditionFalse, + Reason: v1.PodReasonUnschedulable, + Message: "fake scheduling failure", + }) + queue.unschedulableQ.addOrUpdate(pInfo) + queue.lock.Unlock() + } + addPodBackoffQ = func(queue *PriorityQueue, pInfo *podInfo) { + queue.lock.Lock() + queue.podBackoffQ.Add(pInfo) + queue.lock.Unlock() + } + moveAllToActiveQ = func(queue *PriorityQueue, _ *podInfo) { + queue.MoveAllToActiveQueue() + } + backoffPod = func(queue *PriorityQueue, pInfo *podInfo) { + queue.backoffPod(pInfo.pod) + } + flushBackoffQ = func(queue *PriorityQueue, _ *podInfo) { + queue.clock.(*clock.FakeClock).Step(2 * time.Second) + queue.flushBackoffQCompleted() + } +) + // TestPodTimestamp tests the operations related to podInfo. func TestPodTimestamp(t *testing.T) { pod1 := &v1.Pod{ @@ -1021,101 +1065,61 @@ func TestPodTimestamp(t *testing.T) { timestamp: timestamp.Add(time.Second), } - var queue *PriorityQueue - type operation = func() - addPodActiveQ := func(pInfo *podInfo) operation { - return func() { - queue.lock.Lock() - defer queue.lock.Unlock() - queue.activeQ.Add(pInfo) - } - } - updatePodActiveQ := func(pInfo *podInfo) operation { - return func() { - queue.lock.Lock() - defer queue.lock.Unlock() - queue.activeQ.Update(pInfo) - } - } - addPodUnschedulableQ := func(pInfo *podInfo) operation { - return func() { - queue.lock.Lock() - defer queue.lock.Unlock() - // Update pod condition to unschedulable. - podutil.UpdatePodCondition(&pInfo.pod.Status, &v1.PodCondition{ - Type: v1.PodScheduled, - Status: v1.ConditionFalse, - Reason: v1.PodReasonUnschedulable, - Message: "fake scheduling failure", - }) - queue.unschedulableQ.addOrUpdate(pInfo) - } - } - addPodBackoffQ := func(pInfo *podInfo) operation { - return func() { - queue.lock.Lock() - defer queue.lock.Unlock() - queue.podBackoffQ.Add(pInfo) - } - } - moveAllToActiveQ := func() operation { - return func() { - queue.MoveAllToActiveQueue() - } - } - backoffPod := func(pInfo *podInfo) operation { - return func() { - queue.backoffPod(pInfo.pod) - } - } - flushBackoffQ := func() operation { - return func() { - queue.clock.(*clock.FakeClock).Step(2 * time.Second) - queue.flushBackoffQCompleted() - } - } tests := []struct { name string operations []operation + operands []*podInfo expected []*podInfo }{ { name: "add two pod to activeQ and sort them by the timestamp", operations: []operation{ - addPodActiveQ(pInfo2), addPodActiveQ(pInfo1), + addPodActiveQ, + addPodActiveQ, }, + operands: []*podInfo{pInfo2, pInfo1}, expected: []*podInfo{pInfo1, pInfo2}, }, { name: "update two pod to activeQ and sort them by the timestamp", operations: []operation{ - updatePodActiveQ(pInfo2), updatePodActiveQ(pInfo1), + updatePodActiveQ, + updatePodActiveQ, }, + operands: []*podInfo{pInfo2, pInfo1}, expected: []*podInfo{pInfo1, pInfo2}, }, { name: "add two pod to unschedulableQ then move them to activeQ and sort them by the timestamp", operations: []operation{ - addPodUnschedulableQ(pInfo2), addPodUnschedulableQ(pInfo1), moveAllToActiveQ(), + addPodUnschedulableQ, + addPodUnschedulableQ, + moveAllToActiveQ, }, + operands: []*podInfo{pInfo2, pInfo1, nil}, expected: []*podInfo{pInfo1, pInfo2}, }, { name: "add one pod to BackoffQ and move it to activeQ", operations: []operation{ - addPodActiveQ(pInfo2), addPodBackoffQ(pInfo1), backoffPod(pInfo1), flushBackoffQ(), moveAllToActiveQ(), + addPodActiveQ, + addPodBackoffQ, + backoffPod, + flushBackoffQ, + moveAllToActiveQ, }, + operands: []*podInfo{pInfo2, pInfo1, pInfo1, nil, nil}, expected: []*podInfo{pInfo1, pInfo2}, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - queue = NewPriorityQueueWithClock(nil, clock.NewFakeClock(timestamp)) + queue := NewPriorityQueueWithClock(nil, clock.NewFakeClock(timestamp)) var podInfoList []*podInfo - for _, op := range test.operations { - op() + for i, op := range test.operations { + op(queue, test.operands[i]) } for i := 0; i < len(test.expected); i++ { @@ -1133,3 +1137,146 @@ func TestPodTimestamp(t *testing.T) { }) } } + +// TestPendingPodsMetric tests Prometheus metrics related with pending pods +func TestPendingPodsMetric(t *testing.T) { + total := 50 + timestamp := time.Now() + var pInfos = make([]*podInfo, 0, total) + for i := 1; i <= total; i++ { + p := &podInfo{ + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-pod-%d", i), + Namespace: fmt.Sprintf("ns%d", i), + UID: types.UID(fmt.Sprintf("tp-%d", i)), + }, + }, + timestamp: timestamp, + } + pInfos = append(pInfos, p) + } + tests := []struct { + name string + operations []operation + operands [][]*podInfo + expected []int64 + }{ + { + name: "add pods to activeQ and unschedulableQ", + operations: []operation{ + addPodActiveQ, + addPodUnschedulableQ, + }, + operands: [][]*podInfo{ + pInfos[:30], + pInfos[30:], + }, + expected: []int64{30, 0, 20}, + }, + { + name: "add pods to all kinds of queues", + operations: []operation{ + addPodActiveQ, + backoffPod, + addPodBackoffQ, + addPodUnschedulableQ, + }, + operands: [][]*podInfo{ + pInfos[:15], + pInfos[15:40], + pInfos[15:40], + pInfos[40:], + }, + expected: []int64{15, 25, 10}, + }, + { + name: "add pods to unschedulableQ and then move all to activeQ", + operations: []operation{ + addPodUnschedulableQ, + moveAllToActiveQ, + }, + operands: [][]*podInfo{ + pInfos[:total], + {nil}, + }, + expected: []int64{int64(total), 0, 0}, + }, + { + name: "make some pods subject to backoff, add pods to unschedulableQ, and then move all to activeQ", + operations: []operation{ + backoffPod, + addPodUnschedulableQ, + moveAllToActiveQ, + }, + operands: [][]*podInfo{ + pInfos[:20], + pInfos[:total], + {nil}, + }, + expected: []int64{int64(total - 20), 20, 0}, + }, + { + name: "make some pods subject to backoff, add pods to unschedulableQ/activeQ, move all to activeQ, and finally flush backoffQ", + operations: []operation{ + backoffPod, + addPodUnschedulableQ, + addPodActiveQ, + moveAllToActiveQ, + flushBackoffQ, + }, + operands: [][]*podInfo{ + pInfos[:20], + pInfos[:40], + pInfos[40:], + {nil}, + {nil}, + }, + expected: []int64{int64(total), 0, 0}, + }, + } + + resetMetrics := func() { + metrics.ActivePods.Set(0) + metrics.BackoffPods.Set(0) + metrics.UnschedulablePods.Set(0) + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + resetMetrics() + queue := NewPriorityQueueWithClock(nil, clock.NewFakeClock(timestamp)) + for i, op := range test.operations { + for _, pInfo := range test.operands[i] { + op(queue, pInfo) + } + } + + var activeNum, backoffNum, unschedulableNum float64 + metricProto := &dto.Metric{} + if err := metrics.ActivePods.Write(metricProto); err != nil { + t.Errorf("error writing ActivePods metric: %v", err) + } + activeNum = metricProto.Gauge.GetValue() + if int64(activeNum) != test.expected[0] { + t.Errorf("ActivePods: Expected %v, got %v", test.expected[0], activeNum) + } + + if err := metrics.BackoffPods.Write(metricProto); err != nil { + t.Errorf("error writing BackoffPods metric: %v", err) + } + backoffNum = metricProto.Gauge.GetValue() + if int64(backoffNum) != test.expected[1] { + t.Errorf("BackoffPods: Expected %v, got %v", test.expected[1], backoffNum) + } + + if err := metrics.UnschedulablePods.Write(metricProto); err != nil { + t.Errorf("error writing UnschedulablePods metric: %v", err) + } + unschedulableNum = metricProto.Gauge.GetValue() + if int64(unschedulableNum) != test.expected[2] { + t.Errorf("UnschedulablePods: Expected %v, got %v", test.expected[2], unschedulableNum) + } + }) + } +} diff --git a/pkg/scheduler/metrics/BUILD b/pkg/scheduler/metrics/BUILD index 9b2bce0ff95..e5e540cc3f8 100644 --- a/pkg/scheduler/metrics/BUILD +++ b/pkg/scheduler/metrics/BUILD @@ -1,13 +1,13 @@ package(default_visibility = ["//visibility:public"]) -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", -) +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", - srcs = ["metrics.go"], + srcs = [ + "metric_recorder.go", + "metrics.go", + ], importpath = "k8s.io/kubernetes/pkg/scheduler/metrics", deps = [ "//pkg/controller/volume/persistentvolume:go_default_library", @@ -27,3 +27,9 @@ filegroup( srcs = [":package-srcs"], tags = ["automanaged"], ) + +go_test( + name = "go_default_test", + srcs = ["metric_recorder_test.go"], + embed = [":go_default_library"], +) diff --git a/pkg/scheduler/metrics/metric_recorder.go b/pkg/scheduler/metrics/metric_recorder.go new file mode 100644 index 00000000000..6c280365523 --- /dev/null +++ b/pkg/scheduler/metrics/metric_recorder.go @@ -0,0 +1,72 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +// MetricRecorder represents a metric recorder which takes action when the +// metric Inc(), Dec() and Clear() +type MetricRecorder interface { + Inc() + Dec() + Clear() +} + +var _ MetricRecorder = &PendingPodsRecorder{} + +// PendingPodsRecorder is an implementation of MetricRecorder +type PendingPodsRecorder struct { + recorder prometheus.Gauge +} + +// NewActivePodsRecorder returns ActivePods in a Prometheus metric fashion +func NewActivePodsRecorder() *PendingPodsRecorder { + return &PendingPodsRecorder{ + recorder: ActivePods, + } +} + +// NewUnschedulablePodsRecorder returns UnschedulablePods in a Prometheus metric fashion +func NewUnschedulablePodsRecorder() *PendingPodsRecorder { + return &PendingPodsRecorder{ + recorder: UnschedulablePods, + } +} + +// NewBackoffPodsRecorder returns BackoffPods in a Prometheus metric fashion +func NewBackoffPodsRecorder() *PendingPodsRecorder { + return &PendingPodsRecorder{ + recorder: BackoffPods, + } +} + +// Inc increases a metric counter by 1, in an atomic way +func (r *PendingPodsRecorder) Inc() { + r.recorder.Inc() +} + +// Dec decreases a metric counter by 1, in an atomic way +func (r *PendingPodsRecorder) Dec() { + r.recorder.Dec() +} + +// Clear set a metric counter to 0, in an atomic way +func (r *PendingPodsRecorder) Clear() { + r.recorder.Set(float64(0)) +} diff --git a/pkg/scheduler/metrics/metric_recorder_test.go b/pkg/scheduler/metrics/metric_recorder_test.go new file mode 100644 index 00000000000..833a891f291 --- /dev/null +++ b/pkg/scheduler/metrics/metric_recorder_test.go @@ -0,0 +1,103 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "sync" + "sync/atomic" + "testing" +) + +var _ MetricRecorder = &fakePodsRecorder{} + +type fakePodsRecorder struct { + counter int64 +} + +func (r *fakePodsRecorder) Inc() { + atomic.AddInt64(&r.counter, 1) +} + +func (r *fakePodsRecorder) Dec() { + atomic.AddInt64(&r.counter, -1) +} + +func (r *fakePodsRecorder) Clear() { + atomic.StoreInt64(&r.counter, 0) +} + +func TestInc(t *testing.T) { + fakeRecorder := fakePodsRecorder{} + var wg sync.WaitGroup + loops := 100 + wg.Add(loops) + for i := 0; i < loops; i++ { + go func() { + fakeRecorder.Inc() + wg.Done() + }() + } + wg.Wait() + if fakeRecorder.counter != int64(loops) { + t.Errorf("Expected %v, got %v", loops, fakeRecorder.counter) + } +} + +func TestDec(t *testing.T) { + fakeRecorder := fakePodsRecorder{counter: 100} + var wg sync.WaitGroup + loops := 100 + wg.Add(loops) + for i := 0; i < loops; i++ { + go func() { + fakeRecorder.Dec() + wg.Done() + }() + } + wg.Wait() + if fakeRecorder.counter != int64(0) { + t.Errorf("Expected %v, got %v", loops, fakeRecorder.counter) + } +} + +func TestClear(t *testing.T) { + fakeRecorder := fakePodsRecorder{} + var wg sync.WaitGroup + incLoops, decLoops := 100, 80 + wg.Add(incLoops + decLoops) + for i := 0; i < incLoops; i++ { + go func() { + fakeRecorder.Inc() + wg.Done() + }() + } + for i := 0; i < decLoops; i++ { + go func() { + fakeRecorder.Dec() + wg.Done() + }() + } + wg.Wait() + if fakeRecorder.counter != int64(incLoops-decLoops) { + t.Errorf("Expected %v, got %v", incLoops-decLoops, fakeRecorder.counter) + } + // verify Clear() works + fakeRecorder.Clear() + if fakeRecorder.counter != int64(0) { + t.Errorf("Expected %v, got %v", 0, fakeRecorder.counter) + } +} diff --git a/pkg/scheduler/metrics/metrics.go b/pkg/scheduler/metrics/metrics.go index 43810a19cf9..8b02e2241f5 100644 --- a/pkg/scheduler/metrics/metrics.go +++ b/pkg/scheduler/metrics/metrics.go @@ -192,6 +192,16 @@ var ( Help: "Total preemption attempts in the cluster till now", }) + pendingPods = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: SchedulerSubsystem, + Name: "pending_pods_total", + Help: "Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulableQ.", + }, []string{"queue"}) + ActivePods = pendingPods.With(prometheus.Labels{"queue": "active"}) + BackoffPods = pendingPods.With(prometheus.Labels{"queue": "backoff"}) + UnschedulablePods = pendingPods.With(prometheus.Labels{"queue": "unschedulable"}) + metricsList = []prometheus.Collector{ scheduleAttempts, SchedulingLatency, @@ -210,6 +220,7 @@ var ( DeprecatedSchedulingAlgorithmPremptionEvaluationDuration, PreemptionVictims, PreemptionAttempts, + pendingPods, } ) diff --git a/pkg/scheduler/util/BUILD b/pkg/scheduler/util/BUILD index 709b581cd97..471310f8cdf 100644 --- a/pkg/scheduler/util/BUILD +++ b/pkg/scheduler/util/BUILD @@ -32,6 +32,7 @@ go_library( "//pkg/apis/scheduling:go_default_library", "//pkg/features:go_default_library", "//pkg/scheduler/api:go_default_library", + "//pkg/scheduler/metrics:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", diff --git a/pkg/scheduler/util/heap.go b/pkg/scheduler/util/heap.go index 0f15652c654..13d6b2ffd13 100644 --- a/pkg/scheduler/util/heap.go +++ b/pkg/scheduler/util/heap.go @@ -25,6 +25,7 @@ import ( "fmt" "k8s.io/client-go/tools/cache" + "k8s.io/kubernetes/pkg/scheduler/metrics" ) // KeyFunc is a function type to get the key from an object. @@ -127,6 +128,9 @@ type Heap struct { // data stores objects and has a queue that keeps their ordering according // to the heap invariant. data *heapData + // metricRecorder updates the counter when elements of a heap get added or + // removed, and it does nothing if it's nil + metricRecorder metrics.MetricRecorder } // Add inserts an item, and puts it in the queue. The item is updated if it @@ -141,6 +145,9 @@ func (h *Heap) Add(obj interface{}) error { heap.Fix(h.data, h.data.items[key].index) } else { heap.Push(h.data, &itemKeyValue{key, obj}) + if h.metricRecorder != nil { + h.metricRecorder.Inc() + } } return nil } @@ -154,6 +161,9 @@ func (h *Heap) AddIfNotPresent(obj interface{}) error { } if _, exists := h.data.items[key]; !exists { heap.Push(h.data, &itemKeyValue{key, obj}) + if h.metricRecorder != nil { + h.metricRecorder.Inc() + } } return nil } @@ -172,6 +182,9 @@ func (h *Heap) Delete(obj interface{}) error { } if item, ok := h.data.items[key]; ok { heap.Remove(h.data, item.index) + if h.metricRecorder != nil { + h.metricRecorder.Dec() + } return nil } return fmt.Errorf("object not found") @@ -186,6 +199,9 @@ func (h *Heap) Peek() interface{} { func (h *Heap) Pop() (interface{}, error) { obj := heap.Pop(h.data) if obj != nil { + if h.metricRecorder != nil { + h.metricRecorder.Dec() + } return obj, nil } return nil, fmt.Errorf("object was removed from heap data") @@ -225,6 +241,11 @@ func (h *Heap) Len() int { // NewHeap returns a Heap which can be used to queue up items to process. func NewHeap(keyFn KeyFunc, lessFn LessFunc) *Heap { + return NewHeapWithRecorder(keyFn, lessFn, nil) +} + +// NewHeapWithRecorder wraps an optional metricRecorder to compose a Heap object. +func NewHeapWithRecorder(keyFn KeyFunc, lessFn LessFunc, metricRecorder metrics.MetricRecorder) *Heap { return &Heap{ data: &heapData{ items: map[string]*heapItem{}, @@ -232,5 +253,6 @@ func NewHeap(keyFn KeyFunc, lessFn LessFunc) *Heap { keyFunc: keyFn, lessFunc: lessFn, }, + metricRecorder: metricRecorder, } }