Merge pull request #116842 from mengjiao-liu/contextual-logging-scheduler-runtime

Migrated `pkg/scheduler/framework/runtime` to use contextual logging
This commit is contained in:
Kubernetes Prow Robot 2023-05-11 10:59:02 -07:00 committed by GitHub
commit 58e13496d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 167 additions and 25 deletions

View File

@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake" clientsetfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/klog/v2/ktesting"
extenderv1 "k8s.io/kube-scheduler/extender/v1" extenderv1 "k8s.io/kube-scheduler/extender/v1"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
@ -278,7 +279,8 @@ func TestSchedulerWithExtenders(t *testing.T) {
for ii := range test.extenders { for ii := range test.extenders {
extenders = append(extenders, &test.extenders[ii]) extenders = append(extenders, &test.extenders[ii])
} }
ctx, cancel := context.WithCancel(context.Background()) logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
cache := internalcache.New(time.Duration(0), ctx.Done()) cache := internalcache.New(time.Duration(0), ctx.Done())
@ -290,6 +292,7 @@ func TestSchedulerWithExtenders(t *testing.T) {
runtime.WithClientSet(client), runtime.WithClientSet(client),
runtime.WithInformerFactory(informerFactory), runtime.WithInformerFactory(informerFactory),
runtime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), runtime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
runtime.WithLogger(logger),
) )
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View File

@ -352,7 +352,8 @@ func TestPostFilter(t *testing.T) {
if tt.extender != nil { if tt.extender != nil {
extenders = append(extenders, tt.extender) extenders = append(extenders, tt.extender)
} }
ctx, cancel := context.WithCancel(context.Background()) logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
f, err := st.NewFramework(registeredPlugins, "", ctx.Done(), f, err := st.NewFramework(registeredPlugins, "", ctx.Done(),
frameworkruntime.WithClientSet(cs), frameworkruntime.WithClientSet(cs),
@ -361,6 +362,7 @@ func TestPostFilter(t *testing.T) {
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithExtenders(extenders), frameworkruntime.WithExtenders(extenders),
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.pods, tt.nodes)), frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.pods, tt.nodes)),
frameworkruntime.WithLogger(logger),
) )
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -1087,7 +1089,8 @@ func TestDryRunPreemption(t *testing.T) {
parallelism = 1 parallelism = 1
} }
ctx, cancel := context.WithCancel(context.Background()) logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
fwk, err := st.NewFramework( fwk, err := st.NewFramework(
registeredPlugins, "", ctx.Done(), registeredPlugins, "", ctx.Done(),
@ -1095,6 +1098,7 @@ func TestDryRunPreemption(t *testing.T) {
frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithParallelism(parallelism), frameworkruntime.WithParallelism(parallelism),
frameworkruntime.WithLogger(logger),
) )
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -1351,6 +1355,7 @@ func TestSelectBestCandidate(t *testing.T) {
ctx.Done(), ctx.Done(),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithLogger(logger),
) )
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -1480,6 +1485,7 @@ func TestPodEligibleToPreemptOthers(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
var nodes []*v1.Node var nodes []*v1.Node
for _, n := range test.nodes { for _, n := range test.nodes {
nodes = append(nodes, st.MakeNode().Name(n).Obj()) nodes = append(nodes, st.MakeNode().Name(n).Obj())
@ -1492,6 +1498,7 @@ func TestPodEligibleToPreemptOthers(t *testing.T) {
defer close(stopCh) defer close(stopCh)
f, err := st.NewFramework(registeredPlugins, "", stopCh, f, err := st.NewFramework(registeredPlugins, "", stopCh,
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)), frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)),
frameworkruntime.WithLogger(logger),
) )
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -1689,7 +1696,8 @@ func TestPreempt(t *testing.T) {
return true, nil, nil return true, nil, nil
}) })
ctx, cancel := context.WithCancel(context.Background()) logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
cache := internalcache.New(time.Duration(0), ctx.Done()) cache := internalcache.New(time.Duration(0), ctx.Done())
@ -1735,6 +1743,7 @@ func TestPreempt(t *testing.T) {
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)), frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)),
frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithLogger(logger),
) )
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View File

