StatefulSet: Stop using initialized annotation in e2e tests.

Instead, use Readiness to pause and resume individual Pods to verify
that the StatefulSet controller performs the right actions one at a
time.
This commit is contained in:
Anthony Yeh 2017-07-19 15:29:21 -07:00
parent d2791d46e3
commit 48e8370674
No known key found for this signature in database
GPG Key ID: 339F46A383E6ED08
3 changed files with 119 additions and 107 deletions

View File

@ -28,7 +28,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
klabels "k8s.io/apimachinery/pkg/labels" klabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
@ -92,13 +91,12 @@ var _ = SIGDescribe("StatefulSet", func() {
It("should provide basic identity", func() { It("should provide basic identity", func() {
By("Creating statefulset " + ssName + " in namespace " + ns) By("Creating statefulset " + ssName + " in namespace " + ns)
*(ss.Spec.Replicas) = 3 *(ss.Spec.Replicas) = 3
framework.SetStatefulSetInitializedAnnotation(ss, "false") sst := framework.NewStatefulSetTester(c)
sst.PauseNewPods(ss)
_, err := c.AppsV1beta1().StatefulSets(ns).Create(ss) _, err := c.AppsV1beta1().StatefulSets(ns).Create(ss)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
sst := framework.NewStatefulSetTester(c)
By("Saturating stateful set " + ss.Name) By("Saturating stateful set " + ss.Name)
sst.Saturate(ss) sst.Saturate(ss)
@ -130,7 +128,8 @@ var _ = SIGDescribe("StatefulSet", func() {
It("should adopt matching orphans and release non-matching pods", func() { It("should adopt matching orphans and release non-matching pods", func() {
By("Creating statefulset " + ssName + " in namespace " + ns) By("Creating statefulset " + ssName + " in namespace " + ns)
*(ss.Spec.Replicas) = 1 *(ss.Spec.Replicas) = 1
framework.SetStatefulSetInitializedAnnotation(ss, "false") sst := framework.NewStatefulSetTester(c)
sst.PauseNewPods(ss)
// Replace ss with the one returned from Create() so it has the UID. // Replace ss with the one returned from Create() so it has the UID.
// Save Kind since it won't be populated in the returned ss. // Save Kind since it won't be populated in the returned ss.
@ -139,8 +138,6 @@ var _ = SIGDescribe("StatefulSet", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
ss.Kind = kind ss.Kind = kind
sst := framework.NewStatefulSetTester(c)
By("Saturating stateful set " + ss.Name) By("Saturating stateful set " + ss.Name)
sst.Saturate(ss) sst.Saturate(ss)
pods := sst.GetPodList(ss) pods := sst.GetPodList(ss)
@ -214,20 +211,19 @@ var _ = SIGDescribe("StatefulSet", func() {
It("should not deadlock when a pod's predecessor fails", func() { It("should not deadlock when a pod's predecessor fails", func() {
By("Creating statefulset " + ssName + " in namespace " + ns) By("Creating statefulset " + ssName + " in namespace " + ns)
*(ss.Spec.Replicas) = 2 *(ss.Spec.Replicas) = 2
framework.SetStatefulSetInitializedAnnotation(ss, "false") sst := framework.NewStatefulSetTester(c)
sst.PauseNewPods(ss)
_, err := c.AppsV1beta1().StatefulSets(ns).Create(ss) _, err := c.AppsV1beta1().StatefulSets(ns).Create(ss)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
sst := framework.NewStatefulSetTester(c) sst.WaitForRunning(1, 0, ss)
sst.WaitForRunningAndReady(1, ss) By("Resuming stateful pod at index 0.")
sst.ResumeNextPod(ss)
By("Marking stateful pod at index 0 as healthy.")
sst.SetHealthy(ss)
By("Waiting for stateful pod at index 1 to enter running.") By("Waiting for stateful pod at index 1 to enter running.")
sst.WaitForRunningAndReady(2, ss) sst.WaitForRunning(2, 1, ss)
// Now we have 1 healthy and 1 unhealthy stateful pod. Deleting the healthy stateful pod should *not* // Now we have 1 healthy and 1 unhealthy stateful pod. Deleting the healthy stateful pod should *not*
// create a new stateful pod till the remaining stateful pod becomes healthy, which won't happen till // create a new stateful pod till the remaining stateful pod becomes healthy, which won't happen till
@ -237,7 +233,7 @@ var _ = SIGDescribe("StatefulSet", func() {
sst.DeleteStatefulPodAtIndex(0, ss) sst.DeleteStatefulPodAtIndex(0, ss)
By("Confirming stateful pod at index 0 is recreated.") By("Confirming stateful pod at index 0 is recreated.")
sst.WaitForRunningAndReady(2, ss) sst.WaitForRunning(2, 0, ss)
By("Deleting unhealthy stateful pod at index 1.") By("Deleting unhealthy stateful pod at index 1.")
sst.DeleteStatefulPodAtIndex(1, ss) sst.DeleteStatefulPodAtIndex(1, ss)
@ -248,14 +244,11 @@ var _ = SIGDescribe("StatefulSet", func() {
It("should perform rolling updates and roll backs of template modifications", func() { It("should perform rolling updates and roll backs of template modifications", func() {
By("Creating a new StatefulSet") By("Creating a new StatefulSet")
testProbe := &v1.Probe{Handler: v1.Handler{HTTPGet: &v1.HTTPGetAction{
Path: "/index.html",
Port: intstr.IntOrString{IntVal: 80}}}}
ss := framework.NewStatefulSet("ss2", ns, headlessSvcName, 3, nil, nil, labels) ss := framework.NewStatefulSet("ss2", ns, headlessSvcName, 3, nil, nil, labels)
ss.Spec.Template.Spec.Containers[0].ReadinessProbe = testProbe sst := framework.NewStatefulSetTester(c)
sst.SetHttpProbe(ss)
ss, err := c.AppsV1beta1().StatefulSets(ns).Create(ss) ss, err := c.AppsV1beta1().StatefulSets(ns).Create(ss)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
sst := framework.NewStatefulSetTester(c)
sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss) sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss)
ss = sst.WaitForStatus(ss) ss = sst.WaitForStatus(ss)
currentRevision, updateRevision := ss.Status.CurrentRevision, ss.Status.UpdateRevision currentRevision, updateRevision := ss.Status.CurrentRevision, ss.Status.UpdateRevision
@ -272,7 +265,7 @@ var _ = SIGDescribe("StatefulSet", func() {
currentRevision)) currentRevision))
} }
sst.SortStatefulPods(pods) sst.SortStatefulPods(pods)
sst.BreakPodProbe(ss, &pods.Items[1], testProbe) sst.BreakPodHttpProbe(ss, &pods.Items[1])
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
ss, pods = sst.WaitForPodNotReady(ss, pods.Items[1].Name) ss, pods = sst.WaitForPodNotReady(ss, pods.Items[1].Name)
newImage := NewNginxImage newImage := NewNginxImage
@ -294,7 +287,7 @@ var _ = SIGDescribe("StatefulSet", func() {
By("Updating Pods in reverse ordinal order") By("Updating Pods in reverse ordinal order")
pods = sst.GetPodList(ss) pods = sst.GetPodList(ss)
sst.SortStatefulPods(pods) sst.SortStatefulPods(pods)
sst.RestorePodProbe(ss, &pods.Items[1], testProbe) sst.RestorePodHttpProbe(ss, &pods.Items[1])
ss, pods = sst.WaitForPodReady(ss, pods.Items[1].Name) ss, pods = sst.WaitForPodReady(ss, pods.Items[1].Name)
ss, pods = sst.WaitForRollingUpdate(ss) ss, pods = sst.WaitForRollingUpdate(ss)
Expect(ss.Status.CurrentRevision).To(Equal(updateRevision), Expect(ss.Status.CurrentRevision).To(Equal(updateRevision),
@ -319,7 +312,7 @@ var _ = SIGDescribe("StatefulSet", func() {
} }
By("Rolling back to a previous revision") By("Rolling back to a previous revision")
sst.BreakPodProbe(ss, &pods.Items[1], testProbe) sst.BreakPodHttpProbe(ss, &pods.Items[1])
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
ss, pods = sst.WaitForPodNotReady(ss, pods.Items[1].Name) ss, pods = sst.WaitForPodNotReady(ss, pods.Items[1].Name)
priorRevision := currentRevision priorRevision := currentRevision
@ -338,7 +331,7 @@ var _ = SIGDescribe("StatefulSet", func() {
By("Rolling back update in reverse ordinal order") By("Rolling back update in reverse ordinal order")
pods = sst.GetPodList(ss) pods = sst.GetPodList(ss)
sst.SortStatefulPods(pods) sst.SortStatefulPods(pods)
sst.RestorePodProbe(ss, &pods.Items[1], testProbe) sst.RestorePodHttpProbe(ss, &pods.Items[1])
ss, pods = sst.WaitForPodReady(ss, pods.Items[1].Name) ss, pods = sst.WaitForPodReady(ss, pods.Items[1].Name)
ss, pods = sst.WaitForRollingUpdate(ss) ss, pods = sst.WaitForRollingUpdate(ss)
Expect(ss.Status.CurrentRevision).To(Equal(priorRevision), Expect(ss.Status.CurrentRevision).To(Equal(priorRevision),
@ -366,11 +359,9 @@ var _ = SIGDescribe("StatefulSet", func() {
It("should perform canary updates and phased rolling updates of template modifications", func() { It("should perform canary updates and phased rolling updates of template modifications", func() {
By("Creating a new StaefulSet") By("Creating a new StaefulSet")
testProbe := &v1.Probe{Handler: v1.Handler{HTTPGet: &v1.HTTPGetAction{
Path: "/index.html",
Port: intstr.IntOrString{IntVal: 80}}}}
ss := framework.NewStatefulSet("ss2", ns, headlessSvcName, 3, nil, nil, labels) ss := framework.NewStatefulSet("ss2", ns, headlessSvcName, 3, nil, nil, labels)
ss.Spec.Template.Spec.Containers[0].ReadinessProbe = testProbe sst := framework.NewStatefulSetTester(c)
sst.SetHttpProbe(ss)
ss.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{ ss.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{
Type: apps.RollingUpdateStatefulSetStrategyType, Type: apps.RollingUpdateStatefulSetStrategyType,
RollingUpdate: func() *apps.RollingUpdateStatefulSetStrategy { RollingUpdate: func() *apps.RollingUpdateStatefulSetStrategy {
@ -383,7 +374,6 @@ var _ = SIGDescribe("StatefulSet", func() {
} }
ss, err := c.AppsV1beta1().StatefulSets(ns).Create(ss) ss, err := c.AppsV1beta1().StatefulSets(ns).Create(ss)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
sst := framework.NewStatefulSetTester(c)
sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss) sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss)
ss = sst.WaitForStatus(ss) ss = sst.WaitForStatus(ss)
currentRevision, updateRevision := ss.Status.CurrentRevision, ss.Status.UpdateRevision currentRevision, updateRevision := ss.Status.CurrentRevision, ss.Status.UpdateRevision
@ -578,17 +568,14 @@ var _ = SIGDescribe("StatefulSet", func() {
It("should implement legacy replacement when the update strategy is OnDelete", func() { It("should implement legacy replacement when the update strategy is OnDelete", func() {
By("Creating a new StatefulSet") By("Creating a new StatefulSet")
testProbe := &v1.Probe{Handler: v1.Handler{HTTPGet: &v1.HTTPGetAction{
Path: "/index.html",
Port: intstr.IntOrString{IntVal: 80}}}}
ss := framework.NewStatefulSet("ss2", ns, headlessSvcName, 3, nil, nil, labels) ss := framework.NewStatefulSet("ss2", ns, headlessSvcName, 3, nil, nil, labels)
ss.Spec.Template.Spec.Containers[0].ReadinessProbe = testProbe sst := framework.NewStatefulSetTester(c)
sst.SetHttpProbe(ss)
ss.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{ ss.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{
Type: apps.OnDeleteStatefulSetStrategyType, Type: apps.OnDeleteStatefulSetStrategyType,
} }
ss, err := c.AppsV1beta1().StatefulSets(ns).Create(ss) ss, err := c.AppsV1beta1().StatefulSets(ns).Create(ss)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
sst := framework.NewStatefulSetTester(c)
sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss) sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss)
ss = sst.WaitForStatus(ss) ss = sst.WaitForStatus(ss)
currentRevision, updateRevision := ss.Status.CurrentRevision, ss.Status.UpdateRevision currentRevision, updateRevision := ss.Status.CurrentRevision, ss.Status.UpdateRevision
@ -668,27 +655,24 @@ var _ = SIGDescribe("StatefulSet", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
By("Creating stateful set " + ssName + " in namespace " + ns) By("Creating stateful set " + ssName + " in namespace " + ns)
testProbe := &v1.Probe{Handler: v1.Handler{HTTPGet: &v1.HTTPGetAction{
Path: "/index.html",
Port: intstr.IntOrString{IntVal: 80}}}}
ss := framework.NewStatefulSet(ssName, ns, headlessSvcName, 1, nil, nil, psLabels) ss := framework.NewStatefulSet(ssName, ns, headlessSvcName, 1, nil, nil, psLabels)
ss.Spec.Template.Spec.Containers[0].ReadinessProbe = testProbe sst := framework.NewStatefulSetTester(c)
sst.SetHttpProbe(ss)
ss, err = c.AppsV1beta1().StatefulSets(ns).Create(ss) ss, err = c.AppsV1beta1().StatefulSets(ns).Create(ss)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
By("Waiting until all stateful set " + ssName + " replicas will be running in namespace " + ns) By("Waiting until all stateful set " + ssName + " replicas will be running in namespace " + ns)
sst := framework.NewStatefulSetTester(c)
sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss) sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss)
By("Confirming that stateful set scale up will halt with unhealthy stateful pod") By("Confirming that stateful set scale up will halt with unhealthy stateful pod")
sst.BreakProbe(ss, testProbe) sst.BreakHttpProbe(ss)
sst.WaitForRunningAndNotReady(*ss.Spec.Replicas, ss) sst.WaitForRunningAndNotReady(*ss.Spec.Replicas, ss)
sst.WaitForStatusReadyReplicas(ss, 0) sst.WaitForStatusReadyReplicas(ss, 0)
sst.UpdateReplicas(ss, 3) sst.UpdateReplicas(ss, 3)
sst.ConfirmStatefulPodCount(1, ss, 10*time.Second, true) sst.ConfirmStatefulPodCount(1, ss, 10*time.Second, true)
By("Scaling up stateful set " + ssName + " to 3 replicas and waiting until all of them will be running in namespace " + ns) By("Scaling up stateful set " + ssName + " to 3 replicas and waiting until all of them will be running in namespace " + ns)
sst.RestoreProbe(ss, testProbe) sst.RestoreHttpProbe(ss)
sst.WaitForRunningAndReady(3, ss) sst.WaitForRunningAndReady(3, ss)
By("Verifying that stateful set " + ssName + " was scaled up in order") By("Verifying that stateful set " + ssName + " was scaled up in order")
@ -712,14 +696,14 @@ var _ = SIGDescribe("StatefulSet", func() {
}) })
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
sst.BreakProbe(ss, testProbe) sst.BreakHttpProbe(ss)
sst.WaitForStatusReadyReplicas(ss, 0) sst.WaitForStatusReadyReplicas(ss, 0)
sst.WaitForRunningAndNotReady(3, ss) sst.WaitForRunningAndNotReady(3, ss)
sst.UpdateReplicas(ss, 0) sst.UpdateReplicas(ss, 0)
sst.ConfirmStatefulPodCount(3, ss, 10*time.Second, true) sst.ConfirmStatefulPodCount(3, ss, 10*time.Second, true)
By("Scaling down stateful set " + ssName + " to 0 replicas and waiting until none of pods will run in namespace" + ns) By("Scaling down stateful set " + ssName + " to 0 replicas and waiting until none of pods will run in namespace" + ns)
sst.RestoreProbe(ss, testProbe) sst.RestoreHttpProbe(ss)
sst.Scale(ss, 0) sst.Scale(ss, 0)
By("Verifying that stateful set " + ssName + " was scaled down in reverse order") By("Verifying that stateful set " + ssName + " was scaled down in reverse order")
@ -742,39 +726,36 @@ var _ = SIGDescribe("StatefulSet", func() {
psLabels := klabels.Set(labels) psLabels := klabels.Set(labels)
By("Creating stateful set " + ssName + " in namespace " + ns) By("Creating stateful set " + ssName + " in namespace " + ns)
testProbe := &v1.Probe{Handler: v1.Handler{HTTPGet: &v1.HTTPGetAction{
Path: "/index.html",
Port: intstr.IntOrString{IntVal: 80}}}}
ss := framework.NewStatefulSet(ssName, ns, headlessSvcName, 1, nil, nil, psLabels) ss := framework.NewStatefulSet(ssName, ns, headlessSvcName, 1, nil, nil, psLabels)
ss.Spec.PodManagementPolicy = apps.ParallelPodManagement ss.Spec.PodManagementPolicy = apps.ParallelPodManagement
ss.Spec.Template.Spec.Containers[0].ReadinessProbe = testProbe sst := framework.NewStatefulSetTester(c)
sst.SetHttpProbe(ss)
ss, err := c.AppsV1beta1().StatefulSets(ns).Create(ss) ss, err := c.AppsV1beta1().StatefulSets(ns).Create(ss)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
By("Waiting until all stateful set " + ssName + " replicas will be running in namespace " + ns) By("Waiting until all stateful set " + ssName + " replicas will be running in namespace " + ns)
sst := framework.NewStatefulSetTester(c)
sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss) sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss)
By("Confirming that stateful set scale up will not halt with unhealthy stateful pod") By("Confirming that stateful set scale up will not halt with unhealthy stateful pod")
sst.BreakProbe(ss, testProbe) sst.BreakHttpProbe(ss)
sst.WaitForRunningAndNotReady(*ss.Spec.Replicas, ss) sst.WaitForRunningAndNotReady(*ss.Spec.Replicas, ss)
sst.WaitForStatusReadyReplicas(ss, 0) sst.WaitForStatusReadyReplicas(ss, 0)
sst.UpdateReplicas(ss, 3) sst.UpdateReplicas(ss, 3)
sst.ConfirmStatefulPodCount(3, ss, 10*time.Second, false) sst.ConfirmStatefulPodCount(3, ss, 10*time.Second, false)
By("Scaling up stateful set " + ssName + " to 3 replicas and waiting until all of them will be running in namespace " + ns) By("Scaling up stateful set " + ssName + " to 3 replicas and waiting until all of them will be running in namespace " + ns)
sst.RestoreProbe(ss, testProbe) sst.RestoreHttpProbe(ss)
sst.WaitForRunningAndReady(3, ss) sst.WaitForRunningAndReady(3, ss)
By("Scale down will not halt with unhealthy stateful pod") By("Scale down will not halt with unhealthy stateful pod")
sst.BreakProbe(ss, testProbe) sst.BreakHttpProbe(ss)
sst.WaitForStatusReadyReplicas(ss, 0) sst.WaitForStatusReadyReplicas(ss, 0)
sst.WaitForRunningAndNotReady(3, ss) sst.WaitForRunningAndNotReady(3, ss)
sst.UpdateReplicas(ss, 0) sst.UpdateReplicas(ss, 0)
sst.ConfirmStatefulPodCount(0, ss, 10*time.Second, false) sst.ConfirmStatefulPodCount(0, ss, 10*time.Second, false)
By("Scaling down stateful set " + ssName + " to 0 replicas and waiting until none of pods will run in namespace" + ns) By("Scaling down stateful set " + ssName + " to 0 replicas and waiting until none of pods will run in namespace" + ns)
sst.RestoreProbe(ss, testProbe) sst.RestoreHttpProbe(ss)
sst.Scale(ss, 0) sst.Scale(ss, 0)
sst.WaitForStatusReadyReplicas(ss, 0) sst.WaitForStatusReadyReplicas(ss, 0)
}) })

View File

@ -19,6 +19,7 @@ package framework
import ( import (
"fmt" "fmt"
"path/filepath" "path/filepath"
"reflect"
"regexp" "regexp"
"sort" "sort"
"strconv" "strconv"
@ -33,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
@ -160,10 +162,10 @@ func (s *StatefulSetTester) CheckHostname(ss *apps.StatefulSet) error {
func (s *StatefulSetTester) Saturate(ss *apps.StatefulSet) { func (s *StatefulSetTester) Saturate(ss *apps.StatefulSet) {
var i int32 var i int32
for i = 0; i < *(ss.Spec.Replicas); i++ { for i = 0; i < *(ss.Spec.Replicas); i++ {
Logf("Waiting for stateful pod at index " + fmt.Sprintf("%v", i+1) + " to enter Running") Logf("Waiting for stateful pod at index %v to enter Running", i)
s.WaitForRunningAndReady(i+1, ss) s.WaitForRunning(i+1, i, ss)
Logf("Marking stateful pod at index " + fmt.Sprintf("%v", i) + " healthy") Logf("Resuming stateful pod at index %v", i)
s.SetHealthy(ss) s.ResumeNextPod(ss)
} }
} }
@ -282,18 +284,22 @@ func (s *StatefulSetTester) ConfirmStatefulPodCount(count int, ss *apps.Stateful
} }
} }
func (s *StatefulSetTester) waitForRunning(numStatefulPods int32, ss *apps.StatefulSet, shouldBeReady bool) { // WaitForRunning waits for numPodsRunning in ss to be Running and for the first
// numPodsReady ordinals to be Ready.
func (s *StatefulSetTester) WaitForRunning(numPodsRunning, numPodsReady int32, ss *apps.StatefulSet) {
pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout, pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout,
func() (bool, error) { func() (bool, error) {
podList := s.GetPodList(ss) podList := s.GetPodList(ss)
if int32(len(podList.Items)) < numStatefulPods { s.SortStatefulPods(podList)
Logf("Found %d stateful pods, waiting for %d", len(podList.Items), numStatefulPods) if int32(len(podList.Items)) < numPodsRunning {
Logf("Found %d stateful pods, waiting for %d", len(podList.Items), numPodsRunning)
return false, nil return false, nil
} }
if int32(len(podList.Items)) > numStatefulPods { if int32(len(podList.Items)) > numPodsRunning {
return false, fmt.Errorf("Too many pods scheduled, expected %d got %d", numStatefulPods, len(podList.Items)) return false, fmt.Errorf("Too many pods scheduled, expected %d got %d", numPodsRunning, len(podList.Items))
} }
for _, p := range podList.Items { for _, p := range podList.Items {
shouldBeReady := getStatefulPodOrdinal(&p) < int(numPodsReady)
isReady := podutil.IsPodReady(&p) isReady := podutil.IsPodReady(&p)
desiredReadiness := shouldBeReady == isReady desiredReadiness := shouldBeReady == isReady
Logf("Waiting for pod %v to enter %v - Ready=%v, currently %v - Ready=%v", p.Name, v1.PodRunning, shouldBeReady, p.Status.Phase, isReady) Logf("Waiting for pod %v to enter %v - Ready=%v, currently %v - Ready=%v", p.Name, v1.PodRunning, shouldBeReady, p.Status.Phase, isReady)
@ -339,7 +345,7 @@ func (s *StatefulSetTester) WaitForStatus(set *apps.StatefulSet) *apps.StatefulS
// WaitForRunningAndReady waits for numStatefulPods in ss to be Running and Ready. // WaitForRunningAndReady waits for numStatefulPods in ss to be Running and Ready.
func (s *StatefulSetTester) WaitForRunningAndReady(numStatefulPods int32, ss *apps.StatefulSet) { func (s *StatefulSetTester) WaitForRunningAndReady(numStatefulPods int32, ss *apps.StatefulSet) {
s.waitForRunning(numStatefulPods, ss, true) s.WaitForRunning(numStatefulPods, numStatefulPods, ss)
} }
// WaitForPodReady waits for the Pod named podName in set to exist and have a Ready condition. // WaitForPodReady waits for the Pod named podName in set to exist and have a Ready condition.
@ -473,12 +479,31 @@ func (s *StatefulSetTester) WaitForPartitionedRollingUpdate(set *apps.StatefulSe
// WaitForRunningAndReady waits for numStatefulPods in ss to be Running and not Ready. // WaitForRunningAndReady waits for numStatefulPods in ss to be Running and not Ready.
func (s *StatefulSetTester) WaitForRunningAndNotReady(numStatefulPods int32, ss *apps.StatefulSet) { func (s *StatefulSetTester) WaitForRunningAndNotReady(numStatefulPods int32, ss *apps.StatefulSet) {
s.waitForRunning(numStatefulPods, ss, false) s.WaitForRunning(numStatefulPods, 0, ss)
} }
// BreakProbe breaks the readiness probe for Nginx StatefulSet containers in ss. var httpProbe = &v1.Probe{
func (s *StatefulSetTester) BreakProbe(ss *apps.StatefulSet, probe *v1.Probe) error { Handler: v1.Handler{
path := probe.HTTPGet.Path HTTPGet: &v1.HTTPGetAction{
Path: "/index.html",
Port: intstr.IntOrString{IntVal: 80},
},
},
PeriodSeconds: 1,
SuccessThreshold: 1,
FailureThreshold: 1,
}
// SetHttpProbe sets the pod template's ReadinessProbe for Nginx StatefulSet containers.
// This probe can then be controlled with BreakHttpProbe() and RestoreHttpProbe().
// Note that this cannot be used together with PauseNewPods().
func (s *StatefulSetTester) SetHttpProbe(ss *apps.StatefulSet) {
ss.Spec.Template.Spec.Containers[0].ReadinessProbe = httpProbe
}
// BreakHttpProbe breaks the readiness probe for Nginx StatefulSet containers in ss.
func (s *StatefulSetTester) BreakHttpProbe(ss *apps.StatefulSet) error {
path := httpProbe.HTTPGet.Path
if path == "" { if path == "" {
return fmt.Errorf("Path expected to be not empty: %v", path) return fmt.Errorf("Path expected to be not empty: %v", path)
} }
@ -486,9 +511,9 @@ func (s *StatefulSetTester) BreakProbe(ss *apps.StatefulSet, probe *v1.Probe) er
return s.ExecInStatefulPods(ss, cmd) return s.ExecInStatefulPods(ss, cmd)
} }
// BreakProbe breaks the readiness probe for Nginx StatefulSet containers in pod. // BreakPodHttpProbe breaks the readiness probe for Nginx StatefulSet containers in one pod.
func (s *StatefulSetTester) BreakPodProbe(ss *apps.StatefulSet, pod *v1.Pod, probe *v1.Probe) error { func (s *StatefulSetTester) BreakPodHttpProbe(ss *apps.StatefulSet, pod *v1.Pod) error {
path := probe.HTTPGet.Path path := httpProbe.HTTPGet.Path
if path == "" { if path == "" {
return fmt.Errorf("Path expected to be not empty: %v", path) return fmt.Errorf("Path expected to be not empty: %v", path)
} }
@ -498,9 +523,9 @@ func (s *StatefulSetTester) BreakPodProbe(ss *apps.StatefulSet, pod *v1.Pod, pro
return err return err
} }
// RestoreProbe restores the readiness probe for Nginx StatefulSet containers in ss. // RestoreHttpProbe restores the readiness probe for Nginx StatefulSet containers in ss.
func (s *StatefulSetTester) RestoreProbe(ss *apps.StatefulSet, probe *v1.Probe) error { func (s *StatefulSetTester) RestoreHttpProbe(ss *apps.StatefulSet) error {
path := probe.HTTPGet.Path path := httpProbe.HTTPGet.Path
if path == "" { if path == "" {
return fmt.Errorf("Path expected to be not empty: %v", path) return fmt.Errorf("Path expected to be not empty: %v", path)
} }
@ -508,9 +533,9 @@ func (s *StatefulSetTester) RestoreProbe(ss *apps.StatefulSet, probe *v1.Probe)
return s.ExecInStatefulPods(ss, cmd) return s.ExecInStatefulPods(ss, cmd)
} }
// RestoreProbe restores the readiness probe for Nginx StatefulSet containers in pod. // RestorePodHttpProbe restores the readiness probe for Nginx StatefulSet containers in pod.
func (s *StatefulSetTester) RestorePodProbe(ss *apps.StatefulSet, pod *v1.Pod, probe *v1.Probe) error { func (s *StatefulSetTester) RestorePodHttpProbe(ss *apps.StatefulSet, pod *v1.Pod) error {
path := probe.HTTPGet.Path path := httpProbe.HTTPGet.Path
if path == "" { if path == "" {
return fmt.Errorf("Path expected to be not empty: %v", path) return fmt.Errorf("Path expected to be not empty: %v", path)
} }
@ -520,26 +545,50 @@ func (s *StatefulSetTester) RestorePodProbe(ss *apps.StatefulSet, pod *v1.Pod, p
return err return err
} }
// SetHealthy updates the StatefulSet InitAnnotation to true in order to set a StatefulSet Pod to be Running and Ready. var pauseProbe = &v1.Probe{
func (s *StatefulSetTester) SetHealthy(ss *apps.StatefulSet) { Handler: v1.Handler{
Exec: &v1.ExecAction{Command: []string{"test", "-f", "/tmp/statefulset-continue"}},
},
PeriodSeconds: 1,
SuccessThreshold: 1,
FailureThreshold: 1,
}
func hasPauseProbe(pod *v1.Pod) bool {
probe := pod.Spec.Containers[0].ReadinessProbe
return probe != nil && reflect.DeepEqual(probe.Exec.Command, pauseProbe.Exec.Command)
}
// PauseNewPods adds an always-failing ReadinessProbe to the StatefulSet PodTemplate.
// This causes all newly-created Pods to stay Unready until they are manually resumed
// with ResumeNextPod().
// Note that this cannot be used together with SetHttpProbe().
func (s *StatefulSetTester) PauseNewPods(ss *apps.StatefulSet) {
ss.Spec.Template.Spec.Containers[0].ReadinessProbe = pauseProbe
}
// ResumeNextPod allows the next Pod in the StatefulSet to continue by removing the ReadinessProbe
// added by PauseNewPods(), if it's still there.
// It fails the test if it finds any pods that are not in phase Running,
// or if it finds more than one paused Pod existing at the same time.
// This is a no-op if there are no paused pods.
func (s *StatefulSetTester) ResumeNextPod(ss *apps.StatefulSet) {
podList := s.GetPodList(ss) podList := s.GetPodList(ss)
markedHealthyPod := "" resumedPod := ""
for _, pod := range podList.Items { for _, pod := range podList.Items {
if pod.Status.Phase != v1.PodRunning { if pod.Status.Phase != v1.PodRunning {
Failf("Found pod in %v cannot set health", pod.Status.Phase) Failf("Found pod in phase %q, cannot resume", pod.Status.Phase)
} }
if IsStatefulSetPodInitialized(pod) { if podutil.IsPodReady(&pod) || !hasPauseProbe(&pod) {
continue continue
} }
if markedHealthyPod != "" { if resumedPod != "" {
Failf("Found multiple non-healthy stateful pods: %v and %v", pod.Name, markedHealthyPod) Failf("Found multiple paused stateful pods: %v and %v", pod.Name, resumedPod)
} }
p, err := UpdatePodWithRetries(s.c, pod.Namespace, pod.Name, func(update *v1.Pod) { _, err := RunHostCmd(pod.Namespace, pod.Name, "touch /tmp/statefulset-continue")
update.Annotations[apps.StatefulSetInitAnnotation] = "true"
})
ExpectNoError(err) ExpectNoError(err)
Logf("Set annotation %v to %v on pod %v", apps.StatefulSetInitAnnotation, p.Annotations[apps.StatefulSetInitAnnotation], pod.Name) Logf("Resumed pod %v", pod.Name)
markedHealthyPod = pod.Name resumedPod = pod.Name
} }
} }
@ -682,19 +731,6 @@ func DeleteAllStatefulSets(c clientset.Interface, ns string) {
} }
} }
// IsStatefulSetPodInitialized returns true if pod's StatefulSetInitAnnotation exists and is set to true.
func IsStatefulSetPodInitialized(pod v1.Pod) bool {
initialized, ok := pod.Annotations[apps.StatefulSetInitAnnotation]
if !ok {
return false
}
inited, err := strconv.ParseBool(initialized)
if err != nil {
Failf("Couldn't parse statefulset init annotations %v", initialized)
}
return inited
}
// NewStatefulSetPVC returns a PersistentVolumeClaim named name, for testing StatefulSets. // NewStatefulSetPVC returns a PersistentVolumeClaim named name, for testing StatefulSets.
func NewStatefulSetPVC(name string) v1.PersistentVolumeClaim { func NewStatefulSetPVC(name string) v1.PersistentVolumeClaim {
return v1.PersistentVolumeClaim{ return v1.PersistentVolumeClaim{
@ -776,11 +812,6 @@ func NewStatefulSet(name, ns, governingSvcName string, replicas int32, statefulP
} }
} }
// SetStatefulSetInitializedAnnotation sets teh StatefulSetInitAnnotation to value.
func SetStatefulSetInitializedAnnotation(ss *apps.StatefulSet, value string) {
ss.Spec.Template.ObjectMeta.Annotations["pod.alpha.kubernetes.io/initialized"] = value
}
var statefulPodRegex = regexp.MustCompile("(.*)-([0-9]+)$") var statefulPodRegex = regexp.MustCompile("(.*)-([0-9]+)$")
func getStatefulPodOrdinal(pod *v1.Pod) int { func getStatefulPodOrdinal(pod *v1.Pod) int {

View File

@ -62,12 +62,12 @@ func (t *StatefulSetUpgradeTest) Setup(f *framework.Framework) {
t.set = framework.NewStatefulSet(ssName, ns, headlessSvcName, 2, statefulPodMounts, podMounts, labels) t.set = framework.NewStatefulSet(ssName, ns, headlessSvcName, 2, statefulPodMounts, podMounts, labels)
t.service = framework.CreateStatefulSetService(ssName, labels) t.service = framework.CreateStatefulSetService(ssName, labels)
*(t.set.Spec.Replicas) = 3 *(t.set.Spec.Replicas) = 3
framework.SetStatefulSetInitializedAnnotation(t.set, "false") t.tester = framework.NewStatefulSetTester(f.ClientSet)
t.tester.PauseNewPods(t.set)
By("Creating service " + headlessSvcName + " in namespace " + ns) By("Creating service " + headlessSvcName + " in namespace " + ns)
_, err := f.ClientSet.Core().Services(ns).Create(t.service) _, err := f.ClientSet.Core().Services(ns).Create(t.service)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
t.tester = framework.NewStatefulSetTester(f.ClientSet)
By("Creating statefulset " + ssName + " in namespace " + ns) By("Creating statefulset " + ssName + " in namespace " + ns)
*(t.set.Spec.Replicas) = 3 *(t.set.Spec.Replicas) = 3