From b339c0a8bf75ff246440fbd84a3cf93b1f4418bd Mon Sep 17 00:00:00 2001 From: Chun Chen Date: Thu, 30 May 2019 09:45:55 +0800 Subject: [PATCH] Add Bind extension point of the scheduling framework --- pkg/scheduler/framework/v1alpha1/framework.go | 36 ++++ pkg/scheduler/framework/v1alpha1/interface.go | 20 ++ .../internal/queue/scheduling_queue_test.go | 4 + pkg/scheduler/scheduler.go | 32 +-- test/integration/scheduler/framework_test.go | 197 ++++++++++++++++++ 5 files changed, 277 insertions(+), 12 deletions(-) diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index bf73965d6e6..c79850ca59a 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -39,6 +39,7 @@ type framework struct { prefilterPlugins []PrefilterPlugin reservePlugins []ReservePlugin prebindPlugins []PrebindPlugin + bindPlugins []BindPlugin postbindPlugins []PostbindPlugin unreservePlugins []UnreservePlugin permitPlugins []PermitPlugin @@ -128,6 +129,20 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi } } + if plugins.Bind != nil { + for _, pb := range plugins.Bind.Enabled { + if pg, ok := f.plugins[pb.Name]; ok { + p, ok := pg.(BindPlugin) + if !ok { + return nil, fmt.Errorf("plugin %v does not extend bind plugin", pb.Name) + } + f.bindPlugins = append(f.bindPlugins, p) + } else { + return nil, fmt.Errorf("bind plugin %v does not exist", pb.Name) + } + } + } + if plugins.PostBind != nil { for _, pb := range plugins.PostBind.Enabled { if pg, ok := f.plugins[pb.Name]; ok { @@ -243,6 +258,27 @@ func (f *framework) RunPrebindPlugins( return nil } +// RunBindPlugins runs the set of configured bind plugins until one returns a non `Skip` status. +func (f *framework) RunBindPlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status { + if len(f.bindPlugins) == 0 { + return NewStatus(Skip, "") + } + var status *Status + for _, bp := range f.bindPlugins { + status = bp.Bind(pc, pod, nodeName) + if status != nil && status.Code() == Skip { + continue + } + if !status.IsSuccess() { + msg := fmt.Sprintf("bind plugin %v failed to bind pod %v/%v: %v", bp.Name(), pod.Namespace, pod.Name, status.Message()) + klog.Error(msg) + return NewStatus(Error, msg) + } + return status + } + return status +} + // RunPostbindPlugins runs the set of configured postbind plugins. func (f *framework) RunPostbindPlugins( pc *PluginContext, pod *v1.Pod, nodeName string) { diff --git a/pkg/scheduler/framework/v1alpha1/interface.go b/pkg/scheduler/framework/v1alpha1/interface.go index fa29340ea73..b5897cca39d 100644 --- a/pkg/scheduler/framework/v1alpha1/interface.go +++ b/pkg/scheduler/framework/v1alpha1/interface.go @@ -42,6 +42,8 @@ const ( Unschedulable // Wait is used when a permit plugin finds a pod scheduling should wait. Wait + // Skip is used when a bind plugin chooses to skip binding. + Skip ) // Status indicates the result of running a plugin. It consists of a code and a @@ -192,6 +194,17 @@ type PermitPlugin interface { Permit(pc *PluginContext, p *v1.Pod, nodeName string) (*Status, time.Duration) } +// BindPlugin is an interface that must be implemented by "bind" plugins. Bind +// plugins are used to bind a pod to a Node. +type BindPlugin interface { + Plugin + // Bind plugins will not be called until all pre-bind plugins have completed. Each + // bind plugin is called in the configured order. A bind plugin may choose whether + // or not to handle the given Pod. If a bind plugin chooses to handle a Pod, the + // remaining bind plugins are skipped. + Bind(pc *PluginContext, p *v1.Pod, nodeName string) *Status +} + // Framework manages the set of plugins in use by the scheduling framework. // Configured plugins are called at specified points in a scheduling context. type Framework interface { @@ -231,6 +244,13 @@ type Framework interface { // Note that if multiple plugins asked to wait, then we wait for the minimum // timeout duration. RunPermitPlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status + + // RunBindPlugins runs the set of configured bind plugins. A bind plugin may choose + // whether or not to handle the given Pod. If a bind plugin chooses to skip the + // binding, it should return code=4("skip") status. Otherwise, it should return "Error" + // or "Success". If none of the plugins handled binding, RunBindPlugins returns + // code=4("skip") status. + RunBindPlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status } // FrameworkHandle provides data and some tools that plugins can use. It is diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 4a7fdecab0f..2197a54f207 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -175,6 +175,10 @@ func (*fakeFramework) RunPrebindPlugins(pc *framework.PluginContext, pod *v1.Pod return nil } +func (*fakeFramework) RunBindPlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status { + return nil +} + func (*fakeFramework) RunPostbindPlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) {} func (*fakeFramework) RunReservePlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status { diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 8faa07c490e..43042014200 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -412,11 +412,25 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error { // bind binds a pod to a given node defined in a binding object. We expect this to run asynchronously, so we // handle binding metrics internally. -func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error { +func (sched *Scheduler) bind(assumed *v1.Pod, targetNode string, pluginContext *framework.PluginContext) error { bindingStart := time.Now() - // If binding succeeded then PodScheduled condition will be updated in apiserver so that - // it's atomic with setting host. - err := sched.config.GetBinder(assumed).Bind(b) + bindStatus := sched.config.Framework.RunBindPlugins(pluginContext, assumed, targetNode) + var err error + if bindStatus != nil && bindStatus.Code() == framework.Skip { + // All bind plugins chooses to skip binding of this pod, call original binding func. + + // If binding succeeded then PodScheduled condition will be updated in apiserver so that + // it's atomic with setting host. + err = sched.config.GetBinder(assumed).Bind(&v1.Binding{ + ObjectMeta: metav1.ObjectMeta{Namespace: assumed.Namespace, Name: assumed.Name, UID: assumed.UID}, + Target: v1.ObjectReference{ + Kind: "Node", + Name: targetNode, + }, + }) + } else if !bindStatus.IsSuccess() { + err = fmt.Errorf("scheduler RunBindPlugins failed for pod %v/%v: code %d, err %v", assumed.Namespace, assumed.Name, bindStatus.Code(), err) + } if finErr := sched.config.SchedulerCache.FinishBinding(assumed); finErr != nil { klog.Errorf("scheduler cache FinishBinding failed: %v", finErr) } @@ -434,7 +448,7 @@ func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error { metrics.DeprecatedBindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart)) metrics.SchedulingLatency.WithLabelValues(metrics.Binding).Observe(metrics.SinceInSeconds(bindingStart)) metrics.DeprecatedSchedulingLatency.WithLabelValues(metrics.Binding).Observe(metrics.SinceInSeconds(bindingStart)) - sched.config.Recorder.Eventf(assumed, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, b.Target.Name) + sched.config.Recorder.Eventf(assumed, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode) return nil } @@ -575,13 +589,7 @@ func (sched *Scheduler) scheduleOne() { return } - err := sched.bind(assumedPod, &v1.Binding{ - ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID}, - Target: v1.ObjectReference{ - Kind: "Node", - Name: scheduleResult.SuggestedHost, - }, - }) + err := sched.bind(assumedPod, scheduleResult.SuggestedHost, pluginContext) metrics.E2eSchedulingLatency.Observe(metrics.SinceInSeconds(start)) metrics.DeprecatedE2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start)) if err != nil { diff --git a/test/integration/scheduler/framework_test.go b/test/integration/scheduler/framework_test.go index 78d9d00eb57..cb557ef7f15 100644 --- a/test/integration/scheduler/framework_test.go +++ b/test/integration/scheduler/framework_test.go @@ -22,8 +22,10 @@ import ( "time" "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" + clientset "k8s.io/client-go/kubernetes" schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" ) @@ -34,6 +36,7 @@ type TesterPlugin struct { numPrefilterCalled int numReserveCalled int numPrebindCalled int + numBindCalled int numPostbindCalled int numUnreserveCalled int failPrefilter bool @@ -47,6 +50,7 @@ type TesterPlugin struct { timeoutPermit bool waitAndRejectPermit bool waitAndAllowPermit bool + bindStatus *framework.Status } type PrefilterPlugin struct { @@ -61,6 +65,12 @@ type PrebindPlugin struct { TesterPlugin } +type BindPlugin struct { + PluginName string + TesterPlugin + client *clientset.Clientset +} + type PostbindPlugin struct { TesterPlugin } @@ -86,6 +96,7 @@ const ( var _ = framework.PrefilterPlugin(&PrefilterPlugin{}) var _ = framework.ReservePlugin(&ReservePlugin{}) var _ = framework.PrebindPlugin(&PrebindPlugin{}) +var _ = framework.BindPlugin(&BindPlugin{}) var _ = framework.PostbindPlugin(&PostbindPlugin{}) var _ = framework.UnreservePlugin(&UnreservePlugin{}) var _ = framework.PermitPlugin(&PermitPlugin{}) @@ -136,11 +147,38 @@ func (pp *PrebindPlugin) reset() { pp.numPrebindCalled = 0 } +const bindPluginAnnotation = "bindPluginName" + // NewPrebindPlugin is the factory for prebind plugin. func NewPrebindPlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) { return pbdPlugin, nil } +func (bp *BindPlugin) Name() string { + return bp.PluginName +} + +func (bp *BindPlugin) Bind(pc *framework.PluginContext, p *v1.Pod, nodeName string) *framework.Status { + bp.numBindCalled++ + if bp.bindStatus.IsSuccess() { + if err := bp.client.CoreV1().Pods(p.Namespace).Bind(&v1.Binding{ + ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID, Annotations: map[string]string{bindPluginAnnotation: bp.Name()}}, + Target: v1.ObjectReference{ + Kind: "Node", + Name: nodeName, + }, + }); err != nil { + return framework.NewStatus(framework.Error, fmt.Sprintf("bind failed: %v", err)) + } + } + return bp.bindStatus +} + +// reset used to reset numBindCalled. +func (bp *BindPlugin) reset() { + bp.numBindCalled = 0 +} + var ptbdPlugin = &PostbindPlugin{} // Name returns name of the plugin. @@ -599,6 +637,165 @@ func TestUnreservePlugin(t *testing.T) { } } +// TestBindPlugin tests invocation of bind plugins. +func TestBindPlugin(t *testing.T) { + testContext := initTestMaster(t, "bind-plugin", nil) + bindPlugin1, bindPlugin2 := &BindPlugin{PluginName: "bind-plugin-1", client: testContext.clientSet}, &BindPlugin{PluginName: "bind-plugin-2", client: testContext.clientSet} + // Create a plugin registry for testing. Register an unreserve, a bind plugin and a postBind plugin. + registry := framework.Registry{ + unreservePluginName: NewUnreservePlugin, + bindPlugin1.Name(): func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) { + return bindPlugin1, nil + }, + bindPlugin2.Name(): func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) { + return bindPlugin2, nil + }, + postbindPluginName: NewPostbindPlugin, + } + + // Setup initial unreserve and bind plugins for testing. + plugins := &schedulerconfig.Plugins{ + Unreserve: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{{Name: unreservePluginName}}, + }, + Bind: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{{Name: bindPlugin1.Name()}, {Name: bindPlugin2.Name()}}, + }, + PostBind: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{{Name: postbindPluginName}}, + }, + } + // Set reserve and bind config for testing + pluginConfig := []schedulerconfig.PluginConfig{ + { + Name: unreservePluginName, + Args: runtime.Unknown{}, + }, + { + Name: bindPlugin1.Name(), + Args: runtime.Unknown{}, + }, + { + Name: bindPlugin2.Name(), + Args: runtime.Unknown{}, + }, + { + Name: postbindPluginName, + Args: runtime.Unknown{}, + }, + } + + // Create the master and the scheduler with the test plugin set. + context := initTestSchedulerWithOptions(t, testContext, + false, nil, registry, plugins, pluginConfig, false, time.Second) + defer cleanupTest(t, context) + + cs := context.clientSet + // Add a few nodes. + _, err := createNodes(cs, "test-node", nil, 2) + if err != nil { + t.Fatalf("Cannot create nodes: %v", err) + } + + tests := []struct { + bindPluginStatuses []*framework.Status + expectBoundByScheduler bool // true means this test case expecting scheduler would bind pods + expectBoundByPlugin bool // true means this test case expecting a plugin would bind pods + expectBindPluginName string // expecting plugin name to bind pods + }{ + // bind plugins skiped to bind the pod and scheduler binded the pod + { + bindPluginStatuses: []*framework.Status{framework.NewStatus(framework.Skip, ""), framework.NewStatus(framework.Skip, "")}, + expectBoundByScheduler: true, + }, + // bindplugin2 succeeded to bind the pod + { + bindPluginStatuses: []*framework.Status{framework.NewStatus(framework.Skip, ""), framework.NewStatus(framework.Success, "")}, + expectBoundByPlugin: true, + expectBindPluginName: bindPlugin2.Name(), + }, + // bindplugin1 succeeded to bind the pod + { + bindPluginStatuses: []*framework.Status{framework.NewStatus(framework.Success, ""), framework.NewStatus(framework.Success, "")}, + expectBoundByPlugin: true, + expectBindPluginName: bindPlugin1.Name(), + }, + // bind plugin fails to bind the pod + { + bindPluginStatuses: []*framework.Status{framework.NewStatus(framework.Error, "failed to bind"), framework.NewStatus(framework.Success, "")}, + }, + } + + for i, test := range tests { + bindPlugin1.bindStatus = test.bindPluginStatuses[0] + bindPlugin2.bindStatus = test.bindPluginStatuses[1] + + // Create a best effort pod. + pod, err := createPausePod(cs, + initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name})) + if err != nil { + t.Errorf("Error while creating a test pod: %v", err) + } + + if test.expectBoundByScheduler || test.expectBoundByPlugin { + // bind plugins skiped to bind the pod + if err = waitForPodToSchedule(cs, pod); err != nil { + t.Errorf("test #%v: Expected the pod to be scheduled. error: %v", i, err) + continue + } + pod, err = cs.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{}) + if err != nil { + t.Errorf("can't get pod: %v", err) + } + if test.expectBoundByScheduler { + if pod.Annotations[bindPluginAnnotation] != "" { + t.Errorf("test #%v: Expected the pod to be binded by scheduler instead of by bindplugin %s", i, pod.Annotations[bindPluginAnnotation]) + } + if bindPlugin1.numBindCalled != 1 || bindPlugin2.numBindCalled != 1 { + t.Errorf("test #%v: Expected each bind plugin to be called once, was called %d and %d times.", i, bindPlugin1.numBindCalled, bindPlugin2.numBindCalled) + } + } else { + if pod.Annotations[bindPluginAnnotation] != test.expectBindPluginName { + t.Errorf("test #%v: Expected the pod to be binded by bindplugin %s instead of by bindplugin %s", i, test.expectBindPluginName, pod.Annotations[bindPluginAnnotation]) + } + if bindPlugin1.numBindCalled != 1 { + t.Errorf("test #%v: Expected %s to be called once, was called %d times.", i, bindPlugin1.Name(), bindPlugin1.numBindCalled) + } + if test.expectBindPluginName == bindPlugin1.Name() && bindPlugin2.numBindCalled > 0 { + // expect bindplugin1 succeeded to bind the pod and bindplugin2 should not be called. + t.Errorf("test #%v: Expected %s not to be called, was called %d times.", i, bindPlugin2.Name(), bindPlugin1.numBindCalled) + } + } + if err = wait.Poll(10*time.Millisecond, 30*time.Second, func() (done bool, err error) { + return ptbdPlugin.numPostbindCalled == 1, nil + }); err != nil { + t.Errorf("test #%v: Expected the postbind plugin to be called once, was called %d times.", i, ptbdPlugin.numPostbindCalled) + } + if unresPlugin.numUnreserveCalled != 0 { + t.Errorf("test #%v: Expected the unreserve plugin not to be called, was called %d times.", i, unresPlugin.numUnreserveCalled) + } + } else { + // bind plugin fails to bind the pod + if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(cs, pod.Namespace, pod.Name)); err != nil { + t.Errorf("test #%v: Expected a scheduling error, but didn't get it. error: %v", i, err) + } + if ptbdPlugin.numPostbindCalled > 0 { + t.Errorf("test #%v: Didn't expected the postbind plugin to be called %d times.", i, ptbdPlugin.numPostbindCalled) + } + if err = wait.Poll(10*time.Millisecond, 30*time.Second, func() (done bool, err error) { + return unresPlugin.numUnreserveCalled == 1, nil + }); err != nil { + t.Errorf("test #%v: Expected the unreserve plugin to be called once, was called %d times.", i, unresPlugin.numUnreserveCalled) + } + } + ptbdPlugin.reset() + bindPlugin1.reset() + bindPlugin2.reset() + unresPlugin.reset() + cleanupPods(cs, t, []*v1.Pod{pod}) + } +} + // TestPostbindPlugin tests invocation of postbind plugins. func TestPostbindPlugin(t *testing.T) { // Create a plugin registry for testing. Register a prebind and a postbind plugin.