diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 4b442808a06..43b06c8257f 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -42,6 +42,7 @@ import ( podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" + "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/util" ) @@ -283,13 +284,13 @@ func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock) *Priority clock: clock, stop: stop, podBackoff: NewPodBackoffMap(1*time.Second, 10*time.Second), - activeQ: util.NewHeap(podInfoKeyFunc, activeQComp), - unschedulableQ: newUnschedulablePodsMap(), + activeQ: util.NewHeapWithRecorder(podInfoKeyFunc, activeQComp, metrics.NewActivePodsRecorder()), + unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()), nominatedPods: newNominatedPodMap(), moveRequestCycle: -1, } pq.cond.L = &pq.lock - pq.podBackoffQ = util.NewHeap(podInfoKeyFunc, pq.podsCompareBackoffCompleted) + pq.podBackoffQ = util.NewHeapWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder()) pq.run() @@ -777,16 +778,27 @@ type UnschedulablePodsMap struct { // podInfoMap is a map key by a pod's full-name and the value is a pointer to the podInfo. podInfoMap map[string]*podInfo keyFunc func(*v1.Pod) string + // metricRecorder updates the counter when elements of an unschedulablePodsMap + // get added or removed, and it does nothing if it's nil + metricRecorder metrics.MetricRecorder } // Add adds a pod to the unschedulable podInfoMap. func (u *UnschedulablePodsMap) addOrUpdate(pInfo *podInfo) { - u.podInfoMap[u.keyFunc(pInfo.pod)] = pInfo + podID := u.keyFunc(pInfo.pod) + if _, exists := u.podInfoMap[podID]; !exists && u.metricRecorder != nil { + u.metricRecorder.Inc() + } + u.podInfoMap[podID] = pInfo } // Delete deletes a pod from the unschedulable podInfoMap. func (u *UnschedulablePodsMap) delete(pod *v1.Pod) { - delete(u.podInfoMap, u.keyFunc(pod)) + podID := u.keyFunc(pod) + if _, exists := u.podInfoMap[podID]; exists && u.metricRecorder != nil { + u.metricRecorder.Dec() + } + delete(u.podInfoMap, podID) } // Get returns the podInfo if a pod with the same key as the key of the given "pod" @@ -802,13 +814,17 @@ func (u *UnschedulablePodsMap) get(pod *v1.Pod) *podInfo { // Clear removes all the entries from the unschedulable podInfoMap. func (u *UnschedulablePodsMap) clear() { u.podInfoMap = make(map[string]*podInfo) + if u.metricRecorder != nil { + u.metricRecorder.Clear() + } } // newUnschedulablePodsMap initializes a new object of UnschedulablePodsMap. -func newUnschedulablePodsMap() *UnschedulablePodsMap { +func newUnschedulablePodsMap(metricRecorder metrics.MetricRecorder) *UnschedulablePodsMap { return &UnschedulablePodsMap{ - podInfoMap: make(map[string]*podInfo), - keyFunc: util.GetPodFullName, + podInfoMap: make(map[string]*podInfo), + keyFunc: util.GetPodFullName, + metricRecorder: metricRecorder, } } diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 91f4dc7abb4..db12c270720 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -647,7 +647,7 @@ func TestUnschedulablePodsMap(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - upm := newUnschedulablePodsMap() + upm := newUnschedulablePodsMap(nil) for _, p := range test.podsToAdd { upm.addOrUpdate(newPodInfoNoTimestamp(p)) } diff --git a/pkg/scheduler/metrics/metric_recorder.go b/pkg/scheduler/metrics/metric_recorder.go new file mode 100644 index 00000000000..6c280365523 --- /dev/null +++ b/pkg/scheduler/metrics/metric_recorder.go @@ -0,0 +1,72 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +// MetricRecorder represents a metric recorder which takes action when the +// metric Inc(), Dec() and Clear() +type MetricRecorder interface { + Inc() + Dec() + Clear() +} + +var _ MetricRecorder = &PendingPodsRecorder{} + +// PendingPodsRecorder is an implementation of MetricRecorder +type PendingPodsRecorder struct { + recorder prometheus.Gauge +} + +// NewActivePodsRecorder returns ActivePods in a Prometheus metric fashion +func NewActivePodsRecorder() *PendingPodsRecorder { + return &PendingPodsRecorder{ + recorder: ActivePods, + } +} + +// NewUnschedulablePodsRecorder returns UnschedulablePods in a Prometheus metric fashion +func NewUnschedulablePodsRecorder() *PendingPodsRecorder { + return &PendingPodsRecorder{ + recorder: UnschedulablePods, + } +} + +// NewBackoffPodsRecorder returns BackoffPods in a Prometheus metric fashion +func NewBackoffPodsRecorder() *PendingPodsRecorder { + return &PendingPodsRecorder{ + recorder: BackoffPods, + } +} + +// Inc increases a metric counter by 1, in an atomic way +func (r *PendingPodsRecorder) Inc() { + r.recorder.Inc() +} + +// Dec decreases a metric counter by 1, in an atomic way +func (r *PendingPodsRecorder) Dec() { + r.recorder.Dec() +} + +// Clear set a metric counter to 0, in an atomic way +func (r *PendingPodsRecorder) Clear() { + r.recorder.Set(float64(0)) +} diff --git a/pkg/scheduler/metrics/metric_recorder_test.go b/pkg/scheduler/metrics/metric_recorder_test.go new file mode 100644 index 00000000000..833a891f291 --- /dev/null +++ b/pkg/scheduler/metrics/metric_recorder_test.go @@ -0,0 +1,103 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "sync" + "sync/atomic" + "testing" +) + +var _ MetricRecorder = &fakePodsRecorder{} + +type fakePodsRecorder struct { + counter int64 +} + +func (r *fakePodsRecorder) Inc() { + atomic.AddInt64(&r.counter, 1) +} + +func (r *fakePodsRecorder) Dec() { + atomic.AddInt64(&r.counter, -1) +} + +func (r *fakePodsRecorder) Clear() { + atomic.StoreInt64(&r.counter, 0) +} + +func TestInc(t *testing.T) { + fakeRecorder := fakePodsRecorder{} + var wg sync.WaitGroup + loops := 100 + wg.Add(loops) + for i := 0; i < loops; i++ { + go func() { + fakeRecorder.Inc() + wg.Done() + }() + } + wg.Wait() + if fakeRecorder.counter != int64(loops) { + t.Errorf("Expected %v, got %v", loops, fakeRecorder.counter) + } +} + +func TestDec(t *testing.T) { + fakeRecorder := fakePodsRecorder{counter: 100} + var wg sync.WaitGroup + loops := 100 + wg.Add(loops) + for i := 0; i < loops; i++ { + go func() { + fakeRecorder.Dec() + wg.Done() + }() + } + wg.Wait() + if fakeRecorder.counter != int64(0) { + t.Errorf("Expected %v, got %v", loops, fakeRecorder.counter) + } +} + +func TestClear(t *testing.T) { + fakeRecorder := fakePodsRecorder{} + var wg sync.WaitGroup + incLoops, decLoops := 100, 80 + wg.Add(incLoops + decLoops) + for i := 0; i < incLoops; i++ { + go func() { + fakeRecorder.Inc() + wg.Done() + }() + } + for i := 0; i < decLoops; i++ { + go func() { + fakeRecorder.Dec() + wg.Done() + }() + } + wg.Wait() + if fakeRecorder.counter != int64(incLoops-decLoops) { + t.Errorf("Expected %v, got %v", incLoops-decLoops, fakeRecorder.counter) + } + // verify Clear() works + fakeRecorder.Clear() + if fakeRecorder.counter != int64(0) { + t.Errorf("Expected %v, got %v", 0, fakeRecorder.counter) + } +} diff --git a/pkg/scheduler/metrics/metrics.go b/pkg/scheduler/metrics/metrics.go index 43810a19cf9..8b02e2241f5 100644 --- a/pkg/scheduler/metrics/metrics.go +++ b/pkg/scheduler/metrics/metrics.go @@ -192,6 +192,16 @@ var ( Help: "Total preemption attempts in the cluster till now", }) + pendingPods = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: SchedulerSubsystem, + Name: "pending_pods_total", + Help: "Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulableQ.", + }, []string{"queue"}) + ActivePods = pendingPods.With(prometheus.Labels{"queue": "active"}) + BackoffPods = pendingPods.With(prometheus.Labels{"queue": "backoff"}) + UnschedulablePods = pendingPods.With(prometheus.Labels{"queue": "unschedulable"}) + metricsList = []prometheus.Collector{ scheduleAttempts, SchedulingLatency, @@ -210,6 +220,7 @@ var ( DeprecatedSchedulingAlgorithmPremptionEvaluationDuration, PreemptionVictims, PreemptionAttempts, + pendingPods, } ) diff --git a/pkg/scheduler/util/heap.go b/pkg/scheduler/util/heap.go index 0f15652c654..13d6b2ffd13 100644 --- a/pkg/scheduler/util/heap.go +++ b/pkg/scheduler/util/heap.go @@ -25,6 +25,7 @@ import ( "fmt" "k8s.io/client-go/tools/cache" + "k8s.io/kubernetes/pkg/scheduler/metrics" ) // KeyFunc is a function type to get the key from an object. @@ -127,6 +128,9 @@ type Heap struct { // data stores objects and has a queue that keeps their ordering according // to the heap invariant. data *heapData + // metricRecorder updates the counter when elements of a heap get added or + // removed, and it does nothing if it's nil + metricRecorder metrics.MetricRecorder } // Add inserts an item, and puts it in the queue. The item is updated if it @@ -141,6 +145,9 @@ func (h *Heap) Add(obj interface{}) error { heap.Fix(h.data, h.data.items[key].index) } else { heap.Push(h.data, &itemKeyValue{key, obj}) + if h.metricRecorder != nil { + h.metricRecorder.Inc() + } } return nil } @@ -154,6 +161,9 @@ func (h *Heap) AddIfNotPresent(obj interface{}) error { } if _, exists := h.data.items[key]; !exists { heap.Push(h.data, &itemKeyValue{key, obj}) + if h.metricRecorder != nil { + h.metricRecorder.Inc() + } } return nil } @@ -172,6 +182,9 @@ func (h *Heap) Delete(obj interface{}) error { } if item, ok := h.data.items[key]; ok { heap.Remove(h.data, item.index) + if h.metricRecorder != nil { + h.metricRecorder.Dec() + } return nil } return fmt.Errorf("object not found") @@ -186,6 +199,9 @@ func (h *Heap) Peek() interface{} { func (h *Heap) Pop() (interface{}, error) { obj := heap.Pop(h.data) if obj != nil { + if h.metricRecorder != nil { + h.metricRecorder.Dec() + } return obj, nil } return nil, fmt.Errorf("object was removed from heap data") @@ -225,6 +241,11 @@ func (h *Heap) Len() int { // NewHeap returns a Heap which can be used to queue up items to process. func NewHeap(keyFn KeyFunc, lessFn LessFunc) *Heap { + return NewHeapWithRecorder(keyFn, lessFn, nil) +} + +// NewHeapWithRecorder wraps an optional metricRecorder to compose a Heap object. +func NewHeapWithRecorder(keyFn KeyFunc, lessFn LessFunc, metricRecorder metrics.MetricRecorder) *Heap { return &Heap{ data: &heapData{ items: map[string]*heapItem{}, @@ -232,5 +253,6 @@ func NewHeap(keyFn KeyFunc, lessFn LessFunc) *Heap { keyFunc: keyFn, lessFunc: lessFn, }, + metricRecorder: metricRecorder, } }