Merge pull request #109185 from kerthcet/fix/goroutine-leak-in-nodelifecycle-test

resolve goroutine leak in nodelifecycle tests
This commit is contained in:
Kubernetes Prow Robot 2022-08-23 21:19:48 -07:00 committed by GitHub
commit 9c88c73de0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -41,10 +41,10 @@ import (
var timeForControllerToProgress = 500 * time.Millisecond var timeForControllerToProgress = 500 * time.Millisecond
func getPodsAssignedToNode(c *fake.Clientset) GetPodsByNodeNameFunc { func getPodsAssignedToNode(ctx context.Context, c *fake.Clientset) GetPodsByNodeNameFunc {
return func(nodeName string) ([]*v1.Pod, error) { return func(nodeName string) ([]*v1.Pod, error) {
selector := fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName}) selector := fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName})
pods, err := c.CoreV1().Pods(v1.NamespaceAll).List(context.TODO(), metav1.ListOptions{ pods, err := c.CoreV1().Pods(v1.NamespaceAll).List(ctx, metav1.ListOptions{
FieldSelector: selector.String(), FieldSelector: selector.String(),
LabelSelector: labels.Everything().String(), LabelSelector: labels.Everything().String(),
}) })
@ -95,7 +95,7 @@ func setupNewNoExecuteTaintManager(ctx context.Context, fakeClientSet *fake.Clie
informerFactory := informers.NewSharedInformerFactory(fakeClientSet, 0) informerFactory := informers.NewSharedInformerFactory(fakeClientSet, 0)
podIndexer := informerFactory.Core().V1().Pods().Informer().GetIndexer() podIndexer := informerFactory.Core().V1().Pods().Informer().GetIndexer()
nodeIndexer := informerFactory.Core().V1().Nodes().Informer().GetIndexer() nodeIndexer := informerFactory.Core().V1().Nodes().Informer().GetIndexer()
mgr := NewNoExecuteTaintManager(ctx, fakeClientSet, informerFactory.Core().V1().Pods().Lister(), informerFactory.Core().V1().Nodes().Lister(), getPodsAssignedToNode(fakeClientSet)) mgr := NewNoExecuteTaintManager(ctx, fakeClientSet, informerFactory.Core().V1().Pods().Lister(), informerFactory.Core().V1().Nodes().Lister(), getPodsAssignedToNode(ctx, fakeClientSet))
return mgr, podIndexer, nodeIndexer return mgr, podIndexer, nodeIndexer
} }
@ -217,18 +217,19 @@ func TestCreatePod(t *testing.T) {
} }
func TestDeletePod(t *testing.T) { func TestDeletePod(t *testing.T) {
stopCh := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fakeClientset := fake.NewSimpleClientset() fakeClientset := fake.NewSimpleClientset()
controller, _, _ := setupNewNoExecuteTaintManager(context.TODO(), fakeClientset) controller, _, _ := setupNewNoExecuteTaintManager(ctx, fakeClientset)
controller.recorder = testutil.NewFakeRecorder() controller.recorder = testutil.NewFakeRecorder()
go controller.Run(context.TODO()) go controller.Run(ctx)
controller.taintedNodes = map[string][]v1.Taint{ controller.taintedNodes = map[string][]v1.Taint{
"node1": {createNoExecuteTaint(1)}, "node1": {createNoExecuteTaint(1)},
} }
controller.PodUpdated(testutil.NewPod("pod1", "node1"), nil) controller.PodUpdated(testutil.NewPod("pod1", "node1"), nil)
// wait a bit to see if nothing will panic // wait a bit to see if nothing will panic
time.Sleep(timeForControllerToProgress) time.Sleep(timeForControllerToProgress)
close(stopCh)
} }
func TestUpdatePod(t *testing.T) { func TestUpdatePod(t *testing.T) {
@ -357,7 +358,7 @@ func TestCreateNode(t *testing.T) {
for _, item := range testCases { for _, item := range testCases {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods}) fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods})
controller, _, nodeIndexer := setupNewNoExecuteTaintManager(context.TODO(), fakeClientset) controller, _, nodeIndexer := setupNewNoExecuteTaintManager(ctx, fakeClientset)
nodeIndexer.Add(item.node) nodeIndexer.Add(item.node)
controller.recorder = testutil.NewFakeRecorder() controller.recorder = testutil.NewFakeRecorder()
go controller.Run(ctx) go controller.Run(ctx)
@ -374,7 +375,7 @@ func TestCreateNode(t *testing.T) {
func TestDeleteNode(t *testing.T) { func TestDeleteNode(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
fakeClientset := fake.NewSimpleClientset() fakeClientset := fake.NewSimpleClientset()
controller, _, _ := setupNewNoExecuteTaintManager(context.TODO(), fakeClientset) controller, _, _ := setupNewNoExecuteTaintManager(ctx, fakeClientset)
controller.recorder = testutil.NewFakeRecorder() controller.recorder = testutil.NewFakeRecorder()
controller.taintedNodes = map[string][]v1.Taint{ controller.taintedNodes = map[string][]v1.Taint{
"node1": {createNoExecuteTaint(1)}, "node1": {createNoExecuteTaint(1)},
@ -485,12 +486,14 @@ func TestUpdateNode(t *testing.T) {
for _, item := range testCases { for _, item := range testCases {
t.Run(item.description, func(t *testing.T) { t.Run(item.description, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, item.enablePodDisruptionConditions)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, item.enablePodDisruptionConditions)()
stopCh := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods}) fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods})
controller, _, nodeIndexer := setupNewNoExecuteTaintManager(context.TODO(), fakeClientset) controller, _, nodeIndexer := setupNewNoExecuteTaintManager(ctx, fakeClientset)
nodeIndexer.Add(item.newNode) nodeIndexer.Add(item.newNode)
controller.recorder = testutil.NewFakeRecorder() controller.recorder = testutil.NewFakeRecorder()
go controller.Run(context.TODO()) go controller.Run(ctx)
controller.NodeUpdated(item.oldNode, item.newNode) controller.NodeUpdated(item.oldNode, item.newNode)
// wait a bit // wait a bit
time.Sleep(timeForControllerToProgress) time.Sleep(timeForControllerToProgress)
@ -499,7 +502,6 @@ func TestUpdateNode(t *testing.T) {
} }
verifyPodActions(t, item.description, fakeClientset, item.expectPatch, item.expectDelete) verifyPodActions(t, item.description, fakeClientset, item.expectPatch, item.expectDelete)
close(stopCh)
}) })
} }
} }
@ -526,9 +528,9 @@ func TestUpdateNodeWithMultipleTaints(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO()) ctx, cancel := context.WithCancel(context.TODO())
fakeClientset := fake.NewSimpleClientset(pod) fakeClientset := fake.NewSimpleClientset(pod)
controller, _, nodeIndexer := setupNewNoExecuteTaintManager(context.TODO(), fakeClientset) controller, _, nodeIndexer := setupNewNoExecuteTaintManager(ctx, fakeClientset)
controller.recorder = testutil.NewFakeRecorder() controller.recorder = testutil.NewFakeRecorder()
go controller.Run(context.TODO()) go controller.Run(ctx)
// no taint // no taint
nodeIndexer.Add(untaintedNode) nodeIndexer.Add(untaintedNode)
@ -609,39 +611,58 @@ func TestUpdateNodeWithMultiplePods(t *testing.T) {
} }
for _, item := range testCases { for _, item := range testCases {
t.Logf("Starting testcase %q", item.description) t.Run(item.description, func(t *testing.T) {
t.Logf("Starting testcase %q", item.description)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stopCh := make(chan struct{}) fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods})
fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods}) sort.Sort(item.expectedDeleteTimes)
sort.Sort(item.expectedDeleteTimes) controller, _, nodeIndexer := setupNewNoExecuteTaintManager(ctx, fakeClientset)
controller, _, nodeIndexer := setupNewNoExecuteTaintManager(context.TODO(), fakeClientset) nodeIndexer.Add(item.newNode)
nodeIndexer.Add(item.newNode) controller.recorder = testutil.NewFakeRecorder()
controller.recorder = testutil.NewFakeRecorder() go controller.Run(ctx)
go controller.Run(context.TODO()) controller.NodeUpdated(item.oldNode, item.newNode)
controller.NodeUpdated(item.oldNode, item.newNode)
startedAt := time.Now() startedAt := time.Now()
for i := range item.expectedDeleteTimes { for i := range item.expectedDeleteTimes {
if i == 0 || item.expectedDeleteTimes[i-1].timestamp != item.expectedDeleteTimes[i].timestamp { if i == 0 || item.expectedDeleteTimes[i-1].timestamp != item.expectedDeleteTimes[i].timestamp {
// compute a grace duration to give controller time to process updates. Choose big // compute a grace duration to give controller time to process updates. Choose big
// enough intervals in the test cases above to avoid flakes. // enough intervals in the test cases above to avoid flakes.
var increment time.Duration var increment time.Duration
if i == len(item.expectedDeleteTimes)-1 || item.expectedDeleteTimes[i+1].timestamp == item.expectedDeleteTimes[i].timestamp { if i == len(item.expectedDeleteTimes)-1 || item.expectedDeleteTimes[i+1].timestamp == item.expectedDeleteTimes[i].timestamp {
increment = 500 * time.Millisecond increment = 500 * time.Millisecond
} else { } else {
increment = ((item.expectedDeleteTimes[i+1].timestamp - item.expectedDeleteTimes[i].timestamp) / time.Duration(2)) increment = ((item.expectedDeleteTimes[i+1].timestamp - item.expectedDeleteTimes[i].timestamp) / time.Duration(2))
}
sleepTime := item.expectedDeleteTimes[i].timestamp - time.Since(startedAt) + increment
if sleepTime < 0 {
sleepTime = 0
}
t.Logf("Sleeping for %v", sleepTime)
time.Sleep(sleepTime)
} }
sleepTime := item.expectedDeleteTimes[i].timestamp - time.Since(startedAt) + increment for delay, podName := range item.expectedDeleteTimes[i].names {
if sleepTime < 0 { deleted := false
sleepTime = 0 for _, action := range fakeClientset.Actions() {
deleteAction, ok := action.(clienttesting.DeleteActionImpl)
if !ok {
t.Logf("Found not-delete action with verb %v. Ignoring.", action.GetVerb())
continue
}
if deleteAction.GetResource().Resource != "pods" {
continue
}
if podName == deleteAction.GetName() {
deleted = true
}
}
if !deleted {
t.Errorf("Failed to deleted pod %v after %v", podName, delay)
}
} }
t.Logf("Sleeping for %v", sleepTime)
time.Sleep(sleepTime)
}
for delay, podName := range item.expectedDeleteTimes[i].names {
deleted := false
for _, action := range fakeClientset.Actions() { for _, action := range fakeClientset.Actions() {
deleteAction, ok := action.(clienttesting.DeleteActionImpl) deleteAction, ok := action.(clienttesting.DeleteActionImpl)
if !ok { if !ok {
@ -651,38 +672,20 @@ func TestUpdateNodeWithMultiplePods(t *testing.T) {
if deleteAction.GetResource().Resource != "pods" { if deleteAction.GetResource().Resource != "pods" {
continue continue
} }
if podName == deleteAction.GetName() { deletedPodName := deleteAction.GetName()
deleted = true expected := false
for _, podName := range item.expectedDeleteTimes[i].names {
if podName == deletedPodName {
expected = true
}
}
if !expected {
t.Errorf("Pod %v was deleted even though it shouldn't have", deletedPodName)
} }
} }
if !deleted { fakeClientset.ClearActions()
t.Errorf("Failed to deleted pod %v after %v", podName, delay)
}
} }
for _, action := range fakeClientset.Actions() { })
deleteAction, ok := action.(clienttesting.DeleteActionImpl)
if !ok {
t.Logf("Found not-delete action with verb %v. Ignoring.", action.GetVerb())
continue
}
if deleteAction.GetResource().Resource != "pods" {
continue
}
deletedPodName := deleteAction.GetName()
expected := false
for _, podName := range item.expectedDeleteTimes[i].names {
if podName == deletedPodName {
expected = true
}
}
if !expected {
t.Errorf("Pod %v was deleted even though it shouldn't have", deletedPodName)
}
}
fakeClientset.ClearActions()
}
close(stopCh)
} }
} }
@ -812,33 +815,35 @@ func TestEventualConsistency(t *testing.T) {
} }
for _, item := range testCases { for _, item := range testCases {
stopCh := make(chan struct{}) t.Run(item.description, func(t *testing.T) {
fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods}) ctx, cancel := context.WithCancel(context.Background())
controller, podIndexer, nodeIndexer := setupNewNoExecuteTaintManager(context.TODO(), fakeClientset) defer cancel()
nodeIndexer.Add(item.newNode)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(context.TODO())
if item.prevPod != nil { fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods})
podIndexer.Add(item.prevPod) controller, podIndexer, nodeIndexer := setupNewNoExecuteTaintManager(ctx, fakeClientset)
controller.PodUpdated(nil, item.prevPod) nodeIndexer.Add(item.newNode)
} controller.recorder = testutil.NewFakeRecorder()
go controller.Run(ctx)
// First we simulate NodeUpdate that should delete 'pod1'. It doesn't know about 'pod2' yet. if item.prevPod != nil {
controller.NodeUpdated(item.oldNode, item.newNode) podIndexer.Add(item.prevPod)
// TODO(mborsz): Remove this sleep and other sleeps in this file. controller.PodUpdated(nil, item.prevPod)
time.Sleep(timeForControllerToProgress) }
verifyPodActions(t, item.description, fakeClientset, false, item.expectDelete) // First we simulate NodeUpdate that should delete 'pod1'. It doesn't know about 'pod2' yet.
fakeClientset.ClearActions() controller.NodeUpdated(item.oldNode, item.newNode)
// TODO(mborsz): Remove this sleep and other sleeps in this file.
time.Sleep(timeForControllerToProgress)
// And now the delayed update of 'pod2' comes to the TaintManager. We should delete it as well. verifyPodActions(t, item.description, fakeClientset, false, item.expectDelete)
podIndexer.Update(item.newPod) fakeClientset.ClearActions()
controller.PodUpdated(item.prevPod, item.newPod)
// wait a bit
time.Sleep(timeForControllerToProgress)
close(stopCh) // And now the delayed update of 'pod2' comes to the TaintManager. We should delete it as well.
podIndexer.Update(item.newPod)
controller.PodUpdated(item.prevPod, item.newPod)
// wait a bit
time.Sleep(timeForControllerToProgress)
})
} }
} }