Validate pet set scale up/down order

This change implements test cases to validate that:

    scale up will be done in order
    scale down in reverse order
    if any of the pets will be unhealthy, scale up/down will halt

Change-Id: I4487a7c9823d94a2995bbb06accdfd8f3001354c
This commit is contained in:
Dmitry Shulyak 2016-11-18 13:18:50 +02:00
parent 3b43ce8e5c
commit 36d1af6145
2 changed files with 119 additions and 15 deletions

View File

@ -150,7 +150,7 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() {
})
// What happens in this test:
// Network traffic from a node to master is cut off to simulate network partition
// Network traffic from a node to master is cut off to simulate network partition
// Expect to observe:
// 1. Node is marked NotReady after timeout by nodecontroller (40seconds)
// 2. All pods on node are marked NotReady shortly after #1
@ -398,7 +398,7 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() {
restartNodes(f, nodeNames)
By("waiting for pods to be running again")
pst.waitForRunning(ps.Spec.Replicas, ps)
pst.waitForRunningAndReady(ps.Spec.Replicas, ps)
})
It("should not reschedule pets if there is a network partition [Slow] [Disruptive] [Feature:PetSet]", func() {
@ -407,7 +407,7 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() {
Expect(err).NotTo(HaveOccurred())
pst := statefulSetTester{c: c}
pst.waitForRunning(ps.Spec.Replicas, ps)
pst.waitForRunningAndReady(ps.Spec.Replicas, ps)
pod := pst.getPodList(ps).Items[0]
node, err := c.Core().Nodes().Get(pod.Spec.NodeName)
@ -428,7 +428,7 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() {
}
By("waiting for pods to be running again")
pst.waitForRunning(ps.Spec.Replicas, ps)
pst.waitForRunningAndReady(ps.Spec.Replicas, ps)
})
})

View File

