mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Merge pull request #84522 from liu-cong/plugin
Add scheduler plugin execution duration metric.
This commit is contained in:
commit
ce11622bb6
@ -6,6 +6,7 @@ go_library(
|
||||
"cycle_state.go",
|
||||
"framework.go",
|
||||
"interface.go",
|
||||
"metrics_recorder.go",
|
||||
"registry.go",
|
||||
"waiting_pods_map.go",
|
||||
],
|
||||
@ -25,6 +26,7 @@ go_library(
|
||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
|
||||
"//staging/src/k8s.io/component-base/metrics:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
"//vendor/sigs.k8s.io/yaml:go_default_library",
|
||||
],
|
||||
|
@ -44,6 +44,8 @@ type StateKey string
|
||||
type CycleState struct {
|
||||
mx sync.RWMutex
|
||||
storage map[StateKey]StateData
|
||||
// if recordFrameworkMetrics is true, framework metrics will be recorded for this cycle.
|
||||
recordFrameworkMetrics bool
|
||||
}
|
||||
|
||||
// NewCycleState initializes a new CycleState and returns its pointer.
|
||||
@ -53,6 +55,22 @@ func NewCycleState() *CycleState {
|
||||
}
|
||||
}
|
||||
|
||||
// ShouldRecordFrameworkMetrics returns whether framework metrics should be recorded.
|
||||
func (c *CycleState) ShouldRecordFrameworkMetrics() bool {
|
||||
if c == nil {
|
||||
return false
|
||||
}
|
||||
return c.recordFrameworkMetrics
|
||||
}
|
||||
|
||||
// SetRecordFrameworkMetrics sets recordFrameworkMetrics to the given value.
|
||||
func (c *CycleState) SetRecordFrameworkMetrics(flag bool) {
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
c.recordFrameworkMetrics = flag
|
||||
}
|
||||
|
||||
// Clone creates a copy of CycleState and returns its pointer. Clone returns
|
||||
// nil if the context being cloned is nil.
|
||||
func (c *CycleState) Clone() *CycleState {
|
||||
|
@ -46,6 +46,7 @@ const (
|
||||
filter = "Filter"
|
||||
postFilter = "PostFilter"
|
||||
score = "Score"
|
||||
scoreExtensionNormalize = "ScoreExtensionNormalize"
|
||||
preBind = "PreBind"
|
||||
bind = "Bind"
|
||||
postBind = "PostBind"
|
||||
@ -75,6 +76,8 @@ type framework struct {
|
||||
|
||||
clientSet clientset.Interface
|
||||
informerFactory informers.SharedInformerFactory
|
||||
|
||||
metricsRecorder *metricsRecorder
|
||||
}
|
||||
|
||||
// extensionPoint encapsulates desired and applied set of plugins at a specific extension
|
||||
@ -108,6 +111,7 @@ type frameworkOptions struct {
|
||||
clientSet clientset.Interface
|
||||
informerFactory informers.SharedInformerFactory
|
||||
snapshotSharedLister schedulerlisters.SharedLister
|
||||
metricsRecorder *metricsRecorder
|
||||
}
|
||||
|
||||
// Option for the framework.
|
||||
@ -134,7 +138,16 @@ func WithSnapshotSharedLister(snapshotSharedLister schedulerlisters.SharedLister
|
||||
}
|
||||
}
|
||||
|
||||
var defaultFrameworkOptions = frameworkOptions{}
|
||||
// withMetricsRecorder is only used in tests.
|
||||
func withMetricsRecorder(recorder *metricsRecorder) Option {
|
||||
return func(o *frameworkOptions) {
|
||||
o.metricsRecorder = recorder
|
||||
}
|
||||
}
|
||||
|
||||
var defaultFrameworkOptions = frameworkOptions{
|
||||
metricsRecorder: newMetricsRecorder(1000, time.Second),
|
||||
}
|
||||
|
||||
var _ Framework = &framework{}
|
||||
|
||||
@ -152,6 +165,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
|
||||
waitingPods: newWaitingPodsMap(),
|
||||
clientSet: options.clientSet,
|
||||
informerFactory: options.informerFactory,
|
||||
metricsRecorder: options.metricsRecorder,
|
||||
}
|
||||
if plugins == nil {
|
||||
return f, nil
|
||||
@ -254,12 +268,15 @@ func (f *framework) QueueSortFunc() LessFunc {
|
||||
// *Status and its code is set to non-success if any of the plugins returns
|
||||
// anything but Success. If a non-success status is returned, then the scheduling
|
||||
// cycle is aborted.
|
||||
func (f *framework) RunPreFilterPlugins(
|
||||
ctx context.Context, state *CycleState, pod *v1.Pod) (status *Status) {
|
||||
startTime := time.Now()
|
||||
defer func() { recordExtensionPointDuration(startTime, preFilter, status) }()
|
||||
func (f *framework) RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (status *Status) {
|
||||
if state.ShouldRecordFrameworkMetrics() {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
f.metricsRecorder.observeExtensionPointDurationAsync(preFilter, status, metrics.SinceInSeconds(startTime))
|
||||
}()
|
||||
}
|
||||
for _, pl := range f.preFilterPlugins {
|
||||
status := pl.PreFilter(ctx, state, pod)
|
||||
status = f.runPreFilterPlugin(ctx, pl, state, pod)
|
||||
if !status.IsSuccess() {
|
||||
if status.IsUnschedulable() {
|
||||
msg := fmt.Sprintf("rejected by %q at prefilter: %v", pl.Name(), status.Message())
|
||||
@ -275,6 +292,16 @@ func (f *framework) RunPreFilterPlugins(
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *framework) runPreFilterPlugin(ctx context.Context, pl PreFilterPlugin, state *CycleState, pod *v1.Pod) *Status {
|
||||
if !state.ShouldRecordFrameworkMetrics() {
|
||||
return pl.PreFilter(ctx, state, pod)
|
||||
}
|
||||
startTime := time.Now()
|
||||
status := pl.PreFilter(ctx, state, pod)
|
||||
f.metricsRecorder.observePluginDurationAsync(preFilter, pl.Name(), status, metrics.SinceInSeconds(startTime))
|
||||
return status
|
||||
}
|
||||
|
||||
// RunPreFilterExtensionAddPod calls the AddPod interface for the set of configured
|
||||
// PreFilter plugins. It returns directly if any of the plugins return any
|
||||
// status other than Success.
|
||||
@ -285,13 +312,18 @@ func (f *framework) RunPreFilterExtensionAddPod(
|
||||
podToAdd *v1.Pod,
|
||||
nodeInfo *schedulernodeinfo.NodeInfo,
|
||||
) (status *Status) {
|
||||
startTime := time.Now()
|
||||
defer func() { recordExtensionPointDuration(startTime, preFilterExtensionAddPod, status) }()
|
||||
if state.ShouldRecordFrameworkMetrics() {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
f.metricsRecorder.observeExtensionPointDurationAsync(preFilterExtensionAddPod, status, metrics.SinceInSeconds(startTime))
|
||||
}()
|
||||
}
|
||||
for _, pl := range f.preFilterPlugins {
|
||||
if pl.PreFilterExtensions() == nil {
|
||||
continue
|
||||
}
|
||||
if status := pl.PreFilterExtensions().AddPod(ctx, state, podToSchedule, podToAdd, nodeInfo); !status.IsSuccess() {
|
||||
status = f.runPreFilterExtensionAddPod(ctx, pl, state, podToSchedule, podToAdd, nodeInfo)
|
||||
if !status.IsSuccess() {
|
||||
msg := fmt.Sprintf("error while running AddPod for plugin %q while scheduling pod %q: %v",
|
||||
pl.Name(), podToSchedule.Name, status.Message())
|
||||
klog.Error(msg)
|
||||
@ -302,6 +334,16 @@ func (f *framework) RunPreFilterExtensionAddPod(
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *framework) runPreFilterExtensionAddPod(ctx context.Context, pl PreFilterPlugin, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
|
||||
if !state.ShouldRecordFrameworkMetrics() {
|
||||
return pl.PreFilterExtensions().AddPod(ctx, state, podToSchedule, podToAdd, nodeInfo)
|
||||
}
|
||||
startTime := time.Now()
|
||||
status := pl.PreFilterExtensions().AddPod(ctx, state, podToSchedule, podToAdd, nodeInfo)
|
||||
f.metricsRecorder.observePluginDurationAsync(preFilterExtensionAddPod, pl.Name(), status, metrics.SinceInSeconds(startTime))
|
||||
return status
|
||||
}
|
||||
|
||||
// RunPreFilterExtensionRemovePod calls the RemovePod interface for the set of configured
|
||||
// PreFilter plugins. It returns directly if any of the plugins return any
|
||||
// status other than Success.
|
||||
@ -312,13 +354,18 @@ func (f *framework) RunPreFilterExtensionRemovePod(
|
||||
podToRemove *v1.Pod,
|
||||
nodeInfo *schedulernodeinfo.NodeInfo,
|
||||
) (status *Status) {
|
||||
startTime := time.Now()
|
||||
defer func() { recordExtensionPointDuration(startTime, preFilterExtensionRemovePod, status) }()
|
||||
if state.ShouldRecordFrameworkMetrics() {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
f.metricsRecorder.observeExtensionPointDurationAsync(preFilterExtensionRemovePod, status, metrics.SinceInSeconds(startTime))
|
||||
}()
|
||||
}
|
||||
for _, pl := range f.preFilterPlugins {
|
||||
if pl.PreFilterExtensions() == nil {
|
||||
continue
|
||||
}
|
||||
if status := pl.PreFilterExtensions().RemovePod(ctx, state, podToSchedule, podToRemove, nodeInfo); !status.IsSuccess() {
|
||||
status = f.runPreFilterExtensionRemovePod(ctx, pl, state, podToSchedule, podToRemove, nodeInfo)
|
||||
if !status.IsSuccess() {
|
||||
msg := fmt.Sprintf("error while running RemovePod for plugin %q while scheduling pod %q: %v",
|
||||
pl.Name(), podToSchedule.Name, status.Message())
|
||||
klog.Error(msg)
|
||||
@ -329,6 +376,16 @@ func (f *framework) RunPreFilterExtensionRemovePod(
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *framework) runPreFilterExtensionRemovePod(ctx context.Context, pl PreFilterPlugin, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
|
||||
if !state.ShouldRecordFrameworkMetrics() {
|
||||
return pl.PreFilterExtensions().RemovePod(ctx, state, podToSchedule, podToAdd, nodeInfo)
|
||||
}
|
||||
startTime := time.Now()
|
||||
status := pl.PreFilterExtensions().RemovePod(ctx, state, podToSchedule, podToAdd, nodeInfo)
|
||||
f.metricsRecorder.observePluginDurationAsync(preFilterExtensionRemovePod, pl.Name(), status, metrics.SinceInSeconds(startTime))
|
||||
return status
|
||||
}
|
||||
|
||||
// RunFilterPlugins runs the set of configured Filter plugins for pod on
|
||||
// the given node. If any of these plugins doesn't return "Success", the
|
||||
// given node is not suitable for running pod.
|
||||
@ -339,10 +396,14 @@ func (f *framework) RunFilterPlugins(
|
||||
pod *v1.Pod,
|
||||
nodeInfo *schedulernodeinfo.NodeInfo,
|
||||
) (status *Status) {
|
||||
startTime := time.Now()
|
||||
defer func() { recordExtensionPointDuration(startTime, filter, status) }()
|
||||
if state.ShouldRecordFrameworkMetrics() {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
f.metricsRecorder.observeExtensionPointDurationAsync(filter, status, metrics.SinceInSeconds(startTime))
|
||||
}()
|
||||
}
|
||||
for _, pl := range f.filterPlugins {
|
||||
status := pl.Filter(ctx, state, pod, nodeInfo)
|
||||
status = f.runFilterPlugin(ctx, pl, state, pod, nodeInfo)
|
||||
if !status.IsSuccess() {
|
||||
if !status.IsUnschedulable() {
|
||||
errMsg := fmt.Sprintf("error while running %q filter plugin for pod %q: %v",
|
||||
@ -357,6 +418,16 @@ func (f *framework) RunFilterPlugins(
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *framework) runFilterPlugin(ctx context.Context, pl FilterPlugin, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
|
||||
if !state.ShouldRecordFrameworkMetrics() {
|
||||
return pl.Filter(ctx, state, pod, nodeInfo)
|
||||
}
|
||||
startTime := time.Now()
|
||||
status := pl.Filter(ctx, state, pod, nodeInfo)
|
||||
f.metricsRecorder.observePluginDurationAsync(filter, pl.Name(), status, metrics.SinceInSeconds(startTime))
|
||||
return status
|
||||
}
|
||||
|
||||
// RunPostFilterPlugins runs the set of configured post-filter plugins. If any
|
||||
// of these plugins returns any status other than "Success", the given node is
|
||||
// rejected. The filteredNodeStatuses is the set of filtered nodes and their statuses.
|
||||
@ -367,10 +438,14 @@ func (f *framework) RunPostFilterPlugins(
|
||||
nodes []*v1.Node,
|
||||
filteredNodesStatuses NodeToStatusMap,
|
||||
) (status *Status) {
|
||||
startTime := time.Now()
|
||||
defer func() { recordExtensionPointDuration(startTime, postFilter, status) }()
|
||||
if state.ShouldRecordFrameworkMetrics() {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
f.metricsRecorder.observeExtensionPointDurationAsync(postFilter, status, metrics.SinceInSeconds(startTime))
|
||||
}()
|
||||
}
|
||||
for _, pl := range f.postFilterPlugins {
|
||||
status := pl.PostFilter(ctx, state, pod, nodes, filteredNodesStatuses)
|
||||
status = f.runPostFilterPlugin(ctx, pl, state, pod, nodes, filteredNodesStatuses)
|
||||
if !status.IsSuccess() {
|
||||
msg := fmt.Sprintf("error while running %q postfilter plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
|
||||
klog.Error(msg)
|
||||
@ -381,13 +456,27 @@ func (f *framework) RunPostFilterPlugins(
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *framework) runPostFilterPlugin(ctx context.Context, pl PostFilterPlugin, state *CycleState, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status {
|
||||
if !state.ShouldRecordFrameworkMetrics() {
|
||||
return pl.PostFilter(ctx, state, pod, nodes, filteredNodesStatuses)
|
||||
}
|
||||
startTime := time.Now()
|
||||
status := pl.PostFilter(ctx, state, pod, nodes, filteredNodesStatuses)
|
||||
f.metricsRecorder.observePluginDurationAsync(postFilter, pl.Name(), status, metrics.SinceInSeconds(startTime))
|
||||
return status
|
||||
}
|
||||
|
||||
// RunScorePlugins runs the set of configured scoring plugins. It returns a list that
|
||||
// stores for each scoring plugin name the corresponding NodeScoreList(s).
|
||||
// It also returns *Status, which is set to non-success if any of the plugins returns
|
||||
// a non-success status.
|
||||
func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) (ps PluginToNodeScores, status *Status) {
|
||||
startTime := time.Now()
|
||||
defer func() { recordExtensionPointDuration(startTime, score, status) }()
|
||||
if state.ShouldRecordFrameworkMetrics() {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
f.metricsRecorder.observeExtensionPointDurationAsync(score, status, metrics.SinceInSeconds(startTime))
|
||||
}()
|
||||
}
|
||||
pluginToNodeScores := make(PluginToNodeScores, len(f.scorePlugins))
|
||||
for _, pl := range f.scorePlugins {
|
||||
pluginToNodeScores[pl.Name()] = make(NodeScoreList, len(nodes))
|
||||
@ -399,14 +488,14 @@ func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod
|
||||
workqueue.ParallelizeUntil(ctx, 16, len(nodes), func(index int) {
|
||||
for _, pl := range f.scorePlugins {
|
||||
nodeName := nodes[index].Name
|
||||
score, status := pl.Score(ctx, state, pod, nodeName)
|
||||
s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName)
|
||||
if !status.IsSuccess() {
|
||||
errCh.SendErrorWithCancel(fmt.Errorf(status.Message()), cancel)
|
||||
return
|
||||
}
|
||||
pluginToNodeScores[pl.Name()][index] = NodeScore{
|
||||
Name: nodeName,
|
||||
Score: int64(score),
|
||||
Score: int64(s),
|
||||
}
|
||||
}
|
||||
})
|
||||
@ -423,7 +512,7 @@ func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod
|
||||
if pl.ScoreExtensions() == nil {
|
||||
return
|
||||
}
|
||||
status := pl.ScoreExtensions().NormalizeScore(ctx, state, pod, nodeScoreList)
|
||||
status := f.runScoreExtension(ctx, pl, state, pod, nodeScoreList)
|
||||
if !status.IsSuccess() {
|
||||
err := fmt.Errorf("normalize score plugin %q failed with error %v", pl.Name(), status.Message())
|
||||
errCh.SendErrorWithCancel(err, cancel)
|
||||
@ -462,15 +551,38 @@ func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod
|
||||
return pluginToNodeScores, nil
|
||||
}
|
||||
|
||||
func (f *framework) runScorePlugin(ctx context.Context, pl ScorePlugin, state *CycleState, pod *v1.Pod, nodeName string) (int64, *Status) {
|
||||
if !state.ShouldRecordFrameworkMetrics() {
|
||||
return pl.Score(ctx, state, pod, nodeName)
|
||||
}
|
||||
startTime := time.Now()
|
||||
s, status := pl.Score(ctx, state, pod, nodeName)
|
||||
f.metricsRecorder.observePluginDurationAsync(score, pl.Name(), status, metrics.SinceInSeconds(startTime))
|
||||
return s, status
|
||||
}
|
||||
|
||||
func (f *framework) runScoreExtension(ctx context.Context, pl ScorePlugin, state *CycleState, pod *v1.Pod, nodeScoreList NodeScoreList) *Status {
|
||||
if !state.ShouldRecordFrameworkMetrics() {
|
||||
return pl.ScoreExtensions().NormalizeScore(ctx, state, pod, nodeScoreList)
|
||||
}
|
||||
startTime := time.Now()
|
||||
status := pl.ScoreExtensions().NormalizeScore(ctx, state, pod, nodeScoreList)
|
||||
f.metricsRecorder.observePluginDurationAsync(scoreExtensionNormalize, pl.Name(), status, metrics.SinceInSeconds(startTime))
|
||||
return status
|
||||
}
|
||||
|
||||
// RunPreBindPlugins runs the set of configured prebind plugins. It returns a
|
||||
// failure (bool) if any of the plugins returns an error. It also returns an
|
||||
// error containing the rejection message or the error occurred in the plugin.
|
||||
func (f *framework) RunPreBindPlugins(
|
||||
ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
|
||||
startTime := time.Now()
|
||||
defer func() { recordExtensionPointDuration(startTime, preBind, status) }()
|
||||
func (f *framework) RunPreBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
|
||||
if state.ShouldRecordFrameworkMetrics() {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
f.metricsRecorder.observeExtensionPointDurationAsync(preBind, status, metrics.SinceInSeconds(startTime))
|
||||
}()
|
||||
}
|
||||
for _, pl := range f.preBindPlugins {
|
||||
status := pl.PreBind(ctx, state, pod, nodeName)
|
||||
status = f.runPreBindPlugin(ctx, pl, state, pod, nodeName)
|
||||
if !status.IsSuccess() {
|
||||
msg := fmt.Sprintf("error while running %q prebind plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
|
||||
klog.Error(msg)
|
||||
@ -480,15 +592,29 @@ func (f *framework) RunPreBindPlugins(
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *framework) runPreBindPlugin(ctx context.Context, pl PreBindPlugin, state *CycleState, pod *v1.Pod, nodeName string) *Status {
|
||||
if !state.ShouldRecordFrameworkMetrics() {
|
||||
return pl.PreBind(ctx, state, pod, nodeName)
|
||||
}
|
||||
startTime := time.Now()
|
||||
status := pl.PreBind(ctx, state, pod, nodeName)
|
||||
f.metricsRecorder.observePluginDurationAsync(preBind, pl.Name(), status, metrics.SinceInSeconds(startTime))
|
||||
return status
|
||||
}
|
||||
|
||||
// RunBindPlugins runs the set of configured bind plugins until one returns a non `Skip` status.
|
||||
func (f *framework) RunBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
|
||||
startTime := time.Now()
|
||||
defer func() { recordExtensionPointDuration(startTime, bind, status) }()
|
||||
if state.ShouldRecordFrameworkMetrics() {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
f.metricsRecorder.observeExtensionPointDurationAsync(bind, status, metrics.SinceInSeconds(startTime))
|
||||
}()
|
||||
}
|
||||
if len(f.bindPlugins) == 0 {
|
||||
return NewStatus(Skip, "")
|
||||
}
|
||||
for _, bp := range f.bindPlugins {
|
||||
status = bp.Bind(ctx, state, pod, nodeName)
|
||||
status = f.runBindPlugin(ctx, bp, state, pod, nodeName)
|
||||
if status != nil && status.Code() == Skip {
|
||||
continue
|
||||
}
|
||||
@ -502,25 +628,51 @@ func (f *framework) RunBindPlugins(ctx context.Context, state *CycleState, pod *
|
||||
return status
|
||||
}
|
||||
|
||||
// RunPostBindPlugins runs the set of configured postbind plugins.
|
||||
func (f *framework) RunPostBindPlugins(
|
||||
ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) {
|
||||
startTime := time.Now()
|
||||
defer recordExtensionPointDuration(startTime, postBind, nil)
|
||||
for _, pl := range f.postBindPlugins {
|
||||
pl.PostBind(ctx, state, pod, nodeName)
|
||||
func (f *framework) runBindPlugin(ctx context.Context, bp BindPlugin, state *CycleState, pod *v1.Pod, nodeName string) *Status {
|
||||
if !state.ShouldRecordFrameworkMetrics() {
|
||||
return bp.Bind(ctx, state, pod, nodeName)
|
||||
}
|
||||
startTime := time.Now()
|
||||
status := bp.Bind(ctx, state, pod, nodeName)
|
||||
f.metricsRecorder.observePluginDurationAsync(bind, bp.Name(), status, metrics.SinceInSeconds(startTime))
|
||||
return status
|
||||
}
|
||||
|
||||
// RunPostBindPlugins runs the set of configured postbind plugins.
|
||||
func (f *framework) RunPostBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) {
|
||||
if state.ShouldRecordFrameworkMetrics() {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
f.metricsRecorder.observeExtensionPointDurationAsync(postBind, nil, metrics.SinceInSeconds(startTime))
|
||||
}()
|
||||
}
|
||||
for _, pl := range f.postBindPlugins {
|
||||
f.runPostBindPlugin(ctx, pl, state, pod, nodeName)
|
||||
}
|
||||
}
|
||||
|
||||
func (f *framework) runPostBindPlugin(ctx context.Context, pl PostBindPlugin, state *CycleState, pod *v1.Pod, nodeName string) {
|
||||
if !state.ShouldRecordFrameworkMetrics() {
|
||||
pl.PostBind(ctx, state, pod, nodeName)
|
||||
return
|
||||
}
|
||||
startTime := time.Now()
|
||||
pl.PostBind(ctx, state, pod, nodeName)
|
||||
f.metricsRecorder.observePluginDurationAsync(postBind, pl.Name(), nil, metrics.SinceInSeconds(startTime))
|
||||
}
|
||||
|
||||
// RunReservePlugins runs the set of configured reserve plugins. If any of these
|
||||
// plugins returns an error, it does not continue running the remaining ones and
|
||||
// returns the error. In such case, pod will not be scheduled.
|
||||
func (f *framework) RunReservePlugins(
|
||||
ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
|
||||
startTime := time.Now()
|
||||
defer func() { recordExtensionPointDuration(startTime, reserve, status) }()
|
||||
func (f *framework) RunReservePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
|
||||
if state.ShouldRecordFrameworkMetrics() {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
f.metricsRecorder.observeExtensionPointDurationAsync(reserve, status, metrics.SinceInSeconds(startTime))
|
||||
}()
|
||||
}
|
||||
for _, pl := range f.reservePlugins {
|
||||
status := pl.Reserve(ctx, state, pod, nodeName)
|
||||
status = f.runReservePlugin(ctx, pl, state, pod, nodeName)
|
||||
if !status.IsSuccess() {
|
||||
msg := fmt.Sprintf("error while running %q reserve plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
|
||||
klog.Error(msg)
|
||||
@ -530,14 +682,37 @@ func (f *framework) RunReservePlugins(
|
||||
return nil
|
||||
}
|
||||
|
||||
// RunUnreservePlugins runs the set of configured unreserve plugins.
|
||||
func (f *framework) RunUnreservePlugins(
|
||||
ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) {
|
||||
startTime := time.Now()
|
||||
defer recordExtensionPointDuration(startTime, unreserve, nil)
|
||||
for _, pl := range f.unreservePlugins {
|
||||
pl.Unreserve(ctx, state, pod, nodeName)
|
||||
func (f *framework) runReservePlugin(ctx context.Context, pl ReservePlugin, state *CycleState, pod *v1.Pod, nodeName string) *Status {
|
||||
if !state.ShouldRecordFrameworkMetrics() {
|
||||
return pl.Reserve(ctx, state, pod, nodeName)
|
||||
}
|
||||
startTime := time.Now()
|
||||
status := pl.Reserve(ctx, state, pod, nodeName)
|
||||
f.metricsRecorder.observePluginDurationAsync(reserve, pl.Name(), status, metrics.SinceInSeconds(startTime))
|
||||
return status
|
||||
}
|
||||
|
||||
// RunUnreservePlugins runs the set of configured unreserve plugins.
|
||||
func (f *framework) RunUnreservePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) {
|
||||
if state.ShouldRecordFrameworkMetrics() {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
f.metricsRecorder.observeExtensionPointDurationAsync(unreserve, nil, metrics.SinceInSeconds(startTime))
|
||||
}()
|
||||
}
|
||||
for _, pl := range f.unreservePlugins {
|
||||
f.runUnreservePlugin(ctx, pl, state, pod, nodeName)
|
||||
}
|
||||
}
|
||||
|
||||
func (f *framework) runUnreservePlugin(ctx context.Context, pl UnreservePlugin, state *CycleState, pod *v1.Pod, nodeName string) {
|
||||
if !state.ShouldRecordFrameworkMetrics() {
|
||||
pl.Unreserve(ctx, state, pod, nodeName)
|
||||
return
|
||||
}
|
||||
startTime := time.Now()
|
||||
pl.Unreserve(ctx, state, pod, nodeName)
|
||||
f.metricsRecorder.observePluginDurationAsync(unreserve, pl.Name(), nil, metrics.SinceInSeconds(startTime))
|
||||
}
|
||||
|
||||
// RunPermitPlugins runs the set of configured permit plugins. If any of these
|
||||
@ -547,14 +722,17 @@ func (f *framework) RunUnreservePlugins(
|
||||
// returned by the plugin, if the time expires, then it will return an error.
|
||||
// Note that if multiple plugins asked to wait, then we wait for the minimum
|
||||
// timeout duration.
|
||||
func (f *framework) RunPermitPlugins(
|
||||
ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
|
||||
startTime := time.Now()
|
||||
defer func() { recordExtensionPointDuration(startTime, permit, status) }()
|
||||
func (f *framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
|
||||
if state.ShouldRecordFrameworkMetrics() {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
f.metricsRecorder.observeExtensionPointDurationAsync(permit, status, metrics.SinceInSeconds(startTime))
|
||||
}()
|
||||
}
|
||||
pluginsWaitTime := make(map[string]time.Duration)
|
||||
statusCode := Success
|
||||
for _, pl := range f.permitPlugins {
|
||||
status, timeout := pl.Permit(ctx, state, pod, nodeName)
|
||||
status, timeout := f.runPermitPlugin(ctx, pl, state, pod, nodeName)
|
||||
if !status.IsSuccess() {
|
||||
if status.IsUnschedulable() {
|
||||
msg := fmt.Sprintf("rejected by %q at permit: %v", pl.Name(), status.Message())
|
||||
@ -601,6 +779,16 @@ func (f *framework) RunPermitPlugins(
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *framework) runPermitPlugin(ctx context.Context, pl PermitPlugin, state *CycleState, pod *v1.Pod, nodeName string) (*Status, time.Duration) {
|
||||
if !state.ShouldRecordFrameworkMetrics() {
|
||||
return pl.Permit(ctx, state, pod, nodeName)
|
||||
}
|
||||
startTime := time.Now()
|
||||
status, timeout := pl.Permit(ctx, state, pod, nodeName)
|
||||
f.metricsRecorder.observePluginDurationAsync(permit, pl.Name(), status, metrics.SinceInSeconds(startTime))
|
||||
return status, timeout
|
||||
}
|
||||
|
||||
// SnapshotSharedLister returns the scheduler's SharedLister of the latest NodeInfo
|
||||
// snapshot. The snapshot is taken at the beginning of a scheduling cycle and remains
|
||||
// unchanged until a pod finishes "Reserve". There is no guarantee that the information
|
||||
@ -695,11 +883,3 @@ func (f *framework) pluginsNeeded(plugins *config.Plugins) map[string]config.Plu
|
||||
}
|
||||
return pgMap
|
||||
}
|
||||
|
||||
func recordExtensionPointDuration(start time.Time, extensionPoint string, status *Status) {
|
||||
statusCode := Success.String()
|
||||
if status != nil {
|
||||
statusCode = status.Code().String()
|
||||
}
|
||||
metrics.FrameworkExtensionPointDuration.WithLabelValues(extensionPoint, statusCode).Observe(metrics.SinceInSeconds(start))
|
||||
}
|
||||
|
@ -133,6 +133,17 @@ type TestPlugin struct {
|
||||
inj injectedResult
|
||||
}
|
||||
|
||||
type TestPluginPreFilterExtension struct {
|
||||
inj injectedResult
|
||||
}
|
||||
|
||||
func (e *TestPluginPreFilterExtension) AddPod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
|
||||
return NewStatus(Code(e.inj.PreFilterAddPodStatus), "injected status")
|
||||
}
|
||||
func (e *TestPluginPreFilterExtension) RemovePod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
|
||||
return NewStatus(Code(e.inj.PreFilterRemovePodStatus), "injected status")
|
||||
}
|
||||
|
||||
func (pl *TestPlugin) Name() string {
|
||||
return pl.name
|
||||
}
|
||||
@ -150,7 +161,7 @@ func (pl *TestPlugin) PreFilter(ctx context.Context, state *CycleState, p *v1.Po
|
||||
}
|
||||
|
||||
func (pl *TestPlugin) PreFilterExtensions() PreFilterExtensions {
|
||||
return nil
|
||||
return &TestPluginPreFilterExtension{inj: pl.inj}
|
||||
}
|
||||
|
||||
func (pl *TestPlugin) Filter(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
|
||||
@ -586,6 +597,7 @@ func TestPreFilterPlugins(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRecordingMetrics(t *testing.T) {
|
||||
state := &CycleState{recordFrameworkMetrics: true}
|
||||
tests := []struct {
|
||||
name string
|
||||
action func(f Framework)
|
||||
@ -595,117 +607,149 @@ func TestRecordingMetrics(t *testing.T) {
|
||||
}{
|
||||
{
|
||||
name: "PreFilter - Success",
|
||||
action: func(f Framework) { f.RunPreFilterPlugins(context.Background(), nil, pod) },
|
||||
action: func(f Framework) { f.RunPreFilterPlugins(context.Background(), state, pod) },
|
||||
wantExtensionPoint: "PreFilter",
|
||||
wantStatus: Success,
|
||||
},
|
||||
{
|
||||
name: "PreFilterAddPod - Success",
|
||||
action: func(f Framework) { f.RunPreFilterExtensionAddPod(context.Background(), state, pod, nil, nil) },
|
||||
wantExtensionPoint: "PreFilterExtensionAddPod",
|
||||
wantStatus: Success,
|
||||
},
|
||||
{
|
||||
name: "PreFilterRemovePod - Success",
|
||||
action: func(f Framework) { f.RunPreFilterExtensionRemovePod(context.Background(), state, pod, nil, nil) },
|
||||
wantExtensionPoint: "PreFilterExtensionRemovePod",
|
||||
wantStatus: Success,
|
||||
},
|
||||
{
|
||||
name: "PreFilterRemovePod - Success",
|
||||
action: func(f Framework) { f.RunPreFilterPlugins(context.Background(), state, pod) },
|
||||
wantExtensionPoint: "PreFilter",
|
||||
wantStatus: Success,
|
||||
},
|
||||
{
|
||||
name: "Filter - Success",
|
||||
action: func(f Framework) { f.RunFilterPlugins(context.Background(), nil, pod, nil) },
|
||||
action: func(f Framework) { f.RunFilterPlugins(context.Background(), state, pod, nil) },
|
||||
wantExtensionPoint: "Filter",
|
||||
wantStatus: Success,
|
||||
},
|
||||
{
|
||||
name: "PostFilter - Success",
|
||||
action: func(f Framework) { f.RunPostFilterPlugins(context.Background(), nil, pod, nil, nil) },
|
||||
action: func(f Framework) { f.RunPostFilterPlugins(context.Background(), state, pod, nil, nil) },
|
||||
wantExtensionPoint: "PostFilter",
|
||||
wantStatus: Success,
|
||||
},
|
||||
{
|
||||
name: "Score - Success",
|
||||
action: func(f Framework) { f.RunScorePlugins(context.Background(), nil, pod, nodes) },
|
||||
action: func(f Framework) { f.RunScorePlugins(context.Background(), state, pod, nodes) },
|
||||
wantExtensionPoint: "Score",
|
||||
wantStatus: Success,
|
||||
},
|
||||
{
|
||||
name: "Reserve - Success",
|
||||
action: func(f Framework) { f.RunReservePlugins(context.Background(), nil, pod, "") },
|
||||
action: func(f Framework) { f.RunReservePlugins(context.Background(), state, pod, "") },
|
||||
wantExtensionPoint: "Reserve",
|
||||
wantStatus: Success,
|
||||
},
|
||||
{
|
||||
name: "Unreserve - Success",
|
||||
action: func(f Framework) { f.RunUnreservePlugins(context.Background(), nil, pod, "") },
|
||||
action: func(f Framework) { f.RunUnreservePlugins(context.Background(), state, pod, "") },
|
||||
wantExtensionPoint: "Unreserve",
|
||||
wantStatus: Success,
|
||||
},
|
||||
{
|
||||
name: "PreBind - Success",
|
||||
action: func(f Framework) { f.RunPreBindPlugins(context.Background(), nil, pod, "") },
|
||||
action: func(f Framework) { f.RunPreBindPlugins(context.Background(), state, pod, "") },
|
||||
wantExtensionPoint: "PreBind",
|
||||
wantStatus: Success,
|
||||
},
|
||||
{
|
||||
name: "Bind - Success",
|
||||
action: func(f Framework) { f.RunBindPlugins(context.Background(), nil, pod, "") },
|
||||
action: func(f Framework) { f.RunBindPlugins(context.Background(), state, pod, "") },
|
||||
wantExtensionPoint: "Bind",
|
||||
wantStatus: Success,
|
||||
},
|
||||
{
|
||||
name: "PostBind - Success",
|
||||
action: func(f Framework) { f.RunPostBindPlugins(context.Background(), nil, pod, "") },
|
||||
action: func(f Framework) { f.RunPostBindPlugins(context.Background(), state, pod, "") },
|
||||
wantExtensionPoint: "PostBind",
|
||||
wantStatus: Success,
|
||||
},
|
||||
{
|
||||
name: "Permit - Success",
|
||||
action: func(f Framework) { f.RunPermitPlugins(context.Background(), nil, pod, "") },
|
||||
action: func(f Framework) { f.RunPermitPlugins(context.Background(), state, pod, "") },
|
||||
wantExtensionPoint: "Permit",
|
||||
wantStatus: Success,
|
||||
},
|
||||
|
||||
{
|
||||
name: "PreFilter - Error",
|
||||
action: func(f Framework) { f.RunPreFilterPlugins(context.Background(), nil, pod) },
|
||||
action: func(f Framework) { f.RunPreFilterPlugins(context.Background(), state, pod) },
|
||||
inject: injectedResult{PreFilterStatus: int(Error)},
|
||||
wantExtensionPoint: "PreFilter",
|
||||
wantStatus: Error,
|
||||
},
|
||||
{
|
||||
name: "PreFilterAddPod - Error",
|
||||
action: func(f Framework) { f.RunPreFilterExtensionAddPod(context.Background(), state, pod, nil, nil) },
|
||||
inject: injectedResult{PreFilterAddPodStatus: int(Error)},
|
||||
wantExtensionPoint: "PreFilterExtensionAddPod",
|
||||
wantStatus: Error,
|
||||
},
|
||||
{
|
||||
name: "PreFilterRemovePod - Error",
|
||||
action: func(f Framework) { f.RunPreFilterExtensionRemovePod(context.Background(), state, pod, nil, nil) },
|
||||
inject: injectedResult{PreFilterRemovePodStatus: int(Error)},
|
||||
wantExtensionPoint: "PreFilterExtensionRemovePod",
|
||||
wantStatus: Error,
|
||||
},
|
||||
{
|
||||
name: "Filter - Error",
|
||||
action: func(f Framework) { f.RunFilterPlugins(context.Background(), nil, pod, nil) },
|
||||
action: func(f Framework) { f.RunFilterPlugins(context.Background(), state, pod, nil) },
|
||||
inject: injectedResult{FilterStatus: int(Error)},
|
||||
wantExtensionPoint: "Filter",
|
||||
wantStatus: Error,
|
||||
},
|
||||
{
|
||||
name: "PostFilter - Error",
|
||||
action: func(f Framework) { f.RunPostFilterPlugins(context.Background(), nil, pod, nil, nil) },
|
||||
action: func(f Framework) { f.RunPostFilterPlugins(context.Background(), state, pod, nil, nil) },
|
||||
inject: injectedResult{PostFilterStatus: int(Error)},
|
||||
wantExtensionPoint: "PostFilter",
|
||||
wantStatus: Error,
|
||||
},
|
||||
{
|
||||
name: "Score - Error",
|
||||
action: func(f Framework) { f.RunScorePlugins(context.Background(), nil, pod, nodes) },
|
||||
action: func(f Framework) { f.RunScorePlugins(context.Background(), state, pod, nodes) },
|
||||
inject: injectedResult{ScoreStatus: int(Error)},
|
||||
wantExtensionPoint: "Score",
|
||||
wantStatus: Error,
|
||||
},
|
||||
{
|
||||
name: "Reserve - Error",
|
||||
action: func(f Framework) { f.RunReservePlugins(context.Background(), nil, pod, "") },
|
||||
action: func(f Framework) { f.RunReservePlugins(context.Background(), state, pod, "") },
|
||||
inject: injectedResult{ReserveStatus: int(Error)},
|
||||
wantExtensionPoint: "Reserve",
|
||||
wantStatus: Error,
|
||||
},
|
||||
{
|
||||
name: "PreBind - Error",
|
||||
action: func(f Framework) { f.RunPreBindPlugins(context.Background(), nil, pod, "") },
|
||||
action: func(f Framework) { f.RunPreBindPlugins(context.Background(), state, pod, "") },
|
||||
inject: injectedResult{PreBindStatus: int(Error)},
|
||||
wantExtensionPoint: "PreBind",
|
||||
wantStatus: Error,
|
||||
},
|
||||
{
|
||||
name: "Bind - Error",
|
||||
action: func(f Framework) { f.RunBindPlugins(context.Background(), nil, pod, "") },
|
||||
action: func(f Framework) { f.RunBindPlugins(context.Background(), state, pod, "") },
|
||||
inject: injectedResult{BindStatus: int(Error)},
|
||||
wantExtensionPoint: "Bind",
|
||||
wantStatus: Error,
|
||||
},
|
||||
{
|
||||
name: "Permit - Error",
|
||||
action: func(f Framework) { f.RunPermitPlugins(context.Background(), nil, pod, "") },
|
||||
action: func(f Framework) { f.RunPermitPlugins(context.Background(), state, pod, "") },
|
||||
inject: injectedResult{PermitStatus: int(Error)},
|
||||
wantExtensionPoint: "Permit",
|
||||
wantStatus: Error,
|
||||
@ -714,6 +758,10 @@ func TestRecordingMetrics(t *testing.T) {
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
metrics.Register()
|
||||
metrics.FrameworkExtensionPointDuration.Reset()
|
||||
metrics.PluginExecutionDuration.Reset()
|
||||
|
||||
plugin := &TestPlugin{name: testPlugin, inj: tt.inject}
|
||||
r := make(Registry)
|
||||
r.Register(testPlugin,
|
||||
@ -733,16 +781,22 @@ func TestRecordingMetrics(t *testing.T) {
|
||||
PostBind: pluginSet,
|
||||
Unreserve: pluginSet,
|
||||
}
|
||||
f, err := NewFramework(r, plugins, emptyArgs)
|
||||
recorder := newMetricsRecorder(100, time.Nanosecond)
|
||||
f, err := NewFramework(r, plugins, emptyArgs, withMetricsRecorder(recorder))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create framework for testing: %v", err)
|
||||
}
|
||||
metrics.Register()
|
||||
metrics.FrameworkExtensionPointDuration.Reset()
|
||||
|
||||
tt.action(f)
|
||||
|
||||
// Stop the goroutine which records metrics and ensure it's stopped.
|
||||
close(recorder.stopCh)
|
||||
<-recorder.isStoppedCh
|
||||
// Try to clean up the metrics buffer again in case it's not empty.
|
||||
recorder.flushMetrics()
|
||||
|
||||
collectAndCompareFrameworkMetrics(t, tt.wantExtensionPoint, tt.wantStatus)
|
||||
collectAndComparePluginMetrics(t, tt.wantExtensionPoint, testPlugin, tt.wantStatus)
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -765,6 +819,9 @@ func TestPermitWaitingMetric(t *testing.T) {
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
metrics.Register()
|
||||
metrics.PermitWaitDuration.Reset()
|
||||
|
||||
plugin := &TestPlugin{name: testPlugin, inj: tt.inject}
|
||||
r := make(Registry)
|
||||
r.Register(testPlugin,
|
||||
@ -778,8 +835,6 @@ func TestPermitWaitingMetric(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create framework for testing: %v", err)
|
||||
}
|
||||
metrics.Register()
|
||||
metrics.PermitWaitDuration.Reset()
|
||||
|
||||
f.RunPermitPlugins(context.TODO(), nil, pod, "")
|
||||
|
||||
@ -839,17 +894,19 @@ func buildScoreConfigWithWeights(weights map[string]int32, ps ...string) *config
|
||||
}
|
||||
|
||||
type injectedResult struct {
|
||||
ScoreRes int64 `json:"scoreRes,omitempty"`
|
||||
NormalizeRes int64 `json:"normalizeRes,omitempty"`
|
||||
ScoreStatus int `json:"scoreStatus,omitempty"`
|
||||
NormalizeStatus int `json:"normalizeStatus,omitempty"`
|
||||
PreFilterStatus int `json:"preFilterStatus,omitempty"`
|
||||
FilterStatus int `json:"filterStatus,omitempty"`
|
||||
PostFilterStatus int `json:"postFilterStatus,omitempty"`
|
||||
ReserveStatus int `json:"reserveStatus,omitempty"`
|
||||
PreBindStatus int `json:"preBindStatus,omitempty"`
|
||||
BindStatus int `json:"bindStatus,omitempty"`
|
||||
PermitStatus int `json:"permitStatus,omitempty"`
|
||||
ScoreRes int64 `json:"scoreRes,omitempty"`
|
||||
NormalizeRes int64 `json:"normalizeRes,omitempty"`
|
||||
ScoreStatus int `json:"scoreStatus,omitempty"`
|
||||
NormalizeStatus int `json:"normalizeStatus,omitempty"`
|
||||
PreFilterStatus int `json:"preFilterStatus,omitempty"`
|
||||
PreFilterAddPodStatus int `json:"preFilterAddPodStatus,omitempty"`
|
||||
PreFilterRemovePodStatus int `json:"preFilterRemovePodStatus,omitempty"`
|
||||
FilterStatus int `json:"filterStatus,omitempty"`
|
||||
PostFilterStatus int `json:"postFilterStatus,omitempty"`
|
||||
ReserveStatus int `json:"reserveStatus,omitempty"`
|
||||
PreBindStatus int `json:"preBindStatus,omitempty"`
|
||||
BindStatus int `json:"bindStatus,omitempty"`
|
||||
PermitStatus int `json:"permitStatus,omitempty"`
|
||||
}
|
||||
|
||||
func setScoreRes(inj injectedResult) (int64, *Status) {
|
||||
@ -869,6 +926,33 @@ func injectNormalizeRes(inj injectedResult, scores NodeScoreList) *Status {
|
||||
return nil
|
||||
}
|
||||
|
||||
func collectAndComparePluginMetrics(t *testing.T, wantExtensionPoint, wantPlugin string, wantStatus Code) {
|
||||
m := collectHistogramMetric(metrics.PluginExecutionDuration)
|
||||
if len(m.Label) != 3 {
|
||||
t.Fatalf("Unexpected number of label pairs, got: %v, want: 2", len(m.Label))
|
||||
}
|
||||
|
||||
if *m.Label[0].Value != wantExtensionPoint {
|
||||
t.Errorf("Unexpected extension point label, got: %q, want %q", *m.Label[0].Value, wantExtensionPoint)
|
||||
}
|
||||
|
||||
if *m.Label[1].Value != wantPlugin {
|
||||
t.Errorf("Unexpected plugin label, got: %q, want %q", *m.Label[1].Value, wantPlugin)
|
||||
}
|
||||
|
||||
if *m.Label[2].Value != wantStatus.String() {
|
||||
t.Errorf("Unexpected status code label, got: %q, want %q", *m.Label[2].Value, wantStatus)
|
||||
}
|
||||
|
||||
if *m.Histogram.SampleCount == 0 {
|
||||
t.Error("Expect at least 1 sample")
|
||||
}
|
||||
|
||||
if *m.Histogram.SampleSum <= 0 {
|
||||
t.Errorf("Expect latency to be greater than 0, got: %v", *m.Histogram.SampleSum)
|
||||
}
|
||||
}
|
||||
|
||||
func collectAndCompareFrameworkMetrics(t *testing.T, wantExtensionPoint string, wantStatus Code) {
|
||||
m := collectHistogramMetric(metrics.FrameworkExtensionPointDuration)
|
||||
|
||||
@ -885,11 +969,11 @@ func collectAndCompareFrameworkMetrics(t *testing.T, wantExtensionPoint string,
|
||||
}
|
||||
|
||||
if *m.Histogram.SampleCount != 1 {
|
||||
t.Errorf("Expect 1 sample, got: %v", m.Histogram.SampleCount)
|
||||
t.Errorf("Expect 1 sample, got: %v", *m.Histogram.SampleCount)
|
||||
}
|
||||
|
||||
if *m.Histogram.SampleSum <= 0 {
|
||||
t.Errorf("Expect latency to be greater than 0, got: %v", m.Histogram.SampleSum)
|
||||
t.Errorf("Expect latency to be greater than 0, got: %v", *m.Histogram.SampleSum)
|
||||
}
|
||||
}
|
||||
|
||||
@ -911,17 +995,17 @@ func collectAndComparePermitWaitDuration(t *testing.T, wantRes string) {
|
||||
}
|
||||
|
||||
if *m.Histogram.SampleCount != 1 {
|
||||
t.Errorf("Expect 1 sample, got: %v", m.Histogram.SampleCount)
|
||||
t.Errorf("Expect 1 sample, got: %v", *m.Histogram.SampleCount)
|
||||
}
|
||||
|
||||
if *m.Histogram.SampleSum <= 0 {
|
||||
t.Errorf("Expect latency to be greater than 0, got: %v", m.Histogram.SampleSum)
|
||||
t.Errorf("Expect latency to be greater than 0, got: %v", *m.Histogram.SampleSum)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func collectHistogramMetric(metric prometheus.Collector) *dto.Metric {
|
||||
ch := make(chan prometheus.Metric, 1)
|
||||
ch := make(chan prometheus.Metric, 100)
|
||||
metric.Collect(ch)
|
||||
select {
|
||||
case got := <-ch:
|
||||
|
@ -74,3 +74,19 @@ func TestStatus(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// The String() method relies on the value and order of the status codes to function properly.
|
||||
func TestStatusCodes(t *testing.T) {
|
||||
assertStatusCode(t, Success, 0)
|
||||
assertStatusCode(t, Error, 1)
|
||||
assertStatusCode(t, Unschedulable, 2)
|
||||
assertStatusCode(t, UnschedulableAndUnresolvable, 3)
|
||||
assertStatusCode(t, Wait, 4)
|
||||
assertStatusCode(t, Skip, 5)
|
||||
}
|
||||
|
||||
func assertStatusCode(t *testing.T, code Code, value int) {
|
||||
if int(code) != value {
|
||||
t.Errorf("Status code %q should have a value of %v but got %v", code.String(), value, int(code))
|
||||
}
|
||||
}
|
||||
|
115
pkg/scheduler/framework/v1alpha1/metrics_recorder.go
Normal file
115
pkg/scheduler/framework/v1alpha1/metrics_recorder.go
Normal file
@ -0,0 +1,115 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package v1alpha1
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
k8smetrics "k8s.io/component-base/metrics"
|
||||
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
||||
)
|
||||
|
||||
// frameworkMetric is the data structure passed in the buffer channel between the main framework thread
|
||||
// and the metricsRecorder goroutine.
|
||||
type frameworkMetric struct {
|
||||
metric *k8smetrics.HistogramVec
|
||||
labelValues []string
|
||||
value float64
|
||||
}
|
||||
|
||||
// metricRecorder records framework metrics in a separate goroutine to avoid overhead in the critical path.
|
||||
type metricsRecorder struct {
|
||||
// bufferCh is a channel that serves as a metrics buffer before the metricsRecorder goroutine reports it.
|
||||
bufferCh chan *frameworkMetric
|
||||
// if bufferSize is reached, incoming metrics will be discarded.
|
||||
bufferSize int
|
||||
// how often the recorder runs to flush the metrics.
|
||||
interval time.Duration
|
||||
|
||||
// stopCh is used to stop the goroutine which periodically flushes metrics. It's currently only
|
||||
// used in tests.
|
||||
stopCh chan struct{}
|
||||
// isStoppedCh indicates whether the goroutine is stopped. It's used in tests only to make sure
|
||||
// the metric flushing goroutine is stopped so that tests can collect metrics for verification.
|
||||
isStoppedCh chan struct{}
|
||||
}
|
||||
|
||||
func newMetricsRecorder(bufferSize int, interval time.Duration) *metricsRecorder {
|
||||
recorder := &metricsRecorder{
|
||||
bufferCh: make(chan *frameworkMetric, bufferSize),
|
||||
bufferSize: bufferSize,
|
||||
interval: interval,
|
||||
stopCh: make(chan struct{}),
|
||||
isStoppedCh: make(chan struct{}),
|
||||
}
|
||||
go recorder.run()
|
||||
return recorder
|
||||
}
|
||||
|
||||
// observeExtensionPointDurationAsync observes the framework_extension_point_duration_seconds metric.
|
||||
// The metric will be flushed to Prometheus asynchronously.
|
||||
func (r *metricsRecorder) observeExtensionPointDurationAsync(extensionPoint string, status *Status, value float64) {
|
||||
newMetric := &frameworkMetric{
|
||||
metric: metrics.FrameworkExtensionPointDuration,
|
||||
labelValues: []string{extensionPoint, status.Code().String()},
|
||||
value: value,
|
||||
}
|
||||
select {
|
||||
case r.bufferCh <- newMetric:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// observeExtensionPointDurationAsync observes the plugin_execution_duration_seconds metric.
|
||||
// The metric will be flushed to Prometheus asynchronously.
|
||||
func (r *metricsRecorder) observePluginDurationAsync(extensionPoint, pluginName string, status *Status, value float64) {
|
||||
newMetric := &frameworkMetric{
|
||||
metric: metrics.PluginExecutionDuration,
|
||||
labelValues: []string{pluginName, extensionPoint, status.Code().String()},
|
||||
value: value,
|
||||
}
|
||||
select {
|
||||
case r.bufferCh <- newMetric:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// run flushes buffered metrics into Prometheus every second.
|
||||
func (r *metricsRecorder) run() {
|
||||
for {
|
||||
select {
|
||||
case <-r.stopCh:
|
||||
close(r.isStoppedCh)
|
||||
return
|
||||
default:
|
||||
}
|
||||
r.flushMetrics()
|
||||
time.Sleep(r.interval)
|
||||
}
|
||||
}
|
||||
|
||||
// flushMetrics tries to clean up the bufferCh by reading at most bufferSize metrics.
|
||||
func (r *metricsRecorder) flushMetrics() {
|
||||
for i := 0; i < r.bufferSize; i++ {
|
||||
select {
|
||||
case m := <-r.bufferCh:
|
||||
m.metric.WithLabelValues(m.labelValues...).Observe(m.value)
|
||||
default:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
@ -254,6 +254,16 @@ var (
|
||||
},
|
||||
[]string{"extension_point", "status"})
|
||||
|
||||
PluginExecutionDuration = metrics.NewHistogramVec(
|
||||
&metrics.HistogramOpts{
|
||||
Subsystem: SchedulerSubsystem,
|
||||
Name: "plugin_execution_duration_seconds",
|
||||
Help: "Duration for running a plugin at a specific extension point.",
|
||||
Buckets: nil,
|
||||
StabilityLevel: metrics.ALPHA,
|
||||
},
|
||||
[]string{"plugin", "extension_point", "status"})
|
||||
|
||||
SchedulerQueueIncomingPods = metrics.NewCounterVec(
|
||||
&metrics.CounterOpts{
|
||||
Subsystem: SchedulerSubsystem,
|
||||
@ -302,6 +312,7 @@ var (
|
||||
PodSchedulingDuration,
|
||||
PodSchedulingAttempts,
|
||||
FrameworkExtensionPointDuration,
|
||||
PluginExecutionDuration,
|
||||
SchedulerQueueIncomingPods,
|
||||
SchedulerGoroutines,
|
||||
PermitWaitDuration,
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
@ -55,6 +56,8 @@ const (
|
||||
BindTimeoutSeconds = 100
|
||||
// SchedulerError is the reason recorded for events when an error occurs during scheduling a pod.
|
||||
SchedulerError = "SchedulerError"
|
||||
// Percentage of framework metrics to be sampled.
|
||||
frameworkMetricsSamplePercent = 10
|
||||
)
|
||||
|
||||
// podConditionUpdater updates the condition of a pod based on the passed
|
||||
@ -597,6 +600,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
||||
// Synchronously attempt to find a fit for the pod.
|
||||
start := time.Now()
|
||||
state := framework.NewCycleState()
|
||||
state.SetRecordFrameworkMetrics(rand.Intn(100) < frameworkMetricsSamplePercent)
|
||||
schedulingCycleCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, state, pod)
|
||||
|
Loading…
Reference in New Issue
Block a user