@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake" clientsetfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/klog/v2/ktesting"
extenderv1 "k8s.io/kube-scheduler/extender/v1" extenderv1 "k8s.io/kube-scheduler/extender/v1"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
@ -266,6 +267,7 @@ func TestDryRunPreemption(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
registeredPlugins := append([]st.RegisterPluginFunc{ registeredPlugins := append([]st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New)}, st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New)},
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
@ -287,6 +289,7 @@ func TestDryRunPreemption(t *testing.T) {
frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithParallelism(parallelism), frameworkruntime.WithParallelism(parallelism),
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.testPods, tt.nodes)), frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.testPods, tt.nodes)),
frameworkruntime.WithLogger(logger),
) )
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View File

@ -77,6 +77,7 @@ type frameworkImpl struct {
kubeConfig *restclient.Config kubeConfig *restclient.Config
eventRecorder events.EventRecorder eventRecorder events.EventRecorder
informerFactory informers.SharedInformerFactory informerFactory informers.SharedInformerFactory
logger klog.Logger
metricsRecorder *metrics.MetricAsyncRecorder metricsRecorder *metrics.MetricAsyncRecorder
profileName string profileName string
@ -134,6 +135,7 @@ type frameworkOptions struct {
captureProfile CaptureProfile captureProfile CaptureProfile
clusterEventMap map[framework.ClusterEvent]sets.Set[string] clusterEventMap map[framework.ClusterEvent]sets.Set[string]
parallelizer parallelize.Parallelizer parallelizer parallelize.Parallelizer
logger *klog.Logger
} }
// Option for the frameworkImpl. // Option for the frameworkImpl.
@ -229,6 +231,13 @@ func WithMetricsRecorder(r *metrics.MetricAsyncRecorder) Option {
} }
} }
// WithLogger overrides the default logger from k8s.io/klog.
func WithLogger(logger klog.Logger) Option {
return func(o *frameworkOptions) {
o.logger = &logger
}
}
// defaultFrameworkOptions are applied when no option corresponding to those fields exist. // defaultFrameworkOptions are applied when no option corresponding to those fields exist.
func defaultFrameworkOptions(stopCh <-chan struct{}) frameworkOptions { func defaultFrameworkOptions(stopCh <-chan struct{}) frameworkOptions {
return frameworkOptions{ return frameworkOptions{
@ -247,6 +256,11 @@ func NewFramework(r Registry, profile *config.KubeSchedulerProfile, stopCh <-cha
opt(&options) opt(&options)
} }
logger := klog.TODO()
if options.logger != nil {
logger = *options.logger
}
f := &frameworkImpl{ f := &frameworkImpl{
registry: r, registry: r,
snapshotSharedLister: options.snapshotSharedLister, snapshotSharedLister: options.snapshotSharedLister,
@ -260,6 +274,7 @@ func NewFramework(r Registry, profile *config.KubeSchedulerProfile, stopCh <-cha
extenders: options.extenders, extenders: options.extenders,
PodNominator: options.podNominator, PodNominator: options.podNominator,
parallelizer: options.parallelizer, parallelizer: options.parallelizer,
logger: logger,
} }
if profile == nil { if profile == nil {
@ -311,7 +326,7 @@ func NewFramework(r Registry, profile *config.KubeSchedulerProfile, stopCh <-cha
pluginsMap[name] = p pluginsMap[name] = p
// Update ClusterEventMap in place. // Update ClusterEventMap in place.
fillEventToPluginMap(p, options.clusterEventMap) fillEventToPluginMap(logger, p, options.clusterEventMap)
} }
// initialize plugins per individual extension points // initialize plugins per individual extension points
@ -323,7 +338,7 @@ func NewFramework(r Registry, profile *config.KubeSchedulerProfile, stopCh <-cha
// initialize multiPoint plugins to their expanded extension points // initialize multiPoint plugins to their expanded extension points
if len(profile.Plugins.MultiPoint.Enabled) > 0 { if len(profile.Plugins.MultiPoint.Enabled) > 0 {
if err := f.expandMultiPointPlugins(profile, pluginsMap); err != nil { if err := f.expandMultiPointPlugins(logger, profile, pluginsMap); err != nil {
return nil, err return nil, err
} }
} }
@ -443,7 +458,7 @@ func (os *orderedSet) delete(s string) {
} }
} }
func (f *frameworkImpl) expandMultiPointPlugins(profile *config.KubeSchedulerProfile, pluginsMap map[string]framework.Plugin) error { func (f *frameworkImpl) expandMultiPointPlugins(logger klog.Logger, profile *config.KubeSchedulerProfile, pluginsMap map[string]framework.Plugin) error {
// initialize MultiPoint plugins // initialize MultiPoint plugins
for _, e := range f.getExtensionPoints(profile.Plugins) { for _, e := range f.getExtensionPoints(profile.Plugins) {
plugins := reflect.ValueOf(e.slicePtr).Elem() plugins := reflect.ValueOf(e.slicePtr).Elem()
@ -460,7 +475,7 @@ func (f *frameworkImpl) expandMultiPointPlugins(profile *config.KubeSchedulerPro
disabledSet.Insert(disabledPlugin.Name) disabledSet.Insert(disabledPlugin.Name)
} }
if disabledSet.Has("*") { if disabledSet.Has("*") {
klog.V(4).InfoS("all plugins disabled for extension point, skipping MultiPoint expansion", "extension", pluginType) logger.V(4).Info("Skipped MultiPoint expansion because all plugins are disabled for extension point", "extension", pluginType)
continue continue
} }
@ -481,7 +496,7 @@ func (f *frameworkImpl) expandMultiPointPlugins(profile *config.KubeSchedulerPro
// a plugin that's enabled via MultiPoint can still be disabled for specific extension points // a plugin that's enabled via MultiPoint can still be disabled for specific extension points
if disabledSet.Has(ep.Name) { if disabledSet.Has(ep.Name) {
klog.V(4).InfoS("plugin disabled for extension point", "plugin", ep.Name, "extension", pluginType) logger.V(4).Info("Skipped disabled plugin for extension point", "plugin", ep.Name, "extension", pluginType)
continue continue
} }
@ -491,7 +506,7 @@ func (f *frameworkImpl) expandMultiPointPlugins(profile *config.KubeSchedulerPro
// This maintains expected behavior for overriding default plugins (see https://github.com/kubernetes/kubernetes/pull/99582) // This maintains expected behavior for overriding default plugins (see https://github.com/kubernetes/kubernetes/pull/99582)
if enabledSet.has(ep.Name) { if enabledSet.has(ep.Name) {
overridePlugins.insert(ep.Name) overridePlugins.insert(ep.Name)
klog.InfoS("MultiPoint plugin is explicitly re-configured; overriding", "plugin", ep.Name) logger.Info("MultiPoint plugin is explicitly re-configured; overriding", "plugin", ep.Name)
continue continue
} }
@ -530,7 +545,7 @@ func (f *frameworkImpl) expandMultiPointPlugins(profile *config.KubeSchedulerPro
return nil return nil
} }
func fillEventToPluginMap(p framework.Plugin, eventToPlugins map[framework.ClusterEvent]sets.Set[string]) { func fillEventToPluginMap(logger klog.Logger, p framework.Plugin, eventToPlugins map[framework.ClusterEvent]sets.Set[string]) {
ext, ok := p.(framework.EnqueueExtensions) ext, ok := p.(framework.EnqueueExtensions)
if !ok { if !ok {
// If interface EnqueueExtensions is not implemented, register the default events // If interface EnqueueExtensions is not implemented, register the default events
@ -544,7 +559,7 @@ func fillEventToPluginMap(p framework.Plugin, eventToPlugins map[framework.Clust
// We treat it as: the plugin is not interested in any event, and hence pod failed by that plugin // We treat it as: the plugin is not interested in any event, and hence pod failed by that plugin
// cannot be moved by any regular cluster event. // cannot be moved by any regular cluster event.
if len(events) == 0 { if len(events) == 0 {
klog.InfoS("Plugin's EventsToRegister() returned nil", "plugin", p.Name()) logger.Info("Plugin's EventsToRegister() returned nil", "plugin", p.Name())
return return
} }
// The most common case: a plugin implements EnqueueExtensions and returns non-nil result. // The most common case: a plugin implements EnqueueExtensions and returns non-nil result.
@ -622,7 +637,15 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor
var result *framework.PreFilterResult var result *framework.PreFilterResult
var pluginsWithNodes []string var pluginsWithNodes []string
skipPlugins := sets.New[string]() skipPlugins := sets.New[string]()
logger := klog.FromContext(ctx)
logger = klog.LoggerWithName(logger, "PreFilter")
// TODO(knelasevero): Remove duplicated keys from log entry calls
// When contextualized logging hits GA
// https://github.com/kubernetes/kubernetes/issues/111672
logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod))
for _, pl := range f.preFilterPlugins { for _, pl := range f.preFilterPlugins {
logger := klog.LoggerWithName(logger, pl.Name())
ctx := klog.NewContext(ctx, logger)
r, s := f.runPreFilterPlugin(ctx, pl, state, pod) r, s := f.runPreFilterPlugin(ctx, pl, state, pod)
if s.IsSkip() { if s.IsSkip() {
skipPlugins.Insert(pl.Name()) skipPlugins.Insert(pl.Name())
@ -671,14 +694,22 @@ func (f *frameworkImpl) RunPreFilterExtensionAddPod(
podInfoToAdd *framework.PodInfo, podInfoToAdd *framework.PodInfo,
nodeInfo *framework.NodeInfo, nodeInfo *framework.NodeInfo,
) (status *framework.Status) { ) (status *framework.Status) {
logger := klog.FromContext(ctx)
logger = klog.LoggerWithName(logger, "PreFilterExtension")
// TODO(knelasevero): Remove duplicated keys from log entry calls
// When contextualized logging hits GA
// https://github.com/kubernetes/kubernetes/issues/111672
logger = klog.LoggerWithValues(logger, "pod", klog.KObj(podToSchedule), "node", klog.KObj(nodeInfo.Node()), "operation", "addPod")
for _, pl := range f.preFilterPlugins { for _, pl := range f.preFilterPlugins {
if pl.PreFilterExtensions() == nil || state.SkipFilterPlugins.Has(pl.Name()) { if pl.PreFilterExtensions() == nil || state.SkipFilterPlugins.Has(pl.Name()) {
continue continue
} }
logger := klog.LoggerWithName(logger, pl.Name())
ctx := klog.NewContext(ctx, logger)
status = f.runPreFilterExtensionAddPod(ctx, pl, state, podToSchedule, podInfoToAdd, nodeInfo) status = f.runPreFilterExtensionAddPod(ctx, pl, state, podToSchedule, podInfoToAdd, nodeInfo)
if !status.IsSuccess() { if !status.IsSuccess() {
err := status.AsError() err := status.AsError()
klog.ErrorS(err, "Failed running AddPod on PreFilter plugin", "plugin", pl.Name(), "pod", klog.KObj(podToSchedule)) logger.Error(err, "Plugin failed", "pod", klog.KObj(podToSchedule), "node", klog.KObj(nodeInfo.Node()), "operation", "addPod", "plugin", pl.Name())
return framework.AsStatus(fmt.Errorf("running AddPod on PreFilter plugin %q: %w", pl.Name(), err)) return framework.AsStatus(fmt.Errorf("running AddPod on PreFilter plugin %q: %w", pl.Name(), err))
} }
} }
@ -706,14 +737,22 @@ func (f *frameworkImpl) RunPreFilterExtensionRemovePod(
podInfoToRemove *framework.PodInfo, podInfoToRemove *framework.PodInfo,
nodeInfo *framework.NodeInfo, nodeInfo *framework.NodeInfo,
) (status *framework.Status) { ) (status *framework.Status) {
logger := klog.FromContext(ctx)
logger = klog.LoggerWithName(logger, "PreFilterExtension")
// TODO(knelasevero): Remove duplicated keys from log entry calls
// When contextualized logging hits GA
// https://github.com/kubernetes/kubernetes/issues/111672
logger = klog.LoggerWithValues(logger, klog.KObj(podToSchedule), "node", klog.KObj(nodeInfo.Node()))
for _, pl := range f.preFilterPlugins { for _, pl := range f.preFilterPlugins {
if pl.PreFilterExtensions() == nil || state.SkipFilterPlugins.Has(pl.Name()) { if pl.PreFilterExtensions() == nil || state.SkipFilterPlugins.Has(pl.Name()) {
continue continue
} }
logger := klog.LoggerWithName(logger, pl.Name())
ctx := klog.NewContext(ctx, logger)
status = f.runPreFilterExtensionRemovePod(ctx, pl, state, podToSchedule, podInfoToRemove, nodeInfo) status = f.runPreFilterExtensionRemovePod(ctx, pl, state, podToSchedule, podInfoToRemove, nodeInfo)
if !status.IsSuccess() { if !status.IsSuccess() {
err := status.AsError() err := status.AsError()
klog.ErrorS(err, "Failed running RemovePod on PreFilter plugin", "plugin", pl.Name(), "pod", klog.KObj(podToSchedule)) logger.Error(err, "Plugin failed", "node", klog.KObj(nodeInfo.Node()), "operation", "removePod", "plugin", pl.Name(), "pod", klog.KObj(podToSchedule))
return framework.AsStatus(fmt.Errorf("running RemovePod on PreFilter plugin %q: %w", pl.Name(), err)) return framework.AsStatus(fmt.Errorf("running RemovePod on PreFilter plugin %q: %w", pl.Name(), err))
} }
} }
@ -741,7 +780,16 @@ func (f *frameworkImpl) RunFilterPlugins(
pod *v1.Pod, pod *v1.Pod,
nodeInfo *framework.NodeInfo, nodeInfo *framework.NodeInfo,
) *framework.Status { ) *framework.Status {
logger := klog.FromContext(ctx)
logger = klog.LoggerWithName(logger, "Filter")
// TODO(knelasevero): Remove duplicated keys from log entry calls
// When contextualized logging hits GA
// https://github.com/kubernetes/kubernetes/issues/111672
logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod), "node", klog.KObj(nodeInfo.Node()))
for _, pl := range f.filterPlugins { for _, pl := range f.filterPlugins {
logger := klog.LoggerWithName(logger, pl.Name())
ctx := klog.NewContext(ctx, logger)
if state.SkipFilterPlugins.Has(pl.Name()) { if state.SkipFilterPlugins.Has(pl.Name()) {
continue continue
} }
@ -777,11 +825,20 @@ func (f *frameworkImpl) RunPostFilterPlugins(ctx context.Context, state *framewo
metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PostFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PostFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}() }()
logger := klog.FromContext(ctx)
logger = klog.LoggerWithName(logger, "PostFilter")
// TODO(knelasevero): Remove duplicated keys from log entry calls
// When contextualized logging hits GA
// https://github.com/kubernetes/kubernetes/issues/111672
logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod))
// `result` records the last meaningful(non-noop) PostFilterResult. // `result` records the last meaningful(non-noop) PostFilterResult.
var result *framework.PostFilterResult var result *framework.PostFilterResult
var reasons []string var reasons []string
var failedPlugin string var failedPlugin string
for _, pl := range f.postFilterPlugins { for _, pl := range f.postFilterPlugins {
logger := klog.LoggerWithName(logger, pl.Name())
ctx := klog.NewContext(ctx, logger)
r, s := f.runPostFilterPlugin(ctx, pl, state, pod, filteredNodeStatusMap) r, s := f.runPostFilterPlugin(ctx, pl, state, pod, filteredNodeStatusMap)
if s.IsSuccess() { if s.IsSuccess() {
return r, s return r, s
@ -847,6 +904,9 @@ func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, s
// the nominated pods are treated as not running. We can't just assume the // the nominated pods are treated as not running. We can't just assume the
// nominated pods are running because they are not running right now and in fact, // nominated pods are running because they are not running right now and in fact,
// they may end up getting scheduled to a different node. // they may end up getting scheduled to a different node.
logger := klog.FromContext(ctx)
logger = klog.LoggerWithName(logger, "FilterWithNominatedPods")
ctx = klog.NewContext(ctx, logger)
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
stateToUse := state stateToUse := state
nodeInfoToUse := info nodeInfoToUse := info
@ -912,7 +972,15 @@ func (f *frameworkImpl) RunPreScorePlugins(
metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PreScore, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PreScore, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}() }()
skipPlugins := sets.New[string]() skipPlugins := sets.New[string]()
logger := klog.FromContext(ctx)
logger = klog.LoggerWithName(logger, "PreScore")
// TODO(knelasevero): Remove duplicated keys from log entry calls
// When contextualized logging hits GA
// https://github.com/kubernetes/kubernetes/issues/111672
logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod))
for _, pl := range f.preScorePlugins { for _, pl := range f.preScorePlugins {
logger := klog.LoggerWithName(logger, pl.Name())
ctx := klog.NewContext(ctx, logger)
status = f.runPreScorePlugin(ctx, pl, state, pod, nodes) status = f.runPreScorePlugin(ctx, pl, state, pod, nodes)
if status.IsSkip() { if status.IsSkip() {
skipPlugins.Insert(pl.Name()) skipPlugins.Insert(pl.Name())
@ -961,10 +1029,19 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
errCh := parallelize.NewErrorChannel() errCh := parallelize.NewErrorChannel()
if len(plugins) > 0 { if len(plugins) > 0 {
logger := klog.FromContext(ctx)
logger = klog.LoggerWithName(logger, "Score")
// TODO(knelasevero): Remove duplicated keys from log entry calls
// When contextualized logging hits GA
// https://github.com/kubernetes/kubernetes/issues/111672
logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod))
// Run Score method for each node in parallel. // Run Score method for each node in parallel.
f.Parallelizer().Until(ctx, len(nodes), func(index int) { f.Parallelizer().Until(ctx, len(nodes), func(index int) {
nodeName := nodes[index].Name nodeName := nodes[index].Name
logger := klog.LoggerWithValues(logger, "node", klog.ObjectRef{Name: nodeName})
for _, pl := range plugins { for _, pl := range plugins {
logger := klog.LoggerWithName(logger, pl.Name())
ctx := klog.NewContext(ctx, logger)
s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName) s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName)
if !status.IsSuccess() { if !status.IsSuccess() {
err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError()) err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError())
@ -1062,16 +1139,24 @@ func (f *frameworkImpl) RunPreBindPlugins(ctx context.Context, state *framework.
defer func() { defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PreBind, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PreBind, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}() }()
logger := klog.FromContext(ctx)
logger = klog.LoggerWithName(logger, "PreBind")
// TODO(knelasevero): Remove duplicated keys from log entry calls
// When contextualized logging hits GA
// https://github.com/kubernetes/kubernetes/issues/111672
logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName})
for _, pl := range f.preBindPlugins { for _, pl := range f.preBindPlugins {
logger := klog.LoggerWithName(logger, pl.Name())
ctx := klog.NewContext(ctx, logger)
status = f.runPreBindPlugin(ctx, pl, state, pod, nodeName) status = f.runPreBindPlugin(ctx, pl, state, pod, nodeName)
if !status.IsSuccess() { if !status.IsSuccess() {
if status.IsUnschedulable() { if status.IsUnschedulable() {
klog.V(4).InfoS("Pod rejected by PreBind plugin", "pod", klog.KObj(pod), "node", nodeName, "plugin", pl.Name(), "status", status.Message()) logger.V(4).Info("Pod rejected by PreBind plugin", "pod", klog.KObj(pod), "node", nodeName, "plugin", pl.Name(), "status", status.Message())
status.SetFailedPlugin(pl.Name()) status.SetFailedPlugin(pl.Name())
return status return status
} }
err := status.AsError() err := status.AsError()
klog.ErrorS(err, "Failed running PreBind plugin", "plugin", pl.Name(), "pod", klog.KObj(pod), "node", nodeName) logger.Error(err, "Plugin failed", "plugin", pl.Name(), "pod", klog.KObj(pod), "node", nodeName)
return framework.AsStatus(fmt.Errorf("running PreBind plugin %q: %w", pl.Name(), err)) return framework.AsStatus(fmt.Errorf("running PreBind plugin %q: %w", pl.Name(), err))
} }
} }
@ -1097,19 +1182,27 @@ func (f *frameworkImpl) RunBindPlugins(ctx context.Context, state *framework.Cyc
if len(f.bindPlugins) == 0 { if len(f.bindPlugins) == 0 {
return framework.NewStatus(framework.Skip, "") return framework.NewStatus(framework.Skip, "")
} }
logger := klog.FromContext(ctx)
logger = klog.LoggerWithName(logger, "Bind")
// TODO(knelasevero): Remove duplicated keys from log entry calls
// When contextualized logging hits GA
// https://github.com/kubernetes/kubernetes/issues/111672
logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName})
for _, pl := range f.bindPlugins { for _, pl := range f.bindPlugins {
logger := klog.LoggerWithName(logger, pl.Name())
ctx := klog.NewContext(ctx, logger)
status = f.runBindPlugin(ctx, pl, state, pod, nodeName) status = f.runBindPlugin(ctx, pl, state, pod, nodeName)
if status.IsSkip() { if status.IsSkip() {
continue continue
} }
if !status.IsSuccess() { if !status.IsSuccess() {
if status.IsUnschedulable() { if status.IsUnschedulable() {
klog.V(4).InfoS("Pod rejected by Bind plugin", "pod", klog.KObj(pod), "node", nodeName, "plugin", pl.Name(), "status", status.Message()) logger.V(4).Info("Pod rejected by Bind plugin", "pod", klog.KObj(pod), "node", nodeName, "plugin", pl.Name(), "status", status.Message())
status.SetFailedPlugin(pl.Name()) status.SetFailedPlugin(pl.Name())
return status return status
} }
err := status.AsError() err := status.AsError()
klog.ErrorS(err, "Failed running Bind plugin", "plugin", pl.Name(), "pod", klog.KObj(pod), "node", nodeName) logger.Error(err, "Plugin Failed", "plugin", pl.Name(), "pod", klog.KObj(pod), "node", nodeName)
return framework.AsStatus(fmt.Errorf("running Bind plugin %q: %w", pl.Name(), err)) return framework.AsStatus(fmt.Errorf("running Bind plugin %q: %w", pl.Name(), err))
} }
return status return status
@ -1133,7 +1226,15 @@ func (f *frameworkImpl) RunPostBindPlugins(ctx context.Context, state *framework
defer func() { defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PostBind, framework.Success.String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PostBind, framework.Success.String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}() }()
logger := klog.FromContext(ctx)
logger = klog.LoggerWithName(logger, "PostBind")
// TODO(knelasevero): Remove duplicated keys from log entry calls
// When contextualized logging hits GA
// https://github.com/kubernetes/kubernetes/issues/111672
logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName})
for _, pl := range f.postBindPlugins { for _, pl := range f.postBindPlugins {
logger := klog.LoggerWithName(logger, pl.Name())
ctx := klog.NewContext(ctx, logger)
f.runPostBindPlugin(ctx, pl, state, pod, nodeName) f.runPostBindPlugin(ctx, pl, state, pod, nodeName)
} }
} }
@ -1158,11 +1259,19 @@ func (f *frameworkImpl) RunReservePluginsReserve(ctx context.Context, state *fra
defer func() { defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Reserve, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Reserve, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}() }()
logger := klog.FromContext(ctx)
logger = klog.LoggerWithName(logger, "Reserve")
// TODO(knelasevero): Remove duplicated keys from log entry calls
// When contextualized logging hits GA
// https://github.com/kubernetes/kubernetes/issues/111672
logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName})
for _, pl := range f.reservePlugins { for _, pl := range f.reservePlugins {
logger := klog.LoggerWithName(logger, pl.Name())
ctx := klog.NewContext(ctx, logger)
status = f.runReservePluginReserve(ctx, pl, state, pod, nodeName) status = f.runReservePluginReserve(ctx, pl, state, pod, nodeName)
if !status.IsSuccess() { if !status.IsSuccess() {
err := status.AsError() err := status.AsError()
klog.ErrorS(err, "Failed running Reserve plugin", "plugin", pl.Name(), "pod", klog.KObj(pod)) logger.Error(err, "Plugin failed", "plugin", pl.Name(), "pod", klog.KObj(pod))
return framework.AsStatus(fmt.Errorf("running Reserve plugin %q: %w", pl.Name(), err)) return framework.AsStatus(fmt.Errorf("running Reserve plugin %q: %w", pl.Name(), err))
} }
} }
@ -1188,7 +1297,15 @@ func (f *frameworkImpl) RunReservePluginsUnreserve(ctx context.Context, state *f
}() }()
// Execute the Unreserve operation of each reserve plugin in the // Execute the Unreserve operation of each reserve plugin in the
// *reverse* order in which the Reserve operation was executed. // *reverse* order in which the Reserve operation was executed.
logger := klog.FromContext(ctx)
logger = klog.LoggerWithName(logger, "Unreserve")
// TODO(knelasevero): Remove duplicated keys from log entry calls
// When contextualized logging hits GA
// https://github.com/kubernetes/kubernetes/issues/111672
logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName})
for i := len(f.reservePlugins) - 1; i >= 0; i-- { for i := len(f.reservePlugins) - 1; i >= 0; i-- {
logger := klog.LoggerWithName(logger, f.reservePlugins[i].Name())
ctx := klog.NewContext(ctx, logger)
f.runReservePluginUnreserve(ctx, f.reservePlugins[i], state, pod, nodeName) f.runReservePluginUnreserve(ctx, f.reservePlugins[i], state, pod, nodeName)
} }
} }
@ -1216,11 +1333,19 @@ func (f *frameworkImpl) RunPermitPlugins(ctx context.Context, state *framework.C
}() }()
pluginsWaitTime := make(map[string]time.Duration) pluginsWaitTime := make(map[string]time.Duration)
statusCode := framework.Success statusCode := framework.Success
logger := klog.FromContext(ctx)
logger = klog.LoggerWithName(logger, "Permit")
// TODO(knelasevero): Remove duplicated keys from log entry calls
// When contextualized logging hits GA
// https://github.com/kubernetes/kubernetes/issues/111672
logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName})
for _, pl := range f.permitPlugins { for _, pl := range f.permitPlugins {
logger := klog.LoggerWithName(logger, pl.Name())
ctx := klog.NewContext(ctx, logger)
status, timeout := f.runPermitPlugin(ctx, pl, state, pod, nodeName) status, timeout := f.runPermitPlugin(ctx, pl, state, pod, nodeName)
if !status.IsSuccess() { if !status.IsSuccess() {
if status.IsUnschedulable() { if status.IsUnschedulable() {
klog.V(4).InfoS("Pod rejected by permit plugin", "pod", klog.KObj(pod), "plugin", pl.Name(), "status", status.Message()) logger.V(4).Info("Pod rejected by plugin", "pod", klog.KObj(pod), "plugin", pl.Name(), "status", status.Message())
status.SetFailedPlugin(pl.Name()) status.SetFailedPlugin(pl.Name())
return status return status
} }
@ -1233,7 +1358,7 @@ func (f *frameworkImpl) RunPermitPlugins(ctx context.Context, state *framework.C
statusCode = framework.Wait statusCode = framework.Wait
} else { } else {
err := status.AsError() err := status.AsError()
klog.ErrorS(err, "Failed running Permit plugin", "plugin", pl.Name(), "pod", klog.KObj(pod)) logger.Error(err, "Plugin failed", "plugin", pl.Name(), "pod", klog.KObj(pod))
return framework.AsStatus(fmt.Errorf("running Permit plugin %q: %w", pl.Name(), err)).WithFailedPlugin(pl.Name()) return framework.AsStatus(fmt.Errorf("running Permit plugin %q: %w", pl.Name(), err)).WithFailedPlugin(pl.Name())
} }
} }
@ -1242,7 +1367,7 @@ func (f *frameworkImpl) RunPermitPlugins(ctx context.Context, state *framework.C
waitingPod := newWaitingPod(pod, pluginsWaitTime) waitingPod := newWaitingPod(pod, pluginsWaitTime)
f.waitingPods.add(waitingPod) f.waitingPods.add(waitingPod)
msg := fmt.Sprintf("one or more plugins asked to wait and no plugin rejected pod %q", pod.Name) msg := fmt.Sprintf("one or more plugins asked to wait and no plugin rejected pod %q", pod.Name)
klog.V(4).InfoS("One or more plugins asked to wait and no plugin rejected pod", "pod", klog.KObj(pod)) logger.V(4).Info("One or more plugins asked to wait and no plugin rejected pod", "pod", klog.KObj(pod))
return framework.NewStatus(framework.Wait, msg) return framework.NewStatus(framework.Wait, msg)
} }
return nil return nil
@ -1265,7 +1390,9 @@ func (f *frameworkImpl) WaitOnPermit(ctx context.Context, pod *v1.Pod) *framewor
return nil return nil
} }
defer f.waitingPods.remove(pod.UID) defer f.waitingPods.remove(pod.UID)
klog.V(4).InfoS("Pod waiting on permit", "pod", klog.KObj(pod))
logger := klog.FromContext(ctx)
logger.V(4).Info("Pod waiting on permit", "pod", klog.KObj(pod))
startTime := time.Now() startTime := time.Now()
s := <-waitingPod.s s := <-waitingPod.s
@ -1273,11 +1400,11 @@ func (f *frameworkImpl) WaitOnPermit(ctx context.Context, pod *v1.Pod) *framewor
if !s.IsSuccess() { if !s.IsSuccess() {
if s.IsUnschedulable() { if s.IsUnschedulable() {
klog.V(4).InfoS("Pod rejected while waiting on permit", "pod", klog.KObj(pod), "status", s.Message()) logger.V(4).Info("Pod rejected while waiting on permit", "pod", klog.KObj(pod), "status", s.Message())
return s return s
} }
err := s.AsError() err := s.AsError()
klog.ErrorS(err, "Failed waiting on permit for pod", "pod", klog.KObj(pod)) logger.Error(err, "Failed waiting on permit for pod", "pod", klog.KObj(pod))
return framework.AsStatus(fmt.Errorf("waiting on permit for pod: %w", err)).WithFailedPlugin(s.FailedPlugin()) return framework.AsStatus(fmt.Errorf("waiting on permit for pod: %w", err)).WithFailedPlugin(s.FailedPlugin())
} }
return nil return nil