diff --git a/test/integration/scheduler/plugins/plugins_test.go b/test/integration/scheduler/plugins/plugins_test.go index 2ed0eb76264..b0651cf1d81 100644 --- a/test/integration/scheduler/plugins/plugins_test.go +++ b/test/integration/scheduler/plugins/plugins_test.go @@ -14,13 +14,15 @@ See the License for the specific language governing permissions and limitations under the License. */ +// Package plugins contains functional tests for scheduler plugin support. +// Beware that the plugins in this directory are not meant to be used in +// performance tests because they don't behave like real plugins. package plugins import ( "context" "fmt" "sync" - "sync/atomic" "testing" "time" @@ -65,7 +67,7 @@ var ( ) type PreEnqueuePlugin struct { - called int32 + called int admit bool } @@ -76,23 +78,62 @@ type PreFilterPlugin struct { } type ScorePlugin struct { + mutex sync.Mutex failScore bool - numScoreCalled int32 + numScoreCalled int highScoreNode string } +func (sp *ScorePlugin) deepCopy() *ScorePlugin { + sp.mutex.Lock() + defer sp.mutex.Unlock() + + return &ScorePlugin{ + failScore: sp.failScore, + numScoreCalled: sp.numScoreCalled, + highScoreNode: sp.highScoreNode, + } +} + type ScoreWithNormalizePlugin struct { + mutex sync.Mutex numScoreCalled int numNormalizeScoreCalled int } +func (sp *ScoreWithNormalizePlugin) deepCopy() *ScoreWithNormalizePlugin { + sp.mutex.Lock() + defer sp.mutex.Unlock() + + return &ScoreWithNormalizePlugin{ + numScoreCalled: sp.numScoreCalled, + numNormalizeScoreCalled: sp.numNormalizeScoreCalled, + } +} + type FilterPlugin struct { - numFilterCalled int32 + mutex sync.Mutex + numFilterCalled int failFilter bool rejectFilter bool numCalledPerPod map[string]int - sync.RWMutex +} + +func (fp *FilterPlugin) deepCopy() *FilterPlugin { + fp.mutex.Lock() + defer fp.mutex.Unlock() + + clone := &FilterPlugin{ + numFilterCalled: fp.numFilterCalled, + failFilter: fp.failFilter, + rejectFilter: fp.rejectFilter, + numCalledPerPod: make(map[string]int), + } + for pod, counter := range fp.numCalledPerPod { + clone.numCalledPerPod[pod] = counter + } + return clone } type PostFilterPlugin struct { @@ -118,6 +159,7 @@ type PreScorePlugin struct { } type PreBindPlugin struct { + mutex sync.Mutex numPreBindCalled int failPreBind bool rejectPreBind bool @@ -127,7 +169,34 @@ type PreBindPlugin struct { podUIDs map[types.UID]struct{} } +func (pp *PreBindPlugin) set(fail, reject, succeed bool) { + pp.mutex.Lock() + defer pp.mutex.Unlock() + + pp.failPreBind = fail + pp.rejectPreBind = reject + pp.succeedOnRetry = succeed +} + +func (pp *PreBindPlugin) deepCopy() *PreBindPlugin { + pp.mutex.Lock() + defer pp.mutex.Unlock() + + clone := &PreBindPlugin{ + numPreBindCalled: pp.numPreBindCalled, + failPreBind: pp.failPreBind, + rejectPreBind: pp.rejectPreBind, + succeedOnRetry: pp.succeedOnRetry, + podUIDs: make(map[types.UID]struct{}), + } + for uid := range pp.podUIDs { + clone.podUIDs[uid] = struct{}{} + } + return clone +} + type BindPlugin struct { + mutex sync.Mutex name string numBindCalled int bindStatus *framework.Status @@ -135,13 +204,39 @@ type BindPlugin struct { pluginInvokeEventChan chan pluginInvokeEvent } +func (bp *BindPlugin) deepCopy() *BindPlugin { + bp.mutex.Lock() + defer bp.mutex.Unlock() + + return &BindPlugin{ + name: bp.name, + numBindCalled: bp.numBindCalled, + bindStatus: bp.bindStatus, + client: bp.client, + pluginInvokeEventChan: bp.pluginInvokeEventChan, + } +} + type PostBindPlugin struct { + mutex sync.Mutex name string numPostBindCalled int pluginInvokeEventChan chan pluginInvokeEvent } +func (pp *PostBindPlugin) deepCopy() *PostBindPlugin { + pp.mutex.Lock() + defer pp.mutex.Unlock() + + return &PostBindPlugin{ + name: pp.name, + numPostBindCalled: pp.numPostBindCalled, + pluginInvokeEventChan: pp.pluginInvokeEventChan, + } +} + type PermitPlugin struct { + mutex sync.Mutex name string numPermitCalled int failPermit bool @@ -156,6 +251,26 @@ type PermitPlugin struct { fh framework.Handle } +func (pp *PermitPlugin) deepCopy() *PermitPlugin { + pp.mutex.Lock() + defer pp.mutex.Unlock() + + return &PermitPlugin{ + name: pp.name, + numPermitCalled: pp.numPermitCalled, + failPermit: pp.failPermit, + rejectPermit: pp.rejectPermit, + timeoutPermit: pp.timeoutPermit, + waitAndRejectPermit: pp.waitAndRejectPermit, + waitAndAllowPermit: pp.waitAndAllowPermit, + cancelled: pp.cancelled, + waitingPod: pp.waitingPod, + rejectingPod: pp.rejectingPod, + allowingPod: pp.allowingPod, + fh: pp.fh, + } +} + const ( enqueuePluginName = "enqueue-plugin" prefilterPluginName = "prefilter-plugin" @@ -216,13 +331,16 @@ func (sp *ScorePlugin) Name() string { // Score returns the score of scheduling a pod on a specific node. func (sp *ScorePlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) { - curCalled := atomic.AddInt32(&sp.numScoreCalled, 1) + sp.mutex.Lock() + defer sp.mutex.Unlock() + + sp.numScoreCalled++ if sp.failScore { return 0, framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", p.Name)) } score := int64(1) - if curCalled == 1 { + if sp.numScoreCalled == 1 { // The first node is scored the highest, the rest is scored lower. sp.highScoreNode = nodeName score = framework.MaxNodeScore @@ -241,6 +359,9 @@ func (sp *ScoreWithNormalizePlugin) Name() string { // Score returns the score of scheduling a pod on a specific node. func (sp *ScoreWithNormalizePlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) { + sp.mutex.Lock() + defer sp.mutex.Unlock() + sp.numScoreCalled++ score := int64(10) return score, nil @@ -263,12 +384,12 @@ func (fp *FilterPlugin) Name() string { // Filter is a test function that returns an error or nil, depending on the // value of "failFilter". func (fp *FilterPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { - atomic.AddInt32(&fp.numFilterCalled, 1) + fp.mutex.Lock() + defer fp.mutex.Unlock() + fp.numFilterCalled++ if fp.numCalledPerPod != nil { - fp.Lock() fp.numCalledPerPod[fmt.Sprintf("%v/%v", pod.Namespace, pod.Name)]++ - fp.Unlock() } if fp.failFilter { @@ -328,6 +449,9 @@ func (pp *PreBindPlugin) Name() string { // PreBind is a test function that returns (true, nil) or errors for testing. func (pp *PreBindPlugin) PreBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { + pp.mutex.Lock() + defer pp.mutex.Unlock() + pp.numPreBindCalled++ if _, tried := pp.podUIDs[pod.UID]; tried && pp.succeedOnRetry { return nil @@ -349,6 +473,9 @@ func (bp *BindPlugin) Name() string { } func (bp *BindPlugin) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status { + bp.mutex.Lock() + defer bp.mutex.Unlock() + bp.numBindCalled++ if bp.pluginInvokeEventChan != nil { bp.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: bp.Name(), val: bp.numBindCalled} @@ -374,6 +501,9 @@ func (pp *PostBindPlugin) Name() string { // PostBind is a test function, which counts the number of times called. func (pp *PostBindPlugin) PostBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) { + pp.mutex.Lock() + defer pp.mutex.Unlock() + pp.numPostBindCalled++ if pp.pluginInvokeEventChan != nil { pp.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: pp.Name(), val: pp.numPostBindCalled} @@ -443,6 +573,9 @@ func (pp *PermitPlugin) Name() string { // Permit implements the permit test plugin. func (pp *PermitPlugin) Permit(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) { + pp.mutex.Lock() + defer pp.mutex.Unlock() + pp.numPermitCalled++ if pp.failPermit { return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)), 0 @@ -454,6 +587,8 @@ func (pp *PermitPlugin) Permit(ctx context.Context, state *framework.CycleState, go func() { select { case <-ctx.Done(): + pp.mutex.Lock() + defer pp.mutex.Unlock() pp.cancelled = true } }() @@ -487,6 +622,8 @@ func (pp *PermitPlugin) allowAllPods() { // rejectAllPods rejects all waiting pods. func (pp *PermitPlugin) rejectAllPods() { + pp.mutex.Lock() + defer pp.mutex.Unlock() pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { wp.Reject(pp.name, "rejectAllPods") }) } @@ -559,18 +696,18 @@ func TestPreFilterPlugin(t *testing.T) { // TestPostFilterPlugin tests invocation of postFilter plugins. func TestPostFilterPlugin(t *testing.T) { - var numNodes int32 = 1 + numNodes := 1 tests := []struct { name string - numNodes int32 + numNodes int rejectFilter bool failScore bool rejectPostFilter bool rejectPostFilter2 bool breakPostFilter bool breakPostFilter2 bool - expectFilterNumCalled int32 - expectScoreNumCalled int32 + expectFilterNumCalled int + expectScoreNumCalled int expectPostFilterNumCalled int }{ { @@ -712,20 +849,20 @@ func TestPostFilterPlugin(t *testing.T) { t.Errorf("Didn't expect the pod to be scheduled.") } - if numFilterCalled := atomic.LoadInt32(&filterPlugin.numFilterCalled); numFilterCalled < tt.expectFilterNumCalled { + if numFilterCalled := filterPlugin.deepCopy().numFilterCalled; numFilterCalled < tt.expectFilterNumCalled { t.Errorf("Expected the filter plugin to be called at least %v times, but got %v.", tt.expectFilterNumCalled, numFilterCalled) } - if numScoreCalled := atomic.LoadInt32(&scorePlugin.numScoreCalled); numScoreCalled < tt.expectScoreNumCalled { + if numScoreCalled := scorePlugin.deepCopy().numScoreCalled; numScoreCalled < tt.expectScoreNumCalled { t.Errorf("Expected the score plugin to be called at least %v times, but got %v.", tt.expectScoreNumCalled, numScoreCalled) } } else { if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { t.Errorf("Expected the pod to be scheduled. error: %v", err) } - if numFilterCalled := atomic.LoadInt32(&filterPlugin.numFilterCalled); numFilterCalled != tt.expectFilterNumCalled { + if numFilterCalled := filterPlugin.deepCopy().numFilterCalled; numFilterCalled != tt.expectFilterNumCalled { t.Errorf("Expected the filter plugin to be called %v times, but got %v.", tt.expectFilterNumCalled, numFilterCalled) } - if numScoreCalled := atomic.LoadInt32(&scorePlugin.numScoreCalled); numScoreCalled != tt.expectScoreNumCalled { + if numScoreCalled := scorePlugin.deepCopy().numScoreCalled; numScoreCalled != tt.expectScoreNumCalled { t.Errorf("Expected the score plugin to be called %v times, but got %v.", tt.expectScoreNumCalled, numScoreCalled) } } @@ -792,7 +929,7 @@ func TestScorePlugin(t *testing.T) { } } - if numScoreCalled := atomic.LoadInt32(&scorePlugin.numScoreCalled); numScoreCalled == 0 { + if numScoreCalled := scorePlugin.deepCopy().numScoreCalled; numScoreCalled == 0 { t.Errorf("Expected the score plugin to be called.") } }) @@ -820,10 +957,11 @@ func TestNormalizeScorePlugin(t *testing.T) { t.Errorf("Expected the pod to be scheduled. error: %v", err) } - if scoreWithNormalizePlugin.numScoreCalled == 0 { + p := scoreWithNormalizePlugin.deepCopy() + if p.numScoreCalled == 0 { t.Errorf("Expected the score plugin to be called.") } - if scoreWithNormalizePlugin.numNormalizeScoreCalled == 0 { + if p.numNormalizeScoreCalled == 0 { t.Error("Expected the normalize score plugin to be called") } } @@ -980,9 +1118,8 @@ func TestPrebindPlugin(t *testing.T) { } } - preBindPlugin.failPreBind = test.fail - preBindPlugin.rejectPreBind = test.reject - preBindPlugin.succeedOnRetry = test.succeedOnRetry + preBindPlugin.set(test.fail, test.reject, test.succeedOnRetry) + // Create a best effort pod. pod, err := createPausePod(testCtx.ClientSet, initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) @@ -1006,7 +1143,8 @@ func TestPrebindPlugin(t *testing.T) { t.Errorf("Expected the pod to be scheduled. error: %v", err) } - if preBindPlugin.numPreBindCalled == 0 { + p := preBindPlugin.deepCopy() + if p.numPreBindCalled == 0 { t.Errorf("Expected the prebind plugin to be called.") } @@ -1014,7 +1152,7 @@ func TestPrebindPlugin(t *testing.T) { if err := wait.Poll(10*time.Millisecond, 15*time.Second, func() (bool, error) { // 2 means the unschedulable pod is expected to be retried at least twice. // (one initial attempt plus the one moved by the preBind pod) - return int(filterPlugin.numFilterCalled) >= 2*nodesNum, nil + return filterPlugin.deepCopy().numFilterCalled >= 2*nodesNum, nil }); err != nil { t.Errorf("Timed out waiting for the unschedulable Pod to be retried at least twice.") } @@ -1523,27 +1661,30 @@ func TestBindPlugin(t *testing.T) { if err != nil { t.Errorf("can't get pod: %v", err) } + p1 := bindPlugin1.deepCopy() + p2 := bindPlugin2.deepCopy() if test.expectBoundByScheduler { if pod.Annotations[bindPluginAnnotation] != "" { t.Errorf("Expected the pod to be bound by scheduler instead of by bindplugin %s", pod.Annotations[bindPluginAnnotation]) } - if bindPlugin1.numBindCalled != 1 || bindPlugin2.numBindCalled != 1 { - t.Errorf("Expected each bind plugin to be called once, was called %d and %d times.", bindPlugin1.numBindCalled, bindPlugin2.numBindCalled) + if p1.numBindCalled != 1 || p2.numBindCalled != 1 { + t.Errorf("Expected each bind plugin to be called once, was called %d and %d times.", p1.numBindCalled, p2.numBindCalled) } } else { if pod.Annotations[bindPluginAnnotation] != test.expectBindPluginName { t.Errorf("Expected the pod to be bound by bindplugin %s instead of by bindplugin %s", test.expectBindPluginName, pod.Annotations[bindPluginAnnotation]) } - if bindPlugin1.numBindCalled != 1 { - t.Errorf("Expected %s to be called once, was called %d times.", bindPlugin1.Name(), bindPlugin1.numBindCalled) + if p1.numBindCalled != 1 { + t.Errorf("Expected %s to be called once, was called %d times.", p1.Name(), p1.numBindCalled) } - if test.expectBindPluginName == bindPlugin1.Name() && bindPlugin2.numBindCalled > 0 { + if test.expectBindPluginName == p1.Name() && p2.numBindCalled > 0 { // expect bindplugin1 succeeded to bind the pod and bindplugin2 should not be called. - t.Errorf("Expected %s not to be called, was called %d times.", bindPlugin2.Name(), bindPlugin1.numBindCalled) + t.Errorf("Expected %s not to be called, was called %d times.", p2.Name(), p2.numBindCalled) } } if err = wait.Poll(10*time.Millisecond, 30*time.Second, func() (done bool, err error) { - return postBindPlugin.numPostBindCalled == 1, nil + p := postBindPlugin.deepCopy() + return p.numPostBindCalled == 1, nil }); err != nil { t.Errorf("Expected the postbind plugin to be called once, was called %d times.", postBindPlugin.numPostBindCalled) } @@ -1555,8 +1696,9 @@ func TestBindPlugin(t *testing.T) { if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { t.Errorf("Expected a scheduling error, but didn't get it. error: %v", err) } - if postBindPlugin.numPostBindCalled > 0 { - t.Errorf("Didn't expect the postbind plugin to be called %d times.", postBindPlugin.numPostBindCalled) + p := postBindPlugin.deepCopy() + if p.numPostBindCalled > 0 { + t.Errorf("Didn't expect the postbind plugin to be called %d times.", p.numPostBindCalled) } } for j := range test.expectInvokeEvents { @@ -1732,7 +1874,8 @@ func TestPermitPlugin(t *testing.T) { } } - if perPlugin.numPermitCalled == 0 { + p := perPlugin.deepCopy() + if p.numPermitCalled == 0 { t.Errorf("Expected the permit plugin to be called.") } }) @@ -1825,7 +1968,9 @@ func TestPermitPluginsCancelled(t *testing.T) { perPlugin1.rejectAllPods() // Wait some time for the permit plugins to be cancelled err = wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) { - return perPlugin1.cancelled && perPlugin2.cancelled, nil + p1 := perPlugin1.deepCopy() + p2 := perPlugin2.deepCopy() + return p1.cancelled && p2.cancelled, nil }) if err != nil { t.Errorf("Expected all permit plugins to be cancelled") @@ -1910,7 +2055,8 @@ func TestCoSchedulingWithPermitPlugin(t *testing.T) { } } - if permitPlugin.numPermitCalled == 0 { + p := permitPlugin.deepCopy() + if p.numPermitCalled == 0 { t.Errorf("Expected the permit plugin to be called.") } }) @@ -2249,9 +2395,8 @@ func TestPreemptWithPermitPlugin(t *testing.T) { t.Fatalf("Expected the waiting pod to get preempted.") } - filterPlugin.RLock() - waitingPodCalled := filterPlugin.numCalledPerPod[fmt.Sprintf("%v/%v", w.Namespace, w.Name)] - filterPlugin.RUnlock() + p := filterPlugin.deepCopy() + waitingPodCalled := p.numCalledPerPod[fmt.Sprintf("%v/%v", w.Namespace, w.Name)] if waitingPodCalled > tt.maxNumWaitingPodCalled { t.Fatalf("Expected the waiting pod to be called %v times at most, but got %v", tt.maxNumWaitingPodCalled, waitingPodCalled) }