From 36d1af6145267ae924b5d0f53af9c263092020e4 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Fri, 18 Nov 2016 13:18:50 +0200 Subject: [PATCH] Validate pet set scale up/down order This change implements test cases to validate that: scale up will be done in order scale down in reverse order if any of the pets will be unhealthy, scale up/down will halt Change-Id: I4487a7c9823d94a2995bbb06accdfd8f3001354c --- test/e2e/network_partition.go | 8 +-- test/e2e/petset.go | 126 +++++++++++++++++++++++++++++++--- 2 files changed, 119 insertions(+), 15 deletions(-) diff --git a/test/e2e/network_partition.go b/test/e2e/network_partition.go index 4566328d1d8..d0303482996 100644 --- a/test/e2e/network_partition.go +++ b/test/e2e/network_partition.go @@ -150,7 +150,7 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() { }) // What happens in this test: - // Network traffic from a node to master is cut off to simulate network partition + // Network traffic from a node to master is cut off to simulate network partition // Expect to observe: // 1. Node is marked NotReady after timeout by nodecontroller (40seconds) // 2. All pods on node are marked NotReady shortly after #1 @@ -398,7 +398,7 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() { restartNodes(f, nodeNames) By("waiting for pods to be running again") - pst.waitForRunning(ps.Spec.Replicas, ps) + pst.waitForRunningAndReady(ps.Spec.Replicas, ps) }) It("should not reschedule pets if there is a network partition [Slow] [Disruptive] [Feature:PetSet]", func() { @@ -407,7 +407,7 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() { Expect(err).NotTo(HaveOccurred()) pst := statefulSetTester{c: c} - pst.waitForRunning(ps.Spec.Replicas, ps) + pst.waitForRunningAndReady(ps.Spec.Replicas, ps) pod := pst.getPodList(ps).Items[0] node, err := c.Core().Nodes().Get(pod.Spec.NodeName) @@ -428,7 +428,7 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() { } By("waiting for pods to be running again") - pst.waitForRunning(ps.Spec.Replicas, ps) + pst.waitForRunningAndReady(ps.Spec.Replicas, ps) }) }) diff --git a/test/e2e/petset.go b/test/e2e/petset.go index 7e54cfb0eb1..5a0e07d3e94 100644 --- a/test/e2e/petset.go +++ b/test/e2e/petset.go @@ -36,8 +36,10 @@ import ( clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/controller/petset" "k8s.io/kubernetes/pkg/labels" + klabels "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" utilyaml "k8s.io/kubernetes/pkg/util/yaml" @@ -149,13 +151,13 @@ var _ = framework.KubeDescribe("StatefulSet [Slow] [Feature:PetSet]", func() { pst := statefulSetTester{c: c} - pst.waitForRunning(1, ps) + pst.waitForRunningAndReady(1, ps) By("Marking pet at index 0 as healthy.") pst.setHealthy(ps) By("Waiting for pet at index 1 to enter running.") - pst.waitForRunning(2, ps) + pst.waitForRunningAndReady(2, ps) // Now we have 1 healthy and 1 unhealthy pet. Deleting the healthy pet should *not* // create a new pet till the remaining pet becomes healthy, which won't happen till @@ -183,7 +185,7 @@ var _ = framework.KubeDescribe("StatefulSet [Slow] [Feature:PetSet]", func() { pst := statefulSetTester{c: c} - pst.waitForRunning(ps.Spec.Replicas, ps) + pst.waitForRunningAndReady(ps.Spec.Replicas, ps) newImage := newNginxImage oldImage := ps.Spec.Template.Spec.Containers[0].Image @@ -199,7 +201,7 @@ var _ = framework.KubeDescribe("StatefulSet [Slow] [Feature:PetSet]", func() { pst.deletePetAtIndex(updateIndex, ps) By("Waiting for all stateful pods to be running again") - pst.waitForRunning(ps.Spec.Replicas, ps) + pst.waitForRunningAndReady(ps.Spec.Replicas, ps) By(fmt.Sprintf("Verifying stateful pod at index %d is updated", updateIndex)) verify := func(pod *api.Pod) { @@ -208,6 +210,83 @@ var _ = framework.KubeDescribe("StatefulSet [Slow] [Feature:PetSet]", func() { } pst.verifyPodAtIndex(updateIndex, ps, verify) }) + + It("Scaling should happen in predictable order and halt if any pet is unhealthy", func() { + psLabels := klabels.Set(labels) + By("Initializing watcher for selector " + psLabels.String()) + watcher, err := f.ClientSet.Core().Pods(ns).Watch(api.ListOptions{ + LabelSelector: psLabels.AsSelector(), + }) + Expect(err).NotTo(HaveOccurred()) + + By("Creating stateful set " + psName + " in namespace " + ns) + testProbe := &api.Probe{Handler: api.Handler{HTTPGet: &api.HTTPGetAction{ + Path: "/index.html", + Port: intstr.IntOrString{IntVal: 80}}}} + ps := newStatefulSet(psName, ns, headlessSvcName, 1, nil, nil, psLabels) + ps.Spec.Template.Spec.Containers[0].ReadinessProbe = testProbe + ps, err = c.Apps().StatefulSets(ns).Create(ps) + Expect(err).NotTo(HaveOccurred()) + + By("Waiting until all stateful set " + psName + " replicas will be running in namespace " + ns) + pst := &statefulSetTester{c: c} + pst.waitForRunningAndReady(ps.Spec.Replicas, ps) + + By("Confirming that stateful set scale up will halt with unhealthy pet") + pst.breakProbe(ps, testProbe) + pst.waitForRunningAndNotReady(ps.Spec.Replicas, ps) + pst.update(ps.Namespace, ps.Name, func(ps *apps.StatefulSet) { ps.Spec.Replicas = 3 }) + pst.confirmPetCount(1, ps, 10*time.Second) + + By("Scaling up stateful set " + psName + " to 3 replicas and waiting until all of them will be running in namespace " + ns) + pst.restoreProbe(ps, testProbe) + pst.waitForRunningAndReady(3, ps) + + By("Verifying that stateful set " + psName + " was scaled up in order") + expectedOrder := []string{"pet-0", "pet-1", "pet-2"} + _, err = watch.Until(statefulsetTimeout, watcher, func(event watch.Event) (bool, error) { + if event.Type != watch.Added { + return false, nil + } + pod := event.Object.(*api.Pod) + if pod.Name == expectedOrder[0] { + expectedOrder = expectedOrder[1:] + } + return len(expectedOrder) == 0, nil + + }) + Expect(err).NotTo(HaveOccurred()) + + By("Scale down will halt with unhealthy pet") + watcher, err = f.ClientSet.Core().Pods(ns).Watch(api.ListOptions{ + LabelSelector: psLabels.AsSelector(), + }) + Expect(err).NotTo(HaveOccurred()) + + pst.breakProbe(ps, testProbe) + pst.waitForRunningAndNotReady(3, ps) + pst.update(ps.Namespace, ps.Name, func(ps *apps.StatefulSet) { ps.Spec.Replicas = 0 }) + pst.confirmPetCount(3, ps, 10*time.Second) + + By("Scaling down stateful set " + psName + " to 0 replicas and waiting until none of pods will run in namespace" + ns) + pst.restoreProbe(ps, testProbe) + pst.scale(ps, 0) + + By("Verifying that stateful set " + psName + " was scaled down in reverse order") + expectedOrder = []string{"pet-2", "pet-1", "pet-0"} + _, err = watch.Until(statefulsetTimeout, watcher, func(event watch.Event) (bool, error) { + if event.Type != watch.Deleted { + return false, nil + } + pod := event.Object.(*api.Pod) + if pod.Name == expectedOrder[0] { + expectedOrder = expectedOrder[1:] + } + return len(expectedOrder) == 0, nil + + }) + Expect(err).NotTo(HaveOccurred()) + }) }) framework.KubeDescribe("Deploy clustered applications [Slow] [Feature:PetSet]", func() { @@ -414,7 +493,7 @@ func (c *clusterAppTester) run() { if restartCluster { By("Restarting stateful set " + ps.Name) c.tester.restart(ps) - c.tester.waitForRunning(ps.Spec.Replicas, ps) + c.tester.waitForRunningAndReady(ps.Spec.Replicas, ps) } } @@ -605,7 +684,7 @@ func (p *statefulSetTester) createStatefulSet(manifestPath, ns string) *apps.Sta framework.Logf(fmt.Sprintf("creating statefulset %v/%v with %d replicas and selector %+v", ps.Namespace, ps.Name, ps.Spec.Replicas, ps.Spec.Selector)) framework.RunKubectlOrDie("create", "-f", mkpath("petset.yaml"), fmt.Sprintf("--namespace=%v", ns)) - p.waitForRunning(ps.Spec.Replicas, ps) + p.waitForRunningAndReady(ps.Spec.Replicas, ps) return ps } @@ -656,7 +735,7 @@ func (p *statefulSetTester) saturate(ps *apps.StatefulSet) { var i int32 for i = 0; i < ps.Spec.Replicas; i++ { framework.Logf("Waiting for pet at index " + fmt.Sprintf("%v", i+1) + " to enter Running") - p.waitForRunning(i+1, ps) + p.waitForRunningAndReady(i+1, ps) framework.Logf("Marking pet at index " + fmt.Sprintf("%v", i) + " healthy") p.setHealthy(ps) } @@ -757,7 +836,7 @@ func (p *statefulSetTester) confirmPetCount(count int, ps *apps.StatefulSet, tim } } -func (p *statefulSetTester) waitForRunning(numPets int32, ps *apps.StatefulSet) { +func (p *statefulSetTester) waitForRunning(numPets int32, ps *apps.StatefulSet, shouldBeReady bool) { pollErr := wait.PollImmediate(statefulsetPoll, statefulsetTimeout, func() (bool, error) { podList := p.getPodList(ps) @@ -770,8 +849,9 @@ func (p *statefulSetTester) waitForRunning(numPets int32, ps *apps.StatefulSet) } for _, p := range podList.Items { isReady := api.IsPodReady(&p) - if p.Status.Phase != api.PodRunning || !isReady { - framework.Logf("Waiting for pod %v to enter %v - Ready=True, currently %v - Ready=%v", p.Name, api.PodRunning, p.Status.Phase, isReady) + desiredReadiness := shouldBeReady == isReady + framework.Logf("Waiting for pod %v to enter %v - Ready=%v, currently %v - Ready=%v", p.Name, api.PodRunning, shouldBeReady, p.Status.Phase, isReady) + if p.Status.Phase != api.PodRunning || !desiredReadiness { return false, nil } } @@ -780,8 +860,32 @@ func (p *statefulSetTester) waitForRunning(numPets int32, ps *apps.StatefulSet) if pollErr != nil { framework.Failf("Failed waiting for pods to enter running: %v", pollErr) } +} - p.waitForStatus(ps, numPets) +func (p *statefulSetTester) waitForRunningAndReady(numPets int32, ps *apps.StatefulSet) { + p.waitForRunning(numPets, ps, true) +} + +func (p *statefulSetTester) waitForRunningAndNotReady(numPets int32, ps *apps.StatefulSet) { + p.waitForRunning(numPets, ps, false) +} + +func (p *statefulSetTester) breakProbe(ps *apps.StatefulSet, probe *api.Probe) error { + path := probe.HTTPGet.Path + if path == "" { + return fmt.Errorf("Path expected to be not empty: %v", path) + } + cmd := fmt.Sprintf("mv -v /usr/share/nginx/html%v /tmp/", path) + return p.execInPets(ps, cmd) +} + +func (p *statefulSetTester) restoreProbe(ps *apps.StatefulSet, probe *api.Probe) error { + path := probe.HTTPGet.Path + if path == "" { + return fmt.Errorf("Path expected to be not empty: %v", path) + } + cmd := fmt.Sprintf("mv -v /tmp%v /usr/share/nginx/html/", path) + return p.execInPets(ps, cmd) } func (p *statefulSetTester) setHealthy(ps *apps.StatefulSet) {