diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go index 536ac111715..f67600fa5f3 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go @@ -364,7 +364,6 @@ func TestPostFilter(t *testing.T) { frameworkruntime.WithExtenders(extenders), frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.pods, tt.nodes)), frameworkruntime.WithLogger(logger), - frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()), ) if err != nil { t.Fatal(err) @@ -1703,8 +1702,6 @@ func TestPreempt(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() - waitingPods := frameworkruntime.NewWaitingPodsMap() - cache := internalcache.New(ctx, time.Duration(0)) for _, pod := range test.pods { cache.AddPod(logger, pod) @@ -1748,7 +1745,6 @@ func TestPreempt(t *testing.T) { frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)), frameworkruntime.WithInformerFactory(informerFactory), - frameworkruntime.WithWaitingPods(waitingPods), frameworkruntime.WithLogger(logger), ) if err != nil { diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index ed1b76a585a..44a28e3c991 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -132,7 +132,6 @@ type frameworkOptions struct { extenders []framework.Extender captureProfile CaptureProfile parallelizer parallelize.Parallelizer - waitingPods *waitingPodsMap logger *klog.Logger } @@ -222,13 +221,6 @@ func WithMetricsRecorder(r *metrics.MetricAsyncRecorder) Option { } } -// WithWaitingPods sets waitingPods for the scheduling frameworkImpl. -func WithWaitingPods(wp *waitingPodsMap) Option { - return func(o *frameworkOptions) { - o.waitingPods = wp - } -} - // WithLogger overrides the default logger from k8s.io/klog. func WithLogger(logger klog.Logger) Option { return func(o *frameworkOptions) { @@ -262,7 +254,7 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler registry: r, snapshotSharedLister: options.snapshotSharedLister, scorePluginWeight: make(map[string]int), - waitingPods: options.waitingPods, + waitingPods: newWaitingPodsMap(), clientSet: options.clientSet, kubeConfig: options.kubeConfig, eventRecorder: options.eventRecorder, diff --git a/pkg/scheduler/framework/runtime/framework_test.go b/pkg/scheduler/framework/runtime/framework_test.go index 8e9638ee4d6..f1eb3fdce0d 100644 --- a/pkg/scheduler/framework/runtime/framework_test.go +++ b/pkg/scheduler/framework/runtime/framework_test.go @@ -2808,9 +2808,7 @@ func TestPermitPlugins(t *testing.T) { profile := config.KubeSchedulerProfile{Plugins: configPlugins} ctx, cancel := context.WithCancel(context.Background()) defer cancel() - f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile, - WithWaitingPods(NewWaitingPodsMap()), - ) + f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile) if err != nil { t.Fatalf("fail to create framework: %s", err) } @@ -2992,10 +2990,7 @@ func TestRecordingMetrics(t *testing.T) { SchedulerName: testProfileName, Plugins: plugins, } - f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile, - withMetricsRecorder(recorder), - WithWaitingPods(NewWaitingPodsMap()), - ) + f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile, withMetricsRecorder(recorder)) if err != nil { cancel() t.Fatalf("Failed to create framework for testing: %v", err) @@ -3165,9 +3160,7 @@ func TestPermitWaitDurationMetric(t *testing.T) { profile := config.KubeSchedulerProfile{Plugins: plugins} ctx, cancel := context.WithCancel(context.Background()) defer cancel() - f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile, - WithWaitingPods(NewWaitingPodsMap()), - ) + f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile) if err != nil { t.Fatalf("Failed to create framework for testing: %v", err) } @@ -3223,9 +3216,7 @@ func TestWaitOnPermit(t *testing.T) { profile := config.KubeSchedulerProfile{Plugins: plugins} ctx, cancel := context.WithCancel(context.Background()) defer cancel() - f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile, - WithWaitingPods(NewWaitingPodsMap()), - ) + f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile) if err != nil { t.Fatalf("Failed to create framework for testing: %v", err) } diff --git a/pkg/scheduler/framework/runtime/waiting_pods_map.go b/pkg/scheduler/framework/runtime/waiting_pods_map.go index 3d40d4a52a1..04466575229 100644 --- a/pkg/scheduler/framework/runtime/waiting_pods_map.go +++ b/pkg/scheduler/framework/runtime/waiting_pods_map.go @@ -32,8 +32,8 @@ type waitingPodsMap struct { mu sync.RWMutex } -// NewWaitingPodsMap returns a new waitingPodsMap. -func NewWaitingPodsMap() *waitingPodsMap { +// newWaitingPodsMap returns a new waitingPodsMap. +func newWaitingPodsMap() *waitingPodsMap { return &waitingPodsMap{ pods: make(map[types.UID]*waitingPod), } diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index 8bd346901bb..e4d33039688 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -773,9 +773,7 @@ func TestSchedulerScheduleOne(t *testing.T) { registerPluginFuncs, testSchedulerName, frameworkruntime.WithClientSet(client), - frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)), - frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()), - ) + frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName))) if err != nil { t.Fatal(err) } @@ -3525,7 +3523,6 @@ func setupTestScheduler(ctx context.Context, t *testing.T, queuedPodStore *clien informerFactory = informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0) } schedulingQueue := internalqueue.NewTestQueueWithInformerFactory(ctx, nil, informerFactory) - waitingPods := frameworkruntime.NewWaitingPodsMap() fwk, _ := tf.NewFramework( ctx, @@ -3535,7 +3532,6 @@ func setupTestScheduler(ctx context.Context, t *testing.T, queuedPodStore *clien frameworkruntime.WithEventRecorder(recorder), frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), - frameworkruntime.WithWaitingPods(waitingPods), ) errChan := make(chan error, 1) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 5efc1fc0a78..76e0cbb01e1 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -292,8 +292,6 @@ func New(ctx context.Context, snapshot := internalcache.NewEmptySnapshot() metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopEverything) - // waitingPods holds all the pods that are in the scheduler and waiting in the permit stage - waitingPods := frameworkruntime.NewWaitingPodsMap() profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory, frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion), @@ -305,7 +303,6 @@ func New(ctx context.Context, frameworkruntime.WithParallelism(int(options.parallelism)), frameworkruntime.WithExtenders(extenders), frameworkruntime.WithMetricsRecorder(metricsRecorder), - frameworkruntime.WithWaitingPods(waitingPods), ) if err != nil { return nil, fmt.Errorf("initializing profiles: %v", err) diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 08f4b255338..ce636f6da77 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -21,16 +21,13 @@ import ( "fmt" "sort" "strings" - "sync" "testing" "time" "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" - eventsv1 "k8s.io/api/events/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" @@ -440,14 +437,12 @@ func initScheduler(ctx context.Context, cache internalcache.Cache, queue interna tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), } eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}) - waitingPods := frameworkruntime.NewWaitingPodsMap() fwk, err := tf.NewFramework(ctx, registerPluginFuncs, testSchedulerName, frameworkruntime.WithClientSet(client), frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)), - frameworkruntime.WithWaitingPods(waitingPods), ) if err != nil { return nil, nil, err @@ -596,7 +591,6 @@ const ( queueSort = "no-op-queue-sort-plugin" fakeBind = "bind-plugin" emptyEventExtensions = "emptyEventExtensions" - fakePermit = "fakePermit" ) func Test_buildQueueingHintMap(t *testing.T) { @@ -911,160 +905,6 @@ func newFramework(ctx context.Context, r frameworkruntime.Registry, profile sche ) } -func TestFrameworkHandler_IterateOverWaitingPods(t *testing.T) { - const ( - testSchedulerProfile1 = "test-scheduler-profile-1" - testSchedulerProfile2 = "test-scheduler-profile-2" - testSchedulerProfile3 = "test-scheduler-profile-3" - ) - - nodes := []runtime.Object{ - st.MakeNode().Name("node1").UID("node1").Obj(), - st.MakeNode().Name("node2").UID("node2").Obj(), - st.MakeNode().Name("node3").UID("node3").Obj(), - } - - cases := []struct { - name string - profiles []schedulerapi.KubeSchedulerProfile - waitSchedulingPods []*v1.Pod - expectPodNamesInWaitingPods []string - }{ - { - name: "pods with same profile are waiting on permit stage", - profiles: []schedulerapi.KubeSchedulerProfile{ - { - SchedulerName: testSchedulerProfile1, - Plugins: &schedulerapi.Plugins{ - QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}}, - Permit: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: fakePermit}}}, - Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}}, - }, - }, - }, - waitSchedulingPods: []*v1.Pod{ - st.MakePod().Name("pod1").UID("pod1").SchedulerName(testSchedulerProfile1).Obj(), - st.MakePod().Name("pod2").UID("pod2").SchedulerName(testSchedulerProfile1).Obj(), - st.MakePod().Name("pod3").UID("pod3").SchedulerName(testSchedulerProfile1).Obj(), - }, - expectPodNamesInWaitingPods: []string{"pod1", "pod2", "pod3"}, - }, - { - name: "pods with different profiles are waiting on permit stage", - profiles: []schedulerapi.KubeSchedulerProfile{ - { - SchedulerName: testSchedulerProfile1, - Plugins: &schedulerapi.Plugins{ - QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}}, - Permit: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: fakePermit}}}, - Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}}, - }, - }, - { - SchedulerName: testSchedulerProfile2, - Plugins: &schedulerapi.Plugins{ - QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}}, - Permit: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: fakePermit}}}, - Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}}, - }, - }, - { - SchedulerName: testSchedulerProfile3, - Plugins: &schedulerapi.Plugins{ - QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}}, - Permit: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: fakePermit}}}, - Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}}, - }, - }, - }, - waitSchedulingPods: []*v1.Pod{ - st.MakePod().Name("pod1").UID("pod1").SchedulerName(testSchedulerProfile1).Obj(), - st.MakePod().Name("pod2").UID("pod2").SchedulerName(testSchedulerProfile1).Obj(), - st.MakePod().Name("pod3").UID("pod3").SchedulerName(testSchedulerProfile2).Obj(), - st.MakePod().Name("pod4").UID("pod4").SchedulerName(testSchedulerProfile3).Obj(), - }, - expectPodNamesInWaitingPods: []string{"pod1", "pod2", "pod3", "pod4"}, - }, - } - - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - // Set up scheduler for the 3 nodes. - objs := append([]runtime.Object{&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ""}}}, nodes...) - fakeClient := fake.NewSimpleClientset(objs...) - informerFactory := informers.NewSharedInformerFactory(fakeClient, 0) - eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: fakeClient.EventsV1()}) - eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme, fakePermit) - - outOfTreeRegistry := frameworkruntime.Registry{ - fakePermit: newFakePermitPlugin(eventRecorder), - } - - _, ctx := ktesting.NewTestContext(t) - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - scheduler, err := New( - ctx, - fakeClient, - informerFactory, - nil, - profile.NewRecorderFactory(eventBroadcaster), - WithProfiles(tc.profiles...), - WithFrameworkOutOfTreeRegistry(outOfTreeRegistry), - ) - - if err != nil { - t.Fatalf("Failed to create scheduler: %v", err) - } - - var wg sync.WaitGroup - waitSchedulingPodNumber := len(tc.waitSchedulingPods) - wg.Add(waitSchedulingPodNumber) - stopFn, err := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) { - e, ok := obj.(*eventsv1.Event) - if !ok || (e.Reason != podWaitingReason) { - return - } - wg.Done() - }) - if err != nil { - t.Fatal(err) - } - defer stopFn() - - // Run scheduler. - informerFactory.Start(ctx.Done()) - informerFactory.WaitForCacheSync(ctx.Done()) - go scheduler.Run(ctx) - - // Send pods to be scheduled. - for _, p := range tc.waitSchedulingPods { - _, err = fakeClient.CoreV1().Pods("").Create(ctx, p, metav1.CreateOptions{}) - if err != nil { - t.Fatal(err) - } - } - - // Wait all pods in waitSchedulingPods to be scheduled. - wg.Wait() - - // Ensure that all waitingPods in scheduler can be obtained from any profiles. - for _, fwk := range scheduler.Profiles { - actualPodNamesInWaitingPods := sets.NewString() - fwk.IterateOverWaitingPods(func(pod framework.WaitingPod) { - actualPodNamesInWaitingPods.Insert(pod.GetPod().Name) - }) - // Validate the name of pods in waitingPods matches expectations. - if actualPodNamesInWaitingPods.Len() != len(tc.expectPodNamesInWaitingPods) || - !actualPodNamesInWaitingPods.HasAll(tc.expectPodNamesInWaitingPods...) { - t.Fatalf("Unexpected waitingPods in scheduler profile %s, expect: %#v, got: %#v", fwk.ProfileName(), tc.expectPodNamesInWaitingPods, actualPodNamesInWaitingPods.List()) - } - } - }) - } -} - var _ framework.QueueSortPlugin = &fakeQueueSortPlugin{} // fakeQueueSortPlugin is a no-op implementation for QueueSort extension point. @@ -1164,36 +1004,3 @@ func (*emptyEventsToRegisterPlugin) Filter(_ context.Context, _ *framework.Cycle } func (*emptyEventsToRegisterPlugin) EventsToRegister() []framework.ClusterEventWithHint { return nil } - -// fakePermitPlugin only implements PermitPlugin interface. -type fakePermitPlugin struct { - eventRecorder events.EventRecorder -} - -func newFakePermitPlugin(eventRecorder events.EventRecorder) frameworkruntime.PluginFactory { - return func(ctx context.Context, configuration runtime.Object, f framework.Handle) (framework.Plugin, error) { - pl := &fakePermitPlugin{ - eventRecorder: eventRecorder, - } - return pl, nil - } -} - -func (f fakePermitPlugin) Name() string { - return fakePermit -} - -const ( - podWaitingReason = "podWaiting" -) - -func (f fakePermitPlugin) Permit(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (*framework.Status, time.Duration) { - defer func() { - // Send event with podWaiting reason to broadcast this pod is already waiting in the permit stage. - f.eventRecorder.Eventf(p, nil, v1.EventTypeWarning, podWaitingReason, "", "") - }() - - return framework.NewStatus(framework.Wait), 100 * time.Second -} - -var _ framework.PermitPlugin = &fakePermitPlugin{}