From a7466f44e01e4a3d5ba1f7f06c21607b92f79ee7 Mon Sep 17 00:00:00 2001 From: Mengjiao Liu Date: Wed, 6 Sep 2023 11:55:33 +0800 Subject: [PATCH] Change the scheduler plugins PluginFactory function to use context parameter to pass logger - Migrated pkg/scheduler/framework/plugins/nodevolumelimits to use contextual logging - Fix golangci-lint validation failed - Check for plugins creation err --- cmd/kube-scheduler/app/server_test.go | 2 +- .../plugins/defaultbinder/default_binder.go | 2 +- .../defaultpreemption/default_preemption.go | 2 +- .../default_preemption_test.go | 4 +- .../dynamicresources/dynamicresources.go | 2 +- .../dynamicresources/dynamicresources_test.go | 2 +- .../plugins/imagelocality/image_locality.go | 2 +- .../imagelocality/image_locality_test.go | 5 +- .../plugins/interpodaffinity/plugin.go | 3 +- .../plugins/nodeaffinity/node_affinity.go | 2 +- .../nodeaffinity/node_affinity_test.go | 5 +- .../framework/plugins/nodename/node_name.go | 2 +- .../plugins/nodename/node_name_test.go | 11 +-- .../framework/plugins/nodeports/node_ports.go | 2 +- .../plugins/nodeports/node_ports_test.go | 20 ++++-- .../noderesources/balanced_allocation.go | 2 +- .../noderesources/balanced_allocation_test.go | 2 +- .../framework/plugins/noderesources/fit.go | 2 +- .../plugins/noderesources/fit_test.go | 41 +++++++---- .../noderesources/least_allocated_test.go | 1 + .../noderesources/most_allocated_test.go | 2 +- .../requested_to_capacity_ratio_test.go | 6 +- .../nodeunschedulable/node_unschedulable.go | 2 +- .../node_unschedulable_test.go | 12 ++-- .../framework/plugins/nodevolumelimits/csi.go | 54 ++++++++------- .../plugins/nodevolumelimits/non_csi.go | 50 +++++++------- .../plugins/nodevolumelimits/non_csi_test.go | 24 ++++--- .../plugins/podtopologyspread/plugin.go | 3 +- .../plugins/podtopologyspread/scoring_test.go | 11 +-- .../plugins/queuesort/priority_sort.go | 3 +- .../schedulinggates/scheduling_gates.go | 2 +- .../schedulinggates/scheduling_gates_test.go | 7 +- .../tainttoleration/taint_toleration.go | 2 +- .../tainttoleration/taint_toleration_test.go | 13 +++- .../framework/plugins/testing/testing.go | 4 +- .../plugins/volumebinding/volume_binding.go | 2 +- .../volumebinding/volume_binding_test.go | 2 +- .../volumerestrictions/volume_restrictions.go | 2 +- .../volume_restrictions_test.go | 4 +- .../plugins/volumezone/volume_zone.go | 2 +- pkg/scheduler/framework/runtime/framework.go | 2 +- .../framework/runtime/framework_test.go | 68 +++++++++++-------- pkg/scheduler/framework/runtime/registry.go | 9 +-- .../framework/runtime/registry_test.go | 7 +- pkg/scheduler/profile/profile_test.go | 4 +- pkg/scheduler/schedule_one_test.go | 22 +++--- pkg/scheduler/scheduler_test.go | 33 ++++----- .../testing/framework/fake_extender.go | 2 +- .../testing/framework/fake_plugins.go | 18 ++--- .../scheduler/plugins/plugins_test.go | 4 +- .../scheduler/preemption/preemption_test.go | 4 +- test/integration/scheduler/queue_test.go | 8 +-- test/integration/scheduler/util.go | 3 +- 53 files changed, 287 insertions(+), 218 deletions(-) diff --git a/cmd/kube-scheduler/app/server_test.go b/cmd/kube-scheduler/app/server_test.go index 2293c15fa19..6f3e15d0630 100644 --- a/cmd/kube-scheduler/app/server_test.go +++ b/cmd/kube-scheduler/app/server_test.go @@ -450,7 +450,7 @@ func (*foo) Name() string { return "Foo" } -func newFoo(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { +func newFoo(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return &foo{}, nil } diff --git a/pkg/scheduler/framework/plugins/defaultbinder/default_binder.go b/pkg/scheduler/framework/plugins/defaultbinder/default_binder.go index ead88187481..aa1ed6a6943 100644 --- a/pkg/scheduler/framework/plugins/defaultbinder/default_binder.go +++ b/pkg/scheduler/framework/plugins/defaultbinder/default_binder.go @@ -38,7 +38,7 @@ type DefaultBinder struct { var _ framework.BindPlugin = &DefaultBinder{} // New creates a DefaultBinder. -func New(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) { +func New(_ context.Context, _ runtime.Object, handle framework.Handle) (framework.Plugin, error) { return &DefaultBinder{handle: handle}, nil } diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go index b54676fe010..d77080ee3ea 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go @@ -63,7 +63,7 @@ func (pl *DefaultPreemption) Name() string { } // New initializes a new plugin and returns it. -func New(dpArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) { +func New(_ context.Context, dpArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) { args, ok := dpArgs.(*config.DefaultPreemptionArgs) if !ok { return nil, fmt.Errorf("got args of type %T, want *DefaultPreemptionArgs", dpArgs) diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go index 5a51c0ae69c..ef8248d7c4e 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go @@ -104,7 +104,7 @@ type TestPlugin struct { name string } -func newTestPlugin(injArgs runtime.Object, f framework.Handle) (framework.Plugin, error) { +func newTestPlugin(_ context.Context, injArgs runtime.Object, f framework.Handle) (framework.Plugin, error) { return &TestPlugin{name: "test-plugin"}, nil } @@ -1066,7 +1066,7 @@ func TestDryRunPreemption(t *testing.T) { registeredPlugins := append([]tf.RegisterPluginFunc{ tf.RegisterFilterPlugin( "FakeFilter", - func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { + func(_ context.Context, _ runtime.Object, fh framework.Handle) (framework.Plugin, error) { return &fakePlugin, nil }, )}, diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 2c0541b1edd..88db9217daf 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -241,7 +241,7 @@ type dynamicResources struct { } // New initializes a new plugin and returns it. -func New(plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) { +func New(_ context.Context, plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) { if !fts.EnableDynamicResourceAllocation { // Disabled, won't do anything. return &dynamicResources{}, nil diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go index b284a7bfa8f..4e0f76b2020 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go @@ -780,7 +780,7 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceCl t.Fatal(err) } - pl, err := New(nil, fh, feature.Features{EnableDynamicResourceAllocation: true}) + pl, err := New(ctx, nil, fh, feature.Features{EnableDynamicResourceAllocation: true}) if err != nil { t.Fatal(err) } diff --git a/pkg/scheduler/framework/plugins/imagelocality/image_locality.go b/pkg/scheduler/framework/plugins/imagelocality/image_locality.go index 9eeab05b260..bdf3c6c6213 100644 --- a/pkg/scheduler/framework/plugins/imagelocality/image_locality.go +++ b/pkg/scheduler/framework/plugins/imagelocality/image_locality.go @@ -74,7 +74,7 @@ func (pl *ImageLocality) ScoreExtensions() framework.ScoreExtensions { } // New initializes a new plugin and returns it. -func New(_ runtime.Object, h framework.Handle) (framework.Plugin, error) { +func New(_ context.Context, _ runtime.Object, h framework.Handle) (framework.Plugin, error) { return &ImageLocality{handle: h}, nil } diff --git a/pkg/scheduler/framework/plugins/imagelocality/image_locality_test.go b/pkg/scheduler/framework/plugins/imagelocality/image_locality_test.go index 26ee547f595..8e16df35f0e 100644 --- a/pkg/scheduler/framework/plugins/imagelocality/image_locality_test.go +++ b/pkg/scheduler/framework/plugins/imagelocality/image_locality_test.go @@ -340,7 +340,10 @@ func TestImageLocalityPriority(t *testing.T) { state := framework.NewCycleState() fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot)) - p, _ := New(nil, fh) + p, err := New(ctx, nil, fh) + if err != nil { + t.Fatalf("creating plugin: %v", err) + } var gotList framework.NodeScoreList for _, n := range test.nodes { nodeName := n.ObjectMeta.Name diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go b/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go index b7131eaf48f..6f6c3c8feb6 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go @@ -17,6 +17,7 @@ limitations under the License. package interpodaffinity import ( + "context" "fmt" "k8s.io/apimachinery/pkg/labels" @@ -69,7 +70,7 @@ func (pl *InterPodAffinity) EventsToRegister() []framework.ClusterEventWithHint } // New initializes a new plugin and returns it. -func New(plArgs runtime.Object, h framework.Handle) (framework.Plugin, error) { +func New(_ context.Context, plArgs runtime.Object, h framework.Handle) (framework.Plugin, error) { if h.SnapshotSharedLister() == nil { return nil, fmt.Errorf("SnapshotSharedlister is nil") } diff --git a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go index d9d431f9aeb..ca8263d0685 100644 --- a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go +++ b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go @@ -243,7 +243,7 @@ func (pl *NodeAffinity) ScoreExtensions() framework.ScoreExtensions { } // New initializes a new plugin and returns it. -func New(plArgs runtime.Object, h framework.Handle) (framework.Plugin, error) { +func New(_ context.Context, plArgs runtime.Object, h framework.Handle) (framework.Plugin, error) { args, err := getArgs(plArgs) if err != nil { return nil, err diff --git a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go index 69778c27a09..18fbf91630f 100644 --- a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go +++ b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go @@ -896,6 +896,7 @@ func TestNodeAffinity(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) node := v1.Node{ObjectMeta: metav1.ObjectMeta{ Name: test.nodeName, Labels: test.labels, @@ -903,7 +904,7 @@ func TestNodeAffinity(t *testing.T) { nodeInfo := framework.NewNodeInfo() nodeInfo.SetNode(&node) - p, err := New(&test.args, nil) + p, err := New(ctx, &test.args, nil) if err != nil { t.Fatalf("Creating plugin: %v", err) } @@ -1141,7 +1142,7 @@ func TestNodeAffinityPriority(t *testing.T) { state := framework.NewCycleState() fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(cache.NewSnapshot(nil, test.nodes))) - p, err := New(&test.args, fh) + p, err := New(ctx, &test.args, fh) if err != nil { t.Fatalf("Creating plugin: %v", err) } diff --git a/pkg/scheduler/framework/plugins/nodename/node_name.go b/pkg/scheduler/framework/plugins/nodename/node_name.go index 7adea806cb7..ad222e4cf04 100644 --- a/pkg/scheduler/framework/plugins/nodename/node_name.go +++ b/pkg/scheduler/framework/plugins/nodename/node_name.go @@ -67,6 +67,6 @@ func Fits(pod *v1.Pod, nodeInfo *framework.NodeInfo) bool { } // New initializes a new plugin and returns it. -func New(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { +func New(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return &NodeName{}, nil } diff --git a/pkg/scheduler/framework/plugins/nodename/node_name_test.go b/pkg/scheduler/framework/plugins/nodename/node_name_test.go index 90461ea5676..0b92da92131 100644 --- a/pkg/scheduler/framework/plugins/nodename/node_name_test.go +++ b/pkg/scheduler/framework/plugins/nodename/node_name_test.go @@ -17,13 +17,13 @@ limitations under the License. package nodename import ( - "context" "reflect" "testing" v1 "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/scheduler/framework" st "k8s.io/kubernetes/pkg/scheduler/testing" + "k8s.io/kubernetes/test/utils/ktesting" ) func TestNodeName(t *testing.T) { @@ -55,9 +55,12 @@ func TestNodeName(t *testing.T) { t.Run(test.name, func(t *testing.T) { nodeInfo := framework.NewNodeInfo() nodeInfo.SetNode(test.node) - - p, _ := New(nil, nil) - gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, nodeInfo) + _, ctx := ktesting.NewTestContext(t) + p, err := New(ctx, nil, nil) + if err != nil { + t.Fatalf("creating plugin: %v", err) + } + gotStatus := p.(framework.FilterPlugin).Filter(ctx, nil, test.pod, nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } diff --git a/pkg/scheduler/framework/plugins/nodeports/node_ports.go b/pkg/scheduler/framework/plugins/nodeports/node_ports.go index 515aab09eeb..656c87495b2 100644 --- a/pkg/scheduler/framework/plugins/nodeports/node_ports.go +++ b/pkg/scheduler/framework/plugins/nodeports/node_ports.go @@ -149,6 +149,6 @@ func fitsPorts(wantPorts []*v1.ContainerPort, nodeInfo *framework.NodeInfo) bool } // New initializes a new plugin and returns it. -func New(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { +func New(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return &NodePorts{}, nil } diff --git a/pkg/scheduler/framework/plugins/nodeports/node_ports_test.go b/pkg/scheduler/framework/plugins/nodeports/node_ports_test.go index fa9c419c00b..92cd013614f 100644 --- a/pkg/scheduler/framework/plugins/nodeports/node_ports_test.go +++ b/pkg/scheduler/framework/plugins/nodeports/node_ports_test.go @@ -17,7 +17,6 @@ limitations under the License. package nodeports import ( - "context" "fmt" "reflect" "strconv" @@ -26,6 +25,7 @@ import ( "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/scheduler/framework" st "k8s.io/kubernetes/pkg/scheduler/testing" ) @@ -143,9 +143,13 @@ func TestNodePorts(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - p, _ := New(nil, nil) + _, ctx := ktesting.NewTestContext(t) + p, err := New(ctx, nil, nil) + if err != nil { + t.Fatalf("creating plugin: %v", err) + } cycleState := framework.NewCycleState() - _, preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), cycleState, test.pod) + _, preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, cycleState, test.pod) if diff := cmp.Diff(test.wantPreFilterStatus, preFilterStatus); diff != "" { t.Errorf("preFilter status does not match (-want,+got): %s", diff) } @@ -155,7 +159,7 @@ func TestNodePorts(t *testing.T) { if !preFilterStatus.IsSuccess() { t.Errorf("prefilter failed with status: %v", preFilterStatus) } - gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), cycleState, test.pod, test.nodeInfo) + gotStatus := p.(framework.FilterPlugin).Filter(ctx, cycleState, test.pod, test.nodeInfo) if diff := cmp.Diff(test.wantFilterStatus, gotStatus); diff != "" { t.Errorf("filter status does not match (-want, +got): %s", diff) } @@ -164,13 +168,17 @@ func TestNodePorts(t *testing.T) { } func TestPreFilterDisabled(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) pod := &v1.Pod{} nodeInfo := framework.NewNodeInfo() node := v1.Node{} nodeInfo.SetNode(&node) - p, _ := New(nil, nil) + p, err := New(ctx, nil, nil) + if err != nil { + t.Fatalf("creating plugin: %v", err) + } cycleState := framework.NewCycleState() - gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), cycleState, pod, nodeInfo) + gotStatus := p.(framework.FilterPlugin).Filter(ctx, cycleState, pod, nodeInfo) wantStatus := framework.AsStatus(fmt.Errorf(`reading "PreFilterNodePorts" from cycleState: %w`, framework.ErrNotFound)) if !reflect.DeepEqual(gotStatus, wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, wantStatus) diff --git a/pkg/scheduler/framework/plugins/noderesources/balanced_allocation.go b/pkg/scheduler/framework/plugins/noderesources/balanced_allocation.go index f375be2d42f..288fa0d7ff3 100644 --- a/pkg/scheduler/framework/plugins/noderesources/balanced_allocation.go +++ b/pkg/scheduler/framework/plugins/noderesources/balanced_allocation.go @@ -114,7 +114,7 @@ func (ba *BalancedAllocation) ScoreExtensions() framework.ScoreExtensions { } // NewBalancedAllocation initializes a new plugin and returns it. -func NewBalancedAllocation(baArgs runtime.Object, h framework.Handle, fts feature.Features) (framework.Plugin, error) { +func NewBalancedAllocation(_ context.Context, baArgs runtime.Object, h framework.Handle, fts feature.Features) (framework.Plugin, error) { args, ok := baArgs.(*config.NodeResourcesBalancedAllocationArgs) if !ok { return nil, fmt.Errorf("want args to be of type NodeResourcesBalancedAllocationArgs, got %T", baArgs) diff --git a/pkg/scheduler/framework/plugins/noderesources/balanced_allocation_test.go b/pkg/scheduler/framework/plugins/noderesources/balanced_allocation_test.go index f461a3d6d55..26d17b6af50 100644 --- a/pkg/scheduler/framework/plugins/noderesources/balanced_allocation_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/balanced_allocation_test.go @@ -389,7 +389,7 @@ func TestNodeResourcesBalancedAllocation(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot)) - p, _ := NewBalancedAllocation(&test.args, fh, feature.Features{}) + p, _ := NewBalancedAllocation(ctx, &test.args, fh, feature.Features{}) state := framework.NewCycleState() for i := range test.nodes { if test.runPreScore { diff --git a/pkg/scheduler/framework/plugins/noderesources/fit.go b/pkg/scheduler/framework/plugins/noderesources/fit.go index 04e9bcbf757..ba09fa8628a 100644 --- a/pkg/scheduler/framework/plugins/noderesources/fit.go +++ b/pkg/scheduler/framework/plugins/noderesources/fit.go @@ -145,7 +145,7 @@ func (f *Fit) Name() string { } // NewFit initializes a new plugin and returns it. -func NewFit(plArgs runtime.Object, h framework.Handle, fts feature.Features) (framework.Plugin, error) { +func NewFit(_ context.Context, plArgs runtime.Object, h framework.Handle, fts feature.Features) (framework.Plugin, error) { args, ok := plArgs.(*config.NodeResourcesFitArgs) if !ok { return nil, fmt.Errorf("want args to be of type NodeResourcesFitArgs, got %T", plArgs) diff --git a/pkg/scheduler/framework/plugins/noderesources/fit_test.go b/pkg/scheduler/framework/plugins/noderesources/fit_test.go index 50b27ceee94..fb454fe6757 100644 --- a/pkg/scheduler/framework/plugins/noderesources/fit_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/fit_test.go @@ -496,17 +496,20 @@ func TestEnoughRequests(t *testing.T) { test.args.ScoringStrategy = defaultScoringStrategy } - p, err := NewFit(&test.args, nil, plfeature.Features{}) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + p, err := NewFit(ctx, &test.args, nil, plfeature.Features{}) if err != nil { t.Fatal(err) } cycleState := framework.NewCycleState() - _, preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), cycleState, test.pod) + _, preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, cycleState, test.pod) if !preFilterStatus.IsSuccess() { t.Errorf("prefilter failed with status: %v", preFilterStatus) } - gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), cycleState, test.pod, test.nodeInfo) + gotStatus := p.(framework.FilterPlugin).Filter(ctx, cycleState, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } @@ -520,16 +523,19 @@ func TestEnoughRequests(t *testing.T) { } func TestPreFilterDisabled(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() pod := &v1.Pod{} nodeInfo := framework.NewNodeInfo() node := v1.Node{} nodeInfo.SetNode(&node) - p, err := NewFit(&config.NodeResourcesFitArgs{ScoringStrategy: defaultScoringStrategy}, nil, plfeature.Features{}) + p, err := NewFit(ctx, &config.NodeResourcesFitArgs{ScoringStrategy: defaultScoringStrategy}, nil, plfeature.Features{}) if err != nil { t.Fatal(err) } cycleState := framework.NewCycleState() - gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), cycleState, pod, nodeInfo) + gotStatus := p.(framework.FilterPlugin).Filter(ctx, cycleState, pod, nodeInfo) wantStatus := framework.AsStatus(fmt.Errorf(`error reading "PreFilterNodeResourcesFit" from cycleState: %w`, framework.ErrNotFound)) if !reflect.DeepEqual(gotStatus, wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, wantStatus) @@ -571,20 +577,23 @@ func TestNotEnoughRequests(t *testing.T) { } for _, test := range notEnoughPodsTests { t.Run(test.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() node := v1.Node{Status: v1.NodeStatus{Capacity: v1.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 1, 0, 0, 0)}} test.nodeInfo.SetNode(&node) - p, err := NewFit(&config.NodeResourcesFitArgs{ScoringStrategy: defaultScoringStrategy}, nil, plfeature.Features{}) + p, err := NewFit(ctx, &config.NodeResourcesFitArgs{ScoringStrategy: defaultScoringStrategy}, nil, plfeature.Features{}) if err != nil { t.Fatal(err) } cycleState := framework.NewCycleState() - _, preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), cycleState, test.pod) + _, preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, cycleState, test.pod) if !preFilterStatus.IsSuccess() { t.Errorf("prefilter failed with status: %v", preFilterStatus) } - gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), cycleState, test.pod, test.nodeInfo) + gotStatus := p.(framework.FilterPlugin).Filter(ctx, cycleState, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } @@ -629,20 +638,23 @@ func TestStorageRequests(t *testing.T) { for _, test := range storagePodsTests { t.Run(test.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() node := v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 32, 5, 20, 5).Capacity, Allocatable: makeAllocatableResources(10, 20, 32, 5, 20, 5)}} test.nodeInfo.SetNode(&node) - p, err := NewFit(&config.NodeResourcesFitArgs{ScoringStrategy: defaultScoringStrategy}, nil, plfeature.Features{}) + p, err := NewFit(ctx, &config.NodeResourcesFitArgs{ScoringStrategy: defaultScoringStrategy}, nil, plfeature.Features{}) if err != nil { t.Fatal(err) } cycleState := framework.NewCycleState() - _, preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), cycleState, test.pod) + _, preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, cycleState, test.pod) if !preFilterStatus.IsSuccess() { t.Errorf("prefilter failed with status: %v", preFilterStatus) } - gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), cycleState, test.pod, test.nodeInfo) + gotStatus := p.(framework.FilterPlugin).Filter(ctx, cycleState, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } @@ -707,11 +719,14 @@ func TestRestartableInitContainers(t *testing.T) { for _, test := range testCases { t.Run(test.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() node := v1.Node{Status: v1.NodeStatus{Capacity: v1.ResourceList{}, Allocatable: makeAllocatableResources(0, 0, 1, 0, 0, 0)}} nodeInfo := framework.NewNodeInfo() nodeInfo.SetNode(&node) - p, err := NewFit(&config.NodeResourcesFitArgs{ScoringStrategy: defaultScoringStrategy}, nil, plfeature.Features{EnableSidecarContainers: test.enableSidecarContainers}) + p, err := NewFit(ctx, &config.NodeResourcesFitArgs{ScoringStrategy: defaultScoringStrategy}, nil, plfeature.Features{EnableSidecarContainers: test.enableSidecarContainers}) if err != nil { t.Fatal(err) } @@ -924,7 +939,7 @@ func TestFitScore(t *testing.T) { snapshot := cache.NewSnapshot(test.existingPods, test.nodes) fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot)) args := test.nodeResourcesFitArgs - p, err := NewFit(&args, fh, plfeature.Features{}) + p, err := NewFit(ctx, &args, fh, plfeature.Features{}) if err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/pkg/scheduler/framework/plugins/noderesources/least_allocated_test.go b/pkg/scheduler/framework/plugins/noderesources/least_allocated_test.go index 3c1c1ffc0a3..d3f3cc4e9a5 100644 --- a/pkg/scheduler/framework/plugins/noderesources/least_allocated_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/least_allocated_test.go @@ -395,6 +395,7 @@ func TestLeastAllocatedScoringStrategy(t *testing.T) { fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot)) p, err := NewFit( + ctx, &config.NodeResourcesFitArgs{ ScoringStrategy: &config.ScoringStrategy{ Type: config.LeastAllocated, diff --git a/pkg/scheduler/framework/plugins/noderesources/most_allocated_test.go b/pkg/scheduler/framework/plugins/noderesources/most_allocated_test.go index 03e56848066..d61037171d2 100644 --- a/pkg/scheduler/framework/plugins/noderesources/most_allocated_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/most_allocated_test.go @@ -351,7 +351,7 @@ func TestMostAllocatedScoringStrategy(t *testing.T) { snapshot := cache.NewSnapshot(test.existingPods, test.nodes) fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot)) - p, err := NewFit( + p, err := NewFit(ctx, &config.NodeResourcesFitArgs{ ScoringStrategy: &config.ScoringStrategy{ Type: config.MostAllocated, diff --git a/pkg/scheduler/framework/plugins/noderesources/requested_to_capacity_ratio_test.go b/pkg/scheduler/framework/plugins/noderesources/requested_to_capacity_ratio_test.go index 3d32999671f..9950acd45b4 100644 --- a/pkg/scheduler/framework/plugins/noderesources/requested_to_capacity_ratio_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/requested_to_capacity_ratio_test.go @@ -111,7 +111,7 @@ func TestRequestedToCapacityRatioScoringStrategy(t *testing.T) { snapshot := cache.NewSnapshot(test.existingPods, test.nodes) fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot)) - p, err := NewFit(&config.NodeResourcesFitArgs{ + p, err := NewFit(ctx, &config.NodeResourcesFitArgs{ ScoringStrategy: &config.ScoringStrategy{ Type: config.RequestedToCapacityRatio, Resources: test.resources, @@ -320,7 +320,7 @@ func TestResourceBinPackingSingleExtended(t *testing.T) { }, }, } - p, err := NewFit(&args, fh, plfeature.Features{}) + p, err := NewFit(ctx, &args, fh, plfeature.Features{}) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -548,7 +548,7 @@ func TestResourceBinPackingMultipleExtended(t *testing.T) { }, } - p, err := NewFit(&args, fh, plfeature.Features{}) + p, err := NewFit(ctx, &args, fh, plfeature.Features{}) if err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable.go b/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable.go index 5565013da83..9cee9caa2bd 100644 --- a/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable.go +++ b/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable.go @@ -104,6 +104,6 @@ func (pl *NodeUnschedulable) Filter(ctx context.Context, _ *framework.CycleState } // New initializes a new plugin and returns it. -func New(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { +func New(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return &NodeUnschedulable{}, nil } diff --git a/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable_test.go b/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable_test.go index 3be140a8cf4..2733f928a94 100644 --- a/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable_test.go +++ b/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable_test.go @@ -17,13 +17,12 @@ limitations under the License. package nodeunschedulable import ( - "context" "reflect" "testing" v1 "k8s.io/api/core/v1" - "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/test/utils/ktesting" ) func TestNodeUnschedulable(t *testing.T) { @@ -75,9 +74,12 @@ func TestNodeUnschedulable(t *testing.T) { for _, test := range testCases { nodeInfo := framework.NewNodeInfo() nodeInfo.SetNode(test.node) - - p, _ := New(nil, nil) - gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, nodeInfo) + _, ctx := ktesting.NewTestContext(t) + p, err := New(ctx, nil, nil) + if err != nil { + t.Fatalf("creating plugin: %v", err) + } + gotStatus := p.(framework.FilterPlugin).Filter(ctx, nil, test.pod, nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go index e26401d39c2..5db408b33ce 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go @@ -110,15 +110,17 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v node := nodeInfo.Node() + logger := klog.FromContext(ctx) + // If CSINode doesn't exist, the predicate may read the limits from Node object csiNode, err := pl.csiNodeLister.Get(node.Name) if err != nil { // TODO: return the error once CSINode is created by default (2 releases) - klog.V(5).InfoS("Could not get a CSINode object for the node", "node", klog.KObj(node), "err", err) + logger.V(5).Info("Could not get a CSINode object for the node", "node", klog.KObj(node), "err", err) } newVolumes := make(map[string]string) - if err := pl.filterAttachableVolumes(pod, csiNode, true /* new pod */, newVolumes); err != nil { + if err := pl.filterAttachableVolumes(logger, pod, csiNode, true /* new pod */, newVolumes); err != nil { return framework.AsStatus(err) } @@ -135,7 +137,7 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v attachedVolumes := make(map[string]string) for _, existingPod := range nodeInfo.Pods { - if err := pl.filterAttachableVolumes(existingPod.Pod, csiNode, false /* existing pod */, attachedVolumes); err != nil { + if err := pl.filterAttachableVolumes(logger, existingPod.Pod, csiNode, false /* existing pod */, attachedVolumes); err != nil { return framework.AsStatus(err) } } @@ -156,7 +158,7 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v maxVolumeLimit, ok := nodeVolumeLimits[v1.ResourceName(volumeLimitKey)] if ok { currentVolumeCount := attachedVolumeCount[volumeLimitKey] - klog.V(5).InfoS("Found plugin volume limits", "node", node.Name, "volumeLimitKey", volumeLimitKey, + logger.V(5).Info("Found plugin volume limits", "node", node.Name, "volumeLimitKey", volumeLimitKey, "maxLimits", maxVolumeLimit, "currentVolumeCount", currentVolumeCount, "newVolumeCount", count, "pod", klog.KObj(pod)) if currentVolumeCount+count > int(maxVolumeLimit) { @@ -169,7 +171,7 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v } func (pl *CSILimits) filterAttachableVolumes( - pod *v1.Pod, csiNode *storagev1.CSINode, newPod bool, result map[string]string) error { + logger klog.Logger, pod *v1.Pod, csiNode *storagev1.CSINode, newPod bool, result map[string]string) error { for _, vol := range pod.Spec.Volumes { pvcName := "" isEphemeral := false @@ -190,7 +192,7 @@ func (pl *CSILimits) filterAttachableVolumes( // - If the volume is migratable and CSI migration is enabled, need to count it // as well. // - If the volume is not migratable, it will be count in non_csi filter. - if err := pl.checkAttachableInlineVolume(&vol, csiNode, pod, result); err != nil { + if err := pl.checkAttachableInlineVolume(logger, &vol, csiNode, pod, result); err != nil { return err } @@ -212,7 +214,7 @@ func (pl *CSILimits) filterAttachableVolumes( } // If the PVC is invalid, we don't count the volume because // there's no guarantee that it belongs to the running predicate. - klog.V(5).InfoS("Unable to look up PVC info", "pod", klog.KObj(pod), "PVC", klog.KRef(pod.Namespace, pvcName)) + logger.V(5).Info("Unable to look up PVC info", "pod", klog.KObj(pod), "PVC", klog.KRef(pod.Namespace, pvcName)) continue } @@ -223,9 +225,9 @@ func (pl *CSILimits) filterAttachableVolumes( } } - driverName, volumeHandle := pl.getCSIDriverInfo(csiNode, pvc) + driverName, volumeHandle := pl.getCSIDriverInfo(logger, csiNode, pvc) if driverName == "" || volumeHandle == "" { - klog.V(5).InfoS("Could not find a CSI driver name or volume handle, not counting volume") + logger.V(5).Info("Could not find a CSI driver name or volume handle, not counting volume") continue } @@ -238,7 +240,7 @@ func (pl *CSILimits) filterAttachableVolumes( // checkAttachableInlineVolume takes an inline volume and add to the result map if the // volume is migratable and CSI migration for this plugin has been enabled. -func (pl *CSILimits) checkAttachableInlineVolume(vol *v1.Volume, csiNode *storagev1.CSINode, +func (pl *CSILimits) checkAttachableInlineVolume(logger klog.Logger, vol *v1.Volume, csiNode *storagev1.CSINode, pod *v1.Pod, result map[string]string) error { if !pl.translator.IsInlineMigratable(vol) { return nil @@ -253,7 +255,7 @@ func (pl *CSILimits) checkAttachableInlineVolume(vol *v1.Volume, csiNode *storag if csiNode != nil { csiNodeName = csiNode.Name } - klog.V(5).InfoS("CSI Migration is not enabled for provisioner", "provisioner", inTreeProvisionerName, + logger.V(5).Info("CSI Migration is not enabled for provisioner", "provisioner", inTreeProvisionerName, "pod", klog.KObj(pod), "csiNode", csiNodeName) return nil } @@ -280,21 +282,21 @@ func (pl *CSILimits) checkAttachableInlineVolume(vol *v1.Volume, csiNode *storag // getCSIDriverInfo returns the CSI driver name and volume ID of a given PVC. // If the PVC is from a migrated in-tree plugin, this function will return // the information of the CSI driver that the plugin has been migrated to. -func (pl *CSILimits) getCSIDriverInfo(csiNode *storagev1.CSINode, pvc *v1.PersistentVolumeClaim) (string, string) { +func (pl *CSILimits) getCSIDriverInfo(logger klog.Logger, csiNode *storagev1.CSINode, pvc *v1.PersistentVolumeClaim) (string, string) { pvName := pvc.Spec.VolumeName if pvName == "" { - klog.V(5).InfoS("Persistent volume had no name for claim", "PVC", klog.KObj(pvc)) - return pl.getCSIDriverInfoFromSC(csiNode, pvc) + logger.V(5).Info("Persistent volume had no name for claim", "PVC", klog.KObj(pvc)) + return pl.getCSIDriverInfoFromSC(logger, csiNode, pvc) } pv, err := pl.pvLister.Get(pvName) if err != nil { - klog.V(5).InfoS("Unable to look up PV info for PVC and PV", "PVC", klog.KObj(pvc), "PV", klog.KRef("", pvName)) + logger.V(5).Info("Unable to look up PV info for PVC and PV", "PVC", klog.KObj(pvc), "PV", klog.KRef("", pvName)) // If we can't fetch PV associated with PVC, may be it got deleted // or PVC was prebound to a PVC that hasn't been created yet. // fallback to using StorageClass for volume counting - return pl.getCSIDriverInfoFromSC(csiNode, pvc) + return pl.getCSIDriverInfoFromSC(logger, csiNode, pvc) } csiSource := pv.Spec.PersistentVolumeSource.CSI @@ -306,23 +308,23 @@ func (pl *CSILimits) getCSIDriverInfo(csiNode *storagev1.CSINode, pvc *v1.Persis pluginName, err := pl.translator.GetInTreePluginNameFromSpec(pv, nil) if err != nil { - klog.V(5).InfoS("Unable to look up plugin name from PV spec", "err", err) + logger.V(5).Info("Unable to look up plugin name from PV spec", "err", err) return "", "" } if !isCSIMigrationOn(csiNode, pluginName) { - klog.V(5).InfoS("CSI Migration of plugin is not enabled", "plugin", pluginName) + logger.V(5).Info("CSI Migration of plugin is not enabled", "plugin", pluginName) return "", "" } csiPV, err := pl.translator.TranslateInTreePVToCSI(pv) if err != nil { - klog.V(5).InfoS("Unable to translate in-tree volume to CSI", "err", err) + logger.V(5).Info("Unable to translate in-tree volume to CSI", "err", err) return "", "" } if csiPV.Spec.PersistentVolumeSource.CSI == nil { - klog.V(5).InfoS("Unable to get a valid volume source for translated PV", "PV", pvName) + logger.V(5).Info("Unable to get a valid volume source for translated PV", "PV", pvName) return "", "" } @@ -333,7 +335,7 @@ func (pl *CSILimits) getCSIDriverInfo(csiNode *storagev1.CSINode, pvc *v1.Persis } // getCSIDriverInfoFromSC returns the CSI driver name and a random volume ID of a given PVC's StorageClass. -func (pl *CSILimits) getCSIDriverInfoFromSC(csiNode *storagev1.CSINode, pvc *v1.PersistentVolumeClaim) (string, string) { +func (pl *CSILimits) getCSIDriverInfoFromSC(logger klog.Logger, csiNode *storagev1.CSINode, pvc *v1.PersistentVolumeClaim) (string, string) { namespace := pvc.Namespace pvcName := pvc.Name scName := storagehelpers.GetPersistentVolumeClaimClass(pvc) @@ -341,13 +343,13 @@ func (pl *CSILimits) getCSIDriverInfoFromSC(csiNode *storagev1.CSINode, pvc *v1. // If StorageClass is not set or not found, then PVC must be using immediate binding mode // and hence it must be bound before scheduling. So it is safe to not count it. if scName == "" { - klog.V(5).InfoS("PVC has no StorageClass", "PVC", klog.KObj(pvc)) + logger.V(5).Info("PVC has no StorageClass", "PVC", klog.KObj(pvc)) return "", "" } storageClass, err := pl.scLister.Get(scName) if err != nil { - klog.V(5).InfoS("Could not get StorageClass for PVC", "PVC", klog.KObj(pvc), "err", err) + logger.V(5).Info("Could not get StorageClass for PVC", "PVC", klog.KObj(pvc), "err", err) return "", "" } @@ -359,13 +361,13 @@ func (pl *CSILimits) getCSIDriverInfoFromSC(csiNode *storagev1.CSINode, pvc *v1. provisioner := storageClass.Provisioner if pl.translator.IsMigratableIntreePluginByName(provisioner) { if !isCSIMigrationOn(csiNode, provisioner) { - klog.V(5).InfoS("CSI Migration of provisioner is not enabled", "provisioner", provisioner) + logger.V(5).Info("CSI Migration of provisioner is not enabled", "provisioner", provisioner) return "", "" } driverName, err := pl.translator.GetCSINameFromInTreeName(provisioner) if err != nil { - klog.V(5).InfoS("Unable to look up driver name from provisioner name", "provisioner", provisioner, "err", err) + logger.V(5).Info("Unable to look up driver name from provisioner name", "provisioner", provisioner, "err", err) return "", "" } return driverName, volumeHandle @@ -375,7 +377,7 @@ func (pl *CSILimits) getCSIDriverInfoFromSC(csiNode *storagev1.CSINode, pvc *v1. } // NewCSI initializes a new plugin and returns it. -func NewCSI(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) { +func NewCSI(_ context.Context, _ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) { informerFactory := handle.SharedInformerFactory() pvLister := informerFactory.Core().V1().PersistentVolumes().Lister() pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister() diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi.go b/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi.go index 763c6f45ddd..150b21dbe3e 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi.go @@ -70,36 +70,36 @@ const ( const AzureDiskName = names.AzureDiskLimits // NewAzureDisk returns function that initializes a new plugin and returns it. -func NewAzureDisk(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) { +func NewAzureDisk(ctx context.Context, _ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) { informerFactory := handle.SharedInformerFactory() - return newNonCSILimitsWithInformerFactory(azureDiskVolumeFilterType, informerFactory, fts), nil + return newNonCSILimitsWithInformerFactory(ctx, azureDiskVolumeFilterType, informerFactory, fts), nil } // CinderName is the name of the plugin used in the plugin registry and configurations. const CinderName = names.CinderLimits // NewCinder returns function that initializes a new plugin and returns it. -func NewCinder(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) { +func NewCinder(ctx context.Context, _ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) { informerFactory := handle.SharedInformerFactory() - return newNonCSILimitsWithInformerFactory(cinderVolumeFilterType, informerFactory, fts), nil + return newNonCSILimitsWithInformerFactory(ctx, cinderVolumeFilterType, informerFactory, fts), nil } // EBSName is the name of the plugin used in the plugin registry and configurations. const EBSName = names.EBSLimits // NewEBS returns function that initializes a new plugin and returns it. -func NewEBS(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) { +func NewEBS(ctx context.Context, _ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) { informerFactory := handle.SharedInformerFactory() - return newNonCSILimitsWithInformerFactory(ebsVolumeFilterType, informerFactory, fts), nil + return newNonCSILimitsWithInformerFactory(ctx, ebsVolumeFilterType, informerFactory, fts), nil } // GCEPDName is the name of the plugin used in the plugin registry and configurations. const GCEPDName = names.GCEPDLimits // NewGCEPD returns function that initializes a new plugin and returns it. -func NewGCEPD(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) { +func NewGCEPD(ctx context.Context, _ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) { informerFactory := handle.SharedInformerFactory() - return newNonCSILimitsWithInformerFactory(gcePDVolumeFilterType, informerFactory, fts), nil + return newNonCSILimitsWithInformerFactory(ctx, gcePDVolumeFilterType, informerFactory, fts), nil } // nonCSILimits contains information to check the max number of volumes for a plugin. @@ -125,6 +125,7 @@ var _ framework.EnqueueExtensions = &nonCSILimits{} // newNonCSILimitsWithInformerFactory returns a plugin with filter name and informer factory. func newNonCSILimitsWithInformerFactory( + ctx context.Context, filterName string, informerFactory informers.SharedInformerFactory, fts feature.Features, @@ -134,7 +135,7 @@ func newNonCSILimitsWithInformerFactory( csiNodesLister := informerFactory.Storage().V1().CSINodes().Lister() scLister := informerFactory.Storage().V1().StorageClasses().Lister() - return newNonCSILimits(filterName, csiNodesLister, scLister, pvLister, pvcLister, fts) + return newNonCSILimits(ctx, filterName, csiNodesLister, scLister, pvLister, pvcLister, fts) } // newNonCSILimits creates a plugin which evaluates whether a pod can fit based on the @@ -148,6 +149,7 @@ func newNonCSILimitsWithInformerFactory( // types, counts the number of unique volumes, and rejects the new pod if it would place the total count over // the maximum. func newNonCSILimits( + ctx context.Context, filterName string, csiNodeLister storagelisters.CSINodeLister, scLister storagelisters.StorageClassLister, @@ -155,6 +157,7 @@ func newNonCSILimits( pvcLister corelisters.PersistentVolumeClaimLister, fts feature.Features, ) framework.Plugin { + logger := klog.FromContext(ctx) var filter VolumeFilter var volumeLimitKey v1.ResourceName var name string @@ -177,14 +180,14 @@ func newNonCSILimits( filter = cinderVolumeFilter volumeLimitKey = v1.ResourceName(volumeutil.CinderVolumeLimitKey) default: - klog.ErrorS(errors.New("wrong filterName"), "Cannot create nonCSILimits plugin") + logger.Error(errors.New("wrong filterName"), "Cannot create nonCSILimits plugin") return nil } pl := &nonCSILimits{ name: name, filter: filter, volumeLimitKey: volumeLimitKey, - maxVolumeFunc: getMaxVolumeFunc(filterName), + maxVolumeFunc: getMaxVolumeFunc(logger, filterName), csiNodeLister: csiNodeLister, pvLister: pvLister, pvcLister: pvcLister, @@ -238,8 +241,9 @@ func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod return nil } + logger := klog.FromContext(ctx) newVolumes := sets.New[string]() - if err := pl.filterVolumes(pod, true /* new pod */, newVolumes); err != nil { + if err := pl.filterVolumes(logger, pod, true /* new pod */, newVolumes); err != nil { return framework.AsStatus(err) } @@ -257,7 +261,7 @@ func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod if err != nil { // we don't fail here because the CSINode object is only necessary // for determining whether the migration is enabled or not - klog.V(5).InfoS("Could not get a CSINode object for the node", "node", klog.KObj(node), "err", err) + logger.V(5).Info("Could not get a CSINode object for the node", "node", klog.KObj(node), "err", err) } } @@ -269,7 +273,7 @@ func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod // count unique volumes existingVolumes := sets.New[string]() for _, existingPod := range nodeInfo.Pods { - if err := pl.filterVolumes(existingPod.Pod, false /* existing pod */, existingVolumes); err != nil { + if err := pl.filterVolumes(logger, existingPod.Pod, false /* existing pod */, existingVolumes); err != nil { return framework.AsStatus(err) } } @@ -293,7 +297,7 @@ func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod return nil } -func (pl *nonCSILimits) filterVolumes(pod *v1.Pod, newPod bool, filteredVolumes sets.Set[string]) error { +func (pl *nonCSILimits) filterVolumes(logger klog.Logger, pod *v1.Pod, newPod bool, filteredVolumes sets.Set[string]) error { volumes := pod.Spec.Volumes for i := range volumes { vol := &volumes[i] @@ -336,7 +340,7 @@ func (pl *nonCSILimits) filterVolumes(pod *v1.Pod, newPod bool, filteredVolumes } // If the PVC is invalid, we don't count the volume because // there's no guarantee that it belongs to the running predicate. - klog.V(4).InfoS("Unable to look up PVC info, assuming PVC doesn't match predicate when counting limits", "pod", klog.KObj(pod), "PVC", klog.KRef(pod.Namespace, pvcName), "err", err) + logger.V(4).Info("Unable to look up PVC info, assuming PVC doesn't match predicate when counting limits", "pod", klog.KObj(pod), "PVC", klog.KRef(pod.Namespace, pvcName), "err", err) continue } @@ -354,7 +358,7 @@ func (pl *nonCSILimits) filterVolumes(pod *v1.Pod, newPod bool, filteredVolumes // original PV where it was bound to, so we count the volume if // it belongs to the running predicate. if pl.matchProvisioner(pvc) { - klog.V(4).InfoS("PVC is not bound, assuming PVC matches predicate when counting limits", "pod", klog.KObj(pod), "PVC", klog.KRef(pod.Namespace, pvcName)) + logger.V(4).Info("PVC is not bound, assuming PVC matches predicate when counting limits", "pod", klog.KObj(pod), "PVC", klog.KRef(pod.Namespace, pvcName)) filteredVolumes.Insert(pvID) } continue @@ -365,7 +369,7 @@ func (pl *nonCSILimits) filterVolumes(pod *v1.Pod, newPod bool, filteredVolumes // If the PV is invalid and PVC belongs to the running predicate, // log the error and count the PV towards the PV limit. if pl.matchProvisioner(pvc) { - klog.V(4).InfoS("Unable to look up PV, assuming PV matches predicate when counting limits", "pod", klog.KObj(pod), "PVC", klog.KRef(pod.Namespace, pvcName), "PV", klog.KRef("", pvName), "err", err) + logger.V(4).Info("Unable to look up PV, assuming PV matches predicate when counting limits", "pod", klog.KObj(pod), "PVC", klog.KRef(pod.Namespace, pvcName), "PV", klog.KRef("", pvName), "err", err) filteredVolumes.Insert(pvID) } continue @@ -394,12 +398,12 @@ func (pl *nonCSILimits) matchProvisioner(pvc *v1.PersistentVolumeClaim) bool { } // getMaxVolLimitFromEnv checks the max PD volumes environment variable, otherwise returning a default value. -func getMaxVolLimitFromEnv() int { +func getMaxVolLimitFromEnv(logger klog.Logger) int { if rawMaxVols := os.Getenv(KubeMaxPDVols); rawMaxVols != "" { if parsedMaxVols, err := strconv.Atoi(rawMaxVols); err != nil { - klog.ErrorS(err, "Unable to parse maximum PD volumes value, using default") + logger.Error(err, "Unable to parse maximum PD volumes value, using default") } else if parsedMaxVols <= 0 { - klog.ErrorS(errors.New("maximum PD volumes is negative"), "Unable to parse maximum PD volumes value, using default") + logger.Error(errors.New("maximum PD volumes is negative"), "Unable to parse maximum PD volumes value, using default") } else { return parsedMaxVols } @@ -520,9 +524,9 @@ var cinderVolumeFilter = VolumeFilter{ }, } -func getMaxVolumeFunc(filterName string) func(node *v1.Node) int { +func getMaxVolumeFunc(logger klog.Logger, filterName string) func(node *v1.Node) int { return func(node *v1.Node) int { - maxVolumesFromEnv := getMaxVolLimitFromEnv() + maxVolumesFromEnv := getMaxVolLimitFromEnv(logger) if maxVolumesFromEnv > 0 { return maxVolumesFromEnv } diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi_test.go b/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi_test.go index 2b8f99c37f2..c34dd9630af 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi_test.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi_test.go @@ -28,6 +28,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" csilibplugins "k8s.io/csi-translation-lib/plugins" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" st "k8s.io/kubernetes/pkg/scheduler/testing" @@ -181,16 +182,17 @@ func TestEphemeralLimits(t *testing.T) { for _, test := range tests { t.Run(test.test, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) fts := feature.Features{} node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, test.maxVols, filterName) - p := newNonCSILimits(filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(filterName, driverName), getFakePVLister(filterName), append(getFakePVCLister(filterName), test.extraClaims...), fts).(framework.FilterPlugin) - _, gotPreFilterStatus := p.(*nonCSILimits).PreFilter(context.Background(), nil, test.newPod) + p := newNonCSILimits(ctx, filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(filterName, driverName), getFakePVLister(filterName), append(getFakePVCLister(filterName), test.extraClaims...), fts).(framework.FilterPlugin) + _, gotPreFilterStatus := p.(*nonCSILimits).PreFilter(ctx, nil, test.newPod) if diff := cmp.Diff(test.wantPreFilterStatus, gotPreFilterStatus); diff != "" { t.Errorf("PreFilter status does not match (-want, +got): %s", diff) } if gotPreFilterStatus.Code() != framework.Skip { - gotStatus := p.Filter(context.Background(), nil, test.newPod, node) + gotStatus := p.Filter(ctx, nil, test.newPod, node) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("Filter status does not match: %v, want: %v", gotStatus, test.wantStatus) } @@ -412,8 +414,9 @@ func TestAzureDiskLimits(t *testing.T) { for _, test := range tests { t.Run(test.test, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, test.maxVols, test.filterName) - p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName), feature.Features{}).(framework.FilterPlugin) + p := newNonCSILimits(ctx, test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName), feature.Features{}).(framework.FilterPlugin) _, gotPreFilterStatus := p.(*nonCSILimits).PreFilter(context.Background(), nil, test.newPod) if diff := cmp.Diff(test.wantPreFilterStatus, gotPreFilterStatus); diff != "" { t.Errorf("PreFilter status does not match (-want, +got): %s", diff) @@ -693,15 +696,16 @@ func TestEBSLimits(t *testing.T) { for _, test := range tests { t.Run(test.test, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, test.maxVols, test.filterName) - p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName), feature.Features{}).(framework.FilterPlugin) - _, gotPreFilterStatus := p.(*nonCSILimits).PreFilter(context.Background(), nil, test.newPod) + p := newNonCSILimits(ctx, test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName), feature.Features{}).(framework.FilterPlugin) + _, gotPreFilterStatus := p.(*nonCSILimits).PreFilter(ctx, nil, test.newPod) if diff := cmp.Diff(test.wantPreFilterStatus, gotPreFilterStatus); diff != "" { t.Errorf("PreFilter status does not match (-want, +got): %s", diff) } if gotPreFilterStatus.Code() != framework.Skip { - gotStatus := p.Filter(context.Background(), nil, test.newPod, node) + gotStatus := p.Filter(ctx, nil, test.newPod, node) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("Filter status does not match: %v, want: %v", gotStatus, test.wantStatus) } @@ -923,8 +927,9 @@ func TestGCEPDLimits(t *testing.T) { for _, test := range tests { t.Run(test.test, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) node, csiNode := getNodeWithPodAndVolumeLimits("node", test.existingPods, test.maxVols, test.filterName) - p := newNonCSILimits(test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName), feature.Features{}).(framework.FilterPlugin) + p := newNonCSILimits(ctx, test.filterName, getFakeCSINodeLister(csiNode), getFakeCSIStorageClassLister(test.filterName, test.driverName), getFakePVLister(test.filterName), getFakePVCLister(test.filterName), feature.Features{}).(framework.FilterPlugin) _, gotPreFilterStatus := p.(*nonCSILimits).PreFilter(context.Background(), nil, test.newPod) if diff := cmp.Diff(test.wantPreFilterStatus, gotPreFilterStatus); diff != "" { t.Errorf("PreFilter status does not match (-want, +got): %s", diff) @@ -965,8 +970,9 @@ func TestGetMaxVols(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) t.Setenv(KubeMaxPDVols, test.rawMaxVols) - result := getMaxVolLimitFromEnv() + result := getMaxVolLimitFromEnv(logger) if result != test.expected { t.Errorf("expected %v got %v", test.expected, result) } diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go b/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go index 17803e4ee0f..982da6875a2 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go @@ -17,6 +17,7 @@ limitations under the License. package podtopologyspread import ( + "context" "fmt" v1 "k8s.io/api/core/v1" @@ -82,7 +83,7 @@ func (pl *PodTopologySpread) Name() string { } // New initializes a new plugin and returns it. -func New(plArgs runtime.Object, h framework.Handle, fts feature.Features) (framework.Plugin, error) { +func New(_ context.Context, plArgs runtime.Object, h framework.Handle, fts feature.Features) (framework.Plugin, error) { if h.SnapshotSharedLister() == nil { return nil, fmt.Errorf("SnapshotSharedlister is nil") } diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go b/pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go index fe24df8b45c..ed3d3d15c27 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/scoring_test.go @@ -95,7 +95,7 @@ func TestPreScoreSkip(t *testing.T) { if err != nil { t.Fatalf("Failed creating framework runtime: %v", err) } - pl, err := New(&tt.config, f, feature.Features{}) + pl, err := New(ctx, &tt.config, f, feature.Features{}) if err != nil { t.Fatalf("Failed creating plugin: %v", err) } @@ -103,7 +103,7 @@ func TestPreScoreSkip(t *testing.T) { informerFactory.WaitForCacheSync(ctx.Done()) p := pl.(*PodTopologySpread) cs := framework.NewCycleState() - if s := p.PreScore(context.Background(), cs, tt.pod, tt.nodes); !s.IsSkip() { + if s := p.PreScore(ctx, cs, tt.pod, tt.nodes); !s.IsSkip() { t.Fatalf("Expected skip but got %v", s.AsError()) } }) @@ -582,7 +582,7 @@ func TestPreScoreStateEmptyNodes(t *testing.T) { if err != nil { t.Fatalf("Failed creating framework runtime: %v", err) } - pl, err := New(&tt.config, f, feature.Features{EnableNodeInclusionPolicyInPodTopologySpread: tt.enableNodeInclusionPolicy}) + pl, err := New(ctx, &tt.config, f, feature.Features{EnableNodeInclusionPolicyInPodTopologySpread: tt.enableNodeInclusionPolicy}) if err != nil { t.Fatalf("Failed creating plugin: %v", err) } @@ -1336,7 +1336,8 @@ func TestPodTopologySpreadScore(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) t.Cleanup(cancel) allNodes := append([]*v1.Node{}, tt.nodes...) allNodes = append(allNodes, tt.failedNodes...) @@ -1346,7 +1347,7 @@ func TestPodTopologySpreadScore(t *testing.T) { p.enableNodeInclusionPolicyInPodTopologySpread = tt.enableNodeInclusionPolicy p.enableMatchLabelKeysInPodTopologySpread = tt.enableMatchLabelKeys - status := p.PreScore(context.Background(), state, tt.pod, tt.nodes) + status := p.PreScore(ctx, state, tt.pod, tt.nodes) if !status.IsSuccess() { t.Errorf("unexpected error: %v", status) } diff --git a/pkg/scheduler/framework/plugins/queuesort/priority_sort.go b/pkg/scheduler/framework/plugins/queuesort/priority_sort.go index 43eaa74bd2a..1a6cfde4140 100644 --- a/pkg/scheduler/framework/plugins/queuesort/priority_sort.go +++ b/pkg/scheduler/framework/plugins/queuesort/priority_sort.go @@ -17,6 +17,7 @@ limitations under the License. package queuesort import ( + "context" "k8s.io/apimachinery/pkg/runtime" corev1helpers "k8s.io/component-helpers/scheduling/corev1" "k8s.io/kubernetes/pkg/scheduler/framework" @@ -46,6 +47,6 @@ func (pl *PrioritySort) Less(pInfo1, pInfo2 *framework.QueuedPodInfo) bool { } // New initializes a new plugin and returns it. -func New(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) { +func New(_ context.Context, _ runtime.Object, handle framework.Handle) (framework.Plugin, error) { return &PrioritySort{}, nil } diff --git a/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go b/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go index 5c0678cb0d8..1b13e0dd503 100644 --- a/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go +++ b/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go @@ -62,6 +62,6 @@ func (pl *SchedulingGates) EventsToRegister() []framework.ClusterEventWithHint { } // New initializes a new plugin and returns it. -func New(_ runtime.Object, _ framework.Handle, fts feature.Features) (framework.Plugin, error) { +func New(_ context.Context, _ runtime.Object, _ framework.Handle, fts feature.Features) (framework.Plugin, error) { return &SchedulingGates{enablePodSchedulingReadiness: fts.EnablePodSchedulingReadiness}, nil } diff --git a/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates_test.go b/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates_test.go index 670989a1eb9..b4bbc2a72f4 100644 --- a/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates_test.go +++ b/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates_test.go @@ -17,7 +17,6 @@ limitations under the License. package schedulinggates import ( - "context" "testing" "github.com/google/go-cmp/cmp" @@ -26,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" st "k8s.io/kubernetes/pkg/scheduler/testing" + "k8s.io/kubernetes/test/utils/ktesting" ) func TestPreEnqueue(t *testing.T) { @@ -63,12 +63,13 @@ func TestPreEnqueue(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - p, err := New(nil, nil, feature.Features{EnablePodSchedulingReadiness: tt.enablePodSchedulingReadiness}) + _, ctx := ktesting.NewTestContext(t) + p, err := New(ctx, nil, nil, feature.Features{EnablePodSchedulingReadiness: tt.enablePodSchedulingReadiness}) if err != nil { t.Fatalf("Creating plugin: %v", err) } - got := p.(framework.PreEnqueuePlugin).PreEnqueue(context.Background(), tt.pod) + got := p.(framework.PreEnqueuePlugin).PreEnqueue(ctx, tt.pod) if diff := cmp.Diff(tt.want, got); diff != "" { t.Errorf("unexpected status (-want, +got):\n%s", diff) } diff --git a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go index 9d1bbe85caf..edec5bd1b0a 100644 --- a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go +++ b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go @@ -164,6 +164,6 @@ func (pl *TaintToleration) ScoreExtensions() framework.ScoreExtensions { } // New initializes a new plugin and returns it. -func New(_ runtime.Object, h framework.Handle) (framework.Plugin, error) { +func New(_ context.Context, _ runtime.Object, h framework.Handle) (framework.Plugin, error) { return &TaintToleration{handle: h}, nil } diff --git a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration_test.go b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration_test.go index fca4262d815..bdeb23cf759 100644 --- a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration_test.go +++ b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration_test.go @@ -237,7 +237,10 @@ func TestTaintTolerationScore(t *testing.T) { snapshot := cache.NewSnapshot(nil, test.nodes) fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot)) - p, _ := New(nil, fh) + p, err := New(ctx, nil, fh) + if err != nil { + t.Fatalf("creating plugin: %v", err) + } status := p.(framework.PreScorePlugin).PreScore(ctx, state, test.pod, test.nodes) if !status.IsSuccess() { t.Errorf("unexpected error: %v", status) @@ -335,10 +338,14 @@ func TestTaintTolerationFilter(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) nodeInfo := framework.NewNodeInfo() nodeInfo.SetNode(test.node) - p, _ := New(nil, nil) - gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, nodeInfo) + p, err := New(ctx, nil, nil) + if err != nil { + t.Fatalf("creating plugin: %v", err) + } + gotStatus := p.(framework.FilterPlugin).Filter(ctx, nil, test.pod, nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } diff --git a/pkg/scheduler/framework/plugins/testing/testing.go b/pkg/scheduler/framework/plugins/testing/testing.go index 2a525deb09f..1e7d4b34856 100644 --- a/pkg/scheduler/framework/plugins/testing/testing.go +++ b/pkg/scheduler/framework/plugins/testing/testing.go @@ -49,7 +49,7 @@ func SetupPluginWithInformers( if err != nil { tb.Fatalf("Failed creating framework runtime: %v", err) } - p, err := pf(config, fh) + p, err := pf(ctx, config, fh) if err != nil { tb.Fatal(err) } @@ -72,7 +72,7 @@ func SetupPlugin( if err != nil { tb.Fatalf("Failed creating framework runtime: %v", err) } - p, err := pf(config, fh) + p, err := pf(ctx, config, fh) if err != nil { tb.Fatal(err) } diff --git a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go index 66756b7af13..c54e4b89e3e 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go +++ b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go @@ -361,7 +361,7 @@ func (pl *VolumeBinding) Unreserve(ctx context.Context, cs *framework.CycleState } // New initializes a new plugin and returns it. -func New(plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) { +func New(_ context.Context, plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) { args, ok := plArgs.(*config.VolumeBindingArgs) if !ok { return nil, fmt.Errorf("want args to be of type VolumeBindingArgs, got %T", plArgs) diff --git a/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go b/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go index b17244a0491..96dc2096d28 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go +++ b/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go @@ -806,7 +806,7 @@ func TestVolumeBinding(t *testing.T) { } } - pl, err := New(args, fh, item.fts) + pl, err := New(ctx, args, fh, item.fts) if err != nil { t.Fatal(err) } diff --git a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go index cfe883bd83f..f0d25013b07 100644 --- a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go +++ b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go @@ -348,7 +348,7 @@ func (pl *VolumeRestrictions) EventsToRegister() []framework.ClusterEventWithHin } // New initializes a new plugin and returns it. -func New(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) { +func New(_ context.Context, _ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) { informerFactory := handle.SharedInformerFactory() pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister() sharedLister := handle.SnapshotSharedLister() diff --git a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go index a78ab1e3b3a..b2b918a149c 100644 --- a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go +++ b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go @@ -505,8 +505,8 @@ func newPlugin(ctx context.Context, t *testing.T) framework.Plugin { } func newPluginWithListers(ctx context.Context, t *testing.T, pods []*v1.Pod, nodes []*v1.Node, pvcs []*v1.PersistentVolumeClaim, enableReadWriteOncePod bool) framework.Plugin { - pluginFactory := func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) { - return New(plArgs, fh, feature.Features{ + pluginFactory := func(ctx context.Context, plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) { + return New(ctx, plArgs, fh, feature.Features{ EnableReadWriteOncePod: enableReadWriteOncePod, }) } diff --git a/pkg/scheduler/framework/plugins/volumezone/volume_zone.go b/pkg/scheduler/framework/plugins/volumezone/volume_zone.go index b014130b64a..c1893aa63cc 100644 --- a/pkg/scheduler/framework/plugins/volumezone/volume_zone.go +++ b/pkg/scheduler/framework/plugins/volumezone/volume_zone.go @@ -290,7 +290,7 @@ func (pl *VolumeZone) EventsToRegister() []framework.ClusterEventWithHint { } // New initializes a new plugin and returns it. -func New(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) { +func New(_ context.Context, _ runtime.Object, handle framework.Handle) (framework.Plugin, error) { informerFactory := handle.SharedInformerFactory() pvLister := informerFactory.Core().V1().PersistentVolumes().Lister() pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister() diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 168f07024a7..4d4588ef5b8 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -302,7 +302,7 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler Args: args, }) } - p, err := factory(args, f) + p, err := factory(ctx, args, f) if err != nil { return nil, fmt.Errorf("initializing plugin %q: %w", name, err) } diff --git a/pkg/scheduler/framework/runtime/framework_test.go b/pkg/scheduler/framework/runtime/framework_test.go index a99f92f3f38..f5ae23dfd5e 100644 --- a/pkg/scheduler/framework/runtime/framework_test.go +++ b/pkg/scheduler/framework/runtime/framework_test.go @@ -76,7 +76,7 @@ var cmpOpts = []cmp.Option{ }), } -func newScoreWithNormalizePlugin1(injArgs runtime.Object, f framework.Handle) (framework.Plugin, error) { +func newScoreWithNormalizePlugin1(_ context.Context, injArgs runtime.Object, f framework.Handle) (framework.Plugin, error) { var inj injectedResult if err := DecodeInto(injArgs, &inj); err != nil { return nil, err @@ -84,7 +84,7 @@ func newScoreWithNormalizePlugin1(injArgs runtime.Object, f framework.Handle) (f return &TestScoreWithNormalizePlugin{scoreWithNormalizePlugin1, inj}, nil } -func newScoreWithNormalizePlugin2(injArgs runtime.Object, f framework.Handle) (framework.Plugin, error) { +func newScoreWithNormalizePlugin2(_ context.Context, injArgs runtime.Object, f framework.Handle) (framework.Plugin, error) { var inj injectedResult if err := DecodeInto(injArgs, &inj); err != nil { return nil, err @@ -92,7 +92,7 @@ func newScoreWithNormalizePlugin2(injArgs runtime.Object, f framework.Handle) (f return &TestScoreWithNormalizePlugin{scoreWithNormalizePlugin2, inj}, nil } -func newScorePlugin1(injArgs runtime.Object, f framework.Handle) (framework.Plugin, error) { +func newScorePlugin1(_ context.Context, injArgs runtime.Object, f framework.Handle) (framework.Plugin, error) { var inj injectedResult if err := DecodeInto(injArgs, &inj); err != nil { return nil, err @@ -100,7 +100,7 @@ func newScorePlugin1(injArgs runtime.Object, f framework.Handle) (framework.Plug return &TestScorePlugin{scorePlugin1, inj}, nil } -func newPluginNotImplementingScore(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { +func newPluginNotImplementingScore(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return &PluginNotImplementingScore{}, nil } @@ -154,7 +154,7 @@ func (pl *PluginNotImplementingScore) Name() string { return pluginNotImplementingScore } -func newTestPlugin(injArgs runtime.Object, f framework.Handle) (framework.Plugin, error) { +func newTestPlugin(_ context.Context, injArgs runtime.Object, f framework.Handle) (framework.Plugin, error) { return &TestPlugin{name: testPlugin}, nil } @@ -296,7 +296,7 @@ func (dp *TestDuplicatePlugin) PreFilterExtensions() framework.PreFilterExtensio var _ framework.PreFilterPlugin = &TestDuplicatePlugin{} -func newDuplicatePlugin(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { +func newDuplicatePlugin(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return &TestDuplicatePlugin{}, nil } @@ -326,7 +326,7 @@ func (pl *TestPreEnqueuePlugin) PreEnqueue(ctx context.Context, p *v1.Pod) *fram var _ framework.QueueSortPlugin = &TestQueueSortPlugin{} -func newQueueSortPlugin(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { +func newQueueSortPlugin(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return &TestQueueSortPlugin{}, nil } @@ -343,7 +343,7 @@ func (pl *TestQueueSortPlugin) Less(_, _ *framework.QueuedPodInfo) bool { var _ framework.BindPlugin = &TestBindPlugin{} -func newBindPlugin(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { +func newBindPlugin(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return &TestBindPlugin{}, nil } @@ -881,7 +881,7 @@ func TestPreEnqueuePlugins(t *testing.T) { // register all plugins tmpPl := pl if err := registry.Register(pl.Name(), - func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return tmpPl, nil }); err != nil { t.Fatalf("fail to register preEnqueue plugin (%s)", pl.Name()) @@ -1004,9 +1004,11 @@ func TestRunPreScorePlugins(t *testing.T) { for i, p := range tt.plugins { p := p enabled[i].Name = p.name - r.Register(p.name, func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { + if err := r.Register(p.name, func(_ context.Context, _ runtime.Object, fh framework.Handle) (framework.Plugin, error) { return p, nil - }) + }); err != nil { + t.Fatalf("fail to register PreScorePlugins plugin (%s)", p.Name()) + } } ctx, cancel := context.WithCancel(context.Background()) @@ -1425,11 +1427,11 @@ func TestPreFilterPlugins(t *testing.T) { preFilter2 := &TestPreFilterWithExtensionsPlugin{} r := make(Registry) r.Register(preFilterPluginName, - func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { + func(_ context.Context, _ runtime.Object, fh framework.Handle) (framework.Plugin, error) { return preFilter1, nil }) r.Register(preFilterWithExtensionsPluginName, - func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { + func(_ context.Context, _ runtime.Object, fh framework.Handle) (framework.Plugin, error) { return preFilter2, nil }) plugins := &config.Plugins{PreFilter: config.PluginSet{Enabled: []config.Plugin{{Name: preFilterWithExtensionsPluginName}, {Name: preFilterPluginName}}}} @@ -1563,9 +1565,11 @@ func TestRunPreFilterPlugins(t *testing.T) { for i, p := range tt.plugins { p := p enabled[i].Name = p.name - r.Register(p.name, func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { + if err := r.Register(p.name, func(_ context.Context, _ runtime.Object, fh framework.Handle) (framework.Plugin, error) { return p, nil - }) + }); err != nil { + t.Fatalf("fail to register PreFilter plugin (%s)", p.Name()) + } } ctx, cancel := context.WithCancel(context.Background()) @@ -1651,9 +1655,11 @@ func TestRunPreFilterExtensionRemovePod(t *testing.T) { for i, p := range tt.plugins { p := p enabled[i].Name = p.name - r.Register(p.name, func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { + if err := r.Register(p.name, func(_ context.Context, _ runtime.Object, fh framework.Handle) (framework.Plugin, error) { return p, nil - }) + }); err != nil { + t.Fatalf("fail to register PreFilterExtension plugin (%s)", p.Name()) + } } ctx, cancel := context.WithCancel(context.Background()) @@ -1733,9 +1739,11 @@ func TestRunPreFilterExtensionAddPod(t *testing.T) { for i, p := range tt.plugins { p := p enabled[i].Name = p.name - r.Register(p.name, func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { + if err := r.Register(p.name, func(_ context.Context, _ runtime.Object, fh framework.Handle) (framework.Plugin, error) { return p, nil - }) + }); err != nil { + t.Fatalf("fail to register PreFilterExtension plugin (%s)", p.Name()) + } } ctx, cancel := context.WithCancel(context.Background()) @@ -1934,7 +1942,7 @@ func TestFilterPlugins(t *testing.T) { // register all plugins tmpPl := pl if err := registry.Register(pl.name, - func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return tmpPl, nil }); err != nil { t.Fatalf("fail to register filter plugin (%s)", pl.name) @@ -2058,7 +2066,7 @@ func TestPostFilterPlugins(t *testing.T) { // register all plugins tmpPl := pl if err := registry.Register(pl.name, - func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return tmpPl, nil }); err != nil { t.Fatalf("fail to register postFilter plugin (%s)", pl.name) @@ -2190,7 +2198,7 @@ func TestFilterPluginsWithNominatedPods(t *testing.T) { if tt.preFilterPlugin != nil { if err := registry.Register(tt.preFilterPlugin.name, - func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return tt.preFilterPlugin, nil }); err != nil { t.Fatalf("fail to register preFilter plugin (%s)", tt.preFilterPlugin.name) @@ -2202,7 +2210,7 @@ func TestFilterPluginsWithNominatedPods(t *testing.T) { } if tt.filterPlugin != nil { if err := registry.Register(tt.filterPlugin.name, - func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return tt.filterPlugin, nil }); err != nil { t.Fatalf("fail to register filter plugin (%s)", tt.filterPlugin.name) @@ -2366,7 +2374,7 @@ func TestPreBindPlugins(t *testing.T) { for _, pl := range tt.plugins { tmpPl := pl - if err := registry.Register(pl.name, func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + if err := registry.Register(pl.name, func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return tmpPl, nil }); err != nil { t.Fatalf("Unable to register pre bind plugins: %s", pl.name) @@ -2524,7 +2532,7 @@ func TestReservePlugins(t *testing.T) { for _, pl := range tt.plugins { tmpPl := pl - if err := registry.Register(pl.name, func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + if err := registry.Register(pl.name, func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return tmpPl, nil }); err != nil { t.Fatalf("Unable to register pre bind plugins: %s", pl.name) @@ -2650,7 +2658,7 @@ func TestPermitPlugins(t *testing.T) { for _, pl := range tt.plugins { tmpPl := pl - if err := registry.Register(pl.name, func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + if err := registry.Register(pl.name, func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return tmpPl, nil }); err != nil { t.Fatalf("Unable to register Permit plugin: %s", pl.name) @@ -2817,7 +2825,7 @@ func TestRecordingMetrics(t *testing.T) { plugin := &TestPlugin{name: testPlugin, inj: tt.inject} r := make(Registry) r.Register(testPlugin, - func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { + func(_ context.Context, _ runtime.Object, fh framework.Handle) (framework.Plugin, error) { return plugin, nil }) pluginSet := config.PluginSet{Enabled: []config.Plugin{{Name: testPlugin, Weight: 1}}} @@ -2941,7 +2949,7 @@ func TestRunBindPlugins(t *testing.T) { name := fmt.Sprintf("bind-%d", i) plugin := &TestPlugin{name: name, inj: injectedResult{BindStatus: int(inj)}} r.Register(name, - func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { + func(_ context.Context, _ runtime.Object, fh framework.Handle) (framework.Plugin, error) { return plugin, nil }) pluginSet.Enabled = append(pluginSet.Enabled, config.Plugin{Name: name}) @@ -3000,7 +3008,7 @@ func TestPermitWaitDurationMetric(t *testing.T) { plugin := &TestPlugin{name: testPlugin, inj: tt.inject} r := make(Registry) err := r.Register(testPlugin, - func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { + func(_ context.Context, _ runtime.Object, fh framework.Handle) (framework.Plugin, error) { return plugin, nil }) if err != nil { @@ -3059,7 +3067,7 @@ func TestWaitOnPermit(t *testing.T) { testPermitPlugin := &TestPermitPlugin{} r := make(Registry) r.Register(permitPlugin, - func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { + func(_ context.Context, _ runtime.Object, fh framework.Handle) (framework.Plugin, error) { return testPermitPlugin, nil }) plugins := &config.Plugins{ diff --git a/pkg/scheduler/framework/runtime/registry.go b/pkg/scheduler/framework/runtime/registry.go index b0b1d855751..b6fca3c29cd 100644 --- a/pkg/scheduler/framework/runtime/registry.go +++ b/pkg/scheduler/framework/runtime/registry.go @@ -17,6 +17,7 @@ limitations under the License. package runtime import ( + "context" "fmt" "k8s.io/apimachinery/pkg/runtime" @@ -27,16 +28,16 @@ import ( ) // PluginFactory is a function that builds a plugin. -type PluginFactory = func(configuration runtime.Object, f framework.Handle) (framework.Plugin, error) +type PluginFactory = func(ctx context.Context, configuration runtime.Object, f framework.Handle) (framework.Plugin, error) // PluginFactoryWithFts is a function that builds a plugin with certain feature gates. -type PluginFactoryWithFts func(runtime.Object, framework.Handle, plfeature.Features) (framework.Plugin, error) +type PluginFactoryWithFts func(context.Context, runtime.Object, framework.Handle, plfeature.Features) (framework.Plugin, error) // FactoryAdapter can be used to inject feature gates for a plugin that needs // them when the caller expects the older PluginFactory method. func FactoryAdapter(fts plfeature.Features, withFts PluginFactoryWithFts) PluginFactory { - return func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) { - return withFts(plArgs, fh, fts) + return func(ctx context.Context, plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) { + return withFts(ctx, plArgs, fh, fts) } } diff --git a/pkg/scheduler/framework/runtime/registry_test.go b/pkg/scheduler/framework/runtime/registry_test.go index 9ace42394b1..f3182d668eb 100644 --- a/pkg/scheduler/framework/runtime/registry_test.go +++ b/pkg/scheduler/framework/runtime/registry_test.go @@ -17,6 +17,7 @@ limitations under the License. package runtime import ( + "context" "reflect" "testing" @@ -78,8 +79,8 @@ func TestDecodeInto(t *testing.T) { func isRegistryEqual(registryX, registryY Registry) bool { for name, pluginFactory := range registryY { if val, ok := registryX[name]; ok { - p1, _ := pluginFactory(nil, nil) - p2, _ := val(nil, nil) + p1, _ := pluginFactory(nil, nil, nil) + p2, _ := val(nil, nil, nil) if p1.Name() != p2.Name() { // pluginFactory functions are not the same. return false @@ -110,7 +111,7 @@ func (p *mockNoopPlugin) Name() string { func NewMockNoopPluginFactory() PluginFactory { uuid := uuid.New().String() - return func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return &mockNoopPlugin{uuid}, nil } } diff --git a/pkg/scheduler/profile/profile_test.go b/pkg/scheduler/profile/profile_test.go index 6d0201bb24a..34eb0edbddc 100644 --- a/pkg/scheduler/profile/profile_test.go +++ b/pkg/scheduler/profile/profile_test.go @@ -280,8 +280,8 @@ func (p *fakePlugin) Bind(context.Context, *framework.CycleState, *v1.Pod, strin return nil } -func newFakePlugin(name string) func(object runtime.Object, handle framework.Handle) (framework.Plugin, error) { - return func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { +func newFakePlugin(name string) func(ctx context.Context, object runtime.Object, handle framework.Handle) (framework.Plugin, error) { + return func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return &fakePlugin{name: name}, nil } } diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index bbf68c12d42..45aa2f9d8c7 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -141,7 +141,7 @@ func (f *fakeExtender) IsInterested(pod *v1.Pod) bool { type falseMapPlugin struct{} func newFalseMapPlugin() frameworkruntime.PluginFactory { - return func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return &falseMapPlugin{}, nil } } @@ -161,7 +161,7 @@ func (pl *falseMapPlugin) ScoreExtensions() framework.ScoreExtensions { type numericMapPlugin struct{} func newNumericMapPlugin() frameworkruntime.PluginFactory { - return func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return &numericMapPlugin{}, nil } } @@ -183,7 +183,7 @@ func (pl *numericMapPlugin) ScoreExtensions() framework.ScoreExtensions { } // NewNoPodsFilterPlugin initializes a noPodsFilterPlugin and returns it. -func NewNoPodsFilterPlugin(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { +func NewNoPodsFilterPlugin(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return &noPodsFilterPlugin{}, nil } @@ -223,7 +223,7 @@ func (pl *reverseNumericMapPlugin) NormalizeScore(_ context.Context, _ *framewor } func newReverseNumericMapPlugin() frameworkruntime.PluginFactory { - return func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return &reverseNumericMapPlugin{}, nil } } @@ -252,7 +252,7 @@ func (pl *trueMapPlugin) NormalizeScore(_ context.Context, _ *framework.CycleSta } func newTrueMapPlugin() frameworkruntime.PluginFactory { - return func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return &trueMapPlugin{}, nil } } @@ -291,7 +291,7 @@ func (s *fakeNodeSelector) Filter(_ context.Context, _ *framework.CycleState, _ return nil } -func newFakeNodeSelector(args runtime.Object, _ framework.Handle) (framework.Plugin, error) { +func newFakeNodeSelector(_ context.Context, args runtime.Object, _ framework.Handle) (framework.Plugin, error) { pl := &fakeNodeSelector{} if err := frameworkruntime.DecodeInto(args, &pl.fakeNodeSelectorArgs); err != nil { return nil, err @@ -333,7 +333,7 @@ func (f *fakeNodeSelectorDependOnPodAnnotation) Filter(_ context.Context, _ *fra return nil } -func newFakeNodeSelectorDependOnPodAnnotation(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { +func newFakeNodeSelectorDependOnPodAnnotation(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return &fakeNodeSelectorDependOnPodAnnotation{}, nil } @@ -2257,7 +2257,7 @@ func TestSchedulerSchedulePod(t *testing.T) { "node1": framework.Unschedulable, }), ), - tf.RegisterPluginAsExtensions("FakeFilter2", func(configuration runtime.Object, f framework.Handle) (framework.Plugin, error) { + tf.RegisterPluginAsExtensions("FakeFilter2", func(_ context.Context, configuration runtime.Object, f framework.Handle) (framework.Plugin, error) { return tf.FakePreFilterAndFilterPlugin{ FakePreFilterPlugin: &tf.FakePreFilterPlugin{ Result: nil, @@ -2488,7 +2488,7 @@ func TestFindFitPredicateCallCounts(t *testing.T) { plugin := tf.FakeFilterPlugin{} registerFakeFilterFunc := tf.RegisterFilterPlugin( "FakeFilter", - func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { + func(_ context.Context, _ runtime.Object, fh framework.Handle) (framework.Plugin, error) { return &plugin, nil }, ) @@ -3063,7 +3063,7 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) { plugin := tf.FakeFilterPlugin{FailedNodeReturnCodeMap: test.nodeReturnCodeMap} registerFakeFilterFunc := tf.RegisterFilterPlugin( "FakeFilter", - func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { + func(_ context.Context, _ runtime.Object, fh framework.Handle) (framework.Plugin, error) { return &plugin, nil }, ) @@ -3279,7 +3279,7 @@ func setupTestSchedulerWithVolumeBinding(ctx context.Context, t *testing.T, volu fns := []tf.RegisterPluginFunc{ tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), - tf.RegisterPluginAsExtensions(volumebinding.Name, func(plArgs runtime.Object, handle framework.Handle) (framework.Plugin, error) { + tf.RegisterPluginAsExtensions(volumebinding.Name, func(ctx context.Context, plArgs runtime.Object, handle framework.Handle) (framework.Plugin, error) { return &volumebinding.VolumeBinding{Binder: volumeBinder, PVCLister: pvcInformer.Lister()}, nil }, "PreFilter", "Filter", "Reserve", "PreBind"), } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index bc85e70eaeb..57e9aa96ce6 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -538,7 +538,7 @@ func TestInitPluginsWithIndexers(t *testing.T) { { name: "register indexer, no conflicts", entrypoints: map[string]frameworkruntime.PluginFactory{ - "AddIndexer": func(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { + "AddIndexer": func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { podInformer := handle.SharedInformerFactory().Core().V1().Pods() err := podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{ "nodeName": indexByPodSpecNodeName, @@ -551,14 +551,14 @@ func TestInitPluginsWithIndexers(t *testing.T) { name: "register the same indexer name multiple times, conflict", // order of registration doesn't matter entrypoints: map[string]frameworkruntime.PluginFactory{ - "AddIndexer1": func(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { + "AddIndexer1": func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { podInformer := handle.SharedInformerFactory().Core().V1().Pods() err := podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{ "nodeName": indexByPodSpecNodeName, }) return &TestPlugin{name: "AddIndexer1"}, err }, - "AddIndexer2": func(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { + "AddIndexer2": func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { podInformer := handle.SharedInformerFactory().Core().V1().Pods() err := podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{ "nodeName": indexByPodAnnotationNodeName, @@ -572,14 +572,14 @@ func TestInitPluginsWithIndexers(t *testing.T) { name: "register the same indexer body with different names, no conflicts", // order of registration doesn't matter entrypoints: map[string]frameworkruntime.PluginFactory{ - "AddIndexer1": func(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { + "AddIndexer1": func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { podInformer := handle.SharedInformerFactory().Core().V1().Pods() err := podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{ "nodeName1": indexByPodSpecNodeName, }) return &TestPlugin{name: "AddIndexer1"}, err }, - "AddIndexer2": func(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { + "AddIndexer2": func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { podInformer := handle.SharedInformerFactory().Core().V1().Pods() err := podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{ "nodeName2": indexByPodAnnotationNodeName, @@ -819,13 +819,15 @@ func Test_buildQueueingHintMap(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, !tt.featuregateDisabled)() - logger, _ := ktesting.NewTestContext(t) + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() registry := frameworkruntime.Registry{} cfgPls := &schedulerapi.Plugins{} plugins := append(tt.plugins, &fakebindPlugin{}, &fakeQueueSortPlugin{}) for _, pl := range plugins { tmpPl := pl - if err := registry.Register(pl.Name(), func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + if err := registry.Register(pl.Name(), func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return tmpPl, nil }); err != nil { t.Fatalf("fail to register filter plugin (%s)", pl.Name()) @@ -834,9 +836,7 @@ func Test_buildQueueingHintMap(t *testing.T) { } profile := schedulerapi.KubeSchedulerProfile{Plugins: cfgPls} - stopCh := make(chan struct{}) - defer close(stopCh) - fwk, err := newFramework(registry, profile, stopCh) + fwk, err := newFramework(ctx, registry, profile) if err != nil { t.Fatal(err) } @@ -1010,13 +1010,16 @@ func Test_UnionedGVKs(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() registry := plugins.NewInTreeRegistry() cfgPls := &schedulerapi.Plugins{MultiPoint: tt.plugins} plugins := []framework.Plugin{&fakeNodePlugin{}, &fakePodPlugin{}, &fakeNoopPlugin{}, &fakeNoopRuntimePlugin{}, &fakeQueueSortPlugin{}, &fakebindPlugin{}} for _, pl := range plugins { tmpPl := pl - if err := registry.Register(pl.Name(), func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + if err := registry.Register(pl.Name(), func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return tmpPl, nil }); err != nil { t.Fatalf("fail to register filter plugin (%s)", pl.Name()) @@ -1024,9 +1027,7 @@ func Test_UnionedGVKs(t *testing.T) { } profile := schedulerapi.KubeSchedulerProfile{Plugins: cfgPls, PluginConfig: defaults.PluginConfigsV1} - stopCh := make(chan struct{}) - defer close(stopCh) - fwk, err := newFramework(registry, profile, stopCh) + fwk, err := newFramework(ctx, registry, profile) if err != nil { t.Fatal(err) } @@ -1043,8 +1044,8 @@ func Test_UnionedGVKs(t *testing.T) { } } -func newFramework(r frameworkruntime.Registry, profile schedulerapi.KubeSchedulerProfile, stopCh <-chan struct{}) (framework.Framework, error) { - return frameworkruntime.NewFramework(context.Background(), r, &profile, +func newFramework(ctx context.Context, r frameworkruntime.Registry, profile schedulerapi.KubeSchedulerProfile) (framework.Framework, error) { + return frameworkruntime.NewFramework(ctx, r, &profile, frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(nil, nil)), frameworkruntime.WithInformerFactory(informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0)), ) diff --git a/pkg/scheduler/testing/framework/fake_extender.go b/pkg/scheduler/testing/framework/fake_extender.go index 5cad19a3bed..42d8dfad948 100644 --- a/pkg/scheduler/testing/framework/fake_extender.go +++ b/pkg/scheduler/testing/framework/fake_extender.go @@ -112,7 +112,7 @@ type node2PrioritizerPlugin struct{} // NewNode2PrioritizerPlugin returns a factory function to build node2PrioritizerPlugin. func NewNode2PrioritizerPlugin() frameworkruntime.PluginFactory { - return func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return &node2PrioritizerPlugin{}, nil } } diff --git a/pkg/scheduler/testing/framework/fake_plugins.go b/pkg/scheduler/testing/framework/fake_plugins.go index 61d1b870c79..d4c3b480b8d 100644 --- a/pkg/scheduler/testing/framework/fake_plugins.go +++ b/pkg/scheduler/testing/framework/fake_plugins.go @@ -45,7 +45,7 @@ func (pl *FalseFilterPlugin) Filter(_ context.Context, _ *framework.CycleState, } // NewFalseFilterPlugin initializes a FalseFilterPlugin and returns it. -func NewFalseFilterPlugin(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { +func NewFalseFilterPlugin(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return &FalseFilterPlugin{}, nil } @@ -63,7 +63,7 @@ func (pl *TrueFilterPlugin) Filter(_ context.Context, _ *framework.CycleState, p } // NewTrueFilterPlugin initializes a TrueFilterPlugin and returns it. -func NewTrueFilterPlugin(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { +func NewTrueFilterPlugin(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return &TrueFilterPlugin{}, nil } @@ -102,7 +102,7 @@ func (pl *FakeFilterPlugin) Filter(_ context.Context, _ *framework.CycleState, p // NewFakeFilterPlugin initializes a fakeFilterPlugin and returns it. func NewFakeFilterPlugin(failedNodeReturnCodeMap map[string]framework.Code) frameworkruntime.PluginFactory { - return func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return &FakeFilterPlugin{ FailedNodeReturnCodeMap: failedNodeReturnCodeMap, }, nil @@ -131,7 +131,7 @@ func (pl *MatchFilterPlugin) Filter(_ context.Context, _ *framework.CycleState, } // NewMatchFilterPlugin initializes a MatchFilterPlugin and returns it. -func NewMatchFilterPlugin(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { +func NewMatchFilterPlugin(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return &MatchFilterPlugin{}, nil } @@ -159,7 +159,7 @@ func (pl *FakePreFilterPlugin) PreFilterExtensions() framework.PreFilterExtensio // NewFakePreFilterPlugin initializes a fakePreFilterPlugin and returns it. func NewFakePreFilterPlugin(name string, result *framework.PreFilterResult, status *framework.Status) frameworkruntime.PluginFactory { - return func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return &FakePreFilterPlugin{ Result: result, Status: status, @@ -189,7 +189,7 @@ func (pl *FakeReservePlugin) Unreserve(_ context.Context, _ *framework.CycleStat // NewFakeReservePlugin initializes a fakeReservePlugin and returns it. func NewFakeReservePlugin(status *framework.Status) frameworkruntime.PluginFactory { - return func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return &FakeReservePlugin{ Status: status, }, nil @@ -213,7 +213,7 @@ func (pl *FakePreBindPlugin) PreBind(_ context.Context, _ *framework.CycleState, // NewFakePreBindPlugin initializes a fakePreBindPlugin and returns it. func NewFakePreBindPlugin(status *framework.Status) frameworkruntime.PluginFactory { - return func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return &FakePreBindPlugin{ Status: status, }, nil @@ -238,7 +238,7 @@ func (pl *FakePermitPlugin) Permit(_ context.Context, _ *framework.CycleState, _ // NewFakePermitPlugin initializes a fakePermitPlugin and returns it. func NewFakePermitPlugin(status *framework.Status, timeout time.Duration) frameworkruntime.PluginFactory { - return func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return &FakePermitPlugin{ Status: status, Timeout: timeout, @@ -271,7 +271,7 @@ func (pl *FakePreScoreAndScorePlugin) PreScore(ctx context.Context, state *frame } func NewFakePreScoreAndScorePlugin(name string, score int64, preScoreStatus, scoreStatus *framework.Status) frameworkruntime.PluginFactory { - return func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return &FakePreScoreAndScorePlugin{ name: name, score: score, diff --git a/test/integration/scheduler/plugins/plugins_test.go b/test/integration/scheduler/plugins/plugins_test.go index 7f63c93a377..31caf0d5927 100644 --- a/test/integration/scheduler/plugins/plugins_test.go +++ b/test/integration/scheduler/plugins/plugins_test.go @@ -62,7 +62,7 @@ var ( // newPlugin returns a plugin factory with specified Plugin. func newPlugin(plugin framework.Plugin) frameworkruntime.PluginFactory { - return func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { + return func(_ context.Context, _ runtime.Object, fh framework.Handle) (framework.Plugin, error) { switch pl := plugin.(type) { case *PermitPlugin: pl.fh = fh @@ -2518,7 +2518,7 @@ func (j *JobPlugin) PostBind(_ context.Context, state *framework.CycleState, p * func TestActivatePods(t *testing.T) { var jobPlugin *JobPlugin // Create a plugin registry for testing. Register a Job plugin. - registry := frameworkruntime.Registry{jobPluginName: func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { + registry := frameworkruntime.Registry{jobPluginName: func(_ context.Context, _ runtime.Object, fh framework.Handle) (framework.Plugin, error) { jobPlugin = &JobPlugin{podLister: fh.SharedInformerFactory().Core().V1().Pods().Lister()} return jobPlugin, nil }} diff --git a/test/integration/scheduler/preemption/preemption_test.go b/test/integration/scheduler/preemption/preemption_test.go index 6b47548f686..cef7f00a23a 100644 --- a/test/integration/scheduler/preemption/preemption_test.go +++ b/test/integration/scheduler/preemption/preemption_test.go @@ -159,7 +159,7 @@ func TestPreemption(t *testing.T) { // Initialize scheduler with a filter plugin. var filter tokenFilter registry := make(frameworkruntime.Registry) - err := registry.Register(filterPluginName, func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { + err := registry.Register(filterPluginName, func(_ context.Context, _ runtime.Object, fh framework.Handle) (framework.Plugin, error) { return &filter, nil }) if err != nil { @@ -1046,7 +1046,7 @@ func (af *alwaysFail) PreBind(_ context.Context, _ *framework.CycleState, p *v1. return framework.NewStatus(framework.Unschedulable) } -func newAlwaysFail(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { +func newAlwaysFail(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return &alwaysFail{}, nil } diff --git a/test/integration/scheduler/queue_test.go b/test/integration/scheduler/queue_test.go index 10c9eb32a9a..5988a72b2aa 100644 --- a/test/integration/scheduler/queue_test.go +++ b/test/integration/scheduler/queue_test.go @@ -340,7 +340,7 @@ func TestCustomResourceEnqueue(t *testing.T) { } registry := frameworkruntime.Registry{ - "fakeCRPlugin": func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { + "fakeCRPlugin": func(_ context.Context, _ runtime.Object, fh framework.Handle) (framework.Plugin, error) { return &fakeCRPlugin{}, nil }, } @@ -447,8 +447,8 @@ func TestCustomResourceEnqueue(t *testing.T) { func TestRequeueByBindFailure(t *testing.T) { fakeBind := &firstFailBindPlugin{} registry := frameworkruntime.Registry{ - "firstFailBindPlugin": func(o runtime.Object, fh framework.Handle) (framework.Plugin, error) { - binder, err := defaultbinder.New(nil, fh) + "firstFailBindPlugin": func(ctx context.Context, o runtime.Object, fh framework.Handle) (framework.Plugin, error) { + binder, err := defaultbinder.New(ctx, nil, fh) if err != nil { return nil, err } @@ -539,7 +539,7 @@ func TestRequeueByPermitRejection(t *testing.T) { queueingHintCalledCounter := 0 fakePermit := &fakePermitPlugin{} registry := frameworkruntime.Registry{ - fakePermitPluginName: func(o runtime.Object, fh framework.Handle) (framework.Plugin, error) { + fakePermitPluginName: func(ctx context.Context, o runtime.Object, fh framework.Handle) (framework.Plugin, error) { fakePermit = &fakePermitPlugin{ frameworkHandler: fh, schedulingHint: func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index 126a575e63e..3b8ff05c62e 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -17,6 +17,7 @@ limitations under the License. package scheduler import ( + "context" "testing" "time" @@ -81,7 +82,7 @@ func InitTestSchedulerForFrameworkTest(t *testing.T, testCtx *testutils.TestCont // NewPlugin returns a plugin factory with specified Plugin. func NewPlugin(plugin framework.Plugin) frameworkruntime.PluginFactory { - return func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { + return func(_ context.Context, _ runtime.Object, fh framework.Handle) (framework.Plugin, error) { return plugin, nil } }