mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #75501 from Huang-Wei/scheduler-metrics
scheduler: add metrics to record number of pending pods in different queues
This commit is contained in:
commit
a93f803f8e
@ -258,7 +258,8 @@
|
||||
"k8s.io/kubernetes/pkg/fieldpath",
|
||||
"k8s.io/kubernetes/pkg/scheduler/volumebinder",
|
||||
"k8s.io/kubernetes/pkg/util/resizefs",
|
||||
"k8s.io/kubernetes/pkg/apis/apps"
|
||||
"k8s.io/kubernetes/pkg/apis/apps",
|
||||
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
||||
]
|
||||
},
|
||||
{
|
||||
|
@ -12,6 +12,7 @@ go_library(
|
||||
"//pkg/api/v1/pod:go_default_library",
|
||||
"//pkg/scheduler/algorithm/predicates:go_default_library",
|
||||
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
|
||||
"//pkg/scheduler/metrics:go_default_library",
|
||||
"//pkg/scheduler/util:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
@ -31,11 +32,13 @@ go_test(
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/api/v1/pod:go_default_library",
|
||||
"//pkg/scheduler/metrics:go_default_library",
|
||||
"//pkg/scheduler/util:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||
"//vendor/github.com/prometheus/client_model/go:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -42,6 +42,7 @@ import (
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
|
||||
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
|
||||
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
||||
"k8s.io/kubernetes/pkg/scheduler/util"
|
||||
)
|
||||
|
||||
@ -283,13 +284,13 @@ func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock) *Priority
|
||||
clock: clock,
|
||||
stop: stop,
|
||||
podBackoff: NewPodBackoffMap(1*time.Second, 10*time.Second),
|
||||
activeQ: util.NewHeap(podInfoKeyFunc, activeQComp),
|
||||
unschedulableQ: newUnschedulablePodsMap(),
|
||||
activeQ: util.NewHeapWithRecorder(podInfoKeyFunc, activeQComp, metrics.NewActivePodsRecorder()),
|
||||
unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()),
|
||||
nominatedPods: newNominatedPodMap(),
|
||||
moveRequestCycle: -1,
|
||||
}
|
||||
pq.cond.L = &pq.lock
|
||||
pq.podBackoffQ = util.NewHeap(podInfoKeyFunc, pq.podsCompareBackoffCompleted)
|
||||
pq.podBackoffQ = util.NewHeapWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
|
||||
|
||||
pq.run()
|
||||
|
||||
@ -777,16 +778,27 @@ type UnschedulablePodsMap struct {
|
||||
// podInfoMap is a map key by a pod's full-name and the value is a pointer to the podInfo.
|
||||
podInfoMap map[string]*podInfo
|
||||
keyFunc func(*v1.Pod) string
|
||||
// metricRecorder updates the counter when elements of an unschedulablePodsMap
|
||||
// get added or removed, and it does nothing if it's nil
|
||||
metricRecorder metrics.MetricRecorder
|
||||
}
|
||||
|
||||
// Add adds a pod to the unschedulable podInfoMap.
|
||||
func (u *UnschedulablePodsMap) addOrUpdate(pInfo *podInfo) {
|
||||
u.podInfoMap[u.keyFunc(pInfo.pod)] = pInfo
|
||||
podID := u.keyFunc(pInfo.pod)
|
||||
if _, exists := u.podInfoMap[podID]; !exists && u.metricRecorder != nil {
|
||||
u.metricRecorder.Inc()
|
||||
}
|
||||
u.podInfoMap[podID] = pInfo
|
||||
}
|
||||
|
||||
// Delete deletes a pod from the unschedulable podInfoMap.
|
||||
func (u *UnschedulablePodsMap) delete(pod *v1.Pod) {
|
||||
delete(u.podInfoMap, u.keyFunc(pod))
|
||||
podID := u.keyFunc(pod)
|
||||
if _, exists := u.podInfoMap[podID]; exists && u.metricRecorder != nil {
|
||||
u.metricRecorder.Dec()
|
||||
}
|
||||
delete(u.podInfoMap, podID)
|
||||
}
|
||||
|
||||
// Get returns the podInfo if a pod with the same key as the key of the given "pod"
|
||||
@ -802,13 +814,17 @@ func (u *UnschedulablePodsMap) get(pod *v1.Pod) *podInfo {
|
||||
// Clear removes all the entries from the unschedulable podInfoMap.
|
||||
func (u *UnschedulablePodsMap) clear() {
|
||||
u.podInfoMap = make(map[string]*podInfo)
|
||||
if u.metricRecorder != nil {
|
||||
u.metricRecorder.Clear()
|
||||
}
|
||||
}
|
||||
|
||||
// newUnschedulablePodsMap initializes a new object of UnschedulablePodsMap.
|
||||
func newUnschedulablePodsMap() *UnschedulablePodsMap {
|
||||
func newUnschedulablePodsMap(metricRecorder metrics.MetricRecorder) *UnschedulablePodsMap {
|
||||
return &UnschedulablePodsMap{
|
||||
podInfoMap: make(map[string]*podInfo),
|
||||
keyFunc: util.GetPodFullName,
|
||||
podInfoMap: make(map[string]*podInfo),
|
||||
keyFunc: util.GetPodFullName,
|
||||
metricRecorder: metricRecorder,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -23,11 +23,13 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
||||
"k8s.io/kubernetes/pkg/scheduler/util"
|
||||
)
|
||||
|
||||
@ -647,7 +649,7 @@ func TestUnschedulablePodsMap(t *testing.T) {
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
upm := newUnschedulablePodsMap()
|
||||
upm := newUnschedulablePodsMap(nil)
|
||||
for _, p := range test.podsToAdd {
|
||||
upm.addOrUpdate(newPodInfoNoTimestamp(p))
|
||||
}
|
||||
@ -987,6 +989,48 @@ func TestHighProirotyFlushUnschedulableQLeftover(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
type operation func(queue *PriorityQueue, pInfo *podInfo)
|
||||
|
||||
var (
|
||||
addPodActiveQ = func(queue *PriorityQueue, pInfo *podInfo) {
|
||||
queue.lock.Lock()
|
||||
queue.activeQ.Add(pInfo)
|
||||
queue.lock.Unlock()
|
||||
}
|
||||
updatePodActiveQ = func(queue *PriorityQueue, pInfo *podInfo) {
|
||||
queue.lock.Lock()
|
||||
queue.activeQ.Update(pInfo)
|
||||
queue.lock.Unlock()
|
||||
}
|
||||
addPodUnschedulableQ = func(queue *PriorityQueue, pInfo *podInfo) {
|
||||
queue.lock.Lock()
|
||||
// Update pod condition to unschedulable.
|
||||
podutil.UpdatePodCondition(&pInfo.pod.Status, &v1.PodCondition{
|
||||
Type: v1.PodScheduled,
|
||||
Status: v1.ConditionFalse,
|
||||
Reason: v1.PodReasonUnschedulable,
|
||||
Message: "fake scheduling failure",
|
||||
})
|
||||
queue.unschedulableQ.addOrUpdate(pInfo)
|
||||
queue.lock.Unlock()
|
||||
}
|
||||
addPodBackoffQ = func(queue *PriorityQueue, pInfo *podInfo) {
|
||||
queue.lock.Lock()
|
||||
queue.podBackoffQ.Add(pInfo)
|
||||
queue.lock.Unlock()
|
||||
}
|
||||
moveAllToActiveQ = func(queue *PriorityQueue, _ *podInfo) {
|
||||
queue.MoveAllToActiveQueue()
|
||||
}
|
||||
backoffPod = func(queue *PriorityQueue, pInfo *podInfo) {
|
||||
queue.backoffPod(pInfo.pod)
|
||||
}
|
||||
flushBackoffQ = func(queue *PriorityQueue, _ *podInfo) {
|
||||
queue.clock.(*clock.FakeClock).Step(2 * time.Second)
|
||||
queue.flushBackoffQCompleted()
|
||||
}
|
||||
)
|
||||
|
||||
// TestPodTimestamp tests the operations related to podInfo.
|
||||
func TestPodTimestamp(t *testing.T) {
|
||||
pod1 := &v1.Pod{
|
||||
@ -1021,101 +1065,61 @@ func TestPodTimestamp(t *testing.T) {
|
||||
timestamp: timestamp.Add(time.Second),
|
||||
}
|
||||
|
||||
var queue *PriorityQueue
|
||||
type operation = func()
|
||||
addPodActiveQ := func(pInfo *podInfo) operation {
|
||||
return func() {
|
||||
queue.lock.Lock()
|
||||
defer queue.lock.Unlock()
|
||||
queue.activeQ.Add(pInfo)
|
||||
}
|
||||
}
|
||||
updatePodActiveQ := func(pInfo *podInfo) operation {
|
||||
return func() {
|
||||
queue.lock.Lock()
|
||||
defer queue.lock.Unlock()
|
||||
queue.activeQ.Update(pInfo)
|
||||
}
|
||||
}
|
||||
addPodUnschedulableQ := func(pInfo *podInfo) operation {
|
||||
return func() {
|
||||
queue.lock.Lock()
|
||||
defer queue.lock.Unlock()
|
||||
// Update pod condition to unschedulable.
|
||||
podutil.UpdatePodCondition(&pInfo.pod.Status, &v1.PodCondition{
|
||||
Type: v1.PodScheduled,
|
||||
Status: v1.ConditionFalse,
|
||||
Reason: v1.PodReasonUnschedulable,
|
||||
Message: "fake scheduling failure",
|
||||
})
|
||||
queue.unschedulableQ.addOrUpdate(pInfo)
|
||||
}
|
||||
}
|
||||
addPodBackoffQ := func(pInfo *podInfo) operation {
|
||||
return func() {
|
||||
queue.lock.Lock()
|
||||
defer queue.lock.Unlock()
|
||||
queue.podBackoffQ.Add(pInfo)
|
||||
}
|
||||
}
|
||||
moveAllToActiveQ := func() operation {
|
||||
return func() {
|
||||
queue.MoveAllToActiveQueue()
|
||||
}
|
||||
}
|
||||
backoffPod := func(pInfo *podInfo) operation {
|
||||
return func() {
|
||||
queue.backoffPod(pInfo.pod)
|
||||
}
|
||||
}
|
||||
flushBackoffQ := func() operation {
|
||||
return func() {
|
||||
queue.clock.(*clock.FakeClock).Step(2 * time.Second)
|
||||
queue.flushBackoffQCompleted()
|
||||
}
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
operations []operation
|
||||
operands []*podInfo
|
||||
expected []*podInfo
|
||||
}{
|
||||
{
|
||||
name: "add two pod to activeQ and sort them by the timestamp",
|
||||
operations: []operation{
|
||||
addPodActiveQ(pInfo2), addPodActiveQ(pInfo1),
|
||||
addPodActiveQ,
|
||||
addPodActiveQ,
|
||||
},
|
||||
operands: []*podInfo{pInfo2, pInfo1},
|
||||
expected: []*podInfo{pInfo1, pInfo2},
|
||||
},
|
||||
{
|
||||
name: "update two pod to activeQ and sort them by the timestamp",
|
||||
operations: []operation{
|
||||
updatePodActiveQ(pInfo2), updatePodActiveQ(pInfo1),
|
||||
updatePodActiveQ,
|
||||
updatePodActiveQ,
|
||||
},
|
||||
operands: []*podInfo{pInfo2, pInfo1},
|
||||
expected: []*podInfo{pInfo1, pInfo2},
|
||||
},
|
||||
{
|
||||
name: "add two pod to unschedulableQ then move them to activeQ and sort them by the timestamp",
|
||||
operations: []operation{
|
||||
addPodUnschedulableQ(pInfo2), addPodUnschedulableQ(pInfo1), moveAllToActiveQ(),
|
||||
addPodUnschedulableQ,
|
||||
addPodUnschedulableQ,
|
||||
moveAllToActiveQ,
|
||||
},
|
||||
operands: []*podInfo{pInfo2, pInfo1, nil},
|
||||
expected: []*podInfo{pInfo1, pInfo2},
|
||||
},
|
||||
{
|
||||
name: "add one pod to BackoffQ and move it to activeQ",
|
||||
operations: []operation{
|
||||
addPodActiveQ(pInfo2), addPodBackoffQ(pInfo1), backoffPod(pInfo1), flushBackoffQ(), moveAllToActiveQ(),
|
||||
addPodActiveQ,
|
||||
addPodBackoffQ,
|
||||
backoffPod,
|
||||
flushBackoffQ,
|
||||
moveAllToActiveQ,
|
||||
},
|
||||
operands: []*podInfo{pInfo2, pInfo1, pInfo1, nil, nil},
|
||||
expected: []*podInfo{pInfo1, pInfo2},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
queue = NewPriorityQueueWithClock(nil, clock.NewFakeClock(timestamp))
|
||||
queue := NewPriorityQueueWithClock(nil, clock.NewFakeClock(timestamp))
|
||||
var podInfoList []*podInfo
|
||||
|
||||
for _, op := range test.operations {
|
||||
op()
|
||||
for i, op := range test.operations {
|
||||
op(queue, test.operands[i])
|
||||
}
|
||||
|
||||
for i := 0; i < len(test.expected); i++ {
|
||||
@ -1133,3 +1137,146 @@ func TestPodTimestamp(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestPendingPodsMetric tests Prometheus metrics related with pending pods
|
||||
func TestPendingPodsMetric(t *testing.T) {
|
||||
total := 50
|
||||
timestamp := time.Now()
|
||||
var pInfos = make([]*podInfo, 0, total)
|
||||
for i := 1; i <= total; i++ {
|
||||
p := &podInfo{
|
||||
pod: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("test-pod-%d", i),
|
||||
Namespace: fmt.Sprintf("ns%d", i),
|
||||
UID: types.UID(fmt.Sprintf("tp-%d", i)),
|
||||
},
|
||||
},
|
||||
timestamp: timestamp,
|
||||
}
|
||||
pInfos = append(pInfos, p)
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
operations []operation
|
||||
operands [][]*podInfo
|
||||
expected []int64
|
||||
}{
|
||||
{
|
||||
name: "add pods to activeQ and unschedulableQ",
|
||||
operations: []operation{
|
||||
addPodActiveQ,
|
||||
addPodUnschedulableQ,
|
||||
},
|
||||
operands: [][]*podInfo{
|
||||
pInfos[:30],
|
||||
pInfos[30:],
|
||||
},
|
||||
expected: []int64{30, 0, 20},
|
||||
},
|
||||
{
|
||||
name: "add pods to all kinds of queues",
|
||||
operations: []operation{
|
||||
addPodActiveQ,
|
||||
backoffPod,
|
||||
addPodBackoffQ,
|
||||
addPodUnschedulableQ,
|
||||
},
|
||||
operands: [][]*podInfo{
|
||||
pInfos[:15],
|
||||
pInfos[15:40],
|
||||
pInfos[15:40],
|
||||
pInfos[40:],
|
||||
},
|
||||
expected: []int64{15, 25, 10},
|
||||
},
|
||||
{
|
||||
name: "add pods to unschedulableQ and then move all to activeQ",
|
||||
operations: []operation{
|
||||
addPodUnschedulableQ,
|
||||
moveAllToActiveQ,
|
||||
},
|
||||
operands: [][]*podInfo{
|
||||
pInfos[:total],
|
||||
{nil},
|
||||
},
|
||||
expected: []int64{int64(total), 0, 0},
|
||||
},
|
||||
{
|
||||
name: "make some pods subject to backoff, add pods to unschedulableQ, and then move all to activeQ",
|
||||
operations: []operation{
|
||||
backoffPod,
|
||||
addPodUnschedulableQ,
|
||||
moveAllToActiveQ,
|
||||
},
|
||||
operands: [][]*podInfo{
|
||||
pInfos[:20],
|
||||
pInfos[:total],
|
||||
{nil},
|
||||
},
|
||||
expected: []int64{int64(total - 20), 20, 0},
|
||||
},
|
||||
{
|
||||
name: "make some pods subject to backoff, add pods to unschedulableQ/activeQ, move all to activeQ, and finally flush backoffQ",
|
||||
operations: []operation{
|
||||
backoffPod,
|
||||
addPodUnschedulableQ,
|
||||
addPodActiveQ,
|
||||
moveAllToActiveQ,
|
||||
flushBackoffQ,
|
||||
},
|
||||
operands: [][]*podInfo{
|
||||
pInfos[:20],
|
||||
pInfos[:40],
|
||||
pInfos[40:],
|
||||
{nil},
|
||||
{nil},
|
||||
},
|
||||
expected: []int64{int64(total), 0, 0},
|
||||
},
|
||||
}
|
||||
|
||||
resetMetrics := func() {
|
||||
metrics.ActivePods.Set(0)
|
||||
metrics.BackoffPods.Set(0)
|
||||
metrics.UnschedulablePods.Set(0)
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
resetMetrics()
|
||||
queue := NewPriorityQueueWithClock(nil, clock.NewFakeClock(timestamp))
|
||||
for i, op := range test.operations {
|
||||
for _, pInfo := range test.operands[i] {
|
||||
op(queue, pInfo)
|
||||
}
|
||||
}
|
||||
|
||||
var activeNum, backoffNum, unschedulableNum float64
|
||||
metricProto := &dto.Metric{}
|
||||
if err := metrics.ActivePods.Write(metricProto); err != nil {
|
||||
t.Errorf("error writing ActivePods metric: %v", err)
|
||||
}
|
||||
activeNum = metricProto.Gauge.GetValue()
|
||||
if int64(activeNum) != test.expected[0] {
|
||||
t.Errorf("ActivePods: Expected %v, got %v", test.expected[0], activeNum)
|
||||
}
|
||||
|
||||
if err := metrics.BackoffPods.Write(metricProto); err != nil {
|
||||
t.Errorf("error writing BackoffPods metric: %v", err)
|
||||
}
|
||||
backoffNum = metricProto.Gauge.GetValue()
|
||||
if int64(backoffNum) != test.expected[1] {
|
||||
t.Errorf("BackoffPods: Expected %v, got %v", test.expected[1], backoffNum)
|
||||
}
|
||||
|
||||
if err := metrics.UnschedulablePods.Write(metricProto); err != nil {
|
||||
t.Errorf("error writing UnschedulablePods metric: %v", err)
|
||||
}
|
||||
unschedulableNum = metricProto.Gauge.GetValue()
|
||||
if int64(unschedulableNum) != test.expected[2] {
|
||||
t.Errorf("UnschedulablePods: Expected %v, got %v", test.expected[2], unschedulableNum)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -1,13 +1,13 @@
|
||||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
)
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["metrics.go"],
|
||||
srcs = [
|
||||
"metric_recorder.go",
|
||||
"metrics.go",
|
||||
],
|
||||
importpath = "k8s.io/kubernetes/pkg/scheduler/metrics",
|
||||
deps = [
|
||||
"//pkg/controller/volume/persistentvolume:go_default_library",
|
||||
@ -27,3 +27,9 @@ filegroup(
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["metric_recorder_test.go"],
|
||||
embed = [":go_default_library"],
|
||||
)
|
||||
|
72
pkg/scheduler/metrics/metric_recorder.go
Normal file
72
pkg/scheduler/metrics/metric_recorder.go
Normal file
@ -0,0 +1,72 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
// MetricRecorder represents a metric recorder which takes action when the
|
||||
// metric Inc(), Dec() and Clear()
|
||||
type MetricRecorder interface {
|
||||
Inc()
|
||||
Dec()
|
||||
Clear()
|
||||
}
|
||||
|
||||
var _ MetricRecorder = &PendingPodsRecorder{}
|
||||
|
||||
// PendingPodsRecorder is an implementation of MetricRecorder
|
||||
type PendingPodsRecorder struct {
|
||||
recorder prometheus.Gauge
|
||||
}
|
||||
|
||||
// NewActivePodsRecorder returns ActivePods in a Prometheus metric fashion
|
||||
func NewActivePodsRecorder() *PendingPodsRecorder {
|
||||
return &PendingPodsRecorder{
|
||||
recorder: ActivePods,
|
||||
}
|
||||
}
|
||||
|
||||
// NewUnschedulablePodsRecorder returns UnschedulablePods in a Prometheus metric fashion
|
||||
func NewUnschedulablePodsRecorder() *PendingPodsRecorder {
|
||||
return &PendingPodsRecorder{
|
||||
recorder: UnschedulablePods,
|
||||
}
|
||||
}
|
||||
|
||||
// NewBackoffPodsRecorder returns BackoffPods in a Prometheus metric fashion
|
||||
func NewBackoffPodsRecorder() *PendingPodsRecorder {
|
||||
return &PendingPodsRecorder{
|
||||
recorder: BackoffPods,
|
||||
}
|
||||
}
|
||||
|
||||
// Inc increases a metric counter by 1, in an atomic way
|
||||
func (r *PendingPodsRecorder) Inc() {
|
||||
r.recorder.Inc()
|
||||
}
|
||||
|
||||
// Dec decreases a metric counter by 1, in an atomic way
|
||||
func (r *PendingPodsRecorder) Dec() {
|
||||
r.recorder.Dec()
|
||||
}
|
||||
|
||||
// Clear set a metric counter to 0, in an atomic way
|
||||
func (r *PendingPodsRecorder) Clear() {
|
||||
r.recorder.Set(float64(0))
|
||||
}
|
103
pkg/scheduler/metrics/metric_recorder_test.go
Normal file
103
pkg/scheduler/metrics/metric_recorder_test.go
Normal file
@ -0,0 +1,103 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
)
|
||||
|
||||
var _ MetricRecorder = &fakePodsRecorder{}
|
||||
|
||||
type fakePodsRecorder struct {
|
||||
counter int64
|
||||
}
|
||||
|
||||
func (r *fakePodsRecorder) Inc() {
|
||||
atomic.AddInt64(&r.counter, 1)
|
||||
}
|
||||
|
||||
func (r *fakePodsRecorder) Dec() {
|
||||
atomic.AddInt64(&r.counter, -1)
|
||||
}
|
||||
|
||||
func (r *fakePodsRecorder) Clear() {
|
||||
atomic.StoreInt64(&r.counter, 0)
|
||||
}
|
||||
|
||||
func TestInc(t *testing.T) {
|
||||
fakeRecorder := fakePodsRecorder{}
|
||||
var wg sync.WaitGroup
|
||||
loops := 100
|
||||
wg.Add(loops)
|
||||
for i := 0; i < loops; i++ {
|
||||
go func() {
|
||||
fakeRecorder.Inc()
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
if fakeRecorder.counter != int64(loops) {
|
||||
t.Errorf("Expected %v, got %v", loops, fakeRecorder.counter)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDec(t *testing.T) {
|
||||
fakeRecorder := fakePodsRecorder{counter: 100}
|
||||
var wg sync.WaitGroup
|
||||
loops := 100
|
||||
wg.Add(loops)
|
||||
for i := 0; i < loops; i++ {
|
||||
go func() {
|
||||
fakeRecorder.Dec()
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
if fakeRecorder.counter != int64(0) {
|
||||
t.Errorf("Expected %v, got %v", loops, fakeRecorder.counter)
|
||||
}
|
||||
}
|
||||
|
||||
func TestClear(t *testing.T) {
|
||||
fakeRecorder := fakePodsRecorder{}
|
||||
var wg sync.WaitGroup
|
||||
incLoops, decLoops := 100, 80
|
||||
wg.Add(incLoops + decLoops)
|
||||
for i := 0; i < incLoops; i++ {
|
||||
go func() {
|
||||
fakeRecorder.Inc()
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
for i := 0; i < decLoops; i++ {
|
||||
go func() {
|
||||
fakeRecorder.Dec()
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
if fakeRecorder.counter != int64(incLoops-decLoops) {
|
||||
t.Errorf("Expected %v, got %v", incLoops-decLoops, fakeRecorder.counter)
|
||||
}
|
||||
// verify Clear() works
|
||||
fakeRecorder.Clear()
|
||||
if fakeRecorder.counter != int64(0) {
|
||||
t.Errorf("Expected %v, got %v", 0, fakeRecorder.counter)
|
||||
}
|
||||
}
|
@ -192,6 +192,16 @@ var (
|
||||
Help: "Total preemption attempts in the cluster till now",
|
||||
})
|
||||
|
||||
pendingPods = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Subsystem: SchedulerSubsystem,
|
||||
Name: "pending_pods_total",
|
||||
Help: "Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulableQ.",
|
||||
}, []string{"queue"})
|
||||
ActivePods = pendingPods.With(prometheus.Labels{"queue": "active"})
|
||||
BackoffPods = pendingPods.With(prometheus.Labels{"queue": "backoff"})
|
||||
UnschedulablePods = pendingPods.With(prometheus.Labels{"queue": "unschedulable"})
|
||||
|
||||
metricsList = []prometheus.Collector{
|
||||
scheduleAttempts,
|
||||
SchedulingLatency,
|
||||
@ -210,6 +220,7 @@ var (
|
||||
DeprecatedSchedulingAlgorithmPremptionEvaluationDuration,
|
||||
PreemptionVictims,
|
||||
PreemptionAttempts,
|
||||
pendingPods,
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -32,6 +32,7 @@ go_library(
|
||||
"//pkg/apis/scheduling:go_default_library",
|
||||
"//pkg/features:go_default_library",
|
||||
"//pkg/scheduler/api:go_default_library",
|
||||
"//pkg/scheduler/metrics:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
||||
)
|
||||
|
||||
// KeyFunc is a function type to get the key from an object.
|
||||
@ -127,6 +128,9 @@ type Heap struct {
|
||||
// data stores objects and has a queue that keeps their ordering according
|
||||
// to the heap invariant.
|
||||
data *heapData
|
||||
// metricRecorder updates the counter when elements of a heap get added or
|
||||
// removed, and it does nothing if it's nil
|
||||
metricRecorder metrics.MetricRecorder
|
||||
}
|
||||
|
||||
// Add inserts an item, and puts it in the queue. The item is updated if it
|
||||
@ -141,6 +145,9 @@ func (h *Heap) Add(obj interface{}) error {
|
||||
heap.Fix(h.data, h.data.items[key].index)
|
||||
} else {
|
||||
heap.Push(h.data, &itemKeyValue{key, obj})
|
||||
if h.metricRecorder != nil {
|
||||
h.metricRecorder.Inc()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -154,6 +161,9 @@ func (h *Heap) AddIfNotPresent(obj interface{}) error {
|
||||
}
|
||||
if _, exists := h.data.items[key]; !exists {
|
||||
heap.Push(h.data, &itemKeyValue{key, obj})
|
||||
if h.metricRecorder != nil {
|
||||
h.metricRecorder.Inc()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -172,6 +182,9 @@ func (h *Heap) Delete(obj interface{}) error {
|
||||
}
|
||||
if item, ok := h.data.items[key]; ok {
|
||||
heap.Remove(h.data, item.index)
|
||||
if h.metricRecorder != nil {
|
||||
h.metricRecorder.Dec()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("object not found")
|
||||
@ -186,6 +199,9 @@ func (h *Heap) Peek() interface{} {
|
||||
func (h *Heap) Pop() (interface{}, error) {
|
||||
obj := heap.Pop(h.data)
|
||||
if obj != nil {
|
||||
if h.metricRecorder != nil {
|
||||
h.metricRecorder.Dec()
|
||||
}
|
||||
return obj, nil
|
||||
}
|
||||
return nil, fmt.Errorf("object was removed from heap data")
|
||||
@ -225,6 +241,11 @@ func (h *Heap) Len() int {
|
||||
|
||||
// NewHeap returns a Heap which can be used to queue up items to process.
|
||||
func NewHeap(keyFn KeyFunc, lessFn LessFunc) *Heap {
|
||||
return NewHeapWithRecorder(keyFn, lessFn, nil)
|
||||
}
|
||||
|
||||
// NewHeapWithRecorder wraps an optional metricRecorder to compose a Heap object.
|
||||
func NewHeapWithRecorder(keyFn KeyFunc, lessFn LessFunc, metricRecorder metrics.MetricRecorder) *Heap {
|
||||
return &Heap{
|
||||
data: &heapData{
|
||||
items: map[string]*heapItem{},
|
||||
@ -232,5 +253,6 @@ func NewHeap(keyFn KeyFunc, lessFn LessFunc) *Heap {
|
||||
keyFunc: keyFn,
|
||||
lessFunc: lessFn,
|
||||
},
|
||||
metricRecorder: metricRecorder,
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user