From 28b057ebe9f4f00123f868c39b9f089065c7e9b9 Mon Sep 17 00:00:00 2001 From: ymqytw Date: Thu, 5 Jan 2017 14:29:45 -0800 Subject: [PATCH] fix evictions test and refactor the test --- test/integration/evictions/evictions_test.go | 232 ++++++++++--------- 1 file changed, 126 insertions(+), 106 deletions(-) diff --git a/test/integration/evictions/evictions_test.go b/test/integration/evictions/evictions_test.go index 29d838960af..342c12881be 100644 --- a/test/integration/evictions/evictions_test.go +++ b/test/integration/evictions/evictions_test.go @@ -21,6 +21,8 @@ package evictions import ( "fmt" "net/http/httptest" + "sync" + "sync/atomic" "testing" "time" @@ -33,15 +35,138 @@ import ( "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/controller/disruption" "k8s.io/kubernetes/pkg/controller/informers" + utilerrors "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/test/integration/framework" ) const ( - defaultTimeout = 10 * time.Minute + numOfEvictions = 10 ) +// TestConcurrentEvictionRequests is to make sure pod disruption budgets (PDB) controller is able to +// handle concurrent eviction requests. Original issue:#37605 +func TestConcurrentEvictionRequests(t *testing.T) { + podNameFormat := "test-pod-%d" + + s, rm, podInformer, clientSet := rmSetup(t) + defer s.Close() + + ns := framework.CreateTestingNamespace("concurrent-eviction-requests", s, t) + defer framework.DeleteTestingNamespace(ns, s, t) + + stopCh := make(chan struct{}) + go podInformer.Run(stopCh) + go rm.Run(stopCh) + defer close(stopCh) + + config := restclient.Config{Host: s.URL} + clientSet, err := clientset.NewForConfig(&config) + if err != nil { + t.Fatalf("Failed to create clientset: %v", err) + } + + var gracePeriodSeconds int64 = 30 + deleteOption := &v1.DeleteOptions{ + GracePeriodSeconds: &gracePeriodSeconds, + } + + // Generate numOfEvictions pods to evict + for i := 0; i < numOfEvictions; i++ { + podName := fmt.Sprintf(podNameFormat, i) + pod := newPod(podName) + + if _, err := clientSet.Core().Pods(ns.Name).Create(pod); err != nil { + t.Errorf("Failed to create pod: %v", err) + } + + addPodConditionReady(pod) + if _, err := clientSet.Core().Pods(ns.Name).UpdateStatus(pod); err != nil { + t.Fatal(err) + } + } + + waitToObservePods(t, podInformer, numOfEvictions) + + pdb := newPDB() + if _, err := clientSet.Policy().PodDisruptionBudgets(ns.Name).Create(pdb); err != nil { + t.Errorf("Failed to create PodDisruptionBudget: %v", err) + } + + waitPDBStable(t, clientSet, numOfEvictions, ns.Name, pdb.Name) + + var numberPodsEvicted uint32 = 0 + errCh := make(chan error, 3*numOfEvictions) + var wg sync.WaitGroup + // spawn numOfEvictions goroutines to concurrently evict the pods + for i := 0; i < numOfEvictions; i++ { + wg.Add(1) + go func(id int, errCh chan error) { + defer wg.Done() + podName := fmt.Sprintf(podNameFormat, id) + eviction := newEviction(ns.Name, podName, deleteOption) + + err := wait.PollImmediate(5*time.Second, 60*time.Second, func() (bool, error) { + e := clientSet.Policy().Evictions(ns.Name).Evict(eviction) + switch { + case errors.IsTooManyRequests(e): + return false, nil + case errors.IsConflict(e): + return false, fmt.Errorf("Unexpected Conflict (409) error caused by failing to handle concurrent PDB updates: %v", e) + case e == nil: + return true, nil + default: + return false, e + } + }) + + if err != nil { + errCh <- err + // should not return here otherwise we would leak the pod + } + + _, err = clientSet.Core().Pods(ns.Name).Get(podName, metav1.GetOptions{}) + switch { + case errors.IsNotFound(err): + atomic.AddUint32(&numberPodsEvicted, 1) + // pod was evicted and deleted so return from goroutine immediately + return + case err == nil: + // this shouldn't happen if the pod was evicted successfully + errCh <- fmt.Errorf("Pod %q is expected to be evicted", podName) + default: + errCh <- err + } + + // delete pod which still exists due to error + e := clientSet.Core().Pods(ns.Name).Delete(podName, deleteOption) + if e != nil { + errCh <- e + } + + }(i, errCh) + } + + wg.Wait() + + close(errCh) + var errList []error + if err := clientSet.Policy().PodDisruptionBudgets(ns.Name).Delete(pdb.Name, deleteOption); err != nil { + errList = append(errList, fmt.Errorf("Failed to delete PodDisruptionBudget: %v", err)) + } + for err := range errCh { + errList = append(errList, err) + } + if len(errList) > 0 { + t.Fatal(utilerrors.NewAggregate(errList)) + } + + if atomic.LoadUint32(&numberPodsEvicted) != numOfEvictions { + t.Fatalf("fewer number of successful evictions than expected :", numberPodsEvicted) + } +} + func newPod(podName string) *v1.Pod { return &v1.Pod{ ObjectMeta: v1.ObjectMeta{ @@ -121,111 +246,6 @@ func rmSetup(t *testing.T) (*httptest.Server, *disruption.DisruptionController, return s, rm, informers.Pods().Informer(), clientSet } -func TestConcurrentEvictionRequests(t *testing.T) { - podNameFormat := "test-pod-%d" - - s, rm, podInformer, clientSet := rmSetup(t) - defer s.Close() - - ns := framework.CreateTestingNamespace("concurrent-eviction-requests", s, t) - defer framework.DeleteTestingNamespace(ns, s, t) - - stopCh := make(chan struct{}) - go podInformer.Run(stopCh) - go rm.Run(stopCh) - - config := restclient.Config{Host: s.URL} - clientSet, err := clientset.NewForConfig(&config) - - var gracePeriodSeconds int64 = 30 - deleteOption := &v1.DeleteOptions{ - GracePeriodSeconds: &gracePeriodSeconds, - } - - // Generate 10 pods to evict - for i := 0; i < 10; i++ { - podName := fmt.Sprintf(podNameFormat, i) - pod := newPod(podName) - - if _, err := clientSet.Core().Pods(ns.Name).Create(pod); err != nil { - t.Errorf("Failed to create pod: %v", err) - } - - addPodConditionReady(pod) - if _, err := clientSet.Core().Pods(ns.Name).UpdateStatus(pod); err != nil { - t.Fatal(err) - } - } - - waitToObservePods(t, podInformer, 10) - - pdb := newPDB() - if _, err := clientSet.Policy().PodDisruptionBudgets(ns.Name).Create(pdb); err != nil { - t.Errorf("Failed to create PodDisruptionBudget: %v", err) - } - - waitPDBStable(t, clientSet, 10, ns.Name, pdb.Name) - - doneCh := make(chan bool, 10) - errCh := make(chan error, 1) - // spawn 10 goroutine to concurrently evict the pods - for i := 0; i < 10; i++ { - go func(id int, doneCh chan bool, errCh chan error) { - evictionName := fmt.Sprintf(podNameFormat, id) - eviction := newEviction(ns.Name, evictionName, deleteOption) - - var e error - for { - e = clientSet.Policy().Evictions(ns.Name).Evict(eviction) - if errors.IsTooManyRequests(e) { - time.Sleep(5 * time.Second) - } else { - break - } - } - if e != nil { - if errors.IsConflict(err) { - fmt.Errorf("Unexpected Conflict (409) error caused by failing to handle concurrent PDB updates: %v", e) - } else { - errCh <- e - } - return - } - doneCh <- true - }(i, doneCh, errCh) - } - - doneCount := 0 - for { - select { - case err := <-errCh: - t.Errorf("%v", err) - return - case <-doneCh: - doneCount++ - if doneCount == 10 { - return - } - case <-time.After(defaultTimeout): - t.Errorf("Eviction did not complete within %v", defaultTimeout) - } - } - - for i := 0; i < 10; i++ { - podName := fmt.Sprintf(podNameFormat, i) - _, err := clientSet.Core().Pods(ns.Name).Get(podName, metav1.GetOptions{}) - if !errors.IsNotFound(err) { - t.Errorf("Pod %q is expected to be evicted", podName) - } - } - - if err := clientSet.Policy().PodDisruptionBudgets(ns.Name).Delete(pdb.Name, deleteOption); err != nil { - t.Errorf("Failed to delete PodDisruptionBudget: %v", err) - } - - close(stopCh) -} - // wait for the podInformer to observe the pods. Call this function before // running the RS controller to prevent the rc manager from creating new pods // rather than adopting the existing ones.