From b45639256783519b6271c0c1f1da17394a6fa0cc Mon Sep 17 00:00:00 2001 From: Mengjiao Liu Date: Wed, 22 Mar 2023 15:52:25 +0800 Subject: [PATCH 1/2] Migrated `pkg/scheduler/framework/runtime` to use contextual logging --- pkg/scheduler/framework/runtime/framework.go | 169 ++++++++++++++++--- 1 file changed, 148 insertions(+), 21 deletions(-) diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 9fcb35fabc4..7ce9f26e1ce 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -77,6 +77,7 @@ type frameworkImpl struct { kubeConfig *restclient.Config eventRecorder events.EventRecorder informerFactory informers.SharedInformerFactory + logger klog.Logger metricsRecorder *metrics.MetricAsyncRecorder profileName string @@ -134,6 +135,7 @@ type frameworkOptions struct { captureProfile CaptureProfile clusterEventMap map[framework.ClusterEvent]sets.Set[string] parallelizer parallelize.Parallelizer + logger *klog.Logger } // Option for the frameworkImpl. @@ -229,6 +231,13 @@ func WithMetricsRecorder(r *metrics.MetricAsyncRecorder) Option { } } +// WithLogger overrides the default logger from k8s.io/klog. +func WithLogger(logger klog.Logger) Option { + return func(o *frameworkOptions) { + o.logger = &logger + } +} + // defaultFrameworkOptions are applied when no option corresponding to those fields exist. func defaultFrameworkOptions(stopCh <-chan struct{}) frameworkOptions { return frameworkOptions{ @@ -247,6 +256,11 @@ func NewFramework(r Registry, profile *config.KubeSchedulerProfile, stopCh <-cha opt(&options) } + logger := klog.TODO() + if options.logger != nil { + logger = *options.logger + } + f := &frameworkImpl{ registry: r, snapshotSharedLister: options.snapshotSharedLister, @@ -260,6 +274,7 @@ func NewFramework(r Registry, profile *config.KubeSchedulerProfile, stopCh <-cha extenders: options.extenders, PodNominator: options.podNominator, parallelizer: options.parallelizer, + logger: logger, } if profile == nil { @@ -311,7 +326,7 @@ func NewFramework(r Registry, profile *config.KubeSchedulerProfile, stopCh <-cha pluginsMap[name] = p // Update ClusterEventMap in place. - fillEventToPluginMap(p, options.clusterEventMap) + fillEventToPluginMap(logger, p, options.clusterEventMap) } // initialize plugins per individual extension points @@ -323,7 +338,7 @@ func NewFramework(r Registry, profile *config.KubeSchedulerProfile, stopCh <-cha // initialize multiPoint plugins to their expanded extension points if len(profile.Plugins.MultiPoint.Enabled) > 0 { - if err := f.expandMultiPointPlugins(profile, pluginsMap); err != nil { + if err := f.expandMultiPointPlugins(logger, profile, pluginsMap); err != nil { return nil, err } } @@ -443,7 +458,7 @@ func (os *orderedSet) delete(s string) { } } -func (f *frameworkImpl) expandMultiPointPlugins(profile *config.KubeSchedulerProfile, pluginsMap map[string]framework.Plugin) error { +func (f *frameworkImpl) expandMultiPointPlugins(logger klog.Logger, profile *config.KubeSchedulerProfile, pluginsMap map[string]framework.Plugin) error { // initialize MultiPoint plugins for _, e := range f.getExtensionPoints(profile.Plugins) { plugins := reflect.ValueOf(e.slicePtr).Elem() @@ -460,7 +475,7 @@ func (f *frameworkImpl) expandMultiPointPlugins(profile *config.KubeSchedulerPro disabledSet.Insert(disabledPlugin.Name) } if disabledSet.Has("*") { - klog.V(4).InfoS("all plugins disabled for extension point, skipping MultiPoint expansion", "extension", pluginType) + logger.V(4).Info("Skipped MultiPoint expansion because all plugins are disabled for extension point", "extension", pluginType) continue } @@ -481,7 +496,7 @@ func (f *frameworkImpl) expandMultiPointPlugins(profile *config.KubeSchedulerPro // a plugin that's enabled via MultiPoint can still be disabled for specific extension points if disabledSet.Has(ep.Name) { - klog.V(4).InfoS("plugin disabled for extension point", "plugin", ep.Name, "extension", pluginType) + logger.V(4).Info("Skipped disabled plugin for extension point", "plugin", ep.Name, "extension", pluginType) continue } @@ -491,7 +506,7 @@ func (f *frameworkImpl) expandMultiPointPlugins(profile *config.KubeSchedulerPro // This maintains expected behavior for overriding default plugins (see https://github.com/kubernetes/kubernetes/pull/99582) if enabledSet.has(ep.Name) { overridePlugins.insert(ep.Name) - klog.InfoS("MultiPoint plugin is explicitly re-configured; overriding", "plugin", ep.Name) + logger.Info("MultiPoint plugin is explicitly re-configured; overriding", "plugin", ep.Name) continue } @@ -530,7 +545,7 @@ func (f *frameworkImpl) expandMultiPointPlugins(profile *config.KubeSchedulerPro return nil } -func fillEventToPluginMap(p framework.Plugin, eventToPlugins map[framework.ClusterEvent]sets.Set[string]) { +func fillEventToPluginMap(logger klog.Logger, p framework.Plugin, eventToPlugins map[framework.ClusterEvent]sets.Set[string]) { ext, ok := p.(framework.EnqueueExtensions) if !ok { // If interface EnqueueExtensions is not implemented, register the default events @@ -544,7 +559,7 @@ func fillEventToPluginMap(p framework.Plugin, eventToPlugins map[framework.Clust // We treat it as: the plugin is not interested in any event, and hence pod failed by that plugin // cannot be moved by any regular cluster event. if len(events) == 0 { - klog.InfoS("Plugin's EventsToRegister() returned nil", "plugin", p.Name()) + logger.Info("Plugin's EventsToRegister() returned nil", "plugin", p.Name()) return } // The most common case: a plugin implements EnqueueExtensions and returns non-nil result. @@ -622,7 +637,15 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor var result *framework.PreFilterResult var pluginsWithNodes []string skipPlugins := sets.New[string]() + logger := klog.FromContext(ctx) + logger = klog.LoggerWithName(logger, "PreFilter") + // TODO(knelasevero): Remove duplicated keys from log entry calls + // When contextualized logging hits GA + // https://github.com/kubernetes/kubernetes/issues/111672 + logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod)) for _, pl := range f.preFilterPlugins { + logger := klog.LoggerWithName(logger, pl.Name()) + ctx := klog.NewContext(ctx, logger) r, s := f.runPreFilterPlugin(ctx, pl, state, pod) if s.IsSkip() { skipPlugins.Insert(pl.Name()) @@ -671,14 +694,22 @@ func (f *frameworkImpl) RunPreFilterExtensionAddPod( podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo, ) (status *framework.Status) { + logger := klog.FromContext(ctx) + logger = klog.LoggerWithName(logger, "PreFilterExtension") + // TODO(knelasevero): Remove duplicated keys from log entry calls + // When contextualized logging hits GA + // https://github.com/kubernetes/kubernetes/issues/111672 + logger = klog.LoggerWithValues(logger, "pod", klog.KObj(podToSchedule), "node", klog.KObj(nodeInfo.Node()), "operation", "addPod") for _, pl := range f.preFilterPlugins { if pl.PreFilterExtensions() == nil || state.SkipFilterPlugins.Has(pl.Name()) { continue } + logger := klog.LoggerWithName(logger, pl.Name()) + ctx := klog.NewContext(ctx, logger) status = f.runPreFilterExtensionAddPod(ctx, pl, state, podToSchedule, podInfoToAdd, nodeInfo) if !status.IsSuccess() { err := status.AsError() - klog.ErrorS(err, "Failed running AddPod on PreFilter plugin", "plugin", pl.Name(), "pod", klog.KObj(podToSchedule)) + logger.Error(err, "Plugin failed", "pod", klog.KObj(podToSchedule), "node", klog.KObj(nodeInfo.Node()), "operation", "addPod", "plugin", pl.Name()) return framework.AsStatus(fmt.Errorf("running AddPod on PreFilter plugin %q: %w", pl.Name(), err)) } } @@ -706,14 +737,22 @@ func (f *frameworkImpl) RunPreFilterExtensionRemovePod( podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo, ) (status *framework.Status) { + logger := klog.FromContext(ctx) + logger = klog.LoggerWithName(logger, "PreFilterExtension") + // TODO(knelasevero): Remove duplicated keys from log entry calls + // When contextualized logging hits GA + // https://github.com/kubernetes/kubernetes/issues/111672 + logger = klog.LoggerWithValues(logger, klog.KObj(podToSchedule), "node", klog.KObj(nodeInfo.Node())) for _, pl := range f.preFilterPlugins { if pl.PreFilterExtensions() == nil || state.SkipFilterPlugins.Has(pl.Name()) { continue } + logger := klog.LoggerWithName(logger, pl.Name()) + ctx := klog.NewContext(ctx, logger) status = f.runPreFilterExtensionRemovePod(ctx, pl, state, podToSchedule, podInfoToRemove, nodeInfo) if !status.IsSuccess() { err := status.AsError() - klog.ErrorS(err, "Failed running RemovePod on PreFilter plugin", "plugin", pl.Name(), "pod", klog.KObj(podToSchedule)) + logger.Error(err, "Plugin failed", "node", klog.KObj(nodeInfo.Node()), "operation", "removePod", "plugin", pl.Name(), "pod", klog.KObj(podToSchedule)) return framework.AsStatus(fmt.Errorf("running RemovePod on PreFilter plugin %q: %w", pl.Name(), err)) } } @@ -741,7 +780,16 @@ func (f *frameworkImpl) RunFilterPlugins( pod *v1.Pod, nodeInfo *framework.NodeInfo, ) *framework.Status { + logger := klog.FromContext(ctx) + logger = klog.LoggerWithName(logger, "Filter") + // TODO(knelasevero): Remove duplicated keys from log entry calls + // When contextualized logging hits GA + // https://github.com/kubernetes/kubernetes/issues/111672 + logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod), "node", klog.KObj(nodeInfo.Node())) + for _, pl := range f.filterPlugins { + logger := klog.LoggerWithName(logger, pl.Name()) + ctx := klog.NewContext(ctx, logger) if state.SkipFilterPlugins.Has(pl.Name()) { continue } @@ -777,11 +825,20 @@ func (f *frameworkImpl) RunPostFilterPlugins(ctx context.Context, state *framewo metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PostFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) }() + logger := klog.FromContext(ctx) + logger = klog.LoggerWithName(logger, "PostFilter") + // TODO(knelasevero): Remove duplicated keys from log entry calls + // When contextualized logging hits GA + // https://github.com/kubernetes/kubernetes/issues/111672 + logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod)) + // `result` records the last meaningful(non-noop) PostFilterResult. var result *framework.PostFilterResult var reasons []string var failedPlugin string for _, pl := range f.postFilterPlugins { + logger := klog.LoggerWithName(logger, pl.Name()) + ctx := klog.NewContext(ctx, logger) r, s := f.runPostFilterPlugin(ctx, pl, state, pod, filteredNodeStatusMap) if s.IsSuccess() { return r, s @@ -847,6 +904,9 @@ func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, s // the nominated pods are treated as not running. We can't just assume the // nominated pods are running because they are not running right now and in fact, // they may end up getting scheduled to a different node. + logger := klog.FromContext(ctx) + logger = klog.LoggerWithName(logger, "FilterWithNominatedPods") + ctx = klog.NewContext(ctx, logger) for i := 0; i < 2; i++ { stateToUse := state nodeInfoToUse := info @@ -912,7 +972,15 @@ func (f *frameworkImpl) RunPreScorePlugins( metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PreScore, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) }() skipPlugins := sets.New[string]() + logger := klog.FromContext(ctx) + logger = klog.LoggerWithName(logger, "PreScore") + // TODO(knelasevero): Remove duplicated keys from log entry calls + // When contextualized logging hits GA + // https://github.com/kubernetes/kubernetes/issues/111672 + logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod)) for _, pl := range f.preScorePlugins { + logger := klog.LoggerWithName(logger, pl.Name()) + ctx := klog.NewContext(ctx, logger) status = f.runPreScorePlugin(ctx, pl, state, pod, nodes) if status.IsSkip() { skipPlugins.Insert(pl.Name()) @@ -961,10 +1029,19 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy errCh := parallelize.NewErrorChannel() if len(plugins) > 0 { + logger := klog.FromContext(ctx) + logger = klog.LoggerWithName(logger, "Score") + // TODO(knelasevero): Remove duplicated keys from log entry calls + // When contextualized logging hits GA + // https://github.com/kubernetes/kubernetes/issues/111672 + logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod)) // Run Score method for each node in parallel. f.Parallelizer().Until(ctx, len(nodes), func(index int) { nodeName := nodes[index].Name + logger := klog.LoggerWithValues(logger, "node", klog.ObjectRef{Name: nodeName}) for _, pl := range plugins { + logger := klog.LoggerWithName(logger, pl.Name()) + ctx := klog.NewContext(ctx, logger) s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName) if !status.IsSuccess() { err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError()) @@ -1062,16 +1139,24 @@ func (f *frameworkImpl) RunPreBindPlugins(ctx context.Context, state *framework. defer func() { metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PreBind, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) }() + logger := klog.FromContext(ctx) + logger = klog.LoggerWithName(logger, "PreBind") + // TODO(knelasevero): Remove duplicated keys from log entry calls + // When contextualized logging hits GA + // https://github.com/kubernetes/kubernetes/issues/111672 + logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}) for _, pl := range f.preBindPlugins { + logger := klog.LoggerWithName(logger, pl.Name()) + ctx := klog.NewContext(ctx, logger) status = f.runPreBindPlugin(ctx, pl, state, pod, nodeName) if !status.IsSuccess() { if status.IsUnschedulable() { - klog.V(4).InfoS("Pod rejected by PreBind plugin", "pod", klog.KObj(pod), "node", nodeName, "plugin", pl.Name(), "status", status.Message()) + logger.V(4).Info("Pod rejected by PreBind plugin", "pod", klog.KObj(pod), "node", nodeName, "plugin", pl.Name(), "status", status.Message()) status.SetFailedPlugin(pl.Name()) return status } err := status.AsError() - klog.ErrorS(err, "Failed running PreBind plugin", "plugin", pl.Name(), "pod", klog.KObj(pod), "node", nodeName) + logger.Error(err, "Plugin failed", "plugin", pl.Name(), "pod", klog.KObj(pod), "node", nodeName) return framework.AsStatus(fmt.Errorf("running PreBind plugin %q: %w", pl.Name(), err)) } } @@ -1097,19 +1182,27 @@ func (f *frameworkImpl) RunBindPlugins(ctx context.Context, state *framework.Cyc if len(f.bindPlugins) == 0 { return framework.NewStatus(framework.Skip, "") } + logger := klog.FromContext(ctx) + logger = klog.LoggerWithName(logger, "Bind") + // TODO(knelasevero): Remove duplicated keys from log entry calls + // When contextualized logging hits GA + // https://github.com/kubernetes/kubernetes/issues/111672 + logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}) for _, pl := range f.bindPlugins { + logger := klog.LoggerWithName(logger, pl.Name()) + ctx := klog.NewContext(ctx, logger) status = f.runBindPlugin(ctx, pl, state, pod, nodeName) if status.IsSkip() { continue } if !status.IsSuccess() { if status.IsUnschedulable() { - klog.V(4).InfoS("Pod rejected by Bind plugin", "pod", klog.KObj(pod), "node", nodeName, "plugin", pl.Name(), "status", status.Message()) + logger.V(4).Info("Pod rejected by Bind plugin", "pod", klog.KObj(pod), "node", nodeName, "plugin", pl.Name(), "status", status.Message()) status.SetFailedPlugin(pl.Name()) return status } err := status.AsError() - klog.ErrorS(err, "Failed running Bind plugin", "plugin", pl.Name(), "pod", klog.KObj(pod), "node", nodeName) + logger.Error(err, "Plugin Failed", "plugin", pl.Name(), "pod", klog.KObj(pod), "node", nodeName) return framework.AsStatus(fmt.Errorf("running Bind plugin %q: %w", pl.Name(), err)) } return status @@ -1133,7 +1226,15 @@ func (f *frameworkImpl) RunPostBindPlugins(ctx context.Context, state *framework defer func() { metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PostBind, framework.Success.String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) }() + logger := klog.FromContext(ctx) + logger = klog.LoggerWithName(logger, "PostBind") + // TODO(knelasevero): Remove duplicated keys from log entry calls + // When contextualized logging hits GA + // https://github.com/kubernetes/kubernetes/issues/111672 + logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}) for _, pl := range f.postBindPlugins { + logger := klog.LoggerWithName(logger, pl.Name()) + ctx := klog.NewContext(ctx, logger) f.runPostBindPlugin(ctx, pl, state, pod, nodeName) } } @@ -1158,11 +1259,19 @@ func (f *frameworkImpl) RunReservePluginsReserve(ctx context.Context, state *fra defer func() { metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Reserve, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) }() + logger := klog.FromContext(ctx) + logger = klog.LoggerWithName(logger, "Reserve") + // TODO(knelasevero): Remove duplicated keys from log entry calls + // When contextualized logging hits GA + // https://github.com/kubernetes/kubernetes/issues/111672 + logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}) for _, pl := range f.reservePlugins { + logger := klog.LoggerWithName(logger, pl.Name()) + ctx := klog.NewContext(ctx, logger) status = f.runReservePluginReserve(ctx, pl, state, pod, nodeName) if !status.IsSuccess() { err := status.AsError() - klog.ErrorS(err, "Failed running Reserve plugin", "plugin", pl.Name(), "pod", klog.KObj(pod)) + logger.Error(err, "Plugin failed", "plugin", pl.Name(), "pod", klog.KObj(pod)) return framework.AsStatus(fmt.Errorf("running Reserve plugin %q: %w", pl.Name(), err)) } } @@ -1188,7 +1297,15 @@ func (f *frameworkImpl) RunReservePluginsUnreserve(ctx context.Context, state *f }() // Execute the Unreserve operation of each reserve plugin in the // *reverse* order in which the Reserve operation was executed. + logger := klog.FromContext(ctx) + logger = klog.LoggerWithName(logger, "Unreserve") + // TODO(knelasevero): Remove duplicated keys from log entry calls + // When contextualized logging hits GA + // https://github.com/kubernetes/kubernetes/issues/111672 + logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}) for i := len(f.reservePlugins) - 1; i >= 0; i-- { + logger := klog.LoggerWithName(logger, f.reservePlugins[i].Name()) + ctx := klog.NewContext(ctx, logger) f.runReservePluginUnreserve(ctx, f.reservePlugins[i], state, pod, nodeName) } } @@ -1216,11 +1333,19 @@ func (f *frameworkImpl) RunPermitPlugins(ctx context.Context, state *framework.C }() pluginsWaitTime := make(map[string]time.Duration) statusCode := framework.Success + logger := klog.FromContext(ctx) + logger = klog.LoggerWithName(logger, "Permit") + // TODO(knelasevero): Remove duplicated keys from log entry calls + // When contextualized logging hits GA + // https://github.com/kubernetes/kubernetes/issues/111672 + logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}) for _, pl := range f.permitPlugins { + logger := klog.LoggerWithName(logger, pl.Name()) + ctx := klog.NewContext(ctx, logger) status, timeout := f.runPermitPlugin(ctx, pl, state, pod, nodeName) if !status.IsSuccess() { if status.IsUnschedulable() { - klog.V(4).InfoS("Pod rejected by permit plugin", "pod", klog.KObj(pod), "plugin", pl.Name(), "status", status.Message()) + logger.V(4).Info("Pod rejected by plugin", "pod", klog.KObj(pod), "plugin", pl.Name(), "status", status.Message()) status.SetFailedPlugin(pl.Name()) return status } @@ -1233,7 +1358,7 @@ func (f *frameworkImpl) RunPermitPlugins(ctx context.Context, state *framework.C statusCode = framework.Wait } else { err := status.AsError() - klog.ErrorS(err, "Failed running Permit plugin", "plugin", pl.Name(), "pod", klog.KObj(pod)) + logger.Error(err, "Plugin failed", "plugin", pl.Name(), "pod", klog.KObj(pod)) return framework.AsStatus(fmt.Errorf("running Permit plugin %q: %w", pl.Name(), err)).WithFailedPlugin(pl.Name()) } } @@ -1242,7 +1367,7 @@ func (f *frameworkImpl) RunPermitPlugins(ctx context.Context, state *framework.C waitingPod := newWaitingPod(pod, pluginsWaitTime) f.waitingPods.add(waitingPod) msg := fmt.Sprintf("one or more plugins asked to wait and no plugin rejected pod %q", pod.Name) - klog.V(4).InfoS("One or more plugins asked to wait and no plugin rejected pod", "pod", klog.KObj(pod)) + logger.V(4).Info("One or more plugins asked to wait and no plugin rejected pod", "pod", klog.KObj(pod)) return framework.NewStatus(framework.Wait, msg) } return nil @@ -1265,7 +1390,9 @@ func (f *frameworkImpl) WaitOnPermit(ctx context.Context, pod *v1.Pod) *framewor return nil } defer f.waitingPods.remove(pod.UID) - klog.V(4).InfoS("Pod waiting on permit", "pod", klog.KObj(pod)) + + logger := klog.FromContext(ctx) + logger.V(4).Info("Pod waiting on permit", "pod", klog.KObj(pod)) startTime := time.Now() s := <-waitingPod.s @@ -1273,11 +1400,11 @@ func (f *frameworkImpl) WaitOnPermit(ctx context.Context, pod *v1.Pod) *framewor if !s.IsSuccess() { if s.IsUnschedulable() { - klog.V(4).InfoS("Pod rejected while waiting on permit", "pod", klog.KObj(pod), "status", s.Message()) + logger.V(4).Info("Pod rejected while waiting on permit", "pod", klog.KObj(pod), "status", s.Message()) return s } err := s.AsError() - klog.ErrorS(err, "Failed waiting on permit for pod", "pod", klog.KObj(pod)) + logger.Error(err, "Failed waiting on permit for pod", "pod", klog.KObj(pod)) return framework.AsStatus(fmt.Errorf("waiting on permit for pod: %w", err)).WithFailedPlugin(s.FailedPlugin()) } return nil From fe728996ca809ff24d2131492a72025400434376 Mon Sep 17 00:00:00 2001 From: Mengjiao Liu Date: Thu, 23 Mar 2023 11:22:24 +0800 Subject: [PATCH 2/2] scheduler test: call frameworkruntime.WithLogger function for contextual logging --- pkg/scheduler/extender_test.go | 5 ++++- .../defaultpreemption/default_preemption_test.go | 15 ++++++++++++--- .../framework/preemption/preemption_test.go | 3 +++ 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/pkg/scheduler/extender_test.go b/pkg/scheduler/extender_test.go index 9a4e6331ced..97f148f52db 100644 --- a/pkg/scheduler/extender_test.go +++ b/pkg/scheduler/extender_test.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" clientsetfake "k8s.io/client-go/kubernetes/fake" + "k8s.io/klog/v2/ktesting" extenderv1 "k8s.io/kube-scheduler/extender/v1" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" @@ -278,7 +279,8 @@ func TestSchedulerWithExtenders(t *testing.T) { for ii := range test.extenders { extenders = append(extenders, &test.extenders[ii]) } - ctx, cancel := context.WithCancel(context.Background()) + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() cache := internalcache.New(time.Duration(0), ctx.Done()) @@ -290,6 +292,7 @@ func TestSchedulerWithExtenders(t *testing.T) { runtime.WithClientSet(client), runtime.WithInformerFactory(informerFactory), runtime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), + runtime.WithLogger(logger), ) if err != nil { t.Fatal(err) diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go index d2fb9c4227d..496f5bd4a64 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go @@ -352,7 +352,8 @@ func TestPostFilter(t *testing.T) { if tt.extender != nil { extenders = append(extenders, tt.extender) } - ctx, cancel := context.WithCancel(context.Background()) + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() f, err := st.NewFramework(registeredPlugins, "", ctx.Done(), frameworkruntime.WithClientSet(cs), @@ -361,6 +362,7 @@ func TestPostFilter(t *testing.T) { frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), frameworkruntime.WithExtenders(extenders), frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.pods, tt.nodes)), + frameworkruntime.WithLogger(logger), ) if err != nil { t.Fatal(err) @@ -1087,7 +1089,8 @@ func TestDryRunPreemption(t *testing.T) { parallelism = 1 } - ctx, cancel := context.WithCancel(context.Background()) + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() fwk, err := st.NewFramework( registeredPlugins, "", ctx.Done(), @@ -1095,6 +1098,7 @@ func TestDryRunPreemption(t *testing.T) { frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithParallelism(parallelism), + frameworkruntime.WithLogger(logger), ) if err != nil { t.Fatal(err) @@ -1351,6 +1355,7 @@ func TestSelectBestCandidate(t *testing.T) { ctx.Done(), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), frameworkruntime.WithSnapshotSharedLister(snapshot), + frameworkruntime.WithLogger(logger), ) if err != nil { t.Fatal(err) @@ -1480,6 +1485,7 @@ func TestPodEligibleToPreemptOthers(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) var nodes []*v1.Node for _, n := range test.nodes { nodes = append(nodes, st.MakeNode().Name(n).Obj()) @@ -1492,6 +1498,7 @@ func TestPodEligibleToPreemptOthers(t *testing.T) { defer close(stopCh) f, err := st.NewFramework(registeredPlugins, "", stopCh, frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)), + frameworkruntime.WithLogger(logger), ) if err != nil { t.Fatal(err) @@ -1689,7 +1696,8 @@ func TestPreempt(t *testing.T) { return true, nil, nil }) - ctx, cancel := context.WithCancel(context.Background()) + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() cache := internalcache.New(time.Duration(0), ctx.Done()) @@ -1735,6 +1743,7 @@ func TestPreempt(t *testing.T) { frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)), frameworkruntime.WithInformerFactory(informerFactory), + frameworkruntime.WithLogger(logger), ) if err != nil { t.Fatal(err) diff --git a/pkg/scheduler/framework/preemption/preemption_test.go b/pkg/scheduler/framework/preemption/preemption_test.go index 345d75a8482..d248857e59e 100644 --- a/pkg/scheduler/framework/preemption/preemption_test.go +++ b/pkg/scheduler/framework/preemption/preemption_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" clientsetfake "k8s.io/client-go/kubernetes/fake" + "k8s.io/klog/v2/ktesting" extenderv1 "k8s.io/kube-scheduler/extender/v1" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize" @@ -266,6 +267,7 @@ func TestDryRunPreemption(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) registeredPlugins := append([]st.RegisterPluginFunc{ st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New)}, st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), @@ -287,6 +289,7 @@ func TestDryRunPreemption(t *testing.T) { frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithParallelism(parallelism), frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.testPods, tt.nodes)), + frameworkruntime.WithLogger(logger), ) if err != nil { t.Fatal(err)