From fb9cafc99be94a73d9b92545164dbf336bbd230a Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Wed, 30 Jun 2021 09:57:53 -0700 Subject: [PATCH] sched: provide an option for plugin developers to move pods to activeQ --- pkg/scheduler/framework/interface.go | 25 ++++ pkg/scheduler/internal/queue/events.go | 3 + .../internal/queue/scheduling_queue.go | 56 +++++++ pkg/scheduler/scheduler.go | 18 +++ test/integration/scheduler/framework_test.go | 138 ++++++++++++++++++ 5 files changed, 240 insertions(+) diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index 589a3997c90..f5dd413c3ca 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -23,6 +23,7 @@ import ( "errors" "math" "strings" + "sync" "time" "github.com/google/go-cmp/cmp" @@ -105,6 +106,30 @@ const ( MaxTotalScore int64 = math.MaxInt64 ) +// PodsToActivateKey is a reserved state key for stashing pods. +// If the stashed pods are present in unschedulableQ or backoffQ,they will be +// activated (i.e., moved to activeQ) in two phases: +// - end of a scheduling cycle if it succeeds (will be cleared from `PodsToActivate` if activated) +// - end of a binding cycle if it succeeds +var PodsToActivateKey StateKey = "kubernetes.io/pods-to-activate" + +// PodsToActivate stores pods to be activated. +type PodsToActivate struct { + sync.Mutex + // Map is keyed with namespaced pod name, and valued with the pod. + Map map[string]*v1.Pod +} + +// Clone just returns the same state. +func (s *PodsToActivate) Clone() StateData { + return s +} + +// NewPodsToActivate instantiates a PodsToActivate object. +func NewPodsToActivate() *PodsToActivate { + return &PodsToActivate{Map: make(map[string]*v1.Pod)} +} + // Status indicates the result of running a plugin. It consists of a code, a // message, (optionally) an error and an plugin name it fails by. When the status // code is not `Success`, the reasons should explain why. diff --git a/pkg/scheduler/internal/queue/events.go b/pkg/scheduler/internal/queue/events.go index 7954ee55e51..475b6635440 100644 --- a/pkg/scheduler/internal/queue/events.go +++ b/pkg/scheduler/internal/queue/events.go @@ -27,6 +27,9 @@ const ( ScheduleAttemptFailure = "ScheduleAttemptFailure" // BackoffComplete is the event when a pod finishes backoff. BackoffComplete = "BackoffComplete" + // ForceActivate is the event when a pod is moved from unschedulableQ/backoffQ + // to activeQ. Usually it's triggered by plugin implementations. + ForceActivate = "ForceActivate" ) var ( diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 6e08221df1f..b0bac0a128b 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -77,6 +77,10 @@ type PreEnqueueCheck func(pod *v1.Pod) bool type SchedulingQueue interface { framework.PodNominator Add(pod *v1.Pod) error + // Activate moves the given pods to activeQ iff they're in unschedulableQ or backoffQ. + // The passed-in pods are originally compiled from plugins that want to activate Pods, + // by injecting the pods through a reserved CycleState struct (PodsToActivate). + Activate(pods map[string]*v1.Pod) // AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue. // The podSchedulingCycle represents the current scheduling cycle number which can be // returned by calling SchedulingCycle(). @@ -301,6 +305,58 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error { return nil } +// Activate moves the given pods to activeQ iff they're in unschedulableQ or backoffQ. +func (p *PriorityQueue) Activate(pods map[string]*v1.Pod) { + p.lock.Lock() + defer p.lock.Unlock() + + activated := false + for _, pod := range pods { + if p.activate(pod) { + activated = true + } + } + + if activated { + p.cond.Broadcast() + } +} + +func (p *PriorityQueue) activate(pod *v1.Pod) bool { + // Verify if the pod is present in activeQ. + if _, exists, _ := p.activeQ.Get(newQueuedPodInfoForLookup(pod)); exists { + // No need to activate if it's already present in activeQ. + return false + } + var pInfo *framework.QueuedPodInfo + // Verify if the pod is present in unschedulableQ or backoffQ. + if pInfo = p.unschedulableQ.get(pod); pInfo == nil { + // If the pod doesn't belong to unschedulableQ or backoffQ, don't activate it. + if obj, exists, _ := p.podBackoffQ.Get(newQueuedPodInfoForLookup(pod)); !exists { + klog.ErrorS(nil, "To-activate pod does not exist in unschedulableQ or backoffQ", "pod", klog.KObj(pod)) + return false + } else { + pInfo = obj.(*framework.QueuedPodInfo) + } + } + + if pInfo == nil { + // Redundant safe check. We shouldn't reach here. + klog.ErrorS(nil, "Internal error: cannot obtain pInfo") + return false + } + + if err := p.activeQ.Add(pInfo); err != nil { + klog.ErrorS(err, "Error adding pod to the scheduling queue", "pod", klog.KObj(pod)) + return false + } + p.unschedulableQ.delete(pod) + p.podBackoffQ.Delete(pInfo) + metrics.SchedulerQueueIncomingPods.WithLabelValues("active", ForceActivate).Inc() + p.PodNominator.AddNominatedPod(pInfo.PodInfo, "") + return true +} + // isPodBackingoff returns true if a pod is still waiting for its backoff timer. // If this returns true, the pod should not be re-tried. func (p *PriorityQueue) isPodBackingoff(podInfo *framework.QueuedPodInfo) bool { diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index f521d0541b4..40042f9dec5 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -519,6 +519,10 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { start := time.Now() state := framework.NewCycleState() state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent) + // Initialize an empty podsToActivate struct, which will be filled up by plugins or stay empty. + podsToActivate := framework.NewPodsToActivate() + state.Write(framework.PodsToActivateKey, podsToActivate) + schedulingCycleCtx, cancel := context.WithCancel(ctx) defer cancel() scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, sched.Extenders, fwk, state, pod) @@ -607,6 +611,13 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { return } + // At the end of a successful scheduling cycle, pop and move up Pods if needed. + if len(podsToActivate.Map) != 0 { + sched.SchedulingQueue.Activate(podsToActivate.Map) + // Clear the entries after activation. + podsToActivate.Map = make(map[string]*v1.Pod) + } + // bind the pod to its host asynchronously (we can do this b/c of the assumption step above). go func() { bindingCycleCtx, cancel := context.WithCancel(ctx) @@ -666,6 +677,13 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // Run "postbind" plugins. fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) + + // At the end of a successful binding cycle, move up Pods if needed. + if len(podsToActivate.Map) != 0 { + sched.SchedulingQueue.Activate(podsToActivate.Map) + // Unlike the logic in scheduling cycle, we don't bother deleting the entries + // as `podsToActivate.Map` is no longer consumed. + } } }() } diff --git a/test/integration/scheduler/framework_test.go b/test/integration/scheduler/framework_test.go index 083dc51e552..d1a7c24091b 100644 --- a/test/integration/scheduler/framework_test.go +++ b/test/integration/scheduler/framework_test.go @@ -27,9 +27,11 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" + listersv1 "k8s.io/client-go/listers/core/v1" "k8s.io/kube-scheduler/config/v1beta2" "k8s.io/kubernetes/pkg/scheduler" schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" @@ -39,6 +41,7 @@ import ( frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" st "k8s.io/kubernetes/pkg/scheduler/testing" testutils "k8s.io/kubernetes/test/integration/util" + imageutils "k8s.io/kubernetes/test/utils/image" "k8s.io/utils/pointer" ) @@ -1941,6 +1944,141 @@ func TestPreemptWithPermitPlugin(t *testing.T) { testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{waitingPod, runningPod, preemptorPod}) } +const ( + jobPluginName = "job plugin" +) + +var _ framework.PreFilterPlugin = &JobPlugin{} +var _ framework.PostBindPlugin = &PostBindPlugin{} + +type JobPlugin struct { + podLister listersv1.PodLister + podsActivated bool +} + +func (j *JobPlugin) Name() string { + return jobPluginName +} + +func (j *JobPlugin) PreFilter(_ context.Context, _ *framework.CycleState, p *v1.Pod) *framework.Status { + labelSelector := labels.SelectorFromSet(labels.Set{"driver": ""}) + driverPods, err := j.podLister.Pods(p.Namespace).List(labelSelector) + if err != nil { + return framework.AsStatus(err) + } + if len(driverPods) == 0 { + return framework.NewStatus(framework.UnschedulableAndUnresolvable, "unable to find driver pod") + } + return nil +} + +func (j *JobPlugin) PreFilterExtensions() framework.PreFilterExtensions { + return nil +} + +func (j *JobPlugin) PostBind(_ context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) { + if _, ok := p.Labels["driver"]; !ok { + return + } + + // If it's a driver pod, move other executor pods proactively to accelerating the scheduling. + labelSelector := labels.SelectorFromSet(labels.Set{"executor": ""}) + podsToActivate, err := j.podLister.Pods(p.Namespace).List(labelSelector) + if err == nil && len(podsToActivate) != 0 { + c, err := state.Read(framework.PodsToActivateKey) + if err == nil { + if s, ok := c.(*framework.PodsToActivate); ok { + s.Lock() + for _, pod := range podsToActivate { + namespacedName := fmt.Sprintf("%v/%v", pod.Namespace, pod.Name) + s.Map[namespacedName] = pod + } + s.Unlock() + j.podsActivated = true + } + } + } +} + +// This test simulates a typical spark job workflow. +// - N executor pods are created, but kept pending due to missing the driver pod +// - when the driver pod gets created and scheduled, proactively move the executors to activeQ +// and thus accelerate the entire job workflow. +func TestActivatePods(t *testing.T) { + var jobPlugin *JobPlugin + // Create a plugin registry for testing. Register a Job plugin. + registry := frameworkruntime.Registry{jobPluginName: func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { + jobPlugin = &JobPlugin{podLister: fh.SharedInformerFactory().Core().V1().Pods().Lister()} + return jobPlugin, nil + }} + + // Setup initial filter plugin for testing. + cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{ + Profiles: []v1beta2.KubeSchedulerProfile{{ + SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName), + Plugins: &v1beta2.Plugins{ + PreFilter: v1beta2.PluginSet{ + Enabled: []v1beta2.Plugin{ + {Name: jobPluginName}, + }, + }, + PostBind: v1beta2.PluginSet{ + Enabled: []v1beta2.Plugin{ + {Name: jobPluginName}, + }, + }, + }, + }}, + }) + + // Create the API server and the scheduler with the test plugin set. + testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "job-plugin", nil), 1, + scheduler.WithProfiles(cfg.Profiles...), + scheduler.WithFrameworkOutOfTreeRegistry(registry)) + defer testutils.CleanupTest(t, testCtx) + + cs := testCtx.ClientSet + ns := testCtx.NS.Name + pause := imageutils.GetPauseImageName() + + // Firstly create 2 executor pods. + var pods []*v1.Pod + for i := 1; i <= 2; i++ { + name := fmt.Sprintf("executor-%v", i) + executor := st.MakePod().Name(name).Namespace(ns).Label("executor", "").Container(pause).Obj() + pods = append(pods, executor) + if _, err := cs.CoreV1().Pods(executor.Namespace).Create(context.TODO(), executor, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create pod %v: %v", executor.Name, err) + } + } + + // Wait for the 2 executor pods to be unschedulable. + for _, pod := range pods { + if err := waitForPodUnschedulable(cs, pod); err != nil { + t.Errorf("Failed to wait for Pod %v to be unschedulable: %v", pod.Name, err) + } + } + + // Create a driver pod. + driver := st.MakePod().Name("driver").Namespace(ns).Label("driver", "").Container(pause).Obj() + pods = append(pods, driver) + if _, err := cs.CoreV1().Pods(driver.Namespace).Create(context.TODO(), driver, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create pod %v: %v", driver.Name, err) + } + + // Verify all pods to be scheduled. + for _, pod := range pods { + if err := waitForPodToScheduleWithTimeout(cs, pod, wait.ForeverTestTimeout); err != nil { + t.Fatalf("Failed to wait for Pod %v to be schedulable: %v", pod.Name, err) + } + } + + // Lastly verify the pods activation logic is really called. + if jobPlugin.podsActivated == false { + t.Errorf("JobPlugin's pods activation logic is not called") + } +} + func initTestSchedulerForFrameworkTest(t *testing.T, testCtx *testutils.TestContext, nodeCount int, opts ...scheduler.Option) *testutils.TestContext { testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, nil, opts...) testutils.SyncInformerFactory(testCtx)