From 31a4b13238f68648b7f1de5f3d319d4a4d6d583c Mon Sep 17 00:00:00 2001 From: NoicFank <1015542478@qq.com> Date: Wed, 24 Jan 2024 21:21:21 +0800 Subject: [PATCH] enhancement(scheduler): share waitingPods among profiles --- .../default_preemption_test.go | 4 + pkg/scheduler/framework/runtime/framework.go | 10 +- .../framework/runtime/framework_test.go | 17 +- .../framework/runtime/waiting_pods_map.go | 4 +- pkg/scheduler/schedule_one_test.go | 6 +- pkg/scheduler/scheduler.go | 3 + pkg/scheduler/scheduler_test.go | 193 ++++++++++++++++++ 7 files changed, 229 insertions(+), 8 deletions(-) diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go index f67600fa5f3..536ac111715 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go @@ -364,6 +364,7 @@ func TestPostFilter(t *testing.T) { frameworkruntime.WithExtenders(extenders), frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.pods, tt.nodes)), frameworkruntime.WithLogger(logger), + frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()), ) if err != nil { t.Fatal(err) @@ -1702,6 +1703,8 @@ func TestPreempt(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() + waitingPods := frameworkruntime.NewWaitingPodsMap() + cache := internalcache.New(ctx, time.Duration(0)) for _, pod := range test.pods { cache.AddPod(logger, pod) @@ -1745,6 +1748,7 @@ func TestPreempt(t *testing.T) { frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)), frameworkruntime.WithInformerFactory(informerFactory), + frameworkruntime.WithWaitingPods(waitingPods), frameworkruntime.WithLogger(logger), ) if err != nil { diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 44a28e3c991..ed1b76a585a 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -132,6 +132,7 @@ type frameworkOptions struct { extenders []framework.Extender captureProfile CaptureProfile parallelizer parallelize.Parallelizer + waitingPods *waitingPodsMap logger *klog.Logger } @@ -221,6 +222,13 @@ func WithMetricsRecorder(r *metrics.MetricAsyncRecorder) Option { } } +// WithWaitingPods sets waitingPods for the scheduling frameworkImpl. +func WithWaitingPods(wp *waitingPodsMap) Option { + return func(o *frameworkOptions) { + o.waitingPods = wp + } +} + // WithLogger overrides the default logger from k8s.io/klog. func WithLogger(logger klog.Logger) Option { return func(o *frameworkOptions) { @@ -254,7 +262,7 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler registry: r, snapshotSharedLister: options.snapshotSharedLister, scorePluginWeight: make(map[string]int), - waitingPods: newWaitingPodsMap(), + waitingPods: options.waitingPods, clientSet: options.clientSet, kubeConfig: options.kubeConfig, eventRecorder: options.eventRecorder, diff --git a/pkg/scheduler/framework/runtime/framework_test.go b/pkg/scheduler/framework/runtime/framework_test.go index f1eb3fdce0d..8e9638ee4d6 100644 --- a/pkg/scheduler/framework/runtime/framework_test.go +++ b/pkg/scheduler/framework/runtime/framework_test.go @@ -2808,7 +2808,9 @@ func TestPermitPlugins(t *testing.T) { profile := config.KubeSchedulerProfile{Plugins: configPlugins} ctx, cancel := context.WithCancel(context.Background()) defer cancel() - f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile) + f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile, + WithWaitingPods(NewWaitingPodsMap()), + ) if err != nil { t.Fatalf("fail to create framework: %s", err) } @@ -2990,7 +2992,10 @@ func TestRecordingMetrics(t *testing.T) { SchedulerName: testProfileName, Plugins: plugins, } - f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile, withMetricsRecorder(recorder)) + f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile, + withMetricsRecorder(recorder), + WithWaitingPods(NewWaitingPodsMap()), + ) if err != nil { cancel() t.Fatalf("Failed to create framework for testing: %v", err) @@ -3160,7 +3165,9 @@ func TestPermitWaitDurationMetric(t *testing.T) { profile := config.KubeSchedulerProfile{Plugins: plugins} ctx, cancel := context.WithCancel(context.Background()) defer cancel() - f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile) + f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile, + WithWaitingPods(NewWaitingPodsMap()), + ) if err != nil { t.Fatalf("Failed to create framework for testing: %v", err) } @@ -3216,7 +3223,9 @@ func TestWaitOnPermit(t *testing.T) { profile := config.KubeSchedulerProfile{Plugins: plugins} ctx, cancel := context.WithCancel(context.Background()) defer cancel() - f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile) + f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile, + WithWaitingPods(NewWaitingPodsMap()), + ) if err != nil { t.Fatalf("Failed to create framework for testing: %v", err) } diff --git a/pkg/scheduler/framework/runtime/waiting_pods_map.go b/pkg/scheduler/framework/runtime/waiting_pods_map.go index 04466575229..3d40d4a52a1 100644 --- a/pkg/scheduler/framework/runtime/waiting_pods_map.go +++ b/pkg/scheduler/framework/runtime/waiting_pods_map.go @@ -32,8 +32,8 @@ type waitingPodsMap struct { mu sync.RWMutex } -// newWaitingPodsMap returns a new waitingPodsMap. -func newWaitingPodsMap() *waitingPodsMap { +// NewWaitingPodsMap returns a new waitingPodsMap. +func NewWaitingPodsMap() *waitingPodsMap { return &waitingPodsMap{ pods: make(map[types.UID]*waitingPod), } diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index 9b487b1da10..68a78ff6385 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -778,7 +778,9 @@ func TestSchedulerScheduleOne(t *testing.T) { registerPluginFuncs, testSchedulerName, frameworkruntime.WithClientSet(client), - frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName))) + frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)), + frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()), + ) if err != nil { t.Fatal(err) } @@ -3539,6 +3541,7 @@ func setupTestScheduler(ctx context.Context, t *testing.T, queuedPodStore *clien informerFactory = informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0) } schedulingQueue := internalqueue.NewTestQueueWithInformerFactory(ctx, nil, informerFactory) + waitingPods := frameworkruntime.NewWaitingPodsMap() fwk, _ := tf.NewFramework( ctx, @@ -3548,6 +3551,7 @@ func setupTestScheduler(ctx context.Context, t *testing.T, queuedPodStore *clien frameworkruntime.WithEventRecorder(recorder), frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), + frameworkruntime.WithWaitingPods(waitingPods), ) errChan := make(chan error, 1) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index df007aa82fc..8288f02bd01 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -292,6 +292,8 @@ func New(ctx context.Context, snapshot := internalcache.NewEmptySnapshot() metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopEverything) + // waitingPods holds all the pods that are in the scheduler and waiting in the permit stage + waitingPods := frameworkruntime.NewWaitingPodsMap() profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory, frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion), @@ -303,6 +305,7 @@ func New(ctx context.Context, frameworkruntime.WithParallelism(int(options.parallelism)), frameworkruntime.WithExtenders(extenders), frameworkruntime.WithMetricsRecorder(metricsRecorder), + frameworkruntime.WithWaitingPods(waitingPods), ) if err != nil { return nil, fmt.Errorf("initializing profiles: %v", err) diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 5bac7cf4398..a506227300e 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -21,13 +21,16 @@ import ( "fmt" "sort" "strings" + "sync" "testing" "time" "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" + eventsv1 "k8s.io/api/events/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" @@ -437,12 +440,14 @@ func initScheduler(ctx context.Context, cache internalcache.Cache, queue interna tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), } eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}) + waitingPods := frameworkruntime.NewWaitingPodsMap() fwk, err := tf.NewFramework(ctx, registerPluginFuncs, testSchedulerName, frameworkruntime.WithClientSet(client), frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)), + frameworkruntime.WithWaitingPods(waitingPods), ) if err != nil { return nil, nil, err @@ -591,6 +596,7 @@ const ( queueSort = "no-op-queue-sort-plugin" fakeBind = "bind-plugin" emptyEventExtensions = "emptyEventExtensions" + fakePermit = "fakePermit" ) func Test_buildQueueingHintMap(t *testing.T) { @@ -905,6 +911,160 @@ func newFramework(ctx context.Context, r frameworkruntime.Registry, profile sche ) } +func TestFrameworkHandler_IterateOverWaitingPods(t *testing.T) { + const ( + testSchedulerProfile1 = "test-scheduler-profile-1" + testSchedulerProfile2 = "test-scheduler-profile-2" + testSchedulerProfile3 = "test-scheduler-profile-3" + ) + + nodes := []runtime.Object{ + st.MakeNode().Name("node1").UID("node1").Obj(), + st.MakeNode().Name("node2").UID("node2").Obj(), + st.MakeNode().Name("node3").UID("node3").Obj(), + } + + cases := []struct { + name string + profiles []schedulerapi.KubeSchedulerProfile + waitSchedulingPods []*v1.Pod + expectPodNamesInWaitingPods []string + }{ + { + name: "pods with same profile are waiting on permit stage", + profiles: []schedulerapi.KubeSchedulerProfile{ + { + SchedulerName: testSchedulerProfile1, + Plugins: &schedulerapi.Plugins{ + QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}}, + Permit: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: fakePermit}}}, + Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}}, + }, + }, + }, + waitSchedulingPods: []*v1.Pod{ + st.MakePod().Name("pod1").UID("pod1").SchedulerName(testSchedulerProfile1).Obj(), + st.MakePod().Name("pod2").UID("pod2").SchedulerName(testSchedulerProfile1).Obj(), + st.MakePod().Name("pod3").UID("pod3").SchedulerName(testSchedulerProfile1).Obj(), + }, + expectPodNamesInWaitingPods: []string{"pod1", "pod2", "pod3"}, + }, + { + name: "pods with different profiles are waiting on permit stage", + profiles: []schedulerapi.KubeSchedulerProfile{ + { + SchedulerName: testSchedulerProfile1, + Plugins: &schedulerapi.Plugins{ + QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}}, + Permit: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: fakePermit}}}, + Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}}, + }, + }, + { + SchedulerName: testSchedulerProfile2, + Plugins: &schedulerapi.Plugins{ + QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}}, + Permit: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: fakePermit}}}, + Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}}, + }, + }, + { + SchedulerName: testSchedulerProfile3, + Plugins: &schedulerapi.Plugins{ + QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}}, + Permit: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: fakePermit}}}, + Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}}, + }, + }, + }, + waitSchedulingPods: []*v1.Pod{ + st.MakePod().Name("pod1").UID("pod1").SchedulerName(testSchedulerProfile1).Obj(), + st.MakePod().Name("pod2").UID("pod2").SchedulerName(testSchedulerProfile1).Obj(), + st.MakePod().Name("pod3").UID("pod3").SchedulerName(testSchedulerProfile2).Obj(), + st.MakePod().Name("pod4").UID("pod4").SchedulerName(testSchedulerProfile3).Obj(), + }, + expectPodNamesInWaitingPods: []string{"pod1", "pod2", "pod3", "pod4"}, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + // Set up scheduler for the 3 nodes. + objs := append([]runtime.Object{&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ""}}}, nodes...) + fakeClient := fake.NewSimpleClientset(objs...) + informerFactory := informers.NewSharedInformerFactory(fakeClient, 0) + eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: fakeClient.EventsV1()}) + eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme, fakePermit) + + outOfTreeRegistry := frameworkruntime.Registry{ + fakePermit: newFakePermitPlugin(eventRecorder), + } + + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + scheduler, err := New( + ctx, + fakeClient, + informerFactory, + nil, + profile.NewRecorderFactory(eventBroadcaster), + WithProfiles(tc.profiles...), + WithFrameworkOutOfTreeRegistry(outOfTreeRegistry), + ) + + if err != nil { + t.Fatalf("Failed to create scheduler: %v", err) + } + + var wg sync.WaitGroup + waitSchedulingPodNumber := len(tc.waitSchedulingPods) + wg.Add(waitSchedulingPodNumber) + stopFn, err := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) { + e, ok := obj.(*eventsv1.Event) + if !ok || (e.Reason != podWaitingReason) { + return + } + wg.Done() + }) + if err != nil { + t.Fatal(err) + } + defer stopFn() + + // Run scheduler. + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + go scheduler.Run(ctx) + + // Send pods to be scheduled. + for _, p := range tc.waitSchedulingPods { + _, err = fakeClient.CoreV1().Pods("").Create(ctx, p, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + } + + // Wait all pods in waitSchedulingPods to be scheduled. + wg.Wait() + + // Ensure that all waitingPods in scheduler can be obtained from any profiles. + for _, fwk := range scheduler.Profiles { + actualPodNamesInWaitingPods := sets.NewString() + fwk.IterateOverWaitingPods(func(pod framework.WaitingPod) { + actualPodNamesInWaitingPods.Insert(pod.GetPod().Name) + }) + // Validate the name of pods in waitingPods matches expectations. + if actualPodNamesInWaitingPods.Len() != len(tc.expectPodNamesInWaitingPods) || + !actualPodNamesInWaitingPods.HasAll(tc.expectPodNamesInWaitingPods...) { + t.Fatalf("Unexpected waitingPods in scheduler profile %s, expect: %#v, got: %#v", fwk.ProfileName(), tc.expectPodNamesInWaitingPods, actualPodNamesInWaitingPods.List()) + } + } + }) + } +} + var _ framework.QueueSortPlugin = &fakeQueueSortPlugin{} // fakeQueueSortPlugin is a no-op implementation for QueueSort extension point. @@ -1004,3 +1164,36 @@ func (*emptyEventsToRegisterPlugin) Filter(_ context.Context, _ *framework.Cycle } func (*emptyEventsToRegisterPlugin) EventsToRegister() []framework.ClusterEventWithHint { return nil } + +// fakePermitPlugin only implements PermitPlugin interface. +type fakePermitPlugin struct { + eventRecorder events.EventRecorder +} + +func newFakePermitPlugin(eventRecorder events.EventRecorder) frameworkruntime.PluginFactory { + return func(ctx context.Context, configuration runtime.Object, f framework.Handle) (framework.Plugin, error) { + pl := &fakePermitPlugin{ + eventRecorder: eventRecorder, + } + return pl, nil + } +} + +func (f fakePermitPlugin) Name() string { + return fakePermit +} + +const ( + podWaitingReason = "podWaiting" +) + +func (f fakePermitPlugin) Permit(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (*framework.Status, time.Duration) { + defer func() { + // Send event with podWaiting reason to broadcast this pod is already waiting in the permit stage. + f.eventRecorder.Eventf(p, nil, v1.EventTypeWarning, podWaitingReason, "", "") + }() + + return framework.NewStatus(framework.Wait), 100 * time.Second +} + +var _ framework.PermitPlugin = &fakePermitPlugin{}