Fix flaky test TestBindPlugin

This commit is contained in:
Chun Chen 2019-07-02 13:13:47 +08:00 committed by Chun Chen
parent 3bb1a081ab
commit e04f03d3cb
3 changed files with 98 additions and 48 deletions

View File

@ -201,7 +201,9 @@ type BindPlugin interface {
// Bind plugins will not be called until all pre-bind plugins have completed. Each // Bind plugins will not be called until all pre-bind plugins have completed. Each
// bind plugin is called in the configured order. A bind plugin may choose whether // bind plugin is called in the configured order. A bind plugin may choose whether
// or not to handle the given Pod. If a bind plugin chooses to handle a Pod, the // or not to handle the given Pod. If a bind plugin chooses to handle a Pod, the
// remaining bind plugins are skipped. // remaining bind plugins are skipped. When a bind plugin does not handle a pod,
// it must return Skip in its Status code. If a bind plugin returns an Error, the
// pod is rejected and will not be bound.
Bind(pc *PluginContext, p *v1.Pod, nodeName string) *Status Bind(pc *PluginContext, p *v1.Pod, nodeName string) *Status
} }

View File

@ -419,20 +419,21 @@ func (sched *Scheduler) bind(assumed *v1.Pod, targetNode string, pluginContext *
bindingStart := time.Now() bindingStart := time.Now()
bindStatus := sched.config.Framework.RunBindPlugins(pluginContext, assumed, targetNode) bindStatus := sched.config.Framework.RunBindPlugins(pluginContext, assumed, targetNode)
var err error var err error
if bindStatus != nil && bindStatus.Code() == framework.Skip { if !bindStatus.IsSuccess() {
// All bind plugins chooses to skip binding of this pod, call original binding func. if bindStatus.Code() == framework.Skip {
// All bind plugins chose to skip binding of this pod, call original binding function.
// If binding succeeded then PodScheduled condition will be updated in apiserver so that // If binding succeeds then PodScheduled condition will be updated in apiserver so that
// it's atomic with setting host. // it's atomic with setting host.
err = sched.config.GetBinder(assumed).Bind(&v1.Binding{ err = sched.config.GetBinder(assumed).Bind(&v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumed.Namespace, Name: assumed.Name, UID: assumed.UID}, ObjectMeta: metav1.ObjectMeta{Namespace: assumed.Namespace, Name: assumed.Name, UID: assumed.UID},
Target: v1.ObjectReference{ Target: v1.ObjectReference{
Kind: "Node", Kind: "Node",
Name: targetNode, Name: targetNode,
}, },
}) })
} else if !bindStatus.IsSuccess() { } else {
err = fmt.Errorf("scheduler RunBindPlugins failed for pod %v/%v: code %d, err %v", assumed.Namespace, assumed.Name, bindStatus.Code(), err) err = fmt.Errorf("Bind failure, code: %d: %v", bindStatus.Code(), bindStatus.Message())
}
} }
if finErr := sched.config.SchedulerCache.FinishBinding(assumed); finErr != nil { if finErr := sched.config.SchedulerCache.FinishBinding(assumed); finErr != nil {
klog.Errorf("scheduler cache FinishBinding failed: %v", finErr) klog.Errorf("scheduler cache FinishBinding failed: %v", finErr)

View File

@ -21,7 +21,7 @@ import (
"testing" "testing"
"time" "time"
v1 "k8s.io/api/core/v1" "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
@ -48,18 +48,23 @@ type PrebindPlugin struct {
} }
type BindPlugin struct { type BindPlugin struct {
numBindCalled int numBindCalled int
PluginName string PluginName string
bindStatus *framework.Status bindStatus *framework.Status
client *clientset.Clientset client *clientset.Clientset
pluginInvokeEventChan chan pluginInvokeEvent
} }
type PostbindPlugin struct { type PostbindPlugin struct {
numPostbindCalled int name string
numPostbindCalled int
pluginInvokeEventChan chan pluginInvokeEvent
} }
type UnreservePlugin struct { type UnreservePlugin struct {
numUnreserveCalled int name string
numUnreserveCalled int
pluginInvokeEventChan chan pluginInvokeEvent
} }
type PermitPlugin struct { type PermitPlugin struct {
@ -155,6 +160,9 @@ func (bp *BindPlugin) Name() string {
func (bp *BindPlugin) Bind(pc *framework.PluginContext, p *v1.Pod, nodeName string) *framework.Status { func (bp *BindPlugin) Bind(pc *framework.PluginContext, p *v1.Pod, nodeName string) *framework.Status {
bp.numBindCalled++ bp.numBindCalled++
if bp.pluginInvokeEventChan != nil {
bp.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: bp.Name(), val: bp.numBindCalled}
}
if bp.bindStatus.IsSuccess() { if bp.bindStatus.IsSuccess() {
if err := bp.client.CoreV1().Pods(p.Namespace).Bind(&v1.Binding{ if err := bp.client.CoreV1().Pods(p.Namespace).Bind(&v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID, Annotations: map[string]string{bindPluginAnnotation: bp.Name()}}, ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID, Annotations: map[string]string{bindPluginAnnotation: bp.Name()}},
@ -174,16 +182,19 @@ func (bp *BindPlugin) reset() {
bp.numBindCalled = 0 bp.numBindCalled = 0
} }
var ptbdPlugin = &PostbindPlugin{} var ptbdPlugin = &PostbindPlugin{name: postbindPluginName}
// Name returns name of the plugin. // Name returns name of the plugin.
func (pp *PostbindPlugin) Name() string { func (pp *PostbindPlugin) Name() string {
return postbindPluginName return pp.name
} }
// Postbind is a test function, which counts the number of times called. // Postbind is a test function, which counts the number of times called.
func (pp *PostbindPlugin) Postbind(pc *framework.PluginContext, pod *v1.Pod, nodeName string) { func (pp *PostbindPlugin) Postbind(pc *framework.PluginContext, pod *v1.Pod, nodeName string) {
pp.numPostbindCalled++ pp.numPostbindCalled++
if pp.pluginInvokeEventChan != nil {
pp.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: pp.Name(), val: pp.numPostbindCalled}
}
} }
// reset used to reset postbind plugin. // reset used to reset postbind plugin.
@ -227,17 +238,20 @@ func NewPrefilterPlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framew
return pfPlugin, nil return pfPlugin, nil
} }
var unresPlugin = &UnreservePlugin{} var unresPlugin = &UnreservePlugin{name: unreservePluginName}
// Name returns name of the plugin. // Name returns name of the plugin.
func (up *UnreservePlugin) Name() string { func (up *UnreservePlugin) Name() string {
return unreservePluginName return up.name
} }
// Unreserve is a test function that returns an error or nil, depending on the // Unreserve is a test function that returns an error or nil, depending on the
// value of "failUnreserve". // value of "failUnreserve".
func (up *UnreservePlugin) Unreserve(pc *framework.PluginContext, pod *v1.Pod, nodeName string) { func (up *UnreservePlugin) Unreserve(pc *framework.PluginContext, pod *v1.Pod, nodeName string) {
up.numUnreserveCalled++ up.numUnreserveCalled++
if up.pluginInvokeEventChan != nil {
up.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: up.Name(), val: up.numUnreserveCalled}
}
} }
// reset used to reset numUnreserveCalled. // reset used to reset numUnreserveCalled.
@ -653,38 +667,50 @@ func TestUnreservePlugin(t *testing.T) {
} }
} }
type pluginInvokeEvent struct {
pluginName string
val int
}
// TestBindPlugin tests invocation of bind plugins. // TestBindPlugin tests invocation of bind plugins.
func TestBindPlugin(t *testing.T) { func TestBindPlugin(t *testing.T) {
testContext := initTestMaster(t, "bind-plugin", nil) testContext := initTestMaster(t, "bind-plugin", nil)
bindPlugin1, bindPlugin2 := &BindPlugin{PluginName: "bind-plugin-1", client: testContext.clientSet}, &BindPlugin{PluginName: "bind-plugin-2", client: testContext.clientSet} bindPlugin1 := &BindPlugin{PluginName: "bind-plugin-1", client: testContext.clientSet}
bindPlugin2 := &BindPlugin{PluginName: "bind-plugin-2", client: testContext.clientSet}
unreservePlugin := &UnreservePlugin{name: "mock-unreserve-plugin"}
postbindPlugin := &PostbindPlugin{name: "mock-post-bind-plugin"}
// Create a plugin registry for testing. Register an unreserve, a bind plugin and a postBind plugin. // Create a plugin registry for testing. Register an unreserve, a bind plugin and a postBind plugin.
registry := framework.Registry{ registry := framework.Registry{
unreservePluginName: NewUnreservePlugin, unreservePlugin.Name(): func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return unreservePlugin, nil
},
bindPlugin1.Name(): func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) { bindPlugin1.Name(): func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return bindPlugin1, nil return bindPlugin1, nil
}, },
bindPlugin2.Name(): func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) { bindPlugin2.Name(): func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return bindPlugin2, nil return bindPlugin2, nil
}, },
postbindPluginName: NewPostbindPlugin, postbindPlugin.Name(): func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return postbindPlugin, nil
},
} }
// Setup initial unreserve and bind plugins for testing. // Setup initial unreserve and bind plugins for testing.
plugins := &schedulerconfig.Plugins{ plugins := &schedulerconfig.Plugins{
Unreserve: &schedulerconfig.PluginSet{ Unreserve: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{{Name: unreservePluginName}}, Enabled: []schedulerconfig.Plugin{{Name: unreservePlugin.Name()}},
}, },
Bind: &schedulerconfig.PluginSet{ Bind: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{{Name: bindPlugin1.Name()}, {Name: bindPlugin2.Name()}}, Enabled: []schedulerconfig.Plugin{{Name: bindPlugin1.Name()}, {Name: bindPlugin2.Name()}},
}, },
PostBind: &schedulerconfig.PluginSet{ PostBind: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{{Name: postbindPluginName}}, Enabled: []schedulerconfig.Plugin{{Name: postbindPlugin.Name()}},
}, },
} }
// Set reserve and bind config for testing // Set reserve and bind config for testing
pluginConfig := []schedulerconfig.PluginConfig{ pluginConfig := []schedulerconfig.PluginConfig{
{ {
Name: unreservePluginName, Name: unreservePlugin.Name(),
Args: runtime.Unknown{}, Args: runtime.Unknown{},
}, },
{ {
@ -696,7 +722,7 @@ func TestBindPlugin(t *testing.T) {
Args: runtime.Unknown{}, Args: runtime.Unknown{},
}, },
{ {
Name: postbindPluginName, Name: postbindPlugin.Name(),
Args: runtime.Unknown{}, Args: runtime.Unknown{},
}, },
} }
@ -718,34 +744,46 @@ func TestBindPlugin(t *testing.T) {
expectBoundByScheduler bool // true means this test case expecting scheduler would bind pods expectBoundByScheduler bool // true means this test case expecting scheduler would bind pods
expectBoundByPlugin bool // true means this test case expecting a plugin would bind pods expectBoundByPlugin bool // true means this test case expecting a plugin would bind pods
expectBindPluginName string // expecting plugin name to bind pods expectBindPluginName string // expecting plugin name to bind pods
expectInvokeEvents []pluginInvokeEvent
}{ }{
// bind plugins skiped to bind the pod and scheduler binded the pod // bind plugins skipped to bind the pod and scheduler bond the pod
{ {
bindPluginStatuses: []*framework.Status{framework.NewStatus(framework.Skip, ""), framework.NewStatus(framework.Skip, "")}, bindPluginStatuses: []*framework.Status{framework.NewStatus(framework.Skip, ""), framework.NewStatus(framework.Skip, "")},
expectBoundByScheduler: true, expectBoundByScheduler: true,
expectInvokeEvents: []pluginInvokeEvent{{pluginName: bindPlugin1.Name(), val: 1}, {pluginName: bindPlugin2.Name(), val: 1}, {pluginName: postbindPlugin.Name(), val: 1}},
}, },
// bindplugin2 succeeded to bind the pod // bindplugin2 succeeded to bind the pod
{ {
bindPluginStatuses: []*framework.Status{framework.NewStatus(framework.Skip, ""), framework.NewStatus(framework.Success, "")}, bindPluginStatuses: []*framework.Status{framework.NewStatus(framework.Skip, ""), framework.NewStatus(framework.Success, "")},
expectBoundByPlugin: true, expectBoundByPlugin: true,
expectBindPluginName: bindPlugin2.Name(), expectBindPluginName: bindPlugin2.Name(),
expectInvokeEvents: []pluginInvokeEvent{{pluginName: bindPlugin1.Name(), val: 1}, {pluginName: bindPlugin2.Name(), val: 1}, {pluginName: postbindPlugin.Name(), val: 1}},
}, },
// bindplugin1 succeeded to bind the pod // bindplugin1 succeeded to bind the pod
{ {
bindPluginStatuses: []*framework.Status{framework.NewStatus(framework.Success, ""), framework.NewStatus(framework.Success, "")}, bindPluginStatuses: []*framework.Status{framework.NewStatus(framework.Success, ""), framework.NewStatus(framework.Success, "")},
expectBoundByPlugin: true, expectBoundByPlugin: true,
expectBindPluginName: bindPlugin1.Name(), expectBindPluginName: bindPlugin1.Name(),
expectInvokeEvents: []pluginInvokeEvent{{pluginName: bindPlugin1.Name(), val: 1}, {pluginName: postbindPlugin.Name(), val: 1}},
}, },
// bind plugin fails to bind the pod // bind plugin fails to bind the pod
{ {
bindPluginStatuses: []*framework.Status{framework.NewStatus(framework.Error, "failed to bind"), framework.NewStatus(framework.Success, "")}, bindPluginStatuses: []*framework.Status{framework.NewStatus(framework.Error, "failed to bind"), framework.NewStatus(framework.Success, "")},
expectInvokeEvents: []pluginInvokeEvent{{pluginName: bindPlugin1.Name(), val: 1}, {pluginName: unreservePlugin.Name(), val: 1}, {pluginName: bindPlugin1.Name(), val: 2}, {pluginName: unreservePlugin.Name(), val: 2}},
}, },
} }
var pluginInvokeEventChan chan pluginInvokeEvent
for i, test := range tests { for i, test := range tests {
bindPlugin1.bindStatus = test.bindPluginStatuses[0] bindPlugin1.bindStatus = test.bindPluginStatuses[0]
bindPlugin2.bindStatus = test.bindPluginStatuses[1] bindPlugin2.bindStatus = test.bindPluginStatuses[1]
pluginInvokeEventChan = make(chan pluginInvokeEvent, 10)
bindPlugin1.pluginInvokeEventChan = pluginInvokeEventChan
bindPlugin2.pluginInvokeEventChan = pluginInvokeEventChan
unreservePlugin.pluginInvokeEventChan = pluginInvokeEventChan
postbindPlugin.pluginInvokeEventChan = pluginInvokeEventChan
// Create a best effort pod. // Create a best effort pod.
pod, err := createPausePod(cs, pod, err := createPausePod(cs,
initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name})) initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name}))
@ -754,7 +792,7 @@ func TestBindPlugin(t *testing.T) {
} }
if test.expectBoundByScheduler || test.expectBoundByPlugin { if test.expectBoundByScheduler || test.expectBoundByPlugin {
// bind plugins skiped to bind the pod // bind plugins skipped to bind the pod
if err = waitForPodToSchedule(cs, pod); err != nil { if err = waitForPodToSchedule(cs, pod); err != nil {
t.Errorf("test #%v: Expected the pod to be scheduled. error: %v", i, err) t.Errorf("test #%v: Expected the pod to be scheduled. error: %v", i, err)
continue continue
@ -783,31 +821,40 @@ func TestBindPlugin(t *testing.T) {
} }
} }
if err = wait.Poll(10*time.Millisecond, 30*time.Second, func() (done bool, err error) { if err = wait.Poll(10*time.Millisecond, 30*time.Second, func() (done bool, err error) {
return ptbdPlugin.numPostbindCalled == 1, nil return postbindPlugin.numPostbindCalled == 1, nil
}); err != nil { }); err != nil {
t.Errorf("test #%v: Expected the postbind plugin to be called once, was called %d times.", i, ptbdPlugin.numPostbindCalled) t.Errorf("test #%v: Expected the postbind plugin to be called once, was called %d times.", i, postbindPlugin.numPostbindCalled)
} }
if unresPlugin.numUnreserveCalled != 0 { if unreservePlugin.numUnreserveCalled != 0 {
t.Errorf("test #%v: Expected the unreserve plugin not to be called, was called %d times.", i, unresPlugin.numUnreserveCalled) t.Errorf("test #%v: Expected the unreserve plugin not to be called, was called %d times.", i, unreservePlugin.numUnreserveCalled)
} }
} else { } else {
// bind plugin fails to bind the pod // bind plugin fails to bind the pod
if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(cs, pod.Namespace, pod.Name)); err != nil { if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(cs, pod.Namespace, pod.Name)); err != nil {
t.Errorf("test #%v: Expected a scheduling error, but didn't get it. error: %v", i, err) t.Errorf("test #%v: Expected a scheduling error, but didn't get it. error: %v", i, err)
} }
if ptbdPlugin.numPostbindCalled > 0 { if postbindPlugin.numPostbindCalled > 0 {
t.Errorf("test #%v: Didn't expected the postbind plugin to be called %d times.", i, ptbdPlugin.numPostbindCalled) t.Errorf("test #%v: Didn't expected the postbind plugin to be called %d times.", i, postbindPlugin.numPostbindCalled)
}
if err = wait.Poll(10*time.Millisecond, 30*time.Second, func() (done bool, err error) {
return unresPlugin.numUnreserveCalled == 1, nil
}); err != nil {
t.Errorf("test #%v: Expected the unreserve plugin to be called once, was called %d times.", i, unresPlugin.numUnreserveCalled)
} }
} }
ptbdPlugin.reset() for j := range test.expectInvokeEvents {
expectEvent := test.expectInvokeEvents[j]
select {
case event := <-pluginInvokeEventChan:
if event.pluginName != expectEvent.pluginName {
t.Errorf("test #%v: Expect invoke event %d from plugin %s instead of %s", i, j, expectEvent.pluginName, event.pluginName)
}
if event.val != expectEvent.val {
t.Errorf("test #%v: Expect val of invoke event %d to be %d instead of %d", i, j, expectEvent.val, event.val)
}
case <-time.After(time.Second * 30):
t.Errorf("test #%v: Waiting for invoke event %d timeout.", i, j)
}
}
postbindPlugin.reset()
bindPlugin1.reset() bindPlugin1.reset()
bindPlugin2.reset() bindPlugin2.reset()
unresPlugin.reset() unreservePlugin.reset()
cleanupPods(cs, t, []*v1.Pod{pod}) cleanupPods(cs, t, []*v1.Pod{pod})
} }
} }