feat(scheduler): support inflight_events metric

This commit is contained in:
Kensei Nakada 2024-09-02 00:54:00 +09:00
parent 8486ed0620
commit 110d28355d
6 changed files with 161 additions and 13 deletions

View File

@ -123,14 +123,17 @@ type activeQueue struct {
// isSchedulingQueueHintEnabled indicates whether the feature gate for the scheduling queue is enabled.
isSchedulingQueueHintEnabled bool
metricsRecorder metrics.MetricAsyncRecorder
}
func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueueHintEnabled bool) *activeQueue {
func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueueHintEnabled bool, metricRecorder metrics.MetricAsyncRecorder) *activeQueue {
aq := &activeQueue{
queue: queue,
inFlightPods: make(map[types.UID]*list.Element),
inFlightEvents: list.New(),
isSchedulingQueueHintEnabled: isSchedulingQueueHintEnabled,
metricsRecorder: metricRecorder,
}
aq.cond.L = &aq.lock
@ -201,6 +204,7 @@ func (aq *activeQueue) pop(logger klog.Logger) (*framework.QueuedPodInfo, error)
aq.schedCycle++
// In flight, no concurrent events yet.
if aq.isSchedulingQueueHintEnabled {
aq.metricsRecorder.ObserveInFlightEventsAsync(metrics.PodPoppedInFlightEvent, 1)
aq.inFlightPods[pInfo.Pod.UID] = aq.inFlightEvents.PushBack(pInfo.Pod)
}
@ -293,6 +297,7 @@ func (aq *activeQueue) addEventIfPodInFlight(oldPod, newPod *v1.Pod, event frame
_, ok := aq.inFlightPods[newPod.UID]
if ok {
aq.metricsRecorder.ObserveInFlightEventsAsync(event.Label, 1)
aq.inFlightEvents.PushBack(&clusterEvent{
event: event,
oldObj: oldPod,
@ -309,6 +314,7 @@ func (aq *activeQueue) addEventIfAnyInFlight(oldObj, newObj interface{}, event f
defer aq.lock.Unlock()
if len(aq.inFlightPods) != 0 {
aq.metricsRecorder.ObserveInFlightEventsAsync(event.Label, 1)
aq.inFlightEvents.PushBack(&clusterEvent{
event: event,
oldObj: oldObj,
@ -340,7 +346,9 @@ func (aq *activeQueue) done(pod types.UID) {
// Remove the pod from the list.
aq.inFlightEvents.Remove(inFlightPod)
aq.metricsRecorder.ObserveInFlightEventsAsync(metrics.PodPoppedInFlightEvent, -1)
aggrMetricsCounter := map[string]int{}
// Remove events which are only referred to by this Pod
// so that the inFlightEvents list doesn't grow infinitely.
// If the pod was at the head of the list, then all
@ -352,11 +360,17 @@ func (aq *activeQueue) done(pod types.UID) {
// Empty list.
break
}
if _, ok := e.Value.(*clusterEvent); !ok {
ev, ok := e.Value.(*clusterEvent)
if !ok {
// A pod, must stop pruning.
break
}
aq.inFlightEvents.Remove(e)
aggrMetricsCounter[ev.event.Label]--
}
for evLabel, count := range aggrMetricsCounter {
aq.metricsRecorder.ObserveInFlightEventsAsync(evLabel, float64(count))
}
}

View File

@ -331,7 +331,7 @@ func NewPriorityQueue(
podInitialBackoffDuration: options.podInitialBackoffDuration,
podMaxBackoffDuration: options.podMaxBackoffDuration,
podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration,
activeQ: newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](lessFn), metrics.NewActivePodsRecorder()), isSchedulingQueueHintEnabled),
activeQ: newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](lessFn), metrics.NewActivePodsRecorder()), isSchedulingQueueHintEnabled, options.metricsRecorder),
unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()),
preEnqueuePluginMap: options.preEnqueuePluginMap,
queueingHintMap: options.queueingHintMap,

View File

@ -18,11 +18,13 @@ package queue
import (
"context"
"time"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/metrics"
)
// NewTestQueue creates a priority queue with an empty informer factory.
@ -39,6 +41,12 @@ func NewTestQueueWithObjects(
opts ...Option,
) *PriorityQueue {
informerFactory := informers.NewSharedInformerFactory(fake.NewClientset(objs...), 0)
// Because some major functions (e.g., Pop) requires the metric recorder to be set,
// we always set a metric recorder here.
recorder := metrics.NewMetricsAsyncRecorder(10, 20*time.Microsecond, ctx.Done())
// We set it before the options that users provide, so that users can override it.
opts = append([]Option{WithMetricsRecorder(*recorder)}, opts...)
return NewTestQueueWithInformerFactory(ctx, lessFn, informerFactory, opts...)
}

View File

@ -80,23 +80,42 @@ func (r *PendingPodsRecorder) Clear() {
r.recorder.Set(float64(0))
}
// metric is the data structure passed in the buffer channel between the main framework thread
// histgramVecMetric is the data structure passed in the buffer channel between the main framework thread
// and the metricsRecorder goroutine.
type metric struct {
type histgramVecMetric struct {
metric *metrics.HistogramVec
labelValues []string
value float64
}
type gaugeVecMetric struct {
metric *metrics.GaugeVec
labelValues []string
valueToAdd float64
}
type gaugeVecMetricKey struct {
metricName string
labelValue string
}
// MetricAsyncRecorder records metric in a separate goroutine to avoid overhead in the critical path.
type MetricAsyncRecorder struct {
// bufferCh is a channel that serves as a metrics buffer before the metricsRecorder goroutine reports it.
bufferCh chan *metric
bufferCh chan *histgramVecMetric
// if bufferSize is reached, incoming metrics will be discarded.
bufferSize int
// how often the recorder runs to flush the metrics.
interval time.Duration
// aggregatedInflightEventMetric is only to record InFlightEvents metric asynchronously.
// It's a map from gaugeVecMetricKey to the aggregated value
// and the aggregated value is flushed to Prometheus every time the interval is reached.
// Note that we don't lock the map deliberately because we assume the queue takes lock before updating the in-flight events.
aggregatedInflightEventMetric map[gaugeVecMetricKey]int
aggregatedInflightEventMetricLastFlushTime time.Time
aggregatedInflightEventMetricBufferCh chan *gaugeVecMetric
// stopCh is used to stop the goroutine which periodically flushes metrics.
stopCh <-chan struct{}
// IsStoppedCh indicates whether the goroutine is stopped. It's used in tests only to make sure
@ -106,11 +125,14 @@ type MetricAsyncRecorder struct {
func NewMetricsAsyncRecorder(bufferSize int, interval time.Duration, stopCh <-chan struct{}) *MetricAsyncRecorder {
recorder := &MetricAsyncRecorder{
bufferCh: make(chan *metric, bufferSize),
bufferSize: bufferSize,
interval: interval,
stopCh: stopCh,
IsStoppedCh: make(chan struct{}),
bufferCh: make(chan *histgramVecMetric, bufferSize),
bufferSize: bufferSize,
interval: interval,
stopCh: stopCh,
aggregatedInflightEventMetric: make(map[gaugeVecMetricKey]int),
aggregatedInflightEventMetricLastFlushTime: time.Now(),
aggregatedInflightEventMetricBufferCh: make(chan *gaugeVecMetric, bufferSize),
IsStoppedCh: make(chan struct{}),
}
go recorder.run()
return recorder
@ -128,8 +150,32 @@ func (r *MetricAsyncRecorder) ObserveQueueingHintDurationAsync(pluginName, event
r.observeMetricAsync(queueingHintExecutionDuration, value, pluginName, event, hint)
}
// ObserveInFlightEventsAsync observes the in_flight_events metric.
// Note that this function is not goroutine-safe;
// we don't lock the map deliberately for the performance reason and we assume the queue (i.e., the caller) takes lock before updating the in-flight events.
func (r *MetricAsyncRecorder) ObserveInFlightEventsAsync(eventLabel string, valueToAdd float64) {
r.aggregatedInflightEventMetric[gaugeVecMetricKey{metricName: InFlightEvents.Name, labelValue: eventLabel}] += int(valueToAdd)
// Only flush the metric to the channal if the interval is reached.
// The values are flushed to Prometheus in the run() function, which runs once the interval time.
if time.Since(r.aggregatedInflightEventMetricLastFlushTime) > r.interval {
for key, value := range r.aggregatedInflightEventMetric {
newMetric := &gaugeVecMetric{
metric: InFlightEvents,
labelValues: []string{key.labelValue},
valueToAdd: float64(value),
}
select {
case r.aggregatedInflightEventMetricBufferCh <- newMetric:
default:
}
}
r.aggregatedInflightEventMetricLastFlushTime = time.Now()
}
}
func (r *MetricAsyncRecorder) observeMetricAsync(m *metrics.HistogramVec, value float64, labelsValues ...string) {
newMetric := &metric{
newMetric := &histgramVecMetric{
metric: m,
labelValues: labelsValues,
value: value,
@ -161,7 +207,14 @@ func (r *MetricAsyncRecorder) FlushMetrics() {
case m := <-r.bufferCh:
m.metric.WithLabelValues(m.labelValues...).Observe(m.value)
default:
return
// no more value
}
select {
case m := <-r.aggregatedInflightEventMetricBufferCh:
m.metric.WithLabelValues(m.labelValues...).Add(m.valueToAdd)
default:
// no more value
}
}
}

View File

@ -17,9 +17,14 @@ limitations under the License.
package metrics
import (
"sort"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
)
var _ MetricRecorder = &fakePodsRecorder{}
@ -101,3 +106,59 @@ func TestClear(t *testing.T) {
t.Errorf("Expected %v, got %v", 0, fakeRecorder.counter)
}
}
func TestInFlightEventAsync(t *testing.T) {
r := &MetricAsyncRecorder{
aggregatedInflightEventMetric: map[gaugeVecMetricKey]int{},
aggregatedInflightEventMetricLastFlushTime: time.Now(),
aggregatedInflightEventMetricBufferCh: make(chan *gaugeVecMetric, 100),
interval: time.Hour,
}
podAddLabel := "Pod/Add"
r.ObserveInFlightEventsAsync(podAddLabel, 10)
r.ObserveInFlightEventsAsync(podAddLabel, -1)
r.ObserveInFlightEventsAsync(PodPoppedInFlightEvent, 1)
if d := cmp.Diff(r.aggregatedInflightEventMetric, map[gaugeVecMetricKey]int{
{metricName: InFlightEvents.Name, labelValue: podAddLabel}: 9,
{metricName: InFlightEvents.Name, labelValue: PodPoppedInFlightEvent}: 1,
}, cmp.AllowUnexported(gaugeVecMetric{})); d != "" {
t.Errorf("unexpected aggregatedInflightEventMetric: %s", d)
}
r.aggregatedInflightEventMetricLastFlushTime = time.Now().Add(-time.Hour) // to test flush
// It adds -4 and flushes the metric to the channel.
r.ObserveInFlightEventsAsync(podAddLabel, -4)
got := []gaugeVecMetric{}
for {
select {
case m := <-r.aggregatedInflightEventMetricBufferCh:
got = append(got, *m)
continue
default:
}
// got all
break
}
// sort got to avoid the flaky test
sort.Slice(got, func(i, j int) bool {
return got[i].labelValues[0] < got[j].labelValues[0]
})
if d := cmp.Diff(got, []gaugeVecMetric{
{
labelValues: []string{podAddLabel},
valueToAdd: 5,
},
{
labelValues: []string{PodPoppedInFlightEvent},
valueToAdd: 1,
},
}, cmp.AllowUnexported(gaugeVecMetric{}), cmpopts.IgnoreFields(gaugeVecMetric{}, "metric")); d != "" {
t.Errorf("unexpected metrics are sent to aggregatedInflightEventMetricBufferCh: %s", d)
}
}

View File

@ -81,6 +81,10 @@ const (
QueueingHintResultError = "Error"
)
const (
PodPoppedInFlightEvent = "PodPopped"
)
// All the histogram based metrics have 1ms as size for the smallest bucket.
var (
scheduleAttempts = metrics.NewCounterVec(
@ -141,6 +145,13 @@ var (
Help: "Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods that the scheduler attempted to schedule and failed; 'gated' is the number of unschedulable pods that the scheduler never attempted to schedule because they are gated.",
StabilityLevel: metrics.STABLE,
}, []string{"queue"})
InFlightEvents = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Subsystem: SchedulerSubsystem,
Name: "inflight_events",
Help: "Number of events recorded in the scheduling queue.",
StabilityLevel: metrics.ALPHA,
}, []string{"event"})
Goroutines = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Subsystem: SchedulerSubsystem,
@ -292,6 +303,7 @@ func Register() {
RegisterMetrics(metricsList...)
if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) {
RegisterMetrics(queueingHintExecutionDuration)
RegisterMetrics(InFlightEvents)
}
volumebindingmetrics.RegisterVolumeSchedulingMetrics()
})