Merge pull request #117113 from pohly/test-integration-race-detection-scheduler

test/integration/scheduler: fix data races
This commit is contained in:
Kubernetes Prow Robot 2023-05-17 21:26:33 -07:00 committed by GitHub
commit 3ac21a5a30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -14,13 +14,15 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// Package plugins contains functional tests for scheduler plugin support.
// Beware that the plugins in this directory are not meant to be used in
// performance tests because they don't behave like real plugins.
package plugins
import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
@ -65,7 +67,7 @@ var (
)
type PreEnqueuePlugin struct {
called int32
called int
admit bool
}
@ -76,23 +78,62 @@ type PreFilterPlugin struct {
}
type ScorePlugin struct {
mutex sync.Mutex
failScore bool
numScoreCalled int32
numScoreCalled int
highScoreNode string
}
func (sp *ScorePlugin) deepCopy() *ScorePlugin {
sp.mutex.Lock()
defer sp.mutex.Unlock()
return &ScorePlugin{
failScore: sp.failScore,
numScoreCalled: sp.numScoreCalled,
highScoreNode: sp.highScoreNode,
}
}
type ScoreWithNormalizePlugin struct {
mutex sync.Mutex
numScoreCalled int
numNormalizeScoreCalled int
}
func (sp *ScoreWithNormalizePlugin) deepCopy() *ScoreWithNormalizePlugin {
sp.mutex.Lock()
defer sp.mutex.Unlock()
return &ScoreWithNormalizePlugin{
numScoreCalled: sp.numScoreCalled,
numNormalizeScoreCalled: sp.numNormalizeScoreCalled,
}
}
type FilterPlugin struct {
numFilterCalled int32
mutex sync.Mutex
numFilterCalled int
failFilter bool
rejectFilter bool
numCalledPerPod map[string]int
sync.RWMutex
}
func (fp *FilterPlugin) deepCopy() *FilterPlugin {
fp.mutex.Lock()
defer fp.mutex.Unlock()
clone := &FilterPlugin{
numFilterCalled: fp.numFilterCalled,
failFilter: fp.failFilter,
rejectFilter: fp.rejectFilter,
numCalledPerPod: make(map[string]int),
}
for pod, counter := range fp.numCalledPerPod {
clone.numCalledPerPod[pod] = counter
}
return clone
}
type PostFilterPlugin struct {
@ -118,6 +159,7 @@ type PreScorePlugin struct {
}
type PreBindPlugin struct {
mutex sync.Mutex
numPreBindCalled int
failPreBind bool
rejectPreBind bool
@ -127,7 +169,34 @@ type PreBindPlugin struct {
podUIDs map[types.UID]struct{}
}
func (pp *PreBindPlugin) set(fail, reject, succeed bool) {
pp.mutex.Lock()
defer pp.mutex.Unlock()
pp.failPreBind = fail
pp.rejectPreBind = reject
pp.succeedOnRetry = succeed
}
func (pp *PreBindPlugin) deepCopy() *PreBindPlugin {
pp.mutex.Lock()
defer pp.mutex.Unlock()
clone := &PreBindPlugin{
numPreBindCalled: pp.numPreBindCalled,
failPreBind: pp.failPreBind,
rejectPreBind: pp.rejectPreBind,
succeedOnRetry: pp.succeedOnRetry,
podUIDs: make(map[types.UID]struct{}),
}
for uid := range pp.podUIDs {
clone.podUIDs[uid] = struct{}{}
}
return clone
}
type BindPlugin struct {
mutex sync.Mutex
name string
numBindCalled int
bindStatus *framework.Status
@ -135,13 +204,39 @@ type BindPlugin struct {
pluginInvokeEventChan chan pluginInvokeEvent
}
func (bp *BindPlugin) deepCopy() *BindPlugin {
bp.mutex.Lock()
defer bp.mutex.Unlock()
return &BindPlugin{
name: bp.name,
numBindCalled: bp.numBindCalled,
bindStatus: bp.bindStatus,
client: bp.client,
pluginInvokeEventChan: bp.pluginInvokeEventChan,
}
}
type PostBindPlugin struct {
mutex sync.Mutex
name string
numPostBindCalled int
pluginInvokeEventChan chan pluginInvokeEvent
}
func (pp *PostBindPlugin) deepCopy() *PostBindPlugin {
pp.mutex.Lock()
defer pp.mutex.Unlock()
return &PostBindPlugin{
name: pp.name,
numPostBindCalled: pp.numPostBindCalled,
pluginInvokeEventChan: pp.pluginInvokeEventChan,
}
}
type PermitPlugin struct {
mutex sync.Mutex
name string
numPermitCalled int
failPermit bool
@ -156,6 +251,26 @@ type PermitPlugin struct {
fh framework.Handle
}
func (pp *PermitPlugin) deepCopy() *PermitPlugin {
pp.mutex.Lock()
defer pp.mutex.Unlock()
return &PermitPlugin{
name: pp.name,
numPermitCalled: pp.numPermitCalled,
failPermit: pp.failPermit,
rejectPermit: pp.rejectPermit,
timeoutPermit: pp.timeoutPermit,
waitAndRejectPermit: pp.waitAndRejectPermit,
waitAndAllowPermit: pp.waitAndAllowPermit,
cancelled: pp.cancelled,
waitingPod: pp.waitingPod,
rejectingPod: pp.rejectingPod,
allowingPod: pp.allowingPod,
fh: pp.fh,
}
}
const (
enqueuePluginName = "enqueue-plugin"
prefilterPluginName = "prefilter-plugin"
@ -216,13 +331,16 @@ func (sp *ScorePlugin) Name() string {
// Score returns the score of scheduling a pod on a specific node.
func (sp *ScorePlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) {
curCalled := atomic.AddInt32(&sp.numScoreCalled, 1)
sp.mutex.Lock()
defer sp.mutex.Unlock()
sp.numScoreCalled++
if sp.failScore {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", p.Name))
}
score := int64(1)
if curCalled == 1 {
if sp.numScoreCalled == 1 {
// The first node is scored the highest, the rest is scored lower.
sp.highScoreNode = nodeName
score = framework.MaxNodeScore
@ -241,6 +359,9 @@ func (sp *ScoreWithNormalizePlugin) Name() string {
// Score returns the score of scheduling a pod on a specific node.
func (sp *ScoreWithNormalizePlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) {
sp.mutex.Lock()
defer sp.mutex.Unlock()
sp.numScoreCalled++
score := int64(10)
return score, nil
@ -263,12 +384,12 @@ func (fp *FilterPlugin) Name() string {
// Filter is a test function that returns an error or nil, depending on the
// value of "failFilter".
func (fp *FilterPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
atomic.AddInt32(&fp.numFilterCalled, 1)
fp.mutex.Lock()
defer fp.mutex.Unlock()
fp.numFilterCalled++
if fp.numCalledPerPod != nil {
fp.Lock()
fp.numCalledPerPod[fmt.Sprintf("%v/%v", pod.Namespace, pod.Name)]++
fp.Unlock()
}
if fp.failFilter {
@ -328,6 +449,9 @@ func (pp *PreBindPlugin) Name() string {
// PreBind is a test function that returns (true, nil) or errors for testing.
func (pp *PreBindPlugin) PreBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
pp.mutex.Lock()
defer pp.mutex.Unlock()
pp.numPreBindCalled++
if _, tried := pp.podUIDs[pod.UID]; tried && pp.succeedOnRetry {
return nil
@ -349,6 +473,9 @@ func (bp *BindPlugin) Name() string {
}
func (bp *BindPlugin) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
bp.mutex.Lock()
defer bp.mutex.Unlock()
bp.numBindCalled++
if bp.pluginInvokeEventChan != nil {
bp.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: bp.Name(), val: bp.numBindCalled}
@ -374,6 +501,9 @@ func (pp *PostBindPlugin) Name() string {
// PostBind is a test function, which counts the number of times called.
func (pp *PostBindPlugin) PostBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
pp.mutex.Lock()
defer pp.mutex.Unlock()
pp.numPostBindCalled++
if pp.pluginInvokeEventChan != nil {
pp.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: pp.Name(), val: pp.numPostBindCalled}
@ -443,6 +573,9 @@ func (pp *PermitPlugin) Name() string {
// Permit implements the permit test plugin.
func (pp *PermitPlugin) Permit(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) {
pp.mutex.Lock()
defer pp.mutex.Unlock()
pp.numPermitCalled++
if pp.failPermit {
return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)), 0
@ -454,6 +587,8 @@ func (pp *PermitPlugin) Permit(ctx context.Context, state *framework.CycleState,
go func() {
select {
case <-ctx.Done():
pp.mutex.Lock()
defer pp.mutex.Unlock()
pp.cancelled = true
}
}()
@ -487,6 +622,8 @@ func (pp *PermitPlugin) allowAllPods() {
// rejectAllPods rejects all waiting pods.
func (pp *PermitPlugin) rejectAllPods() {
pp.mutex.Lock()
defer pp.mutex.Unlock()
pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { wp.Reject(pp.name, "rejectAllPods") })
}
@ -559,18 +696,18 @@ func TestPreFilterPlugin(t *testing.T) {
// TestPostFilterPlugin tests invocation of postFilter plugins.
func TestPostFilterPlugin(t *testing.T) {
var numNodes int32 = 1
numNodes := 1
tests := []struct {
name string
numNodes int32
numNodes int
rejectFilter bool
failScore bool
rejectPostFilter bool
rejectPostFilter2 bool
breakPostFilter bool
breakPostFilter2 bool
expectFilterNumCalled int32
expectScoreNumCalled int32
expectFilterNumCalled int
expectScoreNumCalled int
expectPostFilterNumCalled int
}{
{
@ -712,20 +849,20 @@ func TestPostFilterPlugin(t *testing.T) {
t.Errorf("Didn't expect the pod to be scheduled.")
}
if numFilterCalled := atomic.LoadInt32(&filterPlugin.numFilterCalled); numFilterCalled < tt.expectFilterNumCalled {
if numFilterCalled := filterPlugin.deepCopy().numFilterCalled; numFilterCalled < tt.expectFilterNumCalled {
t.Errorf("Expected the filter plugin to be called at least %v times, but got %v.", tt.expectFilterNumCalled, numFilterCalled)
}
if numScoreCalled := atomic.LoadInt32(&scorePlugin.numScoreCalled); numScoreCalled < tt.expectScoreNumCalled {
if numScoreCalled := scorePlugin.deepCopy().numScoreCalled; numScoreCalled < tt.expectScoreNumCalled {
t.Errorf("Expected the score plugin to be called at least %v times, but got %v.", tt.expectScoreNumCalled, numScoreCalled)
}
} else {
if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil {
t.Errorf("Expected the pod to be scheduled. error: %v", err)
}
if numFilterCalled := atomic.LoadInt32(&filterPlugin.numFilterCalled); numFilterCalled != tt.expectFilterNumCalled {
if numFilterCalled := filterPlugin.deepCopy().numFilterCalled; numFilterCalled != tt.expectFilterNumCalled {
t.Errorf("Expected the filter plugin to be called %v times, but got %v.", tt.expectFilterNumCalled, numFilterCalled)
}
if numScoreCalled := atomic.LoadInt32(&scorePlugin.numScoreCalled); numScoreCalled != tt.expectScoreNumCalled {
if numScoreCalled := scorePlugin.deepCopy().numScoreCalled; numScoreCalled != tt.expectScoreNumCalled {
t.Errorf("Expected the score plugin to be called %v times, but got %v.", tt.expectScoreNumCalled, numScoreCalled)
}
}
@ -792,7 +929,7 @@ func TestScorePlugin(t *testing.T) {
}
}
if numScoreCalled := atomic.LoadInt32(&scorePlugin.numScoreCalled); numScoreCalled == 0 {
if numScoreCalled := scorePlugin.deepCopy().numScoreCalled; numScoreCalled == 0 {
t.Errorf("Expected the score plugin to be called.")
}
})
@ -820,10 +957,11 @@ func TestNormalizeScorePlugin(t *testing.T) {
t.Errorf("Expected the pod to be scheduled. error: %v", err)
}
if scoreWithNormalizePlugin.numScoreCalled == 0 {
p := scoreWithNormalizePlugin.deepCopy()
if p.numScoreCalled == 0 {
t.Errorf("Expected the score plugin to be called.")
}
if scoreWithNormalizePlugin.numNormalizeScoreCalled == 0 {
if p.numNormalizeScoreCalled == 0 {
t.Error("Expected the normalize score plugin to be called")
}
}
@ -980,9 +1118,8 @@ func TestPrebindPlugin(t *testing.T) {
}
}
preBindPlugin.failPreBind = test.fail
preBindPlugin.rejectPreBind = test.reject
preBindPlugin.succeedOnRetry = test.succeedOnRetry
preBindPlugin.set(test.fail, test.reject, test.succeedOnRetry)
// Create a best effort pod.
pod, err := createPausePod(testCtx.ClientSet,
initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
@ -1006,7 +1143,8 @@ func TestPrebindPlugin(t *testing.T) {
t.Errorf("Expected the pod to be scheduled. error: %v", err)
}
if preBindPlugin.numPreBindCalled == 0 {
p := preBindPlugin.deepCopy()
if p.numPreBindCalled == 0 {
t.Errorf("Expected the prebind plugin to be called.")
}
@ -1014,7 +1152,7 @@ func TestPrebindPlugin(t *testing.T) {
if err := wait.Poll(10*time.Millisecond, 15*time.Second, func() (bool, error) {
// 2 means the unschedulable pod is expected to be retried at least twice.
// (one initial attempt plus the one moved by the preBind pod)
return int(filterPlugin.numFilterCalled) >= 2*nodesNum, nil
return filterPlugin.deepCopy().numFilterCalled >= 2*nodesNum, nil
}); err != nil {
t.Errorf("Timed out waiting for the unschedulable Pod to be retried at least twice.")
}
@ -1523,27 +1661,30 @@ func TestBindPlugin(t *testing.T) {
if err != nil {
t.Errorf("can't get pod: %v", err)
}
p1 := bindPlugin1.deepCopy()
p2 := bindPlugin2.deepCopy()
if test.expectBoundByScheduler {
if pod.Annotations[bindPluginAnnotation] != "" {
t.Errorf("Expected the pod to be bound by scheduler instead of by bindplugin %s", pod.Annotations[bindPluginAnnotation])
}
if bindPlugin1.numBindCalled != 1 || bindPlugin2.numBindCalled != 1 {
t.Errorf("Expected each bind plugin to be called once, was called %d and %d times.", bindPlugin1.numBindCalled, bindPlugin2.numBindCalled)
if p1.numBindCalled != 1 || p2.numBindCalled != 1 {
t.Errorf("Expected each bind plugin to be called once, was called %d and %d times.", p1.numBindCalled, p2.numBindCalled)
}
} else {
if pod.Annotations[bindPluginAnnotation] != test.expectBindPluginName {
t.Errorf("Expected the pod to be bound by bindplugin %s instead of by bindplugin %s", test.expectBindPluginName, pod.Annotations[bindPluginAnnotation])
}
if bindPlugin1.numBindCalled != 1 {
t.Errorf("Expected %s to be called once, was called %d times.", bindPlugin1.Name(), bindPlugin1.numBindCalled)
if p1.numBindCalled != 1 {
t.Errorf("Expected %s to be called once, was called %d times.", p1.Name(), p1.numBindCalled)
}
if test.expectBindPluginName == bindPlugin1.Name() && bindPlugin2.numBindCalled > 0 {
if test.expectBindPluginName == p1.Name() && p2.numBindCalled > 0 {
// expect bindplugin1 succeeded to bind the pod and bindplugin2 should not be called.
t.Errorf("Expected %s not to be called, was called %d times.", bindPlugin2.Name(), bindPlugin1.numBindCalled)
t.Errorf("Expected %s not to be called, was called %d times.", p2.Name(), p2.numBindCalled)
}
}
if err = wait.Poll(10*time.Millisecond, 30*time.Second, func() (done bool, err error) {
return postBindPlugin.numPostBindCalled == 1, nil
p := postBindPlugin.deepCopy()
return p.numPostBindCalled == 1, nil
}); err != nil {
t.Errorf("Expected the postbind plugin to be called once, was called %d times.", postBindPlugin.numPostBindCalled)
}
@ -1555,8 +1696,9 @@ func TestBindPlugin(t *testing.T) {
if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil {
t.Errorf("Expected a scheduling error, but didn't get it. error: %v", err)
}
if postBindPlugin.numPostBindCalled > 0 {
t.Errorf("Didn't expect the postbind plugin to be called %d times.", postBindPlugin.numPostBindCalled)
p := postBindPlugin.deepCopy()
if p.numPostBindCalled > 0 {
t.Errorf("Didn't expect the postbind plugin to be called %d times.", p.numPostBindCalled)
}
}
for j := range test.expectInvokeEvents {
@ -1732,7 +1874,8 @@ func TestPermitPlugin(t *testing.T) {
}
}
if perPlugin.numPermitCalled == 0 {
p := perPlugin.deepCopy()
if p.numPermitCalled == 0 {
t.Errorf("Expected the permit plugin to be called.")
}
})
@ -1825,7 +1968,9 @@ func TestPermitPluginsCancelled(t *testing.T) {
perPlugin1.rejectAllPods()
// Wait some time for the permit plugins to be cancelled
err = wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) {
return perPlugin1.cancelled && perPlugin2.cancelled, nil
p1 := perPlugin1.deepCopy()
p2 := perPlugin2.deepCopy()
return p1.cancelled && p2.cancelled, nil
})
if err != nil {
t.Errorf("Expected all permit plugins to be cancelled")
@ -1910,7 +2055,8 @@ func TestCoSchedulingWithPermitPlugin(t *testing.T) {
}
}
if permitPlugin.numPermitCalled == 0 {
p := permitPlugin.deepCopy()
if p.numPermitCalled == 0 {
t.Errorf("Expected the permit plugin to be called.")
}
})
@ -2249,9 +2395,8 @@ func TestPreemptWithPermitPlugin(t *testing.T) {
t.Fatalf("Expected the waiting pod to get preempted.")
}
filterPlugin.RLock()
waitingPodCalled := filterPlugin.numCalledPerPod[fmt.Sprintf("%v/%v", w.Namespace, w.Name)]
filterPlugin.RUnlock()
p := filterPlugin.deepCopy()
waitingPodCalled := p.numCalledPerPod[fmt.Sprintf("%v/%v", w.Namespace, w.Name)]
if waitingPodCalled > tt.maxNumWaitingPodCalled {
t.Fatalf("Expected the waiting pod to be called %v times at most, but got %v", tt.maxNumWaitingPodCalled, waitingPodCalled)
}