diff --git a/test/e2e/apps/BUILD b/test/e2e/apps/BUILD index 2fc391c2e8c..ba8a93c495e 100644 --- a/test/e2e/apps/BUILD +++ b/test/e2e/apps/BUILD @@ -45,16 +45,19 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", + "//staging/src/k8s.io/client-go/dynamic:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/apps/v1:go_default_library", @@ -78,6 +81,7 @@ go_library( "//test/utils:go_default_library", "//test/utils/image:go_default_library", "//vendor/github.com/davecgh/go-spew/spew:go_default_library", + "//vendor/github.com/evanphx/json-patch:go_default_library", "//vendor/github.com/onsi/ginkgo:go_default_library", "//vendor/github.com/onsi/gomega:go_default_library", "//vendor/k8s.io/utils/pointer:go_default_library", diff --git a/test/e2e/apps/disruption.go b/test/e2e/apps/disruption.go index b43af02efd3..b785218fd61 100644 --- a/test/e2e/apps/disruption.go +++ b/test/e2e/apps/disruption.go @@ -21,16 +21,25 @@ import ( "fmt" "time" + jsonpatch "github.com/evanphx/json-patch" "github.com/onsi/ginkgo" "github.com/onsi/gomega" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" policyv1beta1 "k8s.io/api/policy/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" + clientscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/util/retry" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/test/e2e/framework" @@ -45,24 +54,48 @@ const ( bigClusterSize = 7 schedulingTimeout = 10 * time.Minute timeout = 60 * time.Second + defaultName = "foo" ) +var defaultLabels = map[string]string{"foo": "bar"} + var _ = SIGDescribe("DisruptionController", func() { f := framework.NewDefaultFramework("disruption") var ns string var cs kubernetes.Interface + var dc dynamic.Interface ginkgo.BeforeEach(func() { cs = f.ClientSet ns = f.Namespace.Name + dc = f.DynamicClient + }) + + ginkgo.Context("Listing PodDisruptionBudgets for all namespaces", func() { + anotherFramework := framework.NewDefaultFramework("disruption-2") + + ginkgo.It("should list and delete a collection of PodDisruptionBudgets", func() { + specialLabels := map[string]string{"foo_pdb": "bar_pdb"} + labelSelector := labels.SelectorFromSet(specialLabels).String() + createPDBMinAvailableOrDie(cs, ns, defaultName, intstr.FromInt(2), specialLabels) + createPDBMinAvailableOrDie(cs, ns, "foo2", intstr.FromString("1%"), specialLabels) + createPDBMinAvailableOrDie(anotherFramework.ClientSet, anotherFramework.Namespace.Name, "foo3", intstr.FromInt(2), specialLabels) + + ginkgo.By("listing a collection of PDBs across all namespaces") + listPDBs(cs, metav1.NamespaceAll, labelSelector, 3, []string{defaultName, "foo2", "foo3"}) + + ginkgo.By("listing a collection of PDBs in namespace " + ns) + listPDBs(cs, ns, labelSelector, 2, []string{defaultName, "foo2"}) + deletePDBCollection(cs, ns) + }) }) ginkgo.It("should create a PodDisruptionBudget", func() { - createPDBMinAvailableOrDie(cs, ns, intstr.FromString("1%")) + createPDBMinAvailableOrDie(cs, ns, defaultName, intstr.FromString("1%"), defaultLabels) }) - ginkgo.It("should update PodDisruptionBudget status", func() { - createPDBMinAvailableOrDie(cs, ns, intstr.FromInt(2)) + ginkgo.It("should observe PodDisruptionBudget status updated", func() { + createPDBMinAvailableOrDie(cs, ns, defaultName, intstr.FromInt(1), defaultLabels) createPodsOrDie(cs, ns, 3) waitForPodsOrDie(cs, ns, 3) @@ -70,7 +103,7 @@ var _ = SIGDescribe("DisruptionController", func() { // Since disruptionAllowed starts out 0, if we see it ever become positive, // that means the controller is working. err := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) { - pdb, err := cs.PolicyV1beta1().PodDisruptionBudgets(ns).Get(context.TODO(), "foo", metav1.GetOptions{}) + pdb, err := cs.PolicyV1beta1().PodDisruptionBudgets(ns).Get(context.TODO(), defaultName, metav1.GetOptions{}) if err != nil { return false, err } @@ -79,6 +112,34 @@ var _ = SIGDescribe("DisruptionController", func() { framework.ExpectNoError(err) }) + ginkgo.It("should update/patch PodDisruptionBudget status", func() { + createPDBMinAvailableOrDie(cs, ns, defaultName, intstr.FromInt(1), defaultLabels) + + ginkgo.By("Updating PodDisruptionBudget status") + // PDB status can be updated by both PDB controller and the status API. The test selects `DisruptedPods` field to show immediate update via API. + // The pod has to exist, otherwise wil be removed by the controller. Other fields may not reflect the change from API. + createPodsOrDie(cs, ns, 1) + waitForPodsOrDie(cs, ns, 1) + pod, _ := locateRunningPod(cs, ns) + updatePDBOrDie(cs, ns, defaultName, func(old *policyv1beta1.PodDisruptionBudget) *policyv1beta1.PodDisruptionBudget { + old.Status.DisruptedPods = make(map[string]metav1.Time) + old.Status.DisruptedPods[pod.Name] = metav1.NewTime(time.Now()) + return old + }, cs.PolicyV1beta1().PodDisruptionBudgets(ns).UpdateStatus) + // fetch again to make sure the update from API was effective + updated := getPDBStatusOrDie(dc, ns, defaultName) + framework.ExpectHaveKey(updated.Status.DisruptedPods, pod.Name, "Expecting the DisruptedPods have %s", pod.Name) + + ginkgo.By("Patching PodDisruptionBudget status") + patched, _ := patchPDBOrDie(cs, dc, ns, defaultName, func(old *policyv1beta1.PodDisruptionBudget) (bytes []byte, err error) { + oldBytes, _ := json.Marshal(old) + old.Status.DisruptedPods = make(map[string]metav1.Time) + newBytes, _ := json.Marshal(old) + return jsonpatch.CreateMergePatch(oldBytes, newBytes) + }, "status") + framework.ExpectEmpty(patched.Status.DisruptedPods, "Expecting the PodDisruptionBudget's be empty") + }) + evictionCases := []struct { description string minAvailable intstr.IntOrString @@ -166,11 +227,11 @@ var _ = SIGDescribe("DisruptionController", func() { } if c.minAvailable.String() != "" { - createPDBMinAvailableOrDie(cs, ns, c.minAvailable) + createPDBMinAvailableOrDie(cs, ns, defaultName, c.minAvailable, defaultLabels) } if c.maxUnavailable.String() != "" { - createPDBMaxUnavailableOrDie(cs, ns, c.maxUnavailable) + createPDBMaxUnavailableOrDie(cs, ns, defaultName, c.maxUnavailable) } // Locate a running pod. @@ -209,7 +270,7 @@ var _ = SIGDescribe("DisruptionController", func() { ginkgo.It("should block an eviction until the PDB is updated to allow it", func() { ginkgo.By("Creating a pdb that targets all three pods in a test replica set") - createPDBMinAvailableOrDie(cs, ns, intstr.FromInt(3)) + createPDBMinAvailableOrDie(cs, ns, defaultName, intstr.FromInt(3), defaultLabels) createReplicaSetOrDie(cs, ns, 3, false) ginkgo.By("First trying to evict a pod which shouldn't be evictable") @@ -227,21 +288,57 @@ var _ = SIGDescribe("DisruptionController", func() { gomega.Expect(err).Should(gomega.MatchError("Cannot evict pod as it would violate the pod's disruption budget.")) ginkgo.By("Updating the pdb to allow a pod to be evicted") - updatePDBMinAvailableOrDie(cs, ns, intstr.FromInt(2)) + updatePDBOrDie(cs, ns, defaultName, func(pdb *policyv1beta1.PodDisruptionBudget) *policyv1beta1.PodDisruptionBudget { + newMinAvailable := intstr.FromInt(2) + pdb.Spec.MinAvailable = &newMinAvailable + return pdb + }, cs.PolicyV1beta1().PodDisruptionBudgets(ns).Update) ginkgo.By("Trying to evict the same pod we tried earlier which should now be evictable") waitForPodsOrDie(cs, ns, 3) waitForPdbToObserveHealthyPods(cs, ns, 3) err = cs.CoreV1().Pods(ns).Evict(e) framework.ExpectNoError(err) // the eviction is now allowed + + ginkgo.By("Patching the pdb to disallow a pod to be evicted") + patchPDBOrDie(cs, dc, ns, defaultName, func(old *policyv1beta1.PodDisruptionBudget) (bytes []byte, err error) { + oldData, err := json.Marshal(old) + old.Spec.MinAvailable = nil + maxUnavailable := intstr.FromInt(0) + old.Spec.MaxUnavailable = &maxUnavailable + newData, _ := json.Marshal(old) + return jsonpatch.CreateMergePatch(oldData, newData) + }) + + pod, err = locateRunningPod(cs, ns) // locate a new running pod + framework.ExpectNoError(err) + waitForPodsOrDie(cs, ns, 3) + e = &policyv1beta1.Eviction{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name, + Namespace: ns, + }, + } + err = cs.CoreV1().Pods(ns).Evict(e) + gomega.Expect(err).Should(gomega.MatchError("Cannot evict pod as it would violate the pod's disruption budget.")) + + ginkgo.By("Deleting the pdb to allow a pod to be evicted") + deletePDBOrDie(cs, ns, defaultName) + + ginkgo.By("Trying to evict the same pod we tried earlier which should now be evictable") + waitForPodsOrDie(cs, ns, 3) + err = cs.CoreV1().Pods(ns).Evict(e) + framework.ExpectNoError(err) // the eviction is now allowed }) + }) -func createPDBMinAvailableOrDie(cs kubernetes.Interface, ns string, minAvailable intstr.IntOrString) { +func createPDBMinAvailableOrDie(cs kubernetes.Interface, ns string, name string, minAvailable intstr.IntOrString, labels map[string]string) { pdb := policyv1beta1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ - Name: "foo", + Name: name, Namespace: ns, + Labels: labels, }, Spec: policyv1beta1.PodDisruptionBudgetSpec{ Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}, @@ -250,13 +347,13 @@ func createPDBMinAvailableOrDie(cs kubernetes.Interface, ns string, minAvailable } _, err := cs.PolicyV1beta1().PodDisruptionBudgets(ns).Create(context.TODO(), &pdb, metav1.CreateOptions{}) framework.ExpectNoError(err, "Waiting for the pdb to be created with minAvailable %d in namespace %s", minAvailable.IntVal, ns) - waitForPdbToBeProcessed(cs, ns) + waitForPdbToBeProcessed(cs, ns, name) } -func createPDBMaxUnavailableOrDie(cs kubernetes.Interface, ns string, maxUnavailable intstr.IntOrString) { +func createPDBMaxUnavailableOrDie(cs kubernetes.Interface, ns string, name string, maxUnavailable intstr.IntOrString) { pdb := policyv1beta1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ - Name: "foo", + Name: name, Namespace: ns, }, Spec: policyv1beta1.PodDisruptionBudgetSpec{ @@ -266,24 +363,83 @@ func createPDBMaxUnavailableOrDie(cs kubernetes.Interface, ns string, maxUnavail } _, err := cs.PolicyV1beta1().PodDisruptionBudgets(ns).Create(context.TODO(), &pdb, metav1.CreateOptions{}) framework.ExpectNoError(err, "Waiting for the pdb to be created with maxUnavailable %d in namespace %s", maxUnavailable.IntVal, ns) - waitForPdbToBeProcessed(cs, ns) + waitForPdbToBeProcessed(cs, ns, name) } -func updatePDBMinAvailableOrDie(cs kubernetes.Interface, ns string, minAvailable intstr.IntOrString) { - err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - old, err := cs.PolicyV1beta1().PodDisruptionBudgets(ns).Get(context.TODO(), "foo", metav1.GetOptions{}) +type updateFunc func(pdb *policyv1beta1.PodDisruptionBudget) *policyv1beta1.PodDisruptionBudget +type updateRestAPI func(ctx context.Context, podDisruptionBudget *policyv1beta1.PodDisruptionBudget, opts metav1.UpdateOptions) (*policyv1beta1.PodDisruptionBudget, error) +type patchFunc func(pdb *policyv1beta1.PodDisruptionBudget) ([]byte, error) + +func updatePDBOrDie(cs kubernetes.Interface, ns string, name string, f updateFunc, api updateRestAPI) (updated *policyv1beta1.PodDisruptionBudget, err error) { + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + old, err := cs.PolicyV1beta1().PodDisruptionBudgets(ns).Get(context.TODO(), name, metav1.GetOptions{}) if err != nil { return err } - old.Spec.MinAvailable = &minAvailable - if _, err := cs.PolicyV1beta1().PodDisruptionBudgets(ns).Update(context.TODO(), old, metav1.UpdateOptions{}); err != nil { + old = f(old) + if updated, err = api(context.TODO(), old, metav1.UpdateOptions{}); err != nil { + return err + } + return nil + }) + + framework.ExpectNoError(err, "Waiting for the PDB update to be processed in namespace %s", ns) + waitForPdbToBeProcessed(cs, ns, name) + return updated, err +} + +func patchPDBOrDie(cs kubernetes.Interface, dc dynamic.Interface, ns string, name string, f patchFunc, subresources ...string) (updated *policyv1beta1.PodDisruptionBudget, err error) { + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + old := getPDBStatusOrDie(dc, ns, name) + patchBytes, err := f(old) + if updated, err = cs.PolicyV1beta1().PodDisruptionBudgets(ns).Patch(context.TODO(), old.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, subresources...); err != nil { return err } return nil }) framework.ExpectNoError(err, "Waiting for the pdb update to be processed in namespace %s", ns) - waitForPdbToBeProcessed(cs, ns) + waitForPdbToBeProcessed(cs, ns, name) + return updated, err +} + +func deletePDBOrDie(cs kubernetes.Interface, ns string, name string) { + err := cs.PolicyV1beta1().PodDisruptionBudgets(ns).Delete(context.TODO(), name, &metav1.DeleteOptions{}) + framework.ExpectNoError(err, "Deleting pdb in namespace %s", ns) + waitForPdbToBeDeleted(cs, ns, name) +} + +func listPDBs(cs kubernetes.Interface, ns string, labelSelector string, count int, expectedPDBNames []string) { + pdbList, err := cs.PolicyV1beta1().PodDisruptionBudgets(ns).List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector}) + framework.ExpectNoError(err, "Listing PDB set in namespace %s", ns) + framework.ExpectEqual(len(pdbList.Items), count, "Expecting %d PDBs returned in namespace %s", count, ns) + + pdbNames := make([]string, 0) + for _, item := range pdbList.Items { + pdbNames = append(pdbNames, item.Name) + } + framework.ExpectConsistOf(pdbNames, expectedPDBNames, "Expecting returned PDBs '%s' in namespace %s", expectedPDBNames, ns) +} + +func deletePDBCollection(cs kubernetes.Interface, ns string) { + ginkgo.By("deleting a collection of PDBs") + err := cs.PolicyV1beta1().PodDisruptionBudgets(ns).DeleteCollection(context.TODO(), &metav1.DeleteOptions{}, metav1.ListOptions{}) + framework.ExpectNoError(err, "Deleting PDB set in namespace %s", ns) + + waitForPDBCollectionToBeDeleted(cs, ns) +} + +func waitForPDBCollectionToBeDeleted(cs kubernetes.Interface, ns string) { + ginkgo.By("Waiting for the PDB collection to be deleted") + wait.PollImmediate(framework.Poll, schedulingTimeout, func() (bool, error) { + pdbList, err := cs.PolicyV1beta1().PodDisruptionBudgets(ns).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return false, err + } + framework.ExpectNoError(err, "Listing PDB set in namespace %s", ns) + framework.ExpectEqual(len(pdbList.Items), 0, "Expecting No PDBs returned in namespace %s", ns) + return true, nil + }) } func createPodsOrDie(cs kubernetes.Interface, ns string, n int) { @@ -397,10 +553,10 @@ func locateRunningPod(cs kubernetes.Interface, ns string) (pod *v1.Pod, err erro return pod, err } -func waitForPdbToBeProcessed(cs kubernetes.Interface, ns string) { +func waitForPdbToBeProcessed(cs kubernetes.Interface, ns string, name string) { ginkgo.By("Waiting for the pdb to be processed") err := wait.PollImmediate(framework.Poll, schedulingTimeout, func() (bool, error) { - pdb, err := cs.PolicyV1beta1().PodDisruptionBudgets(ns).Get(context.TODO(), "foo", metav1.GetOptions{}) + pdb, err := cs.PolicyV1beta1().PodDisruptionBudgets(ns).Get(context.TODO(), name, metav1.GetOptions{}) if err != nil { return false, err } @@ -412,6 +568,21 @@ func waitForPdbToBeProcessed(cs kubernetes.Interface, ns string) { framework.ExpectNoError(err, "Waiting for the pdb to be processed in namespace %s", ns) } +func waitForPdbToBeDeleted(cs kubernetes.Interface, ns string, name string) { + ginkgo.By("Waiting for the pdb to be deleted") + err := wait.PollImmediate(framework.Poll, schedulingTimeout, func() (bool, error) { + _, err := cs.PolicyV1beta1().PodDisruptionBudgets(ns).Get(context.TODO(), name, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + return true, nil // done + } + if err != nil { + return false, err + } + return false, nil + }) + framework.ExpectNoError(err, "Waiting for the pdb to be deleted in namespace %s", ns) +} + func waitForPdbToObserveHealthyPods(cs kubernetes.Interface, ns string, healthyCount int32) { ginkgo.By("Waiting for the pdb to observed all healthy pods") err := wait.PollImmediate(framework.Poll, wait.ForeverTestTimeout, func() (bool, error) { @@ -426,3 +597,23 @@ func waitForPdbToObserveHealthyPods(cs kubernetes.Interface, ns string, healthyC }) framework.ExpectNoError(err, "Waiting for the pdb in namespace %s to observed %d healthy pods", ns, healthyCount) } + +func getPDBStatusOrDie(dc dynamic.Interface, ns string, name string) *policyv1beta1.PodDisruptionBudget { + pdbStatusResource := policyv1beta1.SchemeGroupVersion.WithResource("poddisruptionbudgets") + unstruct, err := dc.Resource(pdbStatusResource).Namespace(ns).Get(name, metav1.GetOptions{}, "status") + pdb, err := unstructuredToPDB(unstruct) + framework.ExpectNoError(err, "Getting the status of the pdb %s in namespace %s", name, ns) + return pdb +} + +func unstructuredToPDB(obj *unstructured.Unstructured) (*policyv1beta1.PodDisruptionBudget, error) { + json, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj) + if err != nil { + return nil, err + } + pdb := &policyv1beta1.PodDisruptionBudget{} + err = runtime.DecodeInto(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), json, pdb) + pdb.Kind = "" + pdb.APIVersion = "" + return pdb, err +} diff --git a/test/e2e/framework/expect.go b/test/e2e/framework/expect.go index 07b2de07002..b7584e654fa 100644 --- a/test/e2e/framework/expect.go +++ b/test/e2e/framework/expect.go @@ -45,3 +45,18 @@ func ExpectNoError(err error, explain ...interface{}) { func ExpectNoErrorWithOffset(offset int, err error, explain ...interface{}) { gomega.ExpectWithOffset(1+offset, err).NotTo(gomega.HaveOccurred(), explain...) } + +// ExpectConsistOf expects actual contains precisely the extra elements. The ordering of the elements does not matter. +func ExpectConsistOf(actual interface{}, extra interface{}, explain ...interface{}) { + gomega.ExpectWithOffset(1, actual).To(gomega.ConsistOf(extra), explain...) +} + +// ExpectHaveKey expects the actual map has the key in the keyset +func ExpectHaveKey(actual interface{}, key interface{}, explain ...interface{}) { + gomega.ExpectWithOffset(1, actual).To(gomega.HaveKey(key), explain...) +} + +// ExpectEmpty expects actual is empty +func ExpectEmpty(actual interface{}, explain ...interface{}) { + gomega.ExpectWithOffset(1, actual).To(gomega.BeEmpty(), explain...) +}