Merge pull request #79313 from chenchun/fix-flaky

Add Bind extension point to the scheduling framework
This commit is contained in:
Kubernetes Prow Robot
2019-07-15 18:31:38 -07:00
committed by GitHub
5 changed files with 333 additions and 19 deletions

View File

@@ -39,6 +39,7 @@ type framework struct {
prefilterPlugins []PrefilterPlugin
reservePlugins []ReservePlugin
prebindPlugins []PrebindPlugin
bindPlugins []BindPlugin
postbindPlugins []PostbindPlugin
unreservePlugins []UnreservePlugin
permitPlugins []PermitPlugin
@@ -128,6 +129,20 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
}
}
if plugins.Bind != nil {
for _, pb := range plugins.Bind.Enabled {
if pg, ok := f.plugins[pb.Name]; ok {
p, ok := pg.(BindPlugin)
if !ok {
return nil, fmt.Errorf("plugin %v does not extend bind plugin", pb.Name)
}
f.bindPlugins = append(f.bindPlugins, p)
} else {
return nil, fmt.Errorf("bind plugin %v does not exist", pb.Name)
}
}
}
if plugins.PostBind != nil {
for _, pb := range plugins.PostBind.Enabled {
if pg, ok := f.plugins[pb.Name]; ok {
@@ -243,6 +258,27 @@ func (f *framework) RunPrebindPlugins(
return nil
}
// RunBindPlugins runs the set of configured bind plugins until one returns a non `Skip` status.
func (f *framework) RunBindPlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status {
if len(f.bindPlugins) == 0 {
return NewStatus(Skip, "")
}
var status *Status
for _, bp := range f.bindPlugins {
status = bp.Bind(pc, pod, nodeName)
if status != nil && status.Code() == Skip {
continue
}
if !status.IsSuccess() {
msg := fmt.Sprintf("bind plugin %v failed to bind pod %v/%v: %v", bp.Name(), pod.Namespace, pod.Name, status.Message())
klog.Error(msg)
return NewStatus(Error, msg)
}
return status
}
return status
}
// RunPostbindPlugins runs the set of configured postbind plugins.
func (f *framework) RunPostbindPlugins(
pc *PluginContext, pod *v1.Pod, nodeName string) {

View File

@@ -42,6 +42,8 @@ const (
Unschedulable
// Wait is used when a permit plugin finds a pod scheduling should wait.
Wait
// Skip is used when a bind plugin chooses to skip binding.
Skip
)
// Status indicates the result of running a plugin. It consists of a code and a
@@ -192,6 +194,19 @@ type PermitPlugin interface {
Permit(pc *PluginContext, p *v1.Pod, nodeName string) (*Status, time.Duration)
}
// BindPlugin is an interface that must be implemented by "bind" plugins. Bind
// plugins are used to bind a pod to a Node.
type BindPlugin interface {
Plugin
// Bind plugins will not be called until all pre-bind plugins have completed. Each
// bind plugin is called in the configured order. A bind plugin may choose whether
// or not to handle the given Pod. If a bind plugin chooses to handle a Pod, the
// remaining bind plugins are skipped. When a bind plugin does not handle a pod,
// it must return Skip in its Status code. If a bind plugin returns an Error, the
// pod is rejected and will not be bound.
Bind(pc *PluginContext, p *v1.Pod, nodeName string) *Status
}
// Framework manages the set of plugins in use by the scheduling framework.
// Configured plugins are called at specified points in a scheduling context.
type Framework interface {
@@ -231,6 +246,13 @@ type Framework interface {
// Note that if multiple plugins asked to wait, then we wait for the minimum
// timeout duration.
RunPermitPlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status
// RunBindPlugins runs the set of configured bind plugins. A bind plugin may choose
// whether or not to handle the given Pod. If a bind plugin chooses to skip the
// binding, it should return code=4("skip") status. Otherwise, it should return "Error"
// or "Success". If none of the plugins handled binding, RunBindPlugins returns
// code=4("skip") status.
RunBindPlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status
}
// FrameworkHandle provides data and some tools that plugins can use. It is

View File

@@ -175,6 +175,10 @@ func (*fakeFramework) RunPrebindPlugins(pc *framework.PluginContext, pod *v1.Pod
return nil
}
func (*fakeFramework) RunBindPlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
return nil
}
func (*fakeFramework) RunPostbindPlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) {}
func (*fakeFramework) RunReservePlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {

View File

@@ -415,11 +415,26 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
// bind binds a pod to a given node defined in a binding object. We expect this to run asynchronously, so we
// handle binding metrics internally.
func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error {
func (sched *Scheduler) bind(assumed *v1.Pod, targetNode string, pluginContext *framework.PluginContext) error {
bindingStart := time.Now()
// If binding succeeded then PodScheduled condition will be updated in apiserver so that
// it's atomic with setting host.
err := sched.config.GetBinder(assumed).Bind(b)
bindStatus := sched.config.Framework.RunBindPlugins(pluginContext, assumed, targetNode)
var err error
if !bindStatus.IsSuccess() {
if bindStatus.Code() == framework.Skip {
// All bind plugins chose to skip binding of this pod, call original binding function.
// If binding succeeds then PodScheduled condition will be updated in apiserver so that
// it's atomic with setting host.
err = sched.config.GetBinder(assumed).Bind(&v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumed.Namespace, Name: assumed.Name, UID: assumed.UID},
Target: v1.ObjectReference{
Kind: "Node",
Name: targetNode,
},
})
} else {
err = fmt.Errorf("Bind failure, code: %d: %v", bindStatus.Code(), bindStatus.Message())
}
}
if finErr := sched.config.SchedulerCache.FinishBinding(assumed); finErr != nil {
klog.Errorf("scheduler cache FinishBinding failed: %v", finErr)
}
@@ -437,7 +452,7 @@ func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error {
metrics.DeprecatedBindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart))
metrics.SchedulingLatency.WithLabelValues(metrics.Binding).Observe(metrics.SinceInSeconds(bindingStart))
metrics.DeprecatedSchedulingLatency.WithLabelValues(metrics.Binding).Observe(metrics.SinceInSeconds(bindingStart))
sched.config.Recorder.Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, b.Target.Name)
sched.config.Recorder.Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode)
return nil
}
@@ -578,13 +593,7 @@ func (sched *Scheduler) scheduleOne() {
return
}
err := sched.bind(assumedPod, &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
Target: v1.ObjectReference{
Kind: "Node",
Name: scheduleResult.SuggestedHost,
},
})
err := sched.bind(assumedPod, scheduleResult.SuggestedHost, pluginContext)
metrics.E2eSchedulingLatency.Observe(metrics.SinceInSeconds(start))
metrics.DeprecatedE2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
if err != nil {

View File

@@ -21,9 +21,11 @@ import (
"testing"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)
@@ -45,12 +47,24 @@ type PrebindPlugin struct {
rejectPrebind bool
}
type BindPlugin struct {
numBindCalled int
PluginName string
bindStatus *framework.Status
client *clientset.Clientset
pluginInvokeEventChan chan pluginInvokeEvent
}
type PostbindPlugin struct {
numPostbindCalled int
name string
numPostbindCalled int
pluginInvokeEventChan chan pluginInvokeEvent
}
type UnreservePlugin struct {
numUnreserveCalled int
name string
numUnreserveCalled int
pluginInvokeEventChan chan pluginInvokeEvent
}
type PermitPlugin struct {
@@ -75,6 +89,7 @@ const (
var _ = framework.PrefilterPlugin(&PrefilterPlugin{})
var _ = framework.ReservePlugin(&ReservePlugin{})
var _ = framework.PrebindPlugin(&PrebindPlugin{})
var _ = framework.BindPlugin(&BindPlugin{})
var _ = framework.PostbindPlugin(&PostbindPlugin{})
var _ = framework.UnreservePlugin(&UnreservePlugin{})
var _ = framework.PermitPlugin(&PermitPlugin{})
@@ -132,21 +147,54 @@ func (pp *PrebindPlugin) reset() {
pp.rejectPrebind = false
}
const bindPluginAnnotation = "bindPluginName"
// NewPrebindPlugin is the factory for prebind plugin.
func NewPrebindPlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return pbdPlugin, nil
}
var ptbdPlugin = &PostbindPlugin{}
func (bp *BindPlugin) Name() string {
return bp.PluginName
}
func (bp *BindPlugin) Bind(pc *framework.PluginContext, p *v1.Pod, nodeName string) *framework.Status {
bp.numBindCalled++
if bp.pluginInvokeEventChan != nil {
bp.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: bp.Name(), val: bp.numBindCalled}
}
if bp.bindStatus.IsSuccess() {
if err := bp.client.CoreV1().Pods(p.Namespace).Bind(&v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID, Annotations: map[string]string{bindPluginAnnotation: bp.Name()}},
Target: v1.ObjectReference{
Kind: "Node",
Name: nodeName,
},
}); err != nil {
return framework.NewStatus(framework.Error, fmt.Sprintf("bind failed: %v", err))
}
}
return bp.bindStatus
}
// reset used to reset numBindCalled.
func (bp *BindPlugin) reset() {
bp.numBindCalled = 0
}
var ptbdPlugin = &PostbindPlugin{name: postbindPluginName}
// Name returns name of the plugin.
func (pp *PostbindPlugin) Name() string {
return postbindPluginName
return pp.name
}
// Postbind is a test function, which counts the number of times called.
func (pp *PostbindPlugin) Postbind(pc *framework.PluginContext, pod *v1.Pod, nodeName string) {
pp.numPostbindCalled++
if pp.pluginInvokeEventChan != nil {
pp.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: pp.Name(), val: pp.numPostbindCalled}
}
}
// reset used to reset postbind plugin.
@@ -190,17 +238,20 @@ func NewPrefilterPlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framew
return pfPlugin, nil
}
var unresPlugin = &UnreservePlugin{}
var unresPlugin = &UnreservePlugin{name: unreservePluginName}
// Name returns name of the plugin.
func (up *UnreservePlugin) Name() string {
return unreservePluginName
return up.name
}
// Unreserve is a test function that returns an error or nil, depending on the
// value of "failUnreserve".
func (up *UnreservePlugin) Unreserve(pc *framework.PluginContext, pod *v1.Pod, nodeName string) {
up.numUnreserveCalled++
if up.pluginInvokeEventChan != nil {
up.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: up.Name(), val: up.numUnreserveCalled}
}
}
// reset used to reset numUnreserveCalled.
@@ -616,6 +667,198 @@ func TestUnreservePlugin(t *testing.T) {
}
}
type pluginInvokeEvent struct {
pluginName string
val int
}
// TestBindPlugin tests invocation of bind plugins.
func TestBindPlugin(t *testing.T) {
testContext := initTestMaster(t, "bind-plugin", nil)
bindPlugin1 := &BindPlugin{PluginName: "bind-plugin-1", client: testContext.clientSet}
bindPlugin2 := &BindPlugin{PluginName: "bind-plugin-2", client: testContext.clientSet}
unreservePlugin := &UnreservePlugin{name: "mock-unreserve-plugin"}
postbindPlugin := &PostbindPlugin{name: "mock-post-bind-plugin"}
// Create a plugin registry for testing. Register an unreserve, a bind plugin and a postBind plugin.
registry := framework.Registry{
unreservePlugin.Name(): func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return unreservePlugin, nil
},
bindPlugin1.Name(): func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return bindPlugin1, nil
},
bindPlugin2.Name(): func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return bindPlugin2, nil
},
postbindPlugin.Name(): func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return postbindPlugin, nil
},
}
// Setup initial unreserve and bind plugins for testing.
plugins := &schedulerconfig.Plugins{
Unreserve: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{{Name: unreservePlugin.Name()}},
},
Bind: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{{Name: bindPlugin1.Name()}, {Name: bindPlugin2.Name()}},
},
PostBind: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{{Name: postbindPlugin.Name()}},
},
}
// Set reserve and bind config for testing
pluginConfig := []schedulerconfig.PluginConfig{
{
Name: unreservePlugin.Name(),
Args: runtime.Unknown{},
},
{
Name: bindPlugin1.Name(),
Args: runtime.Unknown{},
},
{
Name: bindPlugin2.Name(),
Args: runtime.Unknown{},
},
{
Name: postbindPlugin.Name(),
Args: runtime.Unknown{},
},
}
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerWithOptions(t, testContext,
false, nil, registry, plugins, pluginConfig, false, time.Second)
defer cleanupTest(t, context)
cs := context.clientSet
// Add a few nodes.
_, err := createNodes(cs, "test-node", nil, 2)
if err != nil {
t.Fatalf("Cannot create nodes: %v", err)
}
tests := []struct {
bindPluginStatuses []*framework.Status
expectBoundByScheduler bool // true means this test case expecting scheduler would bind pods
expectBoundByPlugin bool // true means this test case expecting a plugin would bind pods
expectBindPluginName string // expecting plugin name to bind pods
expectInvokeEvents []pluginInvokeEvent
}{
// bind plugins skipped to bind the pod and scheduler bond the pod
{
bindPluginStatuses: []*framework.Status{framework.NewStatus(framework.Skip, ""), framework.NewStatus(framework.Skip, "")},
expectBoundByScheduler: true,
expectInvokeEvents: []pluginInvokeEvent{{pluginName: bindPlugin1.Name(), val: 1}, {pluginName: bindPlugin2.Name(), val: 1}, {pluginName: postbindPlugin.Name(), val: 1}},
},
// bindplugin2 succeeded to bind the pod
{
bindPluginStatuses: []*framework.Status{framework.NewStatus(framework.Skip, ""), framework.NewStatus(framework.Success, "")},
expectBoundByPlugin: true,
expectBindPluginName: bindPlugin2.Name(),
expectInvokeEvents: []pluginInvokeEvent{{pluginName: bindPlugin1.Name(), val: 1}, {pluginName: bindPlugin2.Name(), val: 1}, {pluginName: postbindPlugin.Name(), val: 1}},
},
// bindplugin1 succeeded to bind the pod
{
bindPluginStatuses: []*framework.Status{framework.NewStatus(framework.Success, ""), framework.NewStatus(framework.Success, "")},
expectBoundByPlugin: true,
expectBindPluginName: bindPlugin1.Name(),
expectInvokeEvents: []pluginInvokeEvent{{pluginName: bindPlugin1.Name(), val: 1}, {pluginName: postbindPlugin.Name(), val: 1}},
},
// bind plugin fails to bind the pod
{
bindPluginStatuses: []*framework.Status{framework.NewStatus(framework.Error, "failed to bind"), framework.NewStatus(framework.Success, "")},
expectInvokeEvents: []pluginInvokeEvent{{pluginName: bindPlugin1.Name(), val: 1}, {pluginName: unreservePlugin.Name(), val: 1}, {pluginName: bindPlugin1.Name(), val: 2}, {pluginName: unreservePlugin.Name(), val: 2}},
},
}
var pluginInvokeEventChan chan pluginInvokeEvent
for i, test := range tests {
bindPlugin1.bindStatus = test.bindPluginStatuses[0]
bindPlugin2.bindStatus = test.bindPluginStatuses[1]
pluginInvokeEventChan = make(chan pluginInvokeEvent, 10)
bindPlugin1.pluginInvokeEventChan = pluginInvokeEventChan
bindPlugin2.pluginInvokeEventChan = pluginInvokeEventChan
unreservePlugin.pluginInvokeEventChan = pluginInvokeEventChan
postbindPlugin.pluginInvokeEventChan = pluginInvokeEventChan
// Create a best effort pod.
pod, err := createPausePod(cs,
initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
if test.expectBoundByScheduler || test.expectBoundByPlugin {
// bind plugins skipped to bind the pod
if err = waitForPodToSchedule(cs, pod); err != nil {
t.Errorf("test #%v: Expected the pod to be scheduled. error: %v", i, err)
continue
}
pod, err = cs.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
if err != nil {
t.Errorf("can't get pod: %v", err)
}
if test.expectBoundByScheduler {
if pod.Annotations[bindPluginAnnotation] != "" {
t.Errorf("test #%v: Expected the pod to be binded by scheduler instead of by bindplugin %s", i, pod.Annotations[bindPluginAnnotation])
}
if bindPlugin1.numBindCalled != 1 || bindPlugin2.numBindCalled != 1 {
t.Errorf("test #%v: Expected each bind plugin to be called once, was called %d and %d times.", i, bindPlugin1.numBindCalled, bindPlugin2.numBindCalled)
}
} else {
if pod.Annotations[bindPluginAnnotation] != test.expectBindPluginName {
t.Errorf("test #%v: Expected the pod to be binded by bindplugin %s instead of by bindplugin %s", i, test.expectBindPluginName, pod.Annotations[bindPluginAnnotation])
}
if bindPlugin1.numBindCalled != 1 {
t.Errorf("test #%v: Expected %s to be called once, was called %d times.", i, bindPlugin1.Name(), bindPlugin1.numBindCalled)
}
if test.expectBindPluginName == bindPlugin1.Name() && bindPlugin2.numBindCalled > 0 {
// expect bindplugin1 succeeded to bind the pod and bindplugin2 should not be called.
t.Errorf("test #%v: Expected %s not to be called, was called %d times.", i, bindPlugin2.Name(), bindPlugin1.numBindCalled)
}
}
if err = wait.Poll(10*time.Millisecond, 30*time.Second, func() (done bool, err error) {
return postbindPlugin.numPostbindCalled == 1, nil
}); err != nil {
t.Errorf("test #%v: Expected the postbind plugin to be called once, was called %d times.", i, postbindPlugin.numPostbindCalled)
}
if unreservePlugin.numUnreserveCalled != 0 {
t.Errorf("test #%v: Expected the unreserve plugin not to be called, was called %d times.", i, unreservePlugin.numUnreserveCalled)
}
} else {
// bind plugin fails to bind the pod
if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(cs, pod.Namespace, pod.Name)); err != nil {
t.Errorf("test #%v: Expected a scheduling error, but didn't get it. error: %v", i, err)
}
if postbindPlugin.numPostbindCalled > 0 {
t.Errorf("test #%v: Didn't expected the postbind plugin to be called %d times.", i, postbindPlugin.numPostbindCalled)
}
}
for j := range test.expectInvokeEvents {
expectEvent := test.expectInvokeEvents[j]
select {
case event := <-pluginInvokeEventChan:
if event.pluginName != expectEvent.pluginName {
t.Errorf("test #%v: Expect invoke event %d from plugin %s instead of %s", i, j, expectEvent.pluginName, event.pluginName)
}
if event.val != expectEvent.val {
t.Errorf("test #%v: Expect val of invoke event %d to be %d instead of %d", i, j, expectEvent.val, event.val)
}
case <-time.After(time.Second * 30):
t.Errorf("test #%v: Waiting for invoke event %d timeout.", i, j)
}
}
postbindPlugin.reset()
bindPlugin1.reset()
bindPlugin2.reset()
unreservePlugin.reset()
cleanupPods(cs, t, []*v1.Pod{pod})
}
}
// TestPostbindPlugin tests invocation of postbind plugins.
func TestPostbindPlugin(t *testing.T) {
// Create a plugin registry for testing. Register a prebind and a postbind plugin.