Add scheduler plugin execution duration metric.

Address comments

Sample metrics

Use rand.Intn and some cleanup
This commit is contained in:
Cong Liu 2019-10-29 10:54:02 -04:00
parent bcb171b375
commit af6a8160c3
8 changed files with 533 additions and 103 deletions

View File

@ -6,6 +6,7 @@ go_library(
"cycle_state.go",
"framework.go",
"interface.go",
"metrics_recorder.go",
"registry.go",
"waiting_pods_map.go",
],
@ -25,6 +26,7 @@ go_library(
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/component-base/metrics:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/sigs.k8s.io/yaml:go_default_library",
],

View File

@ -44,6 +44,8 @@ type StateKey string
type CycleState struct {
mx sync.RWMutex
storage map[StateKey]StateData
// if recordFrameworkMetrics is true, framework metrics will be recorded for this cycle.
recordFrameworkMetrics bool
}
// NewCycleState initializes a new CycleState and returns its pointer.
@ -53,6 +55,22 @@ func NewCycleState() *CycleState {
}
}
// ShouldRecordFrameworkMetrics returns whether framework metrics should be recorded.
func (c *CycleState) ShouldRecordFrameworkMetrics() bool {
if c == nil {
return false
}
return c.recordFrameworkMetrics
}
// SetRecordFrameworkMetrics sets recordFrameworkMetrics to the given value.
func (c *CycleState) SetRecordFrameworkMetrics(flag bool) {
if c == nil {
return
}
c.recordFrameworkMetrics = flag
}
// Clone creates a copy of CycleState and returns its pointer. Clone returns
// nil if the context being cloned is nil.
func (c *CycleState) Clone() *CycleState {

View File

@ -46,6 +46,7 @@ const (
filter = "Filter"
postFilter = "PostFilter"
score = "Score"
scoreExtensionNormalize = "ScoreExtensionNormalize"
preBind = "PreBind"
bind = "Bind"
postBind = "PostBind"
@ -75,6 +76,8 @@ type framework struct {
clientSet clientset.Interface
informerFactory informers.SharedInformerFactory
metricsRecorder *metricsRecorder
}
// extensionPoint encapsulates desired and applied set of plugins at a specific extension
@ -108,6 +111,7 @@ type frameworkOptions struct {
clientSet clientset.Interface
informerFactory informers.SharedInformerFactory
snapshotSharedLister schedulerlisters.SharedLister
metricsRecorder *metricsRecorder
}
// Option for the framework.
@ -134,7 +138,16 @@ func WithSnapshotSharedLister(snapshotSharedLister schedulerlisters.SharedLister
}
}
var defaultFrameworkOptions = frameworkOptions{}
// withMetricsRecorder is only used in tests.
func withMetricsRecorder(recorder *metricsRecorder) Option {
return func(o *frameworkOptions) {
o.metricsRecorder = recorder
}
}
var defaultFrameworkOptions = frameworkOptions{
metricsRecorder: newMetricsRecorder(1000, time.Second),
}
var _ Framework = &framework{}
@ -152,6 +165,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
waitingPods: newWaitingPodsMap(),
clientSet: options.clientSet,
informerFactory: options.informerFactory,
metricsRecorder: options.metricsRecorder,
}
if plugins == nil {
return f, nil
@ -254,12 +268,15 @@ func (f *framework) QueueSortFunc() LessFunc {
// *Status and its code is set to non-success if any of the plugins returns
// anything but Success. If a non-success status is returned, then the scheduling
// cycle is aborted.
func (f *framework) RunPreFilterPlugins(
ctx context.Context, state *CycleState, pod *v1.Pod) (status *Status) {
startTime := time.Now()
defer func() { recordExtensionPointDuration(startTime, preFilter, status) }()
func (f *framework) RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (status *Status) {
if state.ShouldRecordFrameworkMetrics() {
startTime := time.Now()
defer func() {
f.metricsRecorder.observeExtensionPointDurationAsync(preFilter, status, metrics.SinceInSeconds(startTime))
}()
}
for _, pl := range f.preFilterPlugins {
status := pl.PreFilter(ctx, state, pod)
status = f.runPreFilterPlugin(ctx, pl, state, pod)
if !status.IsSuccess() {
if status.IsUnschedulable() {
msg := fmt.Sprintf("rejected by %q at prefilter: %v", pl.Name(), status.Message())
@ -275,6 +292,16 @@ func (f *framework) RunPreFilterPlugins(
return nil
}
func (f *framework) runPreFilterPlugin(ctx context.Context, pl PreFilterPlugin, state *CycleState, pod *v1.Pod) *Status {
if !state.ShouldRecordFrameworkMetrics() {
return pl.PreFilter(ctx, state, pod)
}
startTime := time.Now()
status := pl.PreFilter(ctx, state, pod)
f.metricsRecorder.observePluginDurationAsync(preFilter, pl.Name(), status, metrics.SinceInSeconds(startTime))
return status
}
// RunPreFilterExtensionAddPod calls the AddPod interface for the set of configured
// PreFilter plugins. It returns directly if any of the plugins return any
// status other than Success.
@ -285,13 +312,18 @@ func (f *framework) RunPreFilterExtensionAddPod(
podToAdd *v1.Pod,
nodeInfo *schedulernodeinfo.NodeInfo,
) (status *Status) {
startTime := time.Now()
defer func() { recordExtensionPointDuration(startTime, preFilterExtensionAddPod, status) }()
if state.ShouldRecordFrameworkMetrics() {
startTime := time.Now()
defer func() {
f.metricsRecorder.observeExtensionPointDurationAsync(preFilterExtensionAddPod, status, metrics.SinceInSeconds(startTime))
}()
}
for _, pl := range f.preFilterPlugins {
if pl.PreFilterExtensions() == nil {
continue
}
if status := pl.PreFilterExtensions().AddPod(ctx, state, podToSchedule, podToAdd, nodeInfo); !status.IsSuccess() {
status = f.runPreFilterExtensionAddPod(ctx, pl, state, podToSchedule, podToAdd, nodeInfo)
if !status.IsSuccess() {
msg := fmt.Sprintf("error while running AddPod for plugin %q while scheduling pod %q: %v",
pl.Name(), podToSchedule.Name, status.Message())
klog.Error(msg)
@ -302,6 +334,16 @@ func (f *framework) RunPreFilterExtensionAddPod(
return nil
}
func (f *framework) runPreFilterExtensionAddPod(ctx context.Context, pl PreFilterPlugin, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
if !state.ShouldRecordFrameworkMetrics() {
return pl.PreFilterExtensions().AddPod(ctx, state, podToSchedule, podToAdd, nodeInfo)
}
startTime := time.Now()
status := pl.PreFilterExtensions().AddPod(ctx, state, podToSchedule, podToAdd, nodeInfo)
f.metricsRecorder.observePluginDurationAsync(preFilterExtensionAddPod, pl.Name(), status, metrics.SinceInSeconds(startTime))
return status
}
// RunPreFilterExtensionRemovePod calls the RemovePod interface for the set of configured
// PreFilter plugins. It returns directly if any of the plugins return any
// status other than Success.
@ -312,13 +354,18 @@ func (f *framework) RunPreFilterExtensionRemovePod(
podToRemove *v1.Pod,
nodeInfo *schedulernodeinfo.NodeInfo,
) (status *Status) {
startTime := time.Now()
defer func() { recordExtensionPointDuration(startTime, preFilterExtensionRemovePod, status) }()
if state.ShouldRecordFrameworkMetrics() {
startTime := time.Now()
defer func() {
f.metricsRecorder.observeExtensionPointDurationAsync(preFilterExtensionRemovePod, status, metrics.SinceInSeconds(startTime))
}()
}
for _, pl := range f.preFilterPlugins {
if pl.PreFilterExtensions() == nil {
continue
}
if status := pl.PreFilterExtensions().RemovePod(ctx, state, podToSchedule, podToRemove, nodeInfo); !status.IsSuccess() {
status = f.runPreFilterExtensionRemovePod(ctx, pl, state, podToSchedule, podToRemove, nodeInfo)
if !status.IsSuccess() {
msg := fmt.Sprintf("error while running RemovePod for plugin %q while scheduling pod %q: %v",
pl.Name(), podToSchedule.Name, status.Message())
klog.Error(msg)
@ -329,6 +376,16 @@ func (f *framework) RunPreFilterExtensionRemovePod(
return nil
}
func (f *framework) runPreFilterExtensionRemovePod(ctx context.Context, pl PreFilterPlugin, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
if !state.ShouldRecordFrameworkMetrics() {
return pl.PreFilterExtensions().RemovePod(ctx, state, podToSchedule, podToAdd, nodeInfo)
}
startTime := time.Now()
status := pl.PreFilterExtensions().RemovePod(ctx, state, podToSchedule, podToAdd, nodeInfo)
f.metricsRecorder.observePluginDurationAsync(preFilterExtensionRemovePod, pl.Name(), status, metrics.SinceInSeconds(startTime))
return status
}
// RunFilterPlugins runs the set of configured Filter plugins for pod on
// the given node. If any of these plugins doesn't return "Success", the
// given node is not suitable for running pod.
@ -339,10 +396,14 @@ func (f *framework) RunFilterPlugins(
pod *v1.Pod,
nodeInfo *schedulernodeinfo.NodeInfo,
) (status *Status) {
startTime := time.Now()
defer func() { recordExtensionPointDuration(startTime, filter, status) }()
if state.ShouldRecordFrameworkMetrics() {
startTime := time.Now()
defer func() {
f.metricsRecorder.observeExtensionPointDurationAsync(filter, status, metrics.SinceInSeconds(startTime))
}()
}
for _, pl := range f.filterPlugins {
status := pl.Filter(ctx, state, pod, nodeInfo)
status = f.runFilterPlugin(ctx, pl, state, pod, nodeInfo)
if !status.IsSuccess() {
if !status.IsUnschedulable() {
errMsg := fmt.Sprintf("error while running %q filter plugin for pod %q: %v",
@ -357,6 +418,16 @@ func (f *framework) RunFilterPlugins(
return nil
}
func (f *framework) runFilterPlugin(ctx context.Context, pl FilterPlugin, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
if !state.ShouldRecordFrameworkMetrics() {
return pl.Filter(ctx, state, pod, nodeInfo)
}
startTime := time.Now()
status := pl.Filter(ctx, state, pod, nodeInfo)
f.metricsRecorder.observePluginDurationAsync(filter, pl.Name(), status, metrics.SinceInSeconds(startTime))
return status
}
// RunPostFilterPlugins runs the set of configured post-filter plugins. If any
// of these plugins returns any status other than "Success", the given node is
// rejected. The filteredNodeStatuses is the set of filtered nodes and their statuses.
@ -367,10 +438,14 @@ func (f *framework) RunPostFilterPlugins(
nodes []*v1.Node,
filteredNodesStatuses NodeToStatusMap,
) (status *Status) {
startTime := time.Now()
defer func() { recordExtensionPointDuration(startTime, postFilter, status) }()
if state.ShouldRecordFrameworkMetrics() {
startTime := time.Now()
defer func() {
f.metricsRecorder.observeExtensionPointDurationAsync(postFilter, status, metrics.SinceInSeconds(startTime))
}()
}
for _, pl := range f.postFilterPlugins {
status := pl.PostFilter(ctx, state, pod, nodes, filteredNodesStatuses)
status = f.runPostFilterPlugin(ctx, pl, state, pod, nodes, filteredNodesStatuses)
if !status.IsSuccess() {
msg := fmt.Sprintf("error while running %q postfilter plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
klog.Error(msg)
@ -381,13 +456,27 @@ func (f *framework) RunPostFilterPlugins(
return nil
}
func (f *framework) runPostFilterPlugin(ctx context.Context, pl PostFilterPlugin, state *CycleState, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status {
if !state.ShouldRecordFrameworkMetrics() {
return pl.PostFilter(ctx, state, pod, nodes, filteredNodesStatuses)
}
startTime := time.Now()
status := pl.PostFilter(ctx, state, pod, nodes, filteredNodesStatuses)
f.metricsRecorder.observePluginDurationAsync(postFilter, pl.Name(), status, metrics.SinceInSeconds(startTime))
return status
}
// RunScorePlugins runs the set of configured scoring plugins. It returns a list that
// stores for each scoring plugin name the corresponding NodeScoreList(s).
// It also returns *Status, which is set to non-success if any of the plugins returns
// a non-success status.
func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) (ps PluginToNodeScores, status *Status) {
startTime := time.Now()
defer func() { recordExtensionPointDuration(startTime, score, status) }()
if state.ShouldRecordFrameworkMetrics() {
startTime := time.Now()
defer func() {
f.metricsRecorder.observeExtensionPointDurationAsync(score, status, metrics.SinceInSeconds(startTime))
}()
}
pluginToNodeScores := make(PluginToNodeScores, len(f.scorePlugins))
for _, pl := range f.scorePlugins {
pluginToNodeScores[pl.Name()] = make(NodeScoreList, len(nodes))
@ -399,14 +488,14 @@ func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod
workqueue.ParallelizeUntil(ctx, 16, len(nodes), func(index int) {
for _, pl := range f.scorePlugins {
nodeName := nodes[index].Name
score, status := pl.Score(ctx, state, pod, nodeName)
s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName)
if !status.IsSuccess() {
errCh.SendErrorWithCancel(fmt.Errorf(status.Message()), cancel)
return
}
pluginToNodeScores[pl.Name()][index] = NodeScore{
Name: nodeName,
Score: int64(score),
Score: int64(s),
}
}
})
@ -423,7 +512,7 @@ func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod
if pl.ScoreExtensions() == nil {
return
}
status := pl.ScoreExtensions().NormalizeScore(ctx, state, pod, nodeScoreList)
status := f.runScoreExtension(ctx, pl, state, pod, nodeScoreList)
if !status.IsSuccess() {
err := fmt.Errorf("normalize score plugin %q failed with error %v", pl.Name(), status.Message())
errCh.SendErrorWithCancel(err, cancel)
@ -462,15 +551,38 @@ func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod
return pluginToNodeScores, nil
}
func (f *framework) runScorePlugin(ctx context.Context, pl ScorePlugin, state *CycleState, pod *v1.Pod, nodeName string) (int64, *Status) {
if !state.ShouldRecordFrameworkMetrics() {
return pl.Score(ctx, state, pod, nodeName)
}
startTime := time.Now()
s, status := pl.Score(ctx, state, pod, nodeName)
f.metricsRecorder.observePluginDurationAsync(score, pl.Name(), status, metrics.SinceInSeconds(startTime))
return s, status
}
func (f *framework) runScoreExtension(ctx context.Context, pl ScorePlugin, state *CycleState, pod *v1.Pod, nodeScoreList NodeScoreList) *Status {
if !state.ShouldRecordFrameworkMetrics() {
return pl.ScoreExtensions().NormalizeScore(ctx, state, pod, nodeScoreList)
}
startTime := time.Now()
status := pl.ScoreExtensions().NormalizeScore(ctx, state, pod, nodeScoreList)
f.metricsRecorder.observePluginDurationAsync(scoreExtensionNormalize, pl.Name(), status, metrics.SinceInSeconds(startTime))
return status
}
// RunPreBindPlugins runs the set of configured prebind plugins. It returns a
// failure (bool) if any of the plugins returns an error. It also returns an
// error containing the rejection message or the error occurred in the plugin.
func (f *framework) RunPreBindPlugins(
ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
startTime := time.Now()
defer func() { recordExtensionPointDuration(startTime, preBind, status) }()
func (f *framework) RunPreBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
if state.ShouldRecordFrameworkMetrics() {
startTime := time.Now()
defer func() {
f.metricsRecorder.observeExtensionPointDurationAsync(preBind, status, metrics.SinceInSeconds(startTime))
}()
}
for _, pl := range f.preBindPlugins {
status := pl.PreBind(ctx, state, pod, nodeName)
status = f.runPreBindPlugin(ctx, pl, state, pod, nodeName)
if !status.IsSuccess() {
msg := fmt.Sprintf("error while running %q prebind plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
klog.Error(msg)
@ -480,15 +592,29 @@ func (f *framework) RunPreBindPlugins(
return nil
}
func (f *framework) runPreBindPlugin(ctx context.Context, pl PreBindPlugin, state *CycleState, pod *v1.Pod, nodeName string) *Status {
if !state.ShouldRecordFrameworkMetrics() {
return pl.PreBind(ctx, state, pod, nodeName)
}
startTime := time.Now()
status := pl.PreBind(ctx, state, pod, nodeName)
f.metricsRecorder.observePluginDurationAsync(preBind, pl.Name(), status, metrics.SinceInSeconds(startTime))
return status
}
// RunBindPlugins runs the set of configured bind plugins until one returns a non `Skip` status.
func (f *framework) RunBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
startTime := time.Now()
defer func() { recordExtensionPointDuration(startTime, bind, status) }()
if state.ShouldRecordFrameworkMetrics() {
startTime := time.Now()
defer func() {
f.metricsRecorder.observeExtensionPointDurationAsync(bind, status, metrics.SinceInSeconds(startTime))
}()
}
if len(f.bindPlugins) == 0 {
return NewStatus(Skip, "")
}
for _, bp := range f.bindPlugins {
status = bp.Bind(ctx, state, pod, nodeName)
status = f.runBindPlugin(ctx, bp, state, pod, nodeName)
if status != nil && status.Code() == Skip {
continue
}
@ -502,25 +628,51 @@ func (f *framework) RunBindPlugins(ctx context.Context, state *CycleState, pod *
return status
}
// RunPostBindPlugins runs the set of configured postbind plugins.
func (f *framework) RunPostBindPlugins(
ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) {
startTime := time.Now()
defer recordExtensionPointDuration(startTime, postBind, nil)
for _, pl := range f.postBindPlugins {
pl.PostBind(ctx, state, pod, nodeName)
func (f *framework) runBindPlugin(ctx context.Context, bp BindPlugin, state *CycleState, pod *v1.Pod, nodeName string) *Status {
if !state.ShouldRecordFrameworkMetrics() {
return bp.Bind(ctx, state, pod, nodeName)
}
startTime := time.Now()
status := bp.Bind(ctx, state, pod, nodeName)
f.metricsRecorder.observePluginDurationAsync(bind, bp.Name(), status, metrics.SinceInSeconds(startTime))
return status
}
// RunPostBindPlugins runs the set of configured postbind plugins.
func (f *framework) RunPostBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) {
if state.ShouldRecordFrameworkMetrics() {
startTime := time.Now()
defer func() {
f.metricsRecorder.observeExtensionPointDurationAsync(postBind, nil, metrics.SinceInSeconds(startTime))
}()
}
for _, pl := range f.postBindPlugins {
f.runPostBindPlugin(ctx, pl, state, pod, nodeName)
}
}
func (f *framework) runPostBindPlugin(ctx context.Context, pl PostBindPlugin, state *CycleState, pod *v1.Pod, nodeName string) {
if !state.ShouldRecordFrameworkMetrics() {
pl.PostBind(ctx, state, pod, nodeName)
return
}
startTime := time.Now()
pl.PostBind(ctx, state, pod, nodeName)
f.metricsRecorder.observePluginDurationAsync(postBind, pl.Name(), nil, metrics.SinceInSeconds(startTime))
}
// RunReservePlugins runs the set of configured reserve plugins. If any of these
// plugins returns an error, it does not continue running the remaining ones and
// returns the error. In such case, pod will not be scheduled.
func (f *framework) RunReservePlugins(
ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
startTime := time.Now()
defer func() { recordExtensionPointDuration(startTime, reserve, status) }()
func (f *framework) RunReservePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
if state.ShouldRecordFrameworkMetrics() {
startTime := time.Now()
defer func() {
f.metricsRecorder.observeExtensionPointDurationAsync(reserve, status, metrics.SinceInSeconds(startTime))
}()
}
for _, pl := range f.reservePlugins {
status := pl.Reserve(ctx, state, pod, nodeName)
status = f.runReservePlugin(ctx, pl, state, pod, nodeName)
if !status.IsSuccess() {
msg := fmt.Sprintf("error while running %q reserve plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
klog.Error(msg)
@ -530,14 +682,37 @@ func (f *framework) RunReservePlugins(
return nil
}
// RunUnreservePlugins runs the set of configured unreserve plugins.
func (f *framework) RunUnreservePlugins(
ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) {
startTime := time.Now()
defer recordExtensionPointDuration(startTime, unreserve, nil)
for _, pl := range f.unreservePlugins {
pl.Unreserve(ctx, state, pod, nodeName)
func (f *framework) runReservePlugin(ctx context.Context, pl ReservePlugin, state *CycleState, pod *v1.Pod, nodeName string) *Status {
if !state.ShouldRecordFrameworkMetrics() {
return pl.Reserve(ctx, state, pod, nodeName)
}
startTime := time.Now()
status := pl.Reserve(ctx, state, pod, nodeName)
f.metricsRecorder.observePluginDurationAsync(reserve, pl.Name(), status, metrics.SinceInSeconds(startTime))
return status
}
// RunUnreservePlugins runs the set of configured unreserve plugins.
func (f *framework) RunUnreservePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) {
if state.ShouldRecordFrameworkMetrics() {
startTime := time.Now()
defer func() {
f.metricsRecorder.observeExtensionPointDurationAsync(unreserve, nil, metrics.SinceInSeconds(startTime))
}()
}
for _, pl := range f.unreservePlugins {
f.runUnreservePlugin(ctx, pl, state, pod, nodeName)
}
}
func (f *framework) runUnreservePlugin(ctx context.Context, pl UnreservePlugin, state *CycleState, pod *v1.Pod, nodeName string) {
if !state.ShouldRecordFrameworkMetrics() {
pl.Unreserve(ctx, state, pod, nodeName)
return
}
startTime := time.Now()
pl.Unreserve(ctx, state, pod, nodeName)
f.metricsRecorder.observePluginDurationAsync(unreserve, pl.Name(), nil, metrics.SinceInSeconds(startTime))
}
// RunPermitPlugins runs the set of configured permit plugins. If any of these
@ -547,14 +722,17 @@ func (f *framework) RunUnreservePlugins(
// returned by the plugin, if the time expires, then it will return an error.
// Note that if multiple plugins asked to wait, then we wait for the minimum
// timeout duration.
func (f *framework) RunPermitPlugins(
ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
startTime := time.Now()
defer func() { recordExtensionPointDuration(startTime, permit, status) }()
func (f *framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
if state.ShouldRecordFrameworkMetrics() {
startTime := time.Now()
defer func() {
f.metricsRecorder.observeExtensionPointDurationAsync(permit, status, metrics.SinceInSeconds(startTime))
}()
}
pluginsWaitTime := make(map[string]time.Duration)
statusCode := Success
for _, pl := range f.permitPlugins {
status, timeout := pl.Permit(ctx, state, pod, nodeName)
status, timeout := f.runPermitPlugin(ctx, pl, state, pod, nodeName)
if !status.IsSuccess() {
if status.IsUnschedulable() {
msg := fmt.Sprintf("rejected by %q at permit: %v", pl.Name(), status.Message())
@ -601,6 +779,16 @@ func (f *framework) RunPermitPlugins(
return nil
}
func (f *framework) runPermitPlugin(ctx context.Context, pl PermitPlugin, state *CycleState, pod *v1.Pod, nodeName string) (*Status, time.Duration) {
if !state.ShouldRecordFrameworkMetrics() {
return pl.Permit(ctx, state, pod, nodeName)
}
startTime := time.Now()
status, timeout := pl.Permit(ctx, state, pod, nodeName)
f.metricsRecorder.observePluginDurationAsync(permit, pl.Name(), status, metrics.SinceInSeconds(startTime))
return status, timeout
}
// SnapshotSharedLister returns the scheduler's SharedLister of the latest NodeInfo
// snapshot. The snapshot is taken at the beginning of a scheduling cycle and remains
// unchanged until a pod finishes "Reserve". There is no guarantee that the information
@ -695,11 +883,3 @@ func (f *framework) pluginsNeeded(plugins *config.Plugins) map[string]config.Plu
}
return pgMap
}
func recordExtensionPointDuration(start time.Time, extensionPoint string, status *Status) {
statusCode := Success.String()
if status != nil {
statusCode = status.Code().String()
}
metrics.FrameworkExtensionPointDuration.WithLabelValues(extensionPoint, statusCode).Observe(metrics.SinceInSeconds(start))
}

View File

@ -133,6 +133,17 @@ type TestPlugin struct {
inj injectedResult
}
type TestPluginPreFilterExtension struct {
inj injectedResult
}
func (e *TestPluginPreFilterExtension) AddPod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
return NewStatus(Code(e.inj.PreFilterAddPodStatus), "injected status")
}
func (e *TestPluginPreFilterExtension) RemovePod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
return NewStatus(Code(e.inj.PreFilterRemovePodStatus), "injected status")
}
func (pl *TestPlugin) Name() string {
return pl.name
}
@ -150,7 +161,7 @@ func (pl *TestPlugin) PreFilter(ctx context.Context, state *CycleState, p *v1.Po
}
func (pl *TestPlugin) PreFilterExtensions() PreFilterExtensions {
return nil
return &TestPluginPreFilterExtension{inj: pl.inj}
}
func (pl *TestPlugin) Filter(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
@ -586,6 +597,7 @@ func TestPreFilterPlugins(t *testing.T) {
}
func TestRecordingMetrics(t *testing.T) {
state := &CycleState{recordFrameworkMetrics: true}
tests := []struct {
name string
action func(f Framework)
@ -595,117 +607,149 @@ func TestRecordingMetrics(t *testing.T) {
}{
{
name: "PreFilter - Success",
action: func(f Framework) { f.RunPreFilterPlugins(context.Background(), nil, pod) },
action: func(f Framework) { f.RunPreFilterPlugins(context.Background(), state, pod) },
wantExtensionPoint: "PreFilter",
wantStatus: Success,
},
{
name: "PreFilterAddPod - Success",
action: func(f Framework) { f.RunPreFilterExtensionAddPod(context.Background(), state, pod, nil, nil) },
wantExtensionPoint: "PreFilterExtensionAddPod",
wantStatus: Success,
},
{
name: "PreFilterRemovePod - Success",
action: func(f Framework) { f.RunPreFilterExtensionRemovePod(context.Background(), state, pod, nil, nil) },
wantExtensionPoint: "PreFilterExtensionRemovePod",
wantStatus: Success,
},
{
name: "PreFilterRemovePod - Success",
action: func(f Framework) { f.RunPreFilterPlugins(context.Background(), state, pod) },
wantExtensionPoint: "PreFilter",
wantStatus: Success,
},
{
name: "Filter - Success",
action: func(f Framework) { f.RunFilterPlugins(context.Background(), nil, pod, nil) },
action: func(f Framework) { f.RunFilterPlugins(context.Background(), state, pod, nil) },
wantExtensionPoint: "Filter",
wantStatus: Success,
},
{
name: "PostFilter - Success",
action: func(f Framework) { f.RunPostFilterPlugins(context.Background(), nil, pod, nil, nil) },
action: func(f Framework) { f.RunPostFilterPlugins(context.Background(), state, pod, nil, nil) },
wantExtensionPoint: "PostFilter",
wantStatus: Success,
},
{
name: "Score - Success",
action: func(f Framework) { f.RunScorePlugins(context.Background(), nil, pod, nodes) },
action: func(f Framework) { f.RunScorePlugins(context.Background(), state, pod, nodes) },
wantExtensionPoint: "Score",
wantStatus: Success,
},
{
name: "Reserve - Success",
action: func(f Framework) { f.RunReservePlugins(context.Background(), nil, pod, "") },
action: func(f Framework) { f.RunReservePlugins(context.Background(), state, pod, "") },
wantExtensionPoint: "Reserve",
wantStatus: Success,
},
{
name: "Unreserve - Success",
action: func(f Framework) { f.RunUnreservePlugins(context.Background(), nil, pod, "") },
action: func(f Framework) { f.RunUnreservePlugins(context.Background(), state, pod, "") },
wantExtensionPoint: "Unreserve",
wantStatus: Success,
},
{
name: "PreBind - Success",
action: func(f Framework) { f.RunPreBindPlugins(context.Background(), nil, pod, "") },
action: func(f Framework) { f.RunPreBindPlugins(context.Background(), state, pod, "") },
wantExtensionPoint: "PreBind",
wantStatus: Success,
},
{
name: "Bind - Success",
action: func(f Framework) { f.RunBindPlugins(context.Background(), nil, pod, "") },
action: func(f Framework) { f.RunBindPlugins(context.Background(), state, pod, "") },
wantExtensionPoint: "Bind",
wantStatus: Success,
},
{
name: "PostBind - Success",
action: func(f Framework) { f.RunPostBindPlugins(context.Background(), nil, pod, "") },
action: func(f Framework) { f.RunPostBindPlugins(context.Background(), state, pod, "") },
wantExtensionPoint: "PostBind",
wantStatus: Success,
},
{
name: "Permit - Success",
action: func(f Framework) { f.RunPermitPlugins(context.Background(), nil, pod, "") },
action: func(f Framework) { f.RunPermitPlugins(context.Background(), state, pod, "") },
wantExtensionPoint: "Permit",
wantStatus: Success,
},
{
name: "PreFilter - Error",
action: func(f Framework) { f.RunPreFilterPlugins(context.Background(), nil, pod) },
action: func(f Framework) { f.RunPreFilterPlugins(context.Background(), state, pod) },
inject: injectedResult{PreFilterStatus: int(Error)},
wantExtensionPoint: "PreFilter",
wantStatus: Error,
},
{
name: "PreFilterAddPod - Error",
action: func(f Framework) { f.RunPreFilterExtensionAddPod(context.Background(), state, pod, nil, nil) },
inject: injectedResult{PreFilterAddPodStatus: int(Error)},
wantExtensionPoint: "PreFilterExtensionAddPod",
wantStatus: Error,
},
{
name: "PreFilterRemovePod - Error",
action: func(f Framework) { f.RunPreFilterExtensionRemovePod(context.Background(), state, pod, nil, nil) },
inject: injectedResult{PreFilterRemovePodStatus: int(Error)},
wantExtensionPoint: "PreFilterExtensionRemovePod",
wantStatus: Error,
},
{
name: "Filter - Error",
action: func(f Framework) { f.RunFilterPlugins(context.Background(), nil, pod, nil) },
action: func(f Framework) { f.RunFilterPlugins(context.Background(), state, pod, nil) },
inject: injectedResult{FilterStatus: int(Error)},
wantExtensionPoint: "Filter",
wantStatus: Error,
},
{
name: "PostFilter - Error",
action: func(f Framework) { f.RunPostFilterPlugins(context.Background(), nil, pod, nil, nil) },
action: func(f Framework) { f.RunPostFilterPlugins(context.Background(), state, pod, nil, nil) },
inject: injectedResult{PostFilterStatus: int(Error)},
wantExtensionPoint: "PostFilter",
wantStatus: Error,
},
{
name: "Score - Error",
action: func(f Framework) { f.RunScorePlugins(context.Background(), nil, pod, nodes) },
action: func(f Framework) { f.RunScorePlugins(context.Background(), state, pod, nodes) },
inject: injectedResult{ScoreStatus: int(Error)},
wantExtensionPoint: "Score",
wantStatus: Error,
},
{
name: "Reserve - Error",
action: func(f Framework) { f.RunReservePlugins(context.Background(), nil, pod, "") },
action: func(f Framework) { f.RunReservePlugins(context.Background(), state, pod, "") },
inject: injectedResult{ReserveStatus: int(Error)},
wantExtensionPoint: "Reserve",
wantStatus: Error,
},
{
name: "PreBind - Error",
action: func(f Framework) { f.RunPreBindPlugins(context.Background(), nil, pod, "") },
action: func(f Framework) { f.RunPreBindPlugins(context.Background(), state, pod, "") },
inject: injectedResult{PreBindStatus: int(Error)},
wantExtensionPoint: "PreBind",
wantStatus: Error,
},
{
name: "Bind - Error",
action: func(f Framework) { f.RunBindPlugins(context.Background(), nil, pod, "") },
action: func(f Framework) { f.RunBindPlugins(context.Background(), state, pod, "") },
inject: injectedResult{BindStatus: int(Error)},
wantExtensionPoint: "Bind",
wantStatus: Error,
},
{
name: "Permit - Error",
action: func(f Framework) { f.RunPermitPlugins(context.Background(), nil, pod, "") },
action: func(f Framework) { f.RunPermitPlugins(context.Background(), state, pod, "") },
inject: injectedResult{PermitStatus: int(Error)},
wantExtensionPoint: "Permit",
wantStatus: Error,
@ -714,6 +758,10 @@ func TestRecordingMetrics(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
metrics.Register()
metrics.FrameworkExtensionPointDuration.Reset()
metrics.PluginExecutionDuration.Reset()
plugin := &TestPlugin{name: testPlugin, inj: tt.inject}
r := make(Registry)
r.Register(testPlugin,
@ -733,16 +781,22 @@ func TestRecordingMetrics(t *testing.T) {
PostBind: pluginSet,
Unreserve: pluginSet,
}
f, err := NewFramework(r, plugins, emptyArgs)
recorder := newMetricsRecorder(100, time.Nanosecond)
f, err := NewFramework(r, plugins, emptyArgs, withMetricsRecorder(recorder))
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
metrics.Register()
metrics.FrameworkExtensionPointDuration.Reset()
tt.action(f)
// Stop the goroutine which records metrics and ensure it's stopped.
close(recorder.stopCh)
<-recorder.isStoppedCh
// Try to clean up the metrics buffer again in case it's not empty.
recorder.flushMetrics()
collectAndCompareFrameworkMetrics(t, tt.wantExtensionPoint, tt.wantStatus)
collectAndComparePluginMetrics(t, tt.wantExtensionPoint, testPlugin, tt.wantStatus)
})
}
}
@ -765,6 +819,9 @@ func TestPermitWaitingMetric(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
metrics.Register()
metrics.PermitWaitDuration.Reset()
plugin := &TestPlugin{name: testPlugin, inj: tt.inject}
r := make(Registry)
r.Register(testPlugin,
@ -778,8 +835,6 @@ func TestPermitWaitingMetric(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
metrics.Register()
metrics.PermitWaitDuration.Reset()
f.RunPermitPlugins(context.TODO(), nil, pod, "")
@ -839,17 +894,19 @@ func buildScoreConfigWithWeights(weights map[string]int32, ps ...string) *config
}
type injectedResult struct {
ScoreRes int64 `json:"scoreRes,omitempty"`
NormalizeRes int64 `json:"normalizeRes,omitempty"`
ScoreStatus int `json:"scoreStatus,omitempty"`
NormalizeStatus int `json:"normalizeStatus,omitempty"`
PreFilterStatus int `json:"preFilterStatus,omitempty"`
FilterStatus int `json:"filterStatus,omitempty"`
PostFilterStatus int `json:"postFilterStatus,omitempty"`
ReserveStatus int `json:"reserveStatus,omitempty"`
PreBindStatus int `json:"preBindStatus,omitempty"`
BindStatus int `json:"bindStatus,omitempty"`
PermitStatus int `json:"permitStatus,omitempty"`
ScoreRes int64 `json:"scoreRes,omitempty"`
NormalizeRes int64 `json:"normalizeRes,omitempty"`
ScoreStatus int `json:"scoreStatus,omitempty"`
NormalizeStatus int `json:"normalizeStatus,omitempty"`
PreFilterStatus int `json:"preFilterStatus,omitempty"`
PreFilterAddPodStatus int `json:"preFilterAddPodStatus,omitempty"`
PreFilterRemovePodStatus int `json:"preFilterRemovePodStatus,omitempty"`
FilterStatus int `json:"filterStatus,omitempty"`
PostFilterStatus int `json:"postFilterStatus,omitempty"`
ReserveStatus int `json:"reserveStatus,omitempty"`
PreBindStatus int `json:"preBindStatus,omitempty"`
BindStatus int `json:"bindStatus,omitempty"`
PermitStatus int `json:"permitStatus,omitempty"`
}
func setScoreRes(inj injectedResult) (int64, *Status) {
@ -869,6 +926,33 @@ func injectNormalizeRes(inj injectedResult, scores NodeScoreList) *Status {
return nil
}
func collectAndComparePluginMetrics(t *testing.T, wantExtensionPoint, wantPlugin string, wantStatus Code) {
m := collectHistogramMetric(metrics.PluginExecutionDuration)
if len(m.Label) != 3 {
t.Fatalf("Unexpected number of label pairs, got: %v, want: 2", len(m.Label))
}
if *m.Label[0].Value != wantExtensionPoint {
t.Errorf("Unexpected extension point label, got: %q, want %q", *m.Label[0].Value, wantExtensionPoint)
}
if *m.Label[1].Value != wantPlugin {
t.Errorf("Unexpected plugin label, got: %q, want %q", *m.Label[1].Value, wantPlugin)
}
if *m.Label[2].Value != wantStatus.String() {
t.Errorf("Unexpected status code label, got: %q, want %q", *m.Label[2].Value, wantStatus)
}
if *m.Histogram.SampleCount == 0 {
t.Error("Expect at least 1 sample")
}
if *m.Histogram.SampleSum <= 0 {
t.Errorf("Expect latency to be greater than 0, got: %v", *m.Histogram.SampleSum)
}
}
func collectAndCompareFrameworkMetrics(t *testing.T, wantExtensionPoint string, wantStatus Code) {
m := collectHistogramMetric(metrics.FrameworkExtensionPointDuration)
@ -885,11 +969,11 @@ func collectAndCompareFrameworkMetrics(t *testing.T, wantExtensionPoint string,
}
if *m.Histogram.SampleCount != 1 {
t.Errorf("Expect 1 sample, got: %v", m.Histogram.SampleCount)
t.Errorf("Expect 1 sample, got: %v", *m.Histogram.SampleCount)
}
if *m.Histogram.SampleSum <= 0 {
t.Errorf("Expect latency to be greater than 0, got: %v", m.Histogram.SampleSum)
t.Errorf("Expect latency to be greater than 0, got: %v", *m.Histogram.SampleSum)
}
}
@ -911,17 +995,17 @@ func collectAndComparePermitWaitDuration(t *testing.T, wantRes string) {
}
if *m.Histogram.SampleCount != 1 {
t.Errorf("Expect 1 sample, got: %v", m.Histogram.SampleCount)
t.Errorf("Expect 1 sample, got: %v", *m.Histogram.SampleCount)
}
if *m.Histogram.SampleSum <= 0 {
t.Errorf("Expect latency to be greater than 0, got: %v", m.Histogram.SampleSum)
t.Errorf("Expect latency to be greater than 0, got: %v", *m.Histogram.SampleSum)
}
}
}
func collectHistogramMetric(metric prometheus.Collector) *dto.Metric {
ch := make(chan prometheus.Metric, 1)
ch := make(chan prometheus.Metric, 100)
metric.Collect(ch)
select {
case got := <-ch:

View File

@ -74,3 +74,19 @@ func TestStatus(t *testing.T) {
}
}
}
// The String() method relies on the value and order of the status codes to function properly.
func TestStatusCodes(t *testing.T) {
assertStatusCode(t, Success, 0)
assertStatusCode(t, Error, 1)
assertStatusCode(t, Unschedulable, 2)
assertStatusCode(t, UnschedulableAndUnresolvable, 3)
assertStatusCode(t, Wait, 4)
assertStatusCode(t, Skip, 5)
}
func assertStatusCode(t *testing.T, code Code, value int) {
if int(code) != value {
t.Errorf("Status code %q should have a value of %v but got %v", code.String(), value, int(code))
}
}

View File

@ -0,0 +1,115 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
"time"
k8smetrics "k8s.io/component-base/metrics"
"k8s.io/kubernetes/pkg/scheduler/metrics"
)
// frameworkMetric is the data structure passed in the buffer channel between the main framework thread
// and the metricsRecorder goroutine.
type frameworkMetric struct {
metric *k8smetrics.HistogramVec
labelValues []string
value float64
}
// metricRecorder records framework metrics in a separate goroutine to avoid overhead in the critical path.
type metricsRecorder struct {
// bufferCh is a channel that serves as a metrics buffer before the metricsRecorder goroutine reports it.
bufferCh chan *frameworkMetric
// if bufferSize is reached, incoming metrics will be discarded.
bufferSize int
// how often the recorder runs to flush the metrics.
interval time.Duration
// stopCh is used to stop the goroutine which periodically flushes metrics. It's currently only
// used in tests.
stopCh chan struct{}
// isStoppedCh indicates whether the goroutine is stopped. It's used in tests only to make sure
// the metric flushing goroutine is stopped so that tests can collect metrics for verification.
isStoppedCh chan struct{}
}
func newMetricsRecorder(bufferSize int, interval time.Duration) *metricsRecorder {
recorder := &metricsRecorder{
bufferCh: make(chan *frameworkMetric, bufferSize),
bufferSize: bufferSize,
interval: interval,
stopCh: make(chan struct{}),
isStoppedCh: make(chan struct{}),
}
go recorder.run()
return recorder
}
// observeExtensionPointDurationAsync observes the framework_extension_point_duration_seconds metric.
// The metric will be flushed to Prometheus asynchronously.
func (r *metricsRecorder) observeExtensionPointDurationAsync(extensionPoint string, status *Status, value float64) {
newMetric := &frameworkMetric{
metric: metrics.FrameworkExtensionPointDuration,
labelValues: []string{extensionPoint, status.Code().String()},
value: value,
}
select {
case r.bufferCh <- newMetric:
default:
}
}
// observeExtensionPointDurationAsync observes the plugin_execution_duration_seconds metric.
// The metric will be flushed to Prometheus asynchronously.
func (r *metricsRecorder) observePluginDurationAsync(extensionPoint, pluginName string, status *Status, value float64) {
newMetric := &frameworkMetric{
metric: metrics.PluginExecutionDuration,
labelValues: []string{pluginName, extensionPoint, status.Code().String()},
value: value,
}
select {
case r.bufferCh <- newMetric:
default:
}
}
// run flushes buffered metrics into Prometheus every second.
func (r *metricsRecorder) run() {
for {
select {
case <-r.stopCh:
close(r.isStoppedCh)
return
default:
}
r.flushMetrics()
time.Sleep(r.interval)
}
}
// flushMetrics tries to clean up the bufferCh by reading at most bufferSize metrics.
func (r *metricsRecorder) flushMetrics() {
for i := 0; i < r.bufferSize; i++ {
select {
case m := <-r.bufferCh:
m.metric.WithLabelValues(m.labelValues...).Observe(m.value)
default:
return
}
}
}

View File

@ -254,6 +254,16 @@ var (
},
[]string{"extension_point", "status"})
PluginExecutionDuration = metrics.NewHistogramVec(
&metrics.HistogramOpts{
Subsystem: SchedulerSubsystem,
Name: "plugin_execution_duration_seconds",
Help: "Duration for running a plugin at a specific extension point.",
Buckets: nil,
StabilityLevel: metrics.ALPHA,
},
[]string{"plugin", "extension_point", "status"})
SchedulerQueueIncomingPods = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: SchedulerSubsystem,
@ -302,6 +312,7 @@ var (
PodSchedulingDuration,
PodSchedulingAttempts,
FrameworkExtensionPointDuration,
PluginExecutionDuration,
SchedulerQueueIncomingPods,
SchedulerGoroutines,
PermitWaitDuration,

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io/ioutil"
"math/rand"
"os"
"time"
@ -55,6 +56,8 @@ const (
BindTimeoutSeconds = 100
// SchedulerError is the reason recorded for events when an error occurs during scheduling a pod.
SchedulerError = "SchedulerError"
// Percentage of framework metrics to be sampled.
frameworkMetricsSamplePercent = 10
)
// podConditionUpdater updates the condition of a pod based on the passed
@ -613,6 +616,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// Synchronously attempt to find a fit for the pod.
start := time.Now()
state := framework.NewCycleState()
state.SetRecordFrameworkMetrics(rand.Intn(100) < frameworkMetricsSamplePercent)
schedulingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, state, pod)