@ -36,8 +36,10 @@ import (
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/controller/petset"
"k8s.io/kubernetes/pkg/labels"
klabels "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/intstr"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
utilyaml "k8s.io/kubernetes/pkg/util/yaml"
@ -149,13 +151,13 @@ var _ = framework.KubeDescribe("StatefulSet [Slow] [Feature:PetSet]", func() {
pst := statefulSetTester{c: c}
pst.waitForRunning(1, ps)
pst.waitForRunningAndReady(1, ps)
By("Marking pet at index 0 as healthy.")
pst.setHealthy(ps)
By("Waiting for pet at index 1 to enter running.")
pst.waitForRunning(2, ps)
pst.waitForRunningAndReady(2, ps)
// Now we have 1 healthy and 1 unhealthy pet. Deleting the healthy pet should *not*
// create a new pet till the remaining pet becomes healthy, which won't happen till
@ -183,7 +185,7 @@ var _ = framework.KubeDescribe("StatefulSet [Slow] [Feature:PetSet]", func() {
pst := statefulSetTester{c: c}
pst.waitForRunning(ps.Spec.Replicas, ps)
pst.waitForRunningAndReady(ps.Spec.Replicas, ps)
newImage := newNginxImage
oldImage := ps.Spec.Template.Spec.Containers[0].Image
@ -199,7 +201,7 @@ var _ = framework.KubeDescribe("StatefulSet [Slow] [Feature:PetSet]", func() {
pst.deletePetAtIndex(updateIndex, ps)
By("Waiting for all stateful pods to be running again")
pst.waitForRunning(ps.Spec.Replicas, ps)
pst.waitForRunningAndReady(ps.Spec.Replicas, ps)
By(fmt.Sprintf("Verifying stateful pod at index %d is updated", updateIndex))
verify := func(pod *api.Pod) {
@ -208,6 +210,83 @@ var _ = framework.KubeDescribe("StatefulSet [Slow] [Feature:PetSet]", func() {
}
pst.verifyPodAtIndex(updateIndex, ps, verify)
})
It("Scaling should happen in predictable order and halt if any pet is unhealthy", func() {
psLabels := klabels.Set(labels)
By("Initializing watcher for selector " + psLabels.String())
watcher, err := f.ClientSet.Core().Pods(ns).Watch(api.ListOptions{
LabelSelector: psLabels.AsSelector(),
})
Expect(err).NotTo(HaveOccurred())
By("Creating stateful set " + psName + " in namespace " + ns)
testProbe := &api.Probe{Handler: api.Handler{HTTPGet: &api.HTTPGetAction{
Path: "/index.html",
Port: intstr.IntOrString{IntVal: 80}}}}
ps := newStatefulSet(psName, ns, headlessSvcName, 1, nil, nil, psLabels)
ps.Spec.Template.Spec.Containers[0].ReadinessProbe = testProbe
ps, err = c.Apps().StatefulSets(ns).Create(ps)
Expect(err).NotTo(HaveOccurred())
By("Waiting until all stateful set " + psName + " replicas will be running in namespace " + ns)
pst := &statefulSetTester{c: c}
pst.waitForRunningAndReady(ps.Spec.Replicas, ps)
By("Confirming that stateful set scale up will halt with unhealthy pet")
pst.breakProbe(ps, testProbe)
pst.waitForRunningAndNotReady(ps.Spec.Replicas, ps)
pst.update(ps.Namespace, ps.Name, func(ps *apps.StatefulSet) { ps.Spec.Replicas = 3 })
pst.confirmPetCount(1, ps, 10*time.Second)
By("Scaling up stateful set " + psName + " to 3 replicas and waiting until all of them will be running in namespace " + ns)
pst.restoreProbe(ps, testProbe)
pst.waitForRunningAndReady(3, ps)
By("Verifying that stateful set " + psName + " was scaled up in order")
expectedOrder := []string{"pet-0", "pet-1", "pet-2"}
_, err = watch.Until(statefulsetTimeout, watcher, func(event watch.Event) (bool, error) {
if event.Type != watch.Added {
return false, nil
}
pod := event.Object.(*api.Pod)
if pod.Name == expectedOrder[0] {
expectedOrder = expectedOrder[1:]
}
return len(expectedOrder) == 0, nil
})
Expect(err).NotTo(HaveOccurred())
By("Scale down will halt with unhealthy pet")
watcher, err = f.ClientSet.Core().Pods(ns).Watch(api.ListOptions{
LabelSelector: psLabels.AsSelector(),
})
Expect(err).NotTo(HaveOccurred())
pst.breakProbe(ps, testProbe)
pst.waitForRunningAndNotReady(3, ps)
pst.update(ps.Namespace, ps.Name, func(ps *apps.StatefulSet) { ps.Spec.Replicas = 0 })
pst.confirmPetCount(3, ps, 10*time.Second)
By("Scaling down stateful set " + psName + " to 0 replicas and waiting until none of pods will run in namespace" + ns)
pst.restoreProbe(ps, testProbe)
pst.scale(ps, 0)
By("Verifying that stateful set " + psName + " was scaled down in reverse order")
expectedOrder = []string{"pet-2", "pet-1", "pet-0"}
_, err = watch.Until(statefulsetTimeout, watcher, func(event watch.Event) (bool, error) {
if event.Type != watch.Deleted {
return false, nil
}
pod := event.Object.(*api.Pod)
if pod.Name == expectedOrder[0] {
expectedOrder = expectedOrder[1:]
}
return len(expectedOrder) == 0, nil
})
Expect(err).NotTo(HaveOccurred())
})
})
framework.KubeDescribe("Deploy clustered applications [Slow] [Feature:PetSet]", func() {
@ -414,7 +493,7 @@ func (c *clusterAppTester) run() {
if restartCluster {
By("Restarting stateful set " + ps.Name)
c.tester.restart(ps)
c.tester.waitForRunning(ps.Spec.Replicas, ps)
c.tester.waitForRunningAndReady(ps.Spec.Replicas, ps)
}
}
@ -605,7 +684,7 @@ func (p *statefulSetTester) createStatefulSet(manifestPath, ns string) *apps.Sta
framework.Logf(fmt.Sprintf("creating statefulset %v/%v with %d replicas and selector %+v", ps.Namespace, ps.Name, ps.Spec.Replicas, ps.Spec.Selector))
framework.RunKubectlOrDie("create", "-f", mkpath("petset.yaml"), fmt.Sprintf("--namespace=%v", ns))
p.waitForRunning(ps.Spec.Replicas, ps)
p.waitForRunningAndReady(ps.Spec.Replicas, ps)
return ps
}
@ -656,7 +735,7 @@ func (p *statefulSetTester) saturate(ps *apps.StatefulSet) {
var i int32
for i = 0; i < ps.Spec.Replicas; i++ {
framework.Logf("Waiting for pet at index " + fmt.Sprintf("%v", i+1) + " to enter Running")
p.waitForRunning(i+1, ps)
p.waitForRunningAndReady(i+1, ps)
framework.Logf("Marking pet at index " + fmt.Sprintf("%v", i) + " healthy")
p.setHealthy(ps)
}
@ -757,7 +836,7 @@ func (p *statefulSetTester) confirmPetCount(count int, ps *apps.StatefulSet, tim
}
}
func (p *statefulSetTester) waitForRunning(numPets int32, ps *apps.StatefulSet) {
func (p *statefulSetTester) waitForRunning(numPets int32, ps *apps.StatefulSet, shouldBeReady bool) {
pollErr := wait.PollImmediate(statefulsetPoll, statefulsetTimeout,
func() (bool, error) {
podList := p.getPodList(ps)
@ -770,8 +849,9 @@ func (p *statefulSetTester) waitForRunning(numPets int32, ps *apps.StatefulSet)
}
for _, p := range podList.Items {
isReady := api.IsPodReady(&p)
if p.Status.Phase != api.PodRunning || !isReady {
framework.Logf("Waiting for pod %v to enter %v - Ready=True, currently %v - Ready=%v", p.Name, api.PodRunning, p.Status.Phase, isReady)
desiredReadiness := shouldBeReady == isReady
framework.Logf("Waiting for pod %v to enter %v - Ready=%v, currently %v - Ready=%v", p.Name, api.PodRunning, shouldBeReady, p.Status.Phase, isReady)
if p.Status.Phase != api.PodRunning || !desiredReadiness {
return false, nil
}
}
@ -780,8 +860,32 @@ func (p *statefulSetTester) waitForRunning(numPets int32, ps *apps.StatefulSet)
if pollErr != nil {
framework.Failf("Failed waiting for pods to enter running: %v", pollErr)
}
}
p.waitForStatus(ps, numPets)
func (p *statefulSetTester) waitForRunningAndReady(numPets int32, ps *apps.StatefulSet) {
p.waitForRunning(numPets, ps, true)
}
func (p *statefulSetTester) waitForRunningAndNotReady(numPets int32, ps *apps.StatefulSet) {
p.waitForRunning(numPets, ps, false)
}
func (p *statefulSetTester) breakProbe(ps *apps.StatefulSet, probe *api.Probe) error {
path := probe.HTTPGet.Path
if path == "" {
return fmt.Errorf("Path expected to be not empty: %v", path)
}
cmd := fmt.Sprintf("mv -v /usr/share/nginx/html%v /tmp/", path)
return p.execInPets(ps, cmd)
}
func (p *statefulSetTester) restoreProbe(ps *apps.StatefulSet, probe *api.Probe) error {
path := probe.HTTPGet.Path
if path == "" {
return fmt.Errorf("Path expected to be not empty: %v", path)
}
cmd := fmt.Sprintf("mv -v /tmp%v /usr/share/nginx/html/", path)
return p.execInPets(ps, cmd)
}
func (p *statefulSetTester) setHealthy(ps *apps.StatefulSet) {