fix data races for other usage of Q

This commit is contained in:
skilxn-go 2020-02-20 00:44:40 +08:00
parent 7e33feec57
commit 74718adf10

View File

@ -310,34 +310,42 @@ func TestPriorityQueue_Pop(t *testing.T) {
func TestPriorityQueue_Update(t *testing.T) { func TestPriorityQueue_Update(t *testing.T) {
q := createAndRunPriorityQueue(newDefaultFramework()) q := createAndRunPriorityQueue(newDefaultFramework())
q.Update(nil, &highPriorityPod) q.Update(nil, &highPriorityPod)
q.lock.RLock()
if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&highPriorityPod)); !exists { if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&highPriorityPod)); !exists {
t.Errorf("Expected %v to be added to activeQ.", highPriorityPod.Name) t.Errorf("Expected %v to be added to activeQ.", highPriorityPod.Name)
} }
q.lock.RUnlock()
if len(q.nominatedPods.nominatedPods) != 0 { if len(q.nominatedPods.nominatedPods) != 0 {
t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods) t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods)
} }
// Update highPriorityPod and add a nominatedNodeName to it. // Update highPriorityPod and add a nominatedNodeName to it.
q.Update(&highPriorityPod, &highPriNominatedPod) q.Update(&highPriorityPod, &highPriNominatedPod)
q.lock.RLock()
if q.activeQ.Len() != 1 { if q.activeQ.Len() != 1 {
t.Error("Expected only one item in activeQ.") t.Error("Expected only one item in activeQ.")
} }
q.lock.RUnlock()
if len(q.nominatedPods.nominatedPods) != 1 { if len(q.nominatedPods.nominatedPods) != 1 {
t.Errorf("Expected one item in nomindatePods map: %v", q.nominatedPods) t.Errorf("Expected one item in nomindatePods map: %v", q.nominatedPods)
} }
// Updating an unschedulable pod which is not in any of the two queues, should // Updating an unschedulable pod which is not in any of the two queues, should
// add the pod to activeQ. // add the pod to activeQ.
q.Update(&unschedulablePod, &unschedulablePod) q.Update(&unschedulablePod, &unschedulablePod)
q.lock.RLock()
if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&unschedulablePod)); !exists { if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&unschedulablePod)); !exists {
t.Errorf("Expected %v to be added to activeQ.", unschedulablePod.Name) t.Errorf("Expected %v to be added to activeQ.", unschedulablePod.Name)
} }
q.lock.RUnlock()
// Updating a pod that is already in activeQ, should not change it. // Updating a pod that is already in activeQ, should not change it.
q.Update(&unschedulablePod, &unschedulablePod) q.Update(&unschedulablePod, &unschedulablePod)
if len(q.unschedulableQ.podInfoMap) != 0 { if len(q.unschedulableQ.podInfoMap) != 0 {
t.Error("Expected unschedulableQ to be empty.") t.Error("Expected unschedulableQ to be empty.")
} }
q.lock.RLock()
if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&unschedulablePod)); !exists { if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&unschedulablePod)); !exists {
t.Errorf("Expected: %v to be added to activeQ.", unschedulablePod.Name) t.Errorf("Expected: %v to be added to activeQ.", unschedulablePod.Name)
} }
q.lock.RUnlock()
if p, err := q.Pop(); err != nil || p.Pod != &highPriNominatedPod { if p, err := q.Pop(); err != nil || p.Pod != &highPriNominatedPod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Pod.Name)
} }
@ -362,12 +370,14 @@ func TestPriorityQueue_Delete(t *testing.T) {
if err := q.Delete(&highPriNominatedPod); err != nil { if err := q.Delete(&highPriNominatedPod); err != nil {
t.Errorf("delete failed: %v", err) t.Errorf("delete failed: %v", err)
} }
q.lock.RLock()
if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&unschedulablePod)); !exists { if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&unschedulablePod)); !exists {
t.Errorf("Expected %v to be in activeQ.", unschedulablePod.Name) t.Errorf("Expected %v to be in activeQ.", unschedulablePod.Name)
} }
if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&highPriNominatedPod)); exists { if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&highPriNominatedPod)); exists {
t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPod.Name) t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPod.Name)
} }
q.lock.RUnlock()
if len(q.nominatedPods.nominatedPods) != 1 { if len(q.nominatedPods.nominatedPods) != 1 {
t.Errorf("Expected nomindatePods to have only 'unschedulablePod': %v", q.nominatedPods.nominatedPods) t.Errorf("Expected nomindatePods to have only 'unschedulablePod': %v", q.nominatedPods.nominatedPods)
} }
@ -385,8 +395,8 @@ func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
q.AddUnschedulableIfNotPresent(q.newPodInfo(&unschedulablePod), q.SchedulingCycle()) q.AddUnschedulableIfNotPresent(q.newPodInfo(&unschedulablePod), q.SchedulingCycle())
q.AddUnschedulableIfNotPresent(q.newPodInfo(&highPriorityPod), q.SchedulingCycle()) q.AddUnschedulableIfNotPresent(q.newPodInfo(&highPriorityPod), q.SchedulingCycle())
q.MoveAllToActiveOrBackoffQueue("test") q.MoveAllToActiveOrBackoffQueue("test")
q.lock.Lock() q.lock.RLock()
defer q.lock.Unlock() defer q.lock.RUnlock()
if q.activeQ.Len() != 1 { if q.activeQ.Len() != 1 {
t.Error("Expected 1 item to be in activeQ") t.Error("Expected 1 item to be in activeQ")
} }
@ -446,9 +456,11 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
if getUnschedulablePod(q, affinityPod) != nil { if getUnschedulablePod(q, affinityPod) != nil {
t.Error("affinityPod is still in the unschedulableQ.") t.Error("affinityPod is still in the unschedulableQ.")
} }
q.lock.RLock()
if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(affinityPod)); !exists { if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(affinityPod)); !exists {
t.Error("affinityPod is not moved to activeQ.") t.Error("affinityPod is not moved to activeQ.")
} }
q.lock.RUnlock()
// Check that the other pod is still in the unschedulableQ. // Check that the other pod is still in the unschedulableQ.
if getUnschedulablePod(q, &unschedulablePod) == nil { if getUnschedulablePod(q, &unschedulablePod) == nil {
t.Error("unschedulablePod is not in the unschedulableQ.") t.Error("unschedulablePod is not in the unschedulableQ.")
@ -1177,6 +1189,7 @@ func TestPodTimestamp(t *testing.T) {
op(queue, test.operands[i]) op(queue, test.operands[i])
} }
queue.lock.Lock()
for i := 0; i < len(test.expected); i++ { for i := 0; i < len(test.expected); i++ {
if pInfo, err := queue.activeQ.Pop(); err != nil { if pInfo, err := queue.activeQ.Pop(); err != nil {
t.Errorf("Error while popping the head of the queue: %v", err) t.Errorf("Error while popping the head of the queue: %v", err)
@ -1184,6 +1197,7 @@ func TestPodTimestamp(t *testing.T) {
podInfoList = append(podInfoList, pInfo.(*framework.PodInfo)) podInfoList = append(podInfoList, pInfo.(*framework.PodInfo))
} }
} }
queue.lock.Unlock()
if !reflect.DeepEqual(test.expected, podInfoList) { if !reflect.DeepEqual(test.expected, podInfoList) {
t.Errorf("Unexpected PodInfo list. Expected: %v, got: %v", t.Errorf("Unexpected PodInfo list. Expected: %v, got: %v",
@ -1561,9 +1575,11 @@ func TestBackOffFlow(t *testing.T) {
// An event happens. // An event happens.
q.MoveAllToActiveOrBackoffQueue("deleted pod") q.MoveAllToActiveOrBackoffQueue("deleted pod")
q.lock.RLock()
if _, ok, _ := q.podBackoffQ.Get(podInfo); !ok { if _, ok, _ := q.podBackoffQ.Get(podInfo); !ok {
t.Errorf("pod %v is not in the backoff queue", podID) t.Errorf("pod %v is not in the backoff queue", podID)
} }
q.lock.RUnlock()
// Check backoff duration. // Check backoff duration.
deadline := q.getBackoffTime(podInfo) deadline := q.getBackoffTime(podInfo)
@ -1576,15 +1592,19 @@ func TestBackOffFlow(t *testing.T) {
cl.Step(time.Millisecond) cl.Step(time.Millisecond)
q.flushBackoffQCompleted() q.flushBackoffQCompleted()
// Still in backoff queue after an early flush. // Still in backoff queue after an early flush.
q.lock.RLock()
if _, ok, _ := q.podBackoffQ.Get(podInfo); !ok { if _, ok, _ := q.podBackoffQ.Get(podInfo); !ok {
t.Errorf("pod %v is not in the backoff queue", podID) t.Errorf("pod %v is not in the backoff queue", podID)
} }
q.lock.RUnlock()
// Moved out of the backoff queue after timeout. // Moved out of the backoff queue after timeout.
cl.Step(backoff) cl.Step(backoff)
q.flushBackoffQCompleted() q.flushBackoffQCompleted()
q.lock.RLock()
if _, ok, _ := q.podBackoffQ.Get(podInfo); ok { if _, ok, _ := q.podBackoffQ.Get(podInfo); ok {
t.Errorf("pod %v is still in the backoff queue", podID) t.Errorf("pod %v is still in the backoff queue", podID)
} }
q.lock.RUnlock()
}) })
} }
} }