mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 10:51:29 +00:00
add(scheduler): implement "plugin_execution_duration_seconds" metric in PreEnqueue
This commit is contained in:
parent
ead7d66ee1
commit
6697467062
@ -230,6 +230,21 @@ func WithCaptureProfile(c CaptureProfile) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithClusterEventMap sets clusterEventMap for the scheduling frameworkImpl.
|
||||||
|
func WithClusterEventMap(m map[framework.ClusterEvent]sets.String) Option {
|
||||||
|
return func(o *frameworkOptions) {
|
||||||
|
o.clusterEventMap = m
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithMetricsRecorder sets metrics recorder for the scheduling frameworkImpl.
|
||||||
|
func WithMetricsRecorder(r *metrics.MetricAsyncRecorder) Option {
|
||||||
|
return func(o *frameworkOptions) {
|
||||||
|
o.metricsRecorder = r
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// defaultFrameworkOptions are applied when no option corresponding to those fields exist.
|
||||||
func defaultFrameworkOptions(stopCh <-chan struct{}) frameworkOptions {
|
func defaultFrameworkOptions(stopCh <-chan struct{}) frameworkOptions {
|
||||||
return frameworkOptions{
|
return frameworkOptions{
|
||||||
metricsRecorder: metrics.NewMetricsAsyncRecorder(1000, time.Second, stopCh),
|
metricsRecorder: metrics.NewMetricsAsyncRecorder(1000, time.Second, stopCh),
|
||||||
@ -238,13 +253,6 @@ func defaultFrameworkOptions(stopCh <-chan struct{}) frameworkOptions {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithClusterEventMap sets clusterEventMap for the scheduling frameworkImpl.
|
|
||||||
func WithClusterEventMap(m map[framework.ClusterEvent]sets.String) Option {
|
|
||||||
return func(o *frameworkOptions) {
|
|
||||||
o.clusterEventMap = m
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ framework.Framework = &frameworkImpl{}
|
var _ framework.Framework = &frameworkImpl{}
|
||||||
|
|
||||||
// NewFramework initializes plugins given the configuration and the registry.
|
// NewFramework initializes plugins given the configuration and the registry.
|
||||||
|
@ -29,6 +29,7 @@ package queue
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -182,6 +183,10 @@ type PriorityQueue struct {
|
|||||||
closed bool
|
closed bool
|
||||||
|
|
||||||
nsLister listersv1.NamespaceLister
|
nsLister listersv1.NamespaceLister
|
||||||
|
|
||||||
|
metricsRecorder metrics.MetricAsyncRecorder
|
||||||
|
// pluginMetricsSamplePercent is the percentage of plugin metrics to be sampled.
|
||||||
|
pluginMetricsSamplePercent int
|
||||||
}
|
}
|
||||||
|
|
||||||
type priorityQueueOptions struct {
|
type priorityQueueOptions struct {
|
||||||
@ -190,6 +195,8 @@ type priorityQueueOptions struct {
|
|||||||
podMaxBackoffDuration time.Duration
|
podMaxBackoffDuration time.Duration
|
||||||
podMaxInUnschedulablePodsDuration time.Duration
|
podMaxInUnschedulablePodsDuration time.Duration
|
||||||
podLister listersv1.PodLister
|
podLister listersv1.PodLister
|
||||||
|
metricsRecorder metrics.MetricAsyncRecorder
|
||||||
|
pluginMetricsSamplePercent int
|
||||||
clusterEventMap map[framework.ClusterEvent]sets.String
|
clusterEventMap map[framework.ClusterEvent]sets.String
|
||||||
preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin
|
preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin
|
||||||
}
|
}
|
||||||
@ -246,6 +253,20 @@ func WithPreEnqueuePluginMap(m map[string][]framework.PreEnqueuePlugin) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithMetricsRecorder sets metrics recorder.
|
||||||
|
func WithMetricsRecorder(recorder metrics.MetricAsyncRecorder) Option {
|
||||||
|
return func(o *priorityQueueOptions) {
|
||||||
|
o.metricsRecorder = recorder
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithPluginMetricsSamplePercent sets the percentage of plugin metrics to be sampled.
|
||||||
|
func WithPluginMetricsSamplePercent(percent int) Option {
|
||||||
|
return func(o *priorityQueueOptions) {
|
||||||
|
o.pluginMetricsSamplePercent = percent
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var defaultPriorityQueueOptions = priorityQueueOptions{
|
var defaultPriorityQueueOptions = priorityQueueOptions{
|
||||||
clock: clock.RealClock{},
|
clock: clock.RealClock{},
|
||||||
podInitialBackoffDuration: DefaultPodInitialBackoffDuration,
|
podInitialBackoffDuration: DefaultPodInitialBackoffDuration,
|
||||||
@ -298,6 +319,8 @@ func NewPriorityQueue(
|
|||||||
moveRequestCycle: -1,
|
moveRequestCycle: -1,
|
||||||
clusterEventMap: options.clusterEventMap,
|
clusterEventMap: options.clusterEventMap,
|
||||||
preEnqueuePluginMap: options.preEnqueuePluginMap,
|
preEnqueuePluginMap: options.preEnqueuePluginMap,
|
||||||
|
metricsRecorder: options.metricsRecorder,
|
||||||
|
pluginMetricsSamplePercent: options.pluginMetricsSamplePercent,
|
||||||
}
|
}
|
||||||
pq.cond.L = &pq.lock
|
pq.cond.L = &pq.lock
|
||||||
pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
|
pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
|
||||||
@ -325,8 +348,9 @@ func (p *PriorityQueue) runPreEnqueuePlugins(ctx context.Context, pInfo *framewo
|
|||||||
metrics.FrameworkExtensionPointDuration.WithLabelValues(preEnqueue, s.Code().String(), pod.Spec.SchedulerName).Observe(metrics.SinceInSeconds(startTime))
|
metrics.FrameworkExtensionPointDuration.WithLabelValues(preEnqueue, s.Code().String(), pod.Spec.SchedulerName).Observe(metrics.SinceInSeconds(startTime))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
shouldRecordMetric := rand.Intn(100) < p.pluginMetricsSamplePercent
|
||||||
for _, pl := range p.preEnqueuePluginMap[pod.Spec.SchedulerName] {
|
for _, pl := range p.preEnqueuePluginMap[pod.Spec.SchedulerName] {
|
||||||
s = pl.PreEnqueue(ctx, pod)
|
s = p.runPreEnqueuePlugin(ctx, pl, pod, shouldRecordMetric)
|
||||||
if s.IsSuccess() {
|
if s.IsSuccess() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -342,6 +366,16 @@ func (p *PriorityQueue) runPreEnqueuePlugins(ctx context.Context, pInfo *framewo
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *PriorityQueue) runPreEnqueuePlugin(ctx context.Context, pl framework.PreEnqueuePlugin, pod *v1.Pod, shouldRecordMetric bool) *framework.Status {
|
||||||
|
if !shouldRecordMetric {
|
||||||
|
return pl.PreEnqueue(ctx, pod)
|
||||||
|
}
|
||||||
|
startTime := p.clock.Now()
|
||||||
|
s := pl.PreEnqueue(ctx, pod)
|
||||||
|
p.metricsRecorder.ObservePluginDurationAsync(preEnqueue, pl.Name(), s.Code().String(), p.clock.Since(startTime).Seconds())
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
// addToActiveQ tries to add pod to active queue. It returns 2 parameters:
|
// addToActiveQ tries to add pod to active queue. It returns 2 parameters:
|
||||||
// 1. a boolean flag to indicate whether the pod is added successfully.
|
// 1. a boolean flag to indicate whether the pod is added successfully.
|
||||||
// 2. an error for the caller to act on.
|
// 2. an error for the caller to act on.
|
||||||
|
@ -1603,6 +1603,7 @@ func TestPendingPodsMetric(t *testing.T) {
|
|||||||
operations []operation
|
operations []operation
|
||||||
operands [][]*framework.QueuedPodInfo
|
operands [][]*framework.QueuedPodInfo
|
||||||
metricsName string
|
metricsName string
|
||||||
|
pluginMetricsSamplePercent int
|
||||||
wants string
|
wants string
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
@ -1765,6 +1766,59 @@ scheduler_pending_pods{queue="gated"} 5
|
|||||||
scheduler_pending_pods{queue="unschedulable"} 20
|
scheduler_pending_pods{queue="unschedulable"} 20
|
||||||
`,
|
`,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "the metrics should not be recorded (pluginMetricsSamplePercent=0)",
|
||||||
|
operations: []operation{
|
||||||
|
add,
|
||||||
|
},
|
||||||
|
operands: [][]*framework.QueuedPodInfo{
|
||||||
|
pInfos[:1],
|
||||||
|
},
|
||||||
|
metricsName: "scheduler_plugin_execution_duration_seconds",
|
||||||
|
pluginMetricsSamplePercent: 0,
|
||||||
|
wants: `
|
||||||
|
# HELP scheduler_plugin_execution_duration_seconds [ALPHA] Duration for running a plugin at a specific extension point.
|
||||||
|
# TYPE scheduler_plugin_execution_duration_seconds histogram
|
||||||
|
`, // the observed value will always be 0, because we don't proceed the fake clock.
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "the metrics should be recorded (pluginMetricsSamplePercent=100)",
|
||||||
|
operations: []operation{
|
||||||
|
add,
|
||||||
|
},
|
||||||
|
operands: [][]*framework.QueuedPodInfo{
|
||||||
|
pInfos[:1],
|
||||||
|
},
|
||||||
|
metricsName: "scheduler_plugin_execution_duration_seconds",
|
||||||
|
pluginMetricsSamplePercent: 100,
|
||||||
|
wants: `
|
||||||
|
# HELP scheduler_plugin_execution_duration_seconds [ALPHA] Duration for running a plugin at a specific extension point.
|
||||||
|
# TYPE scheduler_plugin_execution_duration_seconds histogram
|
||||||
|
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="1e-05"} 1
|
||||||
|
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="1.5000000000000002e-05"} 1
|
||||||
|
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="2.2500000000000005e-05"} 1
|
||||||
|
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="3.375000000000001e-05"} 1
|
||||||
|
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="5.062500000000001e-05"} 1
|
||||||
|
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="7.593750000000002e-05"} 1
|
||||||
|
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.00011390625000000003"} 1
|
||||||
|
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.00017085937500000006"} 1
|
||||||
|
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.0002562890625000001"} 1
|
||||||
|
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.00038443359375000017"} 1
|
||||||
|
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.0005766503906250003"} 1
|
||||||
|
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.0008649755859375004"} 1
|
||||||
|
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.0012974633789062506"} 1
|
||||||
|
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.0019461950683593758"} 1
|
||||||
|
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.0029192926025390638"} 1
|
||||||
|
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.004378938903808595"} 1
|
||||||
|
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.006568408355712893"} 1
|
||||||
|
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.009852612533569338"} 1
|
||||||
|
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.014778918800354007"} 1
|
||||||
|
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.02216837820053101"} 1
|
||||||
|
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="+Inf"} 1
|
||||||
|
scheduler_plugin_execution_duration_seconds_sum{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success"} 0
|
||||||
|
scheduler_plugin_execution_duration_seconds_count{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success"} 1
|
||||||
|
`, // the observed value will always be 0, because we don't proceed the fake clock.
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
resetMetrics := func() {
|
resetMetrics := func() {
|
||||||
@ -1772,6 +1826,7 @@ scheduler_pending_pods{queue="unschedulable"} 20
|
|||||||
metrics.BackoffPods().Set(0)
|
metrics.BackoffPods().Set(0)
|
||||||
metrics.UnschedulablePods().Set(0)
|
metrics.UnschedulablePods().Set(0)
|
||||||
metrics.GatedPods().Set(0)
|
metrics.GatedPods().Set(0)
|
||||||
|
metrics.PluginExecutionDuration.Reset()
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
@ -1781,13 +1836,16 @@ scheduler_pending_pods{queue="unschedulable"} 20
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
m := map[string][]framework.PreEnqueuePlugin{"": {&preEnqueuePlugin{allowlists: []string{queueable}}}}
|
m := map[string][]framework.PreEnqueuePlugin{"": {&preEnqueuePlugin{allowlists: []string{queueable}}}}
|
||||||
queue := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(testingclock.NewFakeClock(timestamp)), WithPreEnqueuePluginMap(m))
|
recorder := metrics.NewMetricsAsyncRecorder(3, 20*time.Microsecond, ctx.Done())
|
||||||
|
queue := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(testingclock.NewFakeClock(timestamp)), WithPreEnqueuePluginMap(m), WithPluginMetricsSamplePercent(test.pluginMetricsSamplePercent), WithMetricsRecorder(*recorder))
|
||||||
for i, op := range test.operations {
|
for i, op := range test.operations {
|
||||||
for _, pInfo := range test.operands[i] {
|
for _, pInfo := range test.operands[i] {
|
||||||
op(queue, pInfo)
|
op(queue, pInfo)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
recorder.FlushMetrics()
|
||||||
|
|
||||||
if err := testutil.GatherAndCompare(metrics.GetGather(), strings.NewReader(test.wants), test.metricsName); err != nil {
|
if err := testutil.GatherAndCompare(metrics.GetGather(), strings.NewReader(test.wants), test.metricsName); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -283,6 +283,7 @@ func New(client clientset.Interface,
|
|||||||
|
|
||||||
snapshot := internalcache.NewEmptySnapshot()
|
snapshot := internalcache.NewEmptySnapshot()
|
||||||
clusterEventMap := make(map[framework.ClusterEvent]sets.String)
|
clusterEventMap := make(map[framework.ClusterEvent]sets.String)
|
||||||
|
metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopCh)
|
||||||
|
|
||||||
profiles, err := profile.NewMap(options.profiles, registry, recorderFactory, stopCh,
|
profiles, err := profile.NewMap(options.profiles, registry, recorderFactory, stopCh,
|
||||||
frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
|
frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
|
||||||
@ -292,8 +293,10 @@ func New(client clientset.Interface,
|
|||||||
frameworkruntime.WithSnapshotSharedLister(snapshot),
|
frameworkruntime.WithSnapshotSharedLister(snapshot),
|
||||||
frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
|
frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
|
||||||
frameworkruntime.WithClusterEventMap(clusterEventMap),
|
frameworkruntime.WithClusterEventMap(clusterEventMap),
|
||||||
|
frameworkruntime.WithClusterEventMap(clusterEventMap),
|
||||||
frameworkruntime.WithParallelism(int(options.parallelism)),
|
frameworkruntime.WithParallelism(int(options.parallelism)),
|
||||||
frameworkruntime.WithExtenders(extenders),
|
frameworkruntime.WithExtenders(extenders),
|
||||||
|
frameworkruntime.WithMetricsRecorder(metricsRecorder),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("initializing profiles: %v", err)
|
return nil, fmt.Errorf("initializing profiles: %v", err)
|
||||||
@ -316,6 +319,8 @@ func New(client clientset.Interface,
|
|||||||
internalqueue.WithClusterEventMap(clusterEventMap),
|
internalqueue.WithClusterEventMap(clusterEventMap),
|
||||||
internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
|
internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
|
||||||
internalqueue.WithPreEnqueuePluginMap(preEnqueuePluginMap),
|
internalqueue.WithPreEnqueuePluginMap(preEnqueuePluginMap),
|
||||||
|
internalqueue.WithPluginMetricsSamplePercent(pluginMetricsSamplePercent),
|
||||||
|
internalqueue.WithMetricsRecorder(*metricsRecorder),
|
||||||
)
|
)
|
||||||
|
|
||||||
for _, fwk := range profiles {
|
for _, fwk := range profiles {
|
||||||
|
Loading…
Reference in New Issue
Block a user