test/integration/scheduler: fix data races

The plugins get called by scheduler goroutines. At least the polling seems to
be done concurrently and thus needs locking.

Locking the PreBindPlugin state is less obvious. It might be that the scheduler
is really done with the test pod, but that ordering doesn't seem to be enough
for the race detector. It's simpler to add mutex locking.
This commit is contained in:
Patrick Ohly 2023-03-29 20:57:41 +02:00
parent f70c26d495
commit 9e9a6cde4b

View File

@ -14,13 +14,15 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// Package plugins contains functional tests for scheduler plugin support.
// Beware that the plugins in this directory are not meant to be used in
// performance tests because they don't behave like real plugins.
package plugins
import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
@ -65,7 +67,7 @@ var (
)
type PreEnqueuePlugin struct {
called int32
called int
admit bool
}
@ -76,23 +78,62 @@ type PreFilterPlugin struct {
}
type ScorePlugin struct {
mutex sync.Mutex
failScore bool
numScoreCalled int32
numScoreCalled int
highScoreNode string
}
func (sp *ScorePlugin) deepCopy() *ScorePlugin {
sp.mutex.Lock()
defer sp.mutex.Unlock()
return &ScorePlugin{
failScore: sp.failScore,
numScoreCalled: sp.numScoreCalled,
highScoreNode: sp.highScoreNode,
}
}
type ScoreWithNormalizePlugin struct {
mutex sync.Mutex
numScoreCalled int
numNormalizeScoreCalled int
}
func (sp *ScoreWithNormalizePlugin) deepCopy() *ScoreWithNormalizePlugin {
sp.mutex.Lock()
defer sp.mutex.Unlock()
return &ScoreWithNormalizePlugin{
numScoreCalled: sp.numScoreCalled,
numNormalizeScoreCalled: sp.numNormalizeScoreCalled,
}
}
type FilterPlugin struct {
numFilterCalled int32
mutex sync.Mutex
numFilterCalled int
failFilter bool
rejectFilter bool
numCalledPerPod map[string]int
sync.RWMutex
}
func (fp *FilterPlugin) deepCopy() *FilterPlugin {
fp.mutex.Lock()
defer fp.mutex.Unlock()
clone := &FilterPlugin{
numFilterCalled: fp.numFilterCalled,
failFilter: fp.failFilter,
rejectFilter: fp.rejectFilter,
numCalledPerPod: make(map[string]int),
}
for pod, counter := range fp.numCalledPerPod {
clone.numCalledPerPod[pod] = counter
}
return clone
}
type PostFilterPlugin struct {
@ -118,6 +159,7 @@ type PreScorePlugin struct {
}
type PreBindPlugin struct {
mutex sync.Mutex
numPreBindCalled int
failPreBind bool
rejectPreBind bool
@ -127,7 +169,34 @@ type PreBindPlugin struct {
podUIDs map[types.UID]struct{}
}
func (pp *PreBindPlugin) set(fail, reject, succeed bool) {
pp.mutex.Lock()
defer pp.mutex.Unlock()
pp.failPreBind = fail
pp.rejectPreBind = reject
pp.succeedOnRetry = succeed
}
func (pp *PreBindPlugin) deepCopy() *PreBindPlugin {
pp.mutex.Lock()
defer pp.mutex.Unlock()
clone := &PreBindPlugin{
numPreBindCalled: pp.numPreBindCalled,
failPreBind: pp.failPreBind,
rejectPreBind: pp.rejectPreBind,
succeedOnRetry: pp.succeedOnRetry,
podUIDs: make(map[types.UID]struct{}),
}
for uid := range pp.podUIDs {
clone.podUIDs[uid] = struct{}{}
}
return clone
}
type BindPlugin struct {
mutex sync.Mutex
name string
numBindCalled int
bindStatus *framework.Status
@ -135,13 +204,39 @@ type BindPlugin struct {
pluginInvokeEventChan chan pluginInvokeEvent
}
func (bp *BindPlugin) deepCopy() *BindPlugin {
bp.mutex.Lock()
defer bp.mutex.Unlock()
return &BindPlugin{
name: bp.name,
numBindCalled: bp.numBindCalled,
bindStatus: bp.bindStatus,
client: bp.client,
pluginInvokeEventChan: bp.pluginInvokeEventChan,
}
}
type PostBindPlugin struct {
mutex sync.Mutex
name string
numPostBindCalled int
pluginInvokeEventChan chan pluginInvokeEvent
}
func (pp *PostBindPlugin) deepCopy() *PostBindPlugin {
pp.mutex.Lock()
defer pp.mutex.Unlock()
return &PostBindPlugin{
name: pp.name,
numPostBindCalled: pp.numPostBindCalled,
pluginInvokeEventChan: pp.pluginInvokeEventChan,
}
}
type PermitPlugin struct {
mutex sync.Mutex
name string
numPermitCalled int
failPermit bool
@ -156,6 +251,26 @@ type PermitPlugin struct {
fh framework.Handle
}
func (pp *PermitPlugin) deepCopy() *PermitPlugin {
pp.mutex.Lock()
defer pp.mutex.Unlock()
return &PermitPlugin{
name: pp.name,
numPermitCalled: pp.numPermitCalled,
failPermit: pp.failPermit,
rejectPermit: pp.rejectPermit,
timeoutPermit: pp.timeoutPermit,
waitAndRejectPermit: pp.waitAndRejectPermit,
waitAndAllowPermit: pp.waitAndAllowPermit,
cancelled: pp.cancelled,
waitingPod: pp.waitingPod,
rejectingPod: pp.rejectingPod,
allowingPod: pp.allowingPod,
fh: pp.fh,
}
}
const (
enqueuePluginName = "enqueue-plugin"
prefilterPluginName = "prefilter-plugin"
@ -216,13 +331,16 @@ func (sp *ScorePlugin) Name() string {
// Score returns the score of scheduling a pod on a specific node.
func (sp *ScorePlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) {
curCalled := atomic.AddInt32(&sp.numScoreCalled, 1)
sp.mutex.Lock()
defer sp.mutex.Unlock()
sp.numScoreCalled++
if sp.failScore {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", p.Name))
}
score := int64(1)
if curCalled == 1 {
if sp.numScoreCalled == 1 {
// The first node is scored the highest, the rest is scored lower.
sp.highScoreNode = nodeName
score = framework.MaxNodeScore
@ -241,6 +359,9 @@ func (sp *ScoreWithNormalizePlugin) Name() string {
// Score returns the score of scheduling a pod on a specific node.
func (sp *ScoreWithNormalizePlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) {
sp.mutex.Lock()
defer sp.mutex.Unlock()
sp.numScoreCalled++
score := int64(10)
return score, nil
@ -263,12 +384,12 @@ func (fp *FilterPlugin) Name() string {
// Filter is a test function that returns an error or nil, depending on the
// value of "failFilter".
func (fp *FilterPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
atomic.AddInt32(&fp.numFilterCalled, 1)
fp.mutex.Lock()
defer fp.mutex.Unlock()
fp.numFilterCalled++
if fp.numCalledPerPod != nil {
fp.Lock()
fp.numCalledPerPod[fmt.Sprintf("%v/%v", pod.Namespace, pod.Name)]++
fp.Unlock()
}
if fp.failFilter {
@ -328,6 +449,9 @@ func (pp *PreBindPlugin) Name() string {
// PreBind is a test function that returns (true, nil) or errors for testing.
func (pp *PreBindPlugin) PreBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
pp.mutex.Lock()
defer pp.mutex.Unlock()
pp.numPreBindCalled++
if _, tried := pp.podUIDs[pod.UID]; tried && pp.succeedOnRetry {
return nil
@ -349,6 +473,9 @@ func (bp *BindPlugin) Name() string {
}
func (bp *BindPlugin) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
bp.mutex.Lock()
defer bp.mutex.Unlock()
bp.numBindCalled++
if bp.pluginInvokeEventChan != nil {
bp.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: bp.Name(), val: bp.numBindCalled}
@ -374,6 +501,9 @@ func (pp *PostBindPlugin) Name() string {
// PostBind is a test function, which counts the number of times called.
func (pp *PostBindPlugin) PostBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
pp.mutex.Lock()
defer pp.mutex.Unlock()
pp.numPostBindCalled++
if pp.pluginInvokeEventChan != nil {
pp.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: pp.Name(), val: pp.numPostBindCalled}
@ -443,6 +573,9 @@ func (pp *PermitPlugin) Name() string {
// Permit implements the permit test plugin.
func (pp *PermitPlugin) Permit(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) {
pp.mutex.Lock()
defer pp.mutex.Unlock()
pp.numPermitCalled++
if pp.failPermit {
return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)), 0
@ -454,6 +587,8 @@ func (pp *PermitPlugin) Permit(ctx context.Context, state *framework.CycleState,
go func() {
select {
case <-ctx.Done():
pp.mutex.Lock()
defer pp.mutex.Unlock()
pp.cancelled = true
}
}()
@ -487,6 +622,8 @@ func (pp *PermitPlugin) allowAllPods() {
// rejectAllPods rejects all waiting pods.
func (pp *PermitPlugin) rejectAllPods() {
pp.mutex.Lock()
defer pp.mutex.Unlock()
pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { wp.Reject(pp.name, "rejectAllPods") })
}
@ -559,18 +696,18 @@ func TestPreFilterPlugin(t *testing.T) {
// TestPostFilterPlugin tests invocation of postFilter plugins.
func TestPostFilterPlugin(t *testing.T) {
var numNodes int32 = 1
numNodes := 1
tests := []struct {
name string
numNodes int32
numNodes int
rejectFilter bool
failScore bool
rejectPostFilter bool
rejectPostFilter2 bool
breakPostFilter bool
breakPostFilter2 bool
expectFilterNumCalled int32
expectScoreNumCalled int32
expectFilterNumCalled int
expectScoreNumCalled int
expectPostFilterNumCalled int
}{
{
@ -712,20 +849,20 @@ func TestPostFilterPlugin(t *testing.T) {
t.Errorf("Didn't expect the pod to be scheduled.")
}
if numFilterCalled := atomic.LoadInt32(&filterPlugin.numFilterCalled); numFilterCalled < tt.expectFilterNumCalled {
if numFilterCalled := filterPlugin.deepCopy().numFilterCalled; numFilterCalled < tt.expectFilterNumCalled {
t.Errorf("Expected the filter plugin to be called at least %v times, but got %v.", tt.expectFilterNumCalled, numFilterCalled)
}
if numScoreCalled := atomic.LoadInt32(&scorePlugin.numScoreCalled); numScoreCalled < tt.expectScoreNumCalled {
if numScoreCalled := scorePlugin.deepCopy().numScoreCalled; numScoreCalled < tt.expectScoreNumCalled {
t.Errorf("Expected the score plugin to be called at least %v times, but got %v.", tt.expectScoreNumCalled, numScoreCalled)
}
} else {
if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil {
t.Errorf("Expected the pod to be scheduled. error: %v", err)
}
if numFilterCalled := atomic.LoadInt32(&filterPlugin.numFilterCalled); numFilterCalled != tt.expectFilterNumCalled {
if numFilterCalled := filterPlugin.deepCopy().numFilterCalled; numFilterCalled != tt.expectFilterNumCalled {
t.Errorf("Expected the filter plugin to be called %v times, but got %v.", tt.expectFilterNumCalled, numFilterCalled)
}
if numScoreCalled := atomic.LoadInt32(&scorePlugin.numScoreCalled); numScoreCalled != tt.expectScoreNumCalled {
if numScoreCalled := scorePlugin.deepCopy().numScoreCalled; numScoreCalled != tt.expectScoreNumCalled {
t.Errorf("Expected the score plugin to be called %v times, but got %v.", tt.expectScoreNumCalled, numScoreCalled)
}
}
@ -792,7 +929,7 @@ func TestScorePlugin(t *testing.T) {
}
}
if numScoreCalled := atomic.LoadInt32(&scorePlugin.numScoreCalled); numScoreCalled == 0 {
if numScoreCalled := scorePlugin.deepCopy().numScoreCalled; numScoreCalled == 0 {
t.Errorf("Expected the score plugin to be called.")
}
})
@ -820,10 +957,11 @@ func TestNormalizeScorePlugin(t *testing.T) {
t.Errorf("Expected the pod to be scheduled. error: %v", err)
}
if scoreWithNormalizePlugin.numScoreCalled == 0 {
p := scoreWithNormalizePlugin.deepCopy()
if p.numScoreCalled == 0 {
t.Errorf("Expected the score plugin to be called.")
}
if scoreWithNormalizePlugin.numNormalizeScoreCalled == 0 {
if p.numNormalizeScoreCalled == 0 {
t.Error("Expected the normalize score plugin to be called")
}
}
@ -980,9 +1118,8 @@ func TestPrebindPlugin(t *testing.T) {
}
}
preBindPlugin.failPreBind = test.fail
preBindPlugin.rejectPreBind = test.reject
preBindPlugin.succeedOnRetry = test.succeedOnRetry
preBindPlugin.set(test.fail, test.reject, test.succeedOnRetry)
// Create a best effort pod.
pod, err := createPausePod(testCtx.ClientSet,
initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
@ -1006,7 +1143,8 @@ func TestPrebindPlugin(t *testing.T) {
t.Errorf("Expected the pod to be scheduled. error: %v", err)
}
if preBindPlugin.numPreBindCalled == 0 {
p := preBindPlugin.deepCopy()
if p.numPreBindCalled == 0 {
t.Errorf("Expected the prebind plugin to be called.")
}
@ -1014,7 +1152,7 @@ func TestPrebindPlugin(t *testing.T) {
if err := wait.Poll(10*time.Millisecond, 15*time.Second, func() (bool, error) {
// 2 means the unschedulable pod is expected to be retried at least twice.
// (one initial attempt plus the one moved by the preBind pod)
return int(filterPlugin.numFilterCalled) >= 2*nodesNum, nil
return filterPlugin.deepCopy().numFilterCalled >= 2*nodesNum, nil
}); err != nil {
t.Errorf("Timed out waiting for the unschedulable Pod to be retried at least twice.")
}
@ -1523,27 +1661,30 @@ func TestBindPlugin(t *testing.T) {
if err != nil {
t.Errorf("can't get pod: %v", err)
}
p1 := bindPlugin1.deepCopy()
p2 := bindPlugin2.deepCopy()
if test.expectBoundByScheduler {
if pod.Annotations[bindPluginAnnotation] != "" {
t.Errorf("Expected the pod to be bound by scheduler instead of by bindplugin %s", pod.Annotations[bindPluginAnnotation])
}
if bindPlugin1.numBindCalled != 1 || bindPlugin2.numBindCalled != 1 {
t.Errorf("Expected each bind plugin to be called once, was called %d and %d times.", bindPlugin1.numBindCalled, bindPlugin2.numBindCalled)
if p1.numBindCalled != 1 || p2.numBindCalled != 1 {
t.Errorf("Expected each bind plugin to be called once, was called %d and %d times.", p1.numBindCalled, p2.numBindCalled)
}
} else {
if pod.Annotations[bindPluginAnnotation] != test.expectBindPluginName {
t.Errorf("Expected the pod to be bound by bindplugin %s instead of by bindplugin %s", test.expectBindPluginName, pod.Annotations[bindPluginAnnotation])
}
if bindPlugin1.numBindCalled != 1 {
t.Errorf("Expected %s to be called once, was called %d times.", bindPlugin1.Name(), bindPlugin1.numBindCalled)
if p1.numBindCalled != 1 {
t.Errorf("Expected %s to be called once, was called %d times.", p1.Name(), p1.numBindCalled)
}
if test.expectBindPluginName == bindPlugin1.Name() && bindPlugin2.numBindCalled > 0 {
if test.expectBindPluginName == p1.Name() && p2.numBindCalled > 0 {
// expect bindplugin1 succeeded to bind the pod and bindplugin2 should not be called.
t.Errorf("Expected %s not to be called, was called %d times.", bindPlugin2.Name(), bindPlugin1.numBindCalled)
t.Errorf("Expected %s not to be called, was called %d times.", p2.Name(), p2.numBindCalled)
}
}
if err = wait.Poll(10*time.Millisecond, 30*time.Second, func() (done bool, err error) {
return postBindPlugin.numPostBindCalled == 1, nil
p := postBindPlugin.deepCopy()
return p.numPostBindCalled == 1, nil
}); err != nil {
t.Errorf("Expected the postbind plugin to be called once, was called %d times.", postBindPlugin.numPostBindCalled)
}
@ -1555,8 +1696,9 @@ func TestBindPlugin(t *testing.T) {
if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil {
t.Errorf("Expected a scheduling error, but didn't get it. error: %v", err)
}
if postBindPlugin.numPostBindCalled > 0 {
t.Errorf("Didn't expect the postbind plugin to be called %d times.", postBindPlugin.numPostBindCalled)
p := postBindPlugin.deepCopy()
if p.numPostBindCalled > 0 {
t.Errorf("Didn't expect the postbind plugin to be called %d times.", p.numPostBindCalled)
}
}
for j := range test.expectInvokeEvents {
@ -1732,7 +1874,8 @@ func TestPermitPlugin(t *testing.T) {
}
}
if perPlugin.numPermitCalled == 0 {
p := perPlugin.deepCopy()
if p.numPermitCalled == 0 {
t.Errorf("Expected the permit plugin to be called.")
}
})
@ -1825,7 +1968,9 @@ func TestPermitPluginsCancelled(t *testing.T) {
perPlugin1.rejectAllPods()
// Wait some time for the permit plugins to be cancelled
err = wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) {
return perPlugin1.cancelled && perPlugin2.cancelled, nil
p1 := perPlugin1.deepCopy()
p2 := perPlugin2.deepCopy()
return p1.cancelled && p2.cancelled, nil
})
if err != nil {
t.Errorf("Expected all permit plugins to be cancelled")
@ -1910,7 +2055,8 @@ func TestCoSchedulingWithPermitPlugin(t *testing.T) {
}
}
if permitPlugin.numPermitCalled == 0 {
p := permitPlugin.deepCopy()
if p.numPermitCalled == 0 {
t.Errorf("Expected the permit plugin to be called.")
}
})
@ -2249,9 +2395,8 @@ func TestPreemptWithPermitPlugin(t *testing.T) {
t.Fatalf("Expected the waiting pod to get preempted.")
}
filterPlugin.RLock()
waitingPodCalled := filterPlugin.numCalledPerPod[fmt.Sprintf("%v/%v", w.Namespace, w.Name)]
filterPlugin.RUnlock()
p := filterPlugin.deepCopy()
waitingPodCalled := p.numCalledPerPod[fmt.Sprintf("%v/%v", w.Namespace, w.Name)]
if waitingPodCalled > tt.maxNumWaitingPodCalled {
t.Fatalf("Expected the waiting pod to be called %v times at most, but got %v", tt.maxNumWaitingPodCalled, waitingPodCalled)